Skip to content

yanfang724/hadoop-consumer

Repository files navigation

This is a Hadoop job that pulls data from kafka server into HDFS.

It requires the following inputs from a configuration file 
(test/test.properties is an example)

kafka.etl.topic : the topic to be fetched;

input		: input directory containing topic offsets and
		  it can be generated by DataGenerator; 
		  the number of files in this directory determines the
		  number of mappers in the hadoop job;

output		: output directory containing kafka data and updated 
		  topic offsets;

kafka.request.limit : it is used to limit the number events fetched. 

KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
It fetches kafka data from the server. It starts from provided offsets 
(specified by "input") and stops when it reaches the largest available offsets 
or the specified limit (specified by "kafka.request.limit").

KafkaETLJob contains some helper functions to initialize job configuration.

SimpleKafkaETLJob sets up job properties and files Hadoop job. 

SimpleKafkaETLMapper dumps kafka data into hdfs. 

HOW TO RUN:
In order to run this, make sure the HADOOP_HOME environment variable points to 
your hadoop installation directory.

1. Complile using "sbt" to create a package for hadoop consumer code.
./sbt package

2. Run the hadoop-setup.sh script that enables write permission on the 
   required HDFS directory

3. Produce test events in server and generate offset files
  1) Start kafka server [ Follow the quick start - 
                        http://sna-projects.com/kafka/quickstart.php ]

  2) Update test/test.properties to change the following parameters:  
   kafka.etl.topic 	: topic name
   event.count		: number of events to be generated
   kafka.server.uri     : kafka server uri;
   input                : hdfs directory of offset files

  3) Produce test events to Kafka server and generate offset files
   ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties

4. Fetch generated topic into HDFS:
  1) Update test/test.properties to change the following parameters:
	hadoop.job.ugi	: id and group 
	input           : input location 
	output	        : output location 
	kafka.request.limit: limit the number of events to be fetched; 
			     -1 means no limitation.
        hdfs.default.classpath.dir : hdfs location of jars

  2) copy jars into hdfs
   ./copy-jars.sh ${hdfs.default.classpath.dir}

  2) Fetch data
  ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties

About

Kafka 0.7.2 + CDH 4.3.0

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published