Skip to content

Using Indexer Agent for batch ingestion

srikalyan chandrashekar edited this page May 28, 2015 · 23 revisions

The Indexer Agent(IA)

Indexer Agent can be seen as a (oozie like)scheduler that instantiates a bunch of indexer tasks and manages its lifecycle (start, done, rerun etc). The tasks could be "ingesting data for a DataSource(aka table)" for a particular timestamp. Ingestion options are from local filesystem/HDFS and should be template based paths(more about this later).

The IA framework

The IA core is built on top of Akka framework. Akka is a concurrency framework that hides the complexities behind safe thread interaction and message passing. Agent starts off a system of Schedulers(actors) that orchestrates other Workers(actors) to perform very specific tasks. Actors communicate to each other through messages. The message have one-one mapping to tasks. The workers are distributed work through a Round robin router(actor).

The IA bootstrapping

IA starts off by reading .sql files each consisting of INSERT/INSERT_HADOOP statements. Information about ingestion frequency, datasource name, start time, end time and location of feeds etc are tracked in a database(DB). The DB could be derby/mysql and is configurable. The following is a sample configuration file.

# All the time properties configured here are in minutes.
# If delay is 60 minutes then first attempt to run 00 hour is 60 minutes later.
taskAttemptDelay=90
# Retry on a failed task is attempted after retryDelay.
retryDelay=60
# Maximum number of times the task run is attempted before setting givenUp=1 on that instance.
maxRetries=3
# If (Task nominal time + sla time) < current time then send an email.
slaTime=300
# Number of worker(s) - aka Akka actors
numWorkers=3
# Path(folder) where to find the druid sql files. (Default value is ${HOME}/dsqls)
sqlsPath=/Users/srikalyan/dsqls/

# Work scheduler properties.
work {
  # All the below time(s) are in seconds.
  generate.interval=15
  execute.msgsPerSecond=1
  # Maximum number of parallel tasks (It depends on how many tasks the druid's middle managers can handle).
  execute.maxAtGivenTime=10
  track.interval=15
}
#JDBC properties
jdbc {
  host=127.0.0.1
  port=3306
  id=""
  password=""
  dbName=indexerDB
  # Supported types (mysql, derby)
  dbType=mysql
}

#Druid cluster properties.
druid {
  # default broker is localhost:8082
  broker="localhost:8082"
  # default coordinator is localhost:8081
  coordinator="localhost:8081"
  # default overlord is localhost:8090
  overlord="localhost:8090"
}

The sql file is based on SQL4D grammar. The following is a sample sql file

INSERT_HADOOP INTO abf (timestamp, provider, title, uuid, DOUBLE_SUM(click) AS click) 
  FROM 'hdfs://CLUSTER_NAME/USER/druid_data/{YEAR}{MONTH}{DAY}{HOUR}{MIN}/*gz' 
  WHERE interval BETWEEN '2014-10-31T00:00:00Z' AND '2014-11-03T00:00:00Z' 
  BREAK BY 'day' DELIMITER('\u0001','\n')
-- NOTE the {YEAR} {MONTH} Expression language like variables. They are based on start
--    time (2014-10-31T00:00:00Z) and frequency 'day', other possible frequencies are 
--    'minute', 'fifteen_minute', 'thirty_minute', 'hour'. Instance will be generated 
--    only for times <= end time (2014-11-03T00:00:00Z)

The IA scheduler details

IA has 3 schedulers.

1. Work Generator(Cron)

If the last work that was generated for hourly table X was 00 hour of today, if the current hour is 04 then this scheduler generates work instances for 01, 02, 03 and 04 hours and updates the DB appropriately. In the sample sql provided in this page the instances would be 3 as follows

INSERT_HADOOP INTO abf (timestamp , provider , title, uuid, DOUBLE_SUM(click) AS click) 
  FROM 'hdfs://CLUSTER_NAME/USER/druid_data/201410310000/*gz' 
  WHERE interval BETWEEN '2014-10-31T00:00:00Z' AND '2014-11-01T00:00:00Z' 
  BREAK BY 'day' DELIMITER('\u0001','\n')
INSERT_HADOOP INTO abf (timestamp , provider , title, uuid, DOUBLE_SUM(click) AS click) 
  FROM 'hdfs://CLUSTER_NAME/USER/druid_data/201411010000/*gz' 
  WHERE interval BETWEEN '2014-11-01T00:00:00Z' AND '2014-11-02T00:00:00Z' 
  BREAK BY 'day' DELIMITER('\u0001','\n')
INSERT_HADOOP INTO abf (timestamp , provider , title, uuid, DOUBLE_SUM(click) AS click) 
  FROM 'hdfs://CLUSTER_NAME/USER/druid_data/201411020000/*gz' 
  WHERE interval BETWEEN '2014-11-02T00:00:00Z' AND '2014-11-03T00:00:00Z' 
  BREAK BY 'day' DELIMITER('\u0001','\n')

2. Work Executor(Throttler)

This is not a simple scheduler but a throttler. It does action at a rate of 'N' messages/sec with a maximum of 'M' running actions at a given time. The throttler is implemented in a generic fashion. The "Work executor" throttler extends this by implementing 2 abstract methods getInProgressActionCount() and runAction() to work with throttling task submissions to druid. The getInProgressActionCount() is used to get number of outstanding ingestion tasks in druid. The runAction() send messages to worker(s) to submit tasks to druid.

3. Work Tracker(Cron)

For all the IN_PROGRESS work instances the tracker actors are scheduled to poll the status for indexer tasks on overlord.

Running the IA

One time setup for DB.

The users have a choice of using derby/mysql. WARNING : Do not use Derby even for lamest reasons.

Derby

-- Start the Derby network server 
--  cd $DERBY_HOME/lib/; java -jar derbyrun.jar server start
-- Start the Derby client
-- ij 
-- connect 'jdbc:derby://localhost:1527/indexerDB;create=true';
CREATE TABLE DataSource (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),name VARCHAR(64) NOT NULL, startTime BIGINT, spinFromTime BIGINT, endTime BIGINT, frequency VARCHAR(64) NOT NULL,    status VARCHAR(64) NOT NULL, templatePath VARCHAR(256) NOT NULL, delimiter VARCHAR(4) NOT NULL, listDelimiter VARCHAR(4) NOT NULL, templateSql VARCHAR(2000) NOT NULL, CONSTRAINT dataSource_pk PRIMARY KEY (id)) ;

CREATE TABLE StatusTrail (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),    dataSourceId INTEGER CONSTRAINT dataSource_fk  REFERENCES DataSource (id) ON DELETE CASCADE ON UPDATE RESTRICT,    nominalTime BIGINT, status VARCHAR(64) NOT NULL, givenUp INTEGER DEFAULT 0, attemptsDone INTEGER DEFAULT 0, taskId VARCHAR(128), CONSTRAINT statusTrail_pk PRIMARY KEY (id)) ;

MySql

-- Start the Mysql
-- Add /usr/local/mysql/bin/ to PATH
--      sudo mysqld_safe( do cntrl-Z and type  bg command)
-- Open Mysql client as 
--      mysql
--      mysql> CREATE database indexerDB;
--      mysql> GRANT ALL PRIVILEGES ON * . * TO ''@'localhost';
--      mysql> GRANT ALL PRIVILEGES ON * . * TO ''@'127.0.0.1';
-- Then run the following create statements.

CREATE TABLE DataSource (id INT NOT NULL AUTO_INCREMENT, name VARCHAR(64) NOT NULL, startTime BIGINT, spinFromTime BIGINT, endTime BIGINT, frequency VARCHAR(64) NOT NULL, status VARCHAR(64) NOT NULL,templatePath VARCHAR(256) NOT NULL, delimiter VARCHAR(4) NOT NULL, listDelimiter VARCHAR(4) NOT NULL, templateSql VARCHAR(2000) NOT NULL, PRIMARY KEY (id)) ;

CREATE TABLE StatusTrail (id INT NOT NULL AUTO_INCREMENT, dataSourceId INT, nominalTime BIGINT, status VARCHAR(64) NOT NULL, givenUp INT DEFAULT 0, attemptsDone INT DEFAULT 0, taskId VARCHAR(128), PRIMARY KEY (id), FOREIGN KEY (dataSourceId) REFERENCES DataSource(id) ON DELETE CASCADE ON UPDATE RESTRICT);

Simulate a sample data load

Assuming the user has access to Druid cluster and a Hadoop cluster. Copy the data sets(available at IndexerAgent/sample_data/) to HDFS as follows

hadoop fs -mkdir -p /USER/druid_data/201410310000;hadoop fs -copyFromLocal 31Oct.gz /USER/druid_data/201410310000/;
hadoop fs -mkdir -p /USER/druid_data/201411010000;hadoop fs -copyFromLocal 1Nov.gz /USER/druid_data/201411010000/
hadoop fs -mkdir -p /USER/druid_data/201411020000;hadoop fs -copyFromLocal 2Nov.gz /USER/druid_data/201411020000/
hadoop fs -mkdir -p /USER/druid_data/201411030000;hadoop fs -copyFromLocal 3Nov.gz /USER/druid_data/201411030000/

Run the convenient script startAgent.sh to start IA

    IndexerAgent/startAgent.sh <Path to configuration file>
    # This will be producing logs at /tmp/dsql_indexer_agent_YYYY-MM-DD.log