### Spark Streaming with Python and Kafka

#### Preparing the Environment

We need to make sure that the packages we're going to use are available to Spark. Instead of downloading `jar` files and worrying about paths, we can instead use the `--packages` option and specify the group/artifact/version based on what's available on [Maven](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.spark%22%20AND%20a%3A%22spark-streaming-kafka-0-8-assembly_2.11%22) and Spark will handle the downloading. We specify `PYSPARK_SUBMIT_ARGS` for this to get passed correctly when executing from within Jupyter. 

To run the code in Jupyter, you can put the cursor in each cell and press Shift-Enter to run it each cell at a time -- or you can use menu option `Kernel` -> `Restart & Run All`. When a cell is executing you'll see a `[*]` next to it, and once the execution is complete this changes to `[y]` where `y` is execution step number. Any output from that step will be shown immediately below it.

To run the code standalone, you would download the `.py` from Jupyter, and execute it using 

    /usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 spark_code.py

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [2]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

#### Create Spark context

In [3]:
sc = SparkContext(conf=SparkConf().setAppName("PythonSparkStreamingKafka_RM_01").setMaster("local"))
sc.setLogLevel("WARN")

#### Create Streaming Context 

In [4]:
ssc = StreamingContext(sc, 60)

#### Connect to Kafka

For more information see the [documentation](http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html).

In [5]:
kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})

### Message Processing

### Parse the inbound message as json 

In [6]:
#Parse the inbound message as json
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

In [7]:
#Count number of tweets in the batch
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

In [8]:
#Extract Author name from each tweet
authors_dstream = parsed.map(lambda tweet: tweet['@BhashkarKunal']['kunal bhashkar'])

In [9]:
#Count the number of tweets per author
author_counts = authors_dstream.countByValue()
author_counts.pprint()

In [10]:
#Sort the author count
author_counts_sorted_dstream = author_counts.transform(\
  (lambda foo:foo\
   .sortBy(lambda x:( -x[1]))))
#   .sortBy(lambda x:(x[0].lower(), -x[1]))\
#  ))

In [11]:
author_counts_sorted_dstream.pprint()

In [12]:
#Get top 5 authors by tweet count
top_five_authors = author_counts_sorted_dstream.transform\
  (lambda rdd:sc.parallelize(rdd.take(5)))
top_five_authors.pprint()

In [13]:
#Get authors with more than one tweet, or whose username starts with 'a'
filtered_authors = author_counts.filter(lambda x:\
                                                x[1]>1 \
                                                or \
                                                x[0].lower().startswith('rm'))

In [14]:
filtered_authors.transform\
  (lambda rdd:rdd\
  .sortBy(lambda x:-x[1]))\
  .pprint()

In [15]:
#List the most common words in the tweets
parsed.\
    flatMap(lambda tweet:tweet['text'].split(" "))\
    .countByValue()\
    .transform\
      (lambda rdd:rdd.sortBy(lambda x:-x[1]))\
    .pprint()

In [16]:
#Start the streaming contex
ssc.start()
ssc.awaitTermination(timeout=180)

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:23:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:24:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:24:00
-------------------------------------------

-------------------------------------------
Time: 2020-06-16 00:24:00
----------