This is an application that uses Spark Streaming to read data from Kafka or an HDFS text file (you can choose) to calculate the average income per geographical region in the US. A small data file is also provided with this repo. You also have the option of choosing to use Kafka v08 or Kafka v09 consumer APIs (Kafka v09 introduced new consumer APIs).
For this application, you'd need a Kafka cluster (which in turn requires zookeeper), and a Hadoop cluster with Spark installed on it.
mvn clean package
sudo -u hdfs hadoop fs -mkdir -p /user/hive/warehouse/income
sudo -u hdfs hadoop fs -mkdir -p /user/$USER || :
sudo -u hdfs hadoop fs -chown -R $USER: /user/$USER
spark-submit --master yarn --deploy-mode client --class \
com.markgrover.spark.kafka.DirectKafkaAverageHouseholdIncome spark-kafka-app_2.10-0.1.0-SNAPSHOT.jar [kafka|text]
The last optional parameter [kafka|text]
symbolizes whether you want to read data from Kafka or from a text file. Below we should how to populate and run both such sources.
You have to create a topic in Kafka named income and put all the CSV data in it. Log on to one of the nodes of the Kafka cluster and run the following instructions.
Note: The binaries named kafka-topics
, kafka-console-producer
etc. are valid and present in your path when using CDH. If you are using a non-CDH distribution, you should change them to $KAFKA_HOME/bin/kafka-topics.sh
, kafka-console-producer.sh
and so on, respectively.
First list all the topics:
kafka-topics --list --zookeeper localhost:2181
Then, create new a topic named income
. The replication factor of 3 adds fault tolerance and a non-unit (4, in our case) number of partitions allows us to have upto 4 executors read data directly off of kafka partitions when using the DirectKafkaInputDStream
kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic income
Then, put some data in the newly created income
topic.
cat ./DEC_00_SF3_P077_with_ann.csv | kafka-console-producer --broker-list ${HOSTNAME}:9092 --topic income
You can verify that the data has been by launching a console consumer. It's ok, you can re-read the data, our kafka app, always reads the data from the beginning:
kafka-console-consumer --zookeeper localhost:2181 --from-beginning --topic income
Log on to one of the nodes of the Hadoop cluster and run the following instructions to create a stream out of HDFS data. Start your streaming application before you run the steps below.
Put the data file into HDFS's /tmp directory.
cp ./DEC_00_SF3_P077_with_ann.csv /tmp
sudo -u hdfs hadoop fs -put /tmp/DEC_00_SF3_P077_with_ann.csv /tmp
This is required because we need to an atomic move operation to the HDFS directory (/user/hive/warehouse/income
) which our streaming application is listening to. Putting the file directly from the local filesystem into the destination directly won't be atomic and would confuse our spark streaming application.
sudo -u hdfs hadoop fs -mv /tmp/DEC_00_SF3_P077_with_ann.csv /user/hive/warehouse/income
Search for Time:
and you should see a snippet like this:
-------------------------------------------
Time: 1452799840000 ms
-------------------------------------------
(006,13530.0)
(007,13419.0)
This is our average income for the regions in the data set. This shows that region 006 (i.e. all zipcodes starting with 006) have an average annual income of $13,530 and of $13,419. for those in region 007.