# Lab 3 Using Spark Streaming to analyze tweets from Kafka

Now that you have successfully streamed tweets into your Kafka topic "twitterstream", we will explore using [Spark Streaming](http://spark.apache.org/streaming/) to analyze these tweets. Spark Streaming is a Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. 

![](http://spark.apache.org/docs/1.6.2/img/streaming-arch.png)
([img src](http://spark.apache.org/docs/latest/streaming-programming-guide.html))

In this lab, you will ingest the tweets from Kafka as a data source and the results published within this jupyter notebook.

## Preparing the Environment

The code below ensures that the packages needed to run this lab are available to Spark. 

To run the code in Jupyter, you can put the cursor in each cell and press Shift-Enter to run it each cell at a time -- or you can use menu option `Kernel` -> `Restart & Run All`. When a cell is executing you'll see a `[*]` next to it, and once the execution is complete this changes to `[y]` where `y` is execution step number. Any output from that step will be shown immediately below it.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

import findspark
findspark.init('/usr/lib/spark')

### Import dependencies

We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka. We also need the python `json` module for parsing the inbound twitter data

In [2]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

### Create Spark and Streaming Context

A Spark Context is the main entry point to connect to a Spark cluster and allows for the creation of RDDs, accumulators and broadcast variables. Note that only one SparkContext may be active per Java Virtual Machine (JVM) and you must terminate the active Spark context before creating a new one. the primary object under which everything else is called.

A Streaming Context is built on top of a Spark Context and used to access functionalities of Spark Streaming, an extension of the core Spark API that enables the scalable, high-throughput, fault-tolerant stream processing of live data streams.  

See the [API reference](http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [programming guide](http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext) for more details. 

In [3]:
# Create Spark Context
sc = SparkContext()

# Create Streaming Context
# The first argument is the Spark Context whilst the second argument represents the batch duration, which we set as 30 seconds here.
ssc = StreamingContext(sc, 30)

### Create Discretized Stream (DStream) and connect to Kafka

A DStream is the basic abstraction in Spark Streaming which consists of a continuous sequence of RDDs representing a continuous stream of data. The code below uses the native Spark Streaming Kafka capabilities to receive data from Kafka by connecting to a particular topic, twitterstream, which we created earlier on in this lab. Note that spark-streaming is an arbitrary name for a consumer group and can be changed.

For more information see the [documentation](http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html).

In [4]:
# Create DStream from a Kafka topic
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'twitterstream':1})

## Message Processing

### Parse the inbound message as json

The inbound stream is a [`DStream`](http://spark.apache.org/docs/2.0.0/api/python/pyspark.streaming.html#pyspark.streaming.DStream), which supports various built-in [transformations](http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) such as `map` which is used here to parse the inbound messages from their native JSON format. 

Note that this will fail horribly if the inbound message _isn't_ valid JSON. 

In [5]:
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

### Count number of tweets in the batch

The [`DStream`](http://spark.apache.org/docs/2.0.0/api/python/pyspark.streaming.html#pyspark.streaming.DStream) object provides native functions to count the number of messages in the batch, and to print them to the output: 

* [`count`](http://spark.apache.org/docs/2.0.0/api/python/pyspark.streaming.html#pyspark.streaming.DStream.count)
* [`pprint`](http://spark.apache.org/docs/2.0.0/api/python/pyspark.streaming.html#pyspark.streaming.DStream.pprint) 

We use the `map` function to add in some text explaining the value printed. 

_Note that nothing gets written to output from the Spark Streaming context and descendent objects until the Spark Streaming Context is started, which happens later in the code_

In [6]:
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

### Extract Author name from each tweet

Tweets come through in a JSON structure, of which you can see an [example here](https://gist.github.com/rmoff/3968605712f437a1f37e7be52129cade). We're going to analyse tweets by author, which is accessible in the json structure at `user.screen_name`. 

The [`lambda`](https://pythonconquerstheuniverse.wordpress.com/2011/08/29/lambda_tutorial/) anonymous function is used to apply the `map` to each RDD within the DStream. The result is a DStream holding just the author's screenname for each tweet in the original DStream.

In [7]:
def convertToJson(x):
    x = json.loads(x)
    return x['user']['screen_name']

authors_dstream = parsed.map(convertToJson)

### Count the number of tweets per author

With our authors DStream, we can now count them using the `countByValue` function. This is conceptually the same as this quasi-SQL statement: 

    SELECT   AUTHOR, COUNT(*)
    FROM     DSTREAM
    GROUP BY AUTHOR

_Using `countByValue` is a more legible way of doing the same thing that you'll see done in tutorials elsewhere with a map / reduceBy. _

In [8]:
author_counts = authors_dstream.countByValue()
author_counts.pprint()

### List the most common words in the tweets

The code below implements a word count for the tweets within the batch.

In [9]:
def findCommonWords(x):
    x = json.loads(x)
    return x['text'].split(" ")

parsed.\
    flatMap(findCommonWords)\
    .countByValue()\
    .transform\
      (lambda rdd:rdd.sortBy(lambda x:-x[1]))\
    .pprint()

## Start the streaming context

As mentioned above, it is only after starting the Streaming Context where you will the results from the pprint functions. This is because all the code above is on defining the DStream only but not executed.

You can add a awaitTermination to cancel the execution after a certain time, which in this case we have specified as 5 minutes.

Wait for a while after you run the code below and if you see some results (i.e. number of tweets, author and word count for each batch), it means that you are successful.

In [10]:
ssc.start()
ssc.awaitTermination(timeout=300)

-------------------------------------------
Time: 2017-07-03 22:30:30
-------------------------------------------
Tweets in this batch: 223

-------------------------------------------
Time: 2017-07-03 22:30:30
-------------------------------------------
('Miss_Kaye', 1)
('bibacus', 1)
('KateKathleen2', 1)
('allyspeirs', 1)
('ChLee5895', 1)
('Maadu_In_Danger', 1)
('liskaaz', 1)
('JamesSandiford2', 1)
('buddapapatdc', 1)
('triciaking73', 1)
...

-------------------------------------------
Time: 2017-07-03 22:30:30
-------------------------------------------
('RT', 161)
('to', 109)
('the', 71)
('health', 64)
('a', 51)
('', 43)
('Board', 40)
('in', 38)
('Favorite', 38)
('|', 38)
...

-------------------------------------------
Time: 2017-07-03 22:31:00
-------------------------------------------
Tweets in this batch: 314

-------------------------------------------
Time: 2017-07-03 22:31:00
-------------------------------------------
('isabelpennefat3', 1)
('KhaledAlAmiri', 1)
('winysouza