[View in Colaboratory](https://colab.research.google.com/github/imsangil/ProgrammingAssignment2/blob/master/DS2_Spark_Stream_%EA%B0%9C%EC%9D%B8%EC%97%B0%EC%8A%B5.ipynb)

# Part 0: Environment setup

In [1]:
# Setting up spark
!rm -rf /content/*
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirror.cdnetworks.com/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
!tar -xf /content/spark-2.3.2-bin-hadoop2.7.tgz
!pip install -q findspark
# Download necessary dependency file for Kafka
!wget -q http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.3.2/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar
!ls /content/

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu artful-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.149)]0% [1 InRelease gpgv 83.2 kB] [Connecting to archive.ubuntu.com (91.189.88.149)                                                                               Hit:2 http://archive.ubuntu.com/ubuntu artful InRelease
Hit:3 http://archive.ubuntu.com/ubuntu artful-updates InRelease
Hit:4 http://archive.ubuntu.com/ubuntu artful-backports InRelease
Reading package lists... Done

Redirecting output to ‘wget-log’.

Redirecting output to ‘wget-log.1’.
spark-2.3.2-bin-hadoop2.7			   wget-log
spark-2.3.2-bin-hadoop2.7.tgz			   wget-log.1
spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar


In [0]:
# Setting the environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

# Part 1: Running a simple wordcount query

We will implement a simple continuous wordcount query. This query will
* Read the text sentence from a Kafka
* Split the sentence into words
* Continuously aggregate the counts for each word

Firstly, we need to start from making a simple TCP server on the master server which produces random sentences to its clients. In this class, we will use `nc (netcat)` program. You will get the data stream from the TCP socket server provided by TA.

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a spark context
conf = SparkConf().setAppName("WordCount").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
# Read the text line from the socket stream
lines = stream_context.socketTextStream("147.46.216.122", 20332)
# Split each line into multiple words
words = lines.flatMap(lambda x: x.split(" "))
# Make a (key(word), count) stream and Count the word
word_counts = words.map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b)
    
word_counts.pprint()

stream_context.start()
# Wait for 60 seconds
stream_context.awaitTermination(60)
stream_context.stop()

In [0]:
stream_context.stop()

# Part 2: Running a stream application from Kafka source

Apache Kafka is a distributed streaming platform which supports messaging, processing, and storing of the stream data. In this practice session, we will focus on leveraging Kafka as a message brokering system.

Kafka supports high-throughput & fault-tolerant messaging via publish-subscribe model. In publish-subscribe model, stream events are managed in **topics**. A **Producer** consistently generates a data, whereas **Consumer** receives the data events. Each topic is partitioned into multiple "partitions", and partitions are distributed and stored in the secondary storage to guarantee fault tolerance.

As we can guess from the information above, we need the server address and topic name to fetch the data from a Kafka broker. Kafka server and producers are already set up by TAs. We will review the Producer code firstly.

After revewing the code, we will implement the same word count application from the Kafka source. The broker address is **147.46.216.122:9092** and the topic is **wc**.

In [3]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context containing the additional Kafka dependency.
conf = SparkConf().setAppName("KafkaWordCount").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
# Read the Kafka data stream from the broker for the given topic.
# The data will be arrived in (topic, data) format
kvs = KafkaUtils.createDirectStream(stream_context, ["wc"], {"metadata.broker.list": "147.46.216.122:9092"})
# Extract the data from the Kafka data stream
lines = kvs.map(lambda x: x[1]).map(lambda l: l.replace('"', ''))
# Split each line into multiple words
words = lines.flatMap(lambda x: x.split(" "))
# Count the word
word_counts = words.map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b)

word_counts.pprint()

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(20)
stream_context.stop()

-------------------------------------------
Time: 2018-09-28 02:09:35
-------------------------------------------
(u'fun', 3)
(u'is', 3)
(u'DS2', 5)
(u'session', 2)
(u'to', 2)
(u'practice', 2)
(u'Welcome', 2)
(u'class', 3)

-------------------------------------------
Time: 2018-09-28 02:09:40
-------------------------------------------
(u'a', 1)
(u'is', 1)
(u'DS2', 1)
(u'Have', 1)
(u'fun', 1)
(u'expired', 2)
(u'Session', 2)
(u'nice', 1)
(u'class', 1)
(u'day', 1)

-------------------------------------------
Time: 2018-09-28 02:09:45
-------------------------------------------
(u'expired', 3)
(u'DS2', 2)
(u'to', 2)
(u'practice', 2)
(u'session', 2)
(u'Welcome', 2)
(u'Session', 3)

-------------------------------------------
Time: 2018-09-28 02:09:50
-------------------------------------------
(u'a', 2)
(u'is', 1)
(u'DS2', 1)
(u'Have', 2)
(u'fun', 1)
(u'expired', 2)
(u'Session', 2)
(u'nice', 2)
(u'day', 2)
(u'class', 1)



In [0]:
stream_context.stop()

Until now, we processed only the simple plain texts. From this time, we will process JSON-formatted data events which are widely used for data transfer. Here, we will get the json-formatted movie datasets from the kafka server. We can get the data from the **movie** topic.

To  json-formatted data, we will use python json `json` package. You can get the python dictionary instance by calling `json.loads` on serialized json data.

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("movie_json").setMaster("local[*]") \
  .set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
# Get the kafka stream data
kvs = KafkaUtils.createDirectStream(stream_context, ["movie"], {"metadata.broker.list": "147.46.216.122:9092"})
# Deserialize the json-formattted data into python dict.
movies = kvs.map(lambda x: json.loads(x[1]))
# Print the input data
movies.pprint()

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(20)
stream_context.stop()

In [0]:
stream_context.stop()

You can also perform some filtering on json data. For example, you can filter out the movies prodcued before 2000 from the code below.

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("filter_movie").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
#lines = stream_context.socketTextStream("master", 5000)
kvs = KafkaUtils.createDirectStream(stream_context, ["movie"], {"metadata.broker.list": "147.46.216.122:9092"})
# Deserialize the json-formattted data into python dict.
movies = kvs.map(lambda x: json.loads(x[1]))
# Filter out movies produced before 2000.
twentyfirstcentry_movies = movies.filter(lambda movie: movie['year'] >= 2000)
# Print the input data
twentyfirstcentry_movies.pprint()

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(60)
stream_context.stop()

In [0]:
stream_context.stop()

# Quiz 0: Filter & Map

In the following cell, implement the word count example which gets the data from the **"wc"** topic according to the following condition
* Filters out the word 'DS2'
* Double the frequency of the word 'class'

Hint: Make a separate method rather than lambda for filtering for easy coding.
```
def method(word):
   ...
   
stream.map(method)
```

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("quiz_0").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
kvs = KafkaUtils.createDirectStream(stream_context, ["wc"], {"metadata.broker.list": "147.46.216.122:9092"})

# TODO: Implement your codes here!
words = kvs.map(lambda x: json.loads(x[1]))

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(60)
stream_context.stop()

In [0]:
stream_context.stop()

# Quiz 1: Filtering on JSON-formatted data

In the following cell, implement the stream application which receives the **movie** topic from the Kafka stream and filters out all the movies which does not contain the word "the" in their (cases are ignored). Print out the titles of the movies which are not filtered out.

## Example

**Input**: {"title": "The titanic", ...}, {"title": "Titanic", ...}, {"title": "Flintheart Glomgold", ...}, ...

**Output**: "The titanic", "Flintheart Glombold", ...

Hint: Use `.lower()` method and `in` operator.
Ex) 
```
>>> a = "Hello" 
>>> a.lower() 
"hello"
>>> "llo" in a
True
```

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("quiz_1").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
# Get the data from the Kafka Stream
kvs = KafkaUtils.createDirectStream(stream_context, ["movie"], {"metadata.broker.list": "147.46.216.122:9092"})

# TODO: Implement your code here!

# Deserialize the json-formattted data into python dict.
movies = kvs.map(lambda x: json.loads(x[1]))

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(60)
stream_context.stop()

In [0]:
stream_context.stop()

# Part 3: Running a windowed stream application

By windowing, we can continuously get the set of recent data. A time-based **sliding window** can be defined by **window size** and **sliding interval**. For example, the window of `(window size = 5 seconds, sliding interval = 1 seconds)` consistently emits the data events in recent five seconds for every one second. For the special cases when the window size and the sliding interval is same, we call them as **tumbling windows**.

Let's make a windowed movie stream, with window size of 30 second and sliding interval of 5 second.

In [4]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("window").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)
#lines = stream_context.socketTextStream("master", 5000)
kvs = KafkaUtils.createDirectStream(stream_context, ["movie"], {"metadata.broker.list": "147.46.216.122:9092"})
# Deserialize the json-formattted data into python dict.
movies = kvs.map(lambda x: json.loads(x[1]))
# Make a time-based window (size = 30 secs, interval = 5 secs)
windowed_movies = movies.window(30, 5)
# Print the window
windowed_movies.pprint()

# Start the computation
stream_context.start()
# await termination for 60 seconds
stream_context.awaitTermination(20)
stream_context.stop()

-------------------------------------------
Time: 2018-09-28 02:11:30
-------------------------------------------
{u'genre': u'Animated short', u'title': u'Lambert the Sheepish Lion', u'year': 1951}
{u'genre': u'Drama', u'title': u'Is There Life Out There?', u'year': 1994}
{u'genre': u'Romantic comedy', u'title': u"It's Complicated", u'year': 2009}
{u'genre': u'Documentary', u'title': u'Winter Soldier', u'year': 1972}
{u'genre': u'Crime', u'title': u'Tell No Tales', u'year': 1939}
{u'genre': u'Drama', u'title': u'The Craving', u'year': 1916}

-------------------------------------------
Time: 2018-09-28 02:11:35
-------------------------------------------
{u'genre': u'Animated short', u'title': u'Lambert the Sheepish Lion', u'year': 1951}
{u'genre': u'Drama', u'title': u'Is There Life Out There?', u'year': 1994}
{u'genre': u'Romantic comedy', u'title': u"It's Complicated", u'year': 2009}
{u'genre': u'Documentary', u'title': u'Winter Soldier', u'year': 1972}
{u'genre': u'Crime', u'title'

In [0]:
stream_context.stop()

# Quiz 2: Windowed aggregation

Windowed streams can also be aggregated like normal stream data. Make the windowed stream of movie data firstly (size: 30 secs, interval: 5 secs). On the windowed stream, make a python `dict` whose key is the first alphabet of the movie title and the value is the list of movie titles starting with the alphabet. Before aggregation, make sure that all the movie titles are lower-cased.

## Example

Input data in window: {"title": "titanic", ...}, {"title": "Harry Porter", ...}, {"title": "The Purchase Price"}

Output: {"h": \["harry porter"\], "t": \["titanic", "the purchace price"\]}

Hint: Make key-value stream first using `map()` before making a windowed stream. Also, use list appending in Python.
```
>>> a = [1, 2, 3]
>>> b = [4, 5]
>>> a + b
[1, 2, 3, 4, 5]
```

In [0]:
import findspark
import os
import json

findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Create a spark context
conf = SparkConf().setAppName("quiz_2").setMaster("local[*]").set("spark.jars", "/content/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar")
spark_context = SparkContext(conf=conf)
# Create a streaming context with batch interval 5 secs
stream_context = StreamingContext(spark_context, 5)

# TODO: Implement your code here!
kvs = KafkaUtils.createDirectStream(stream_context, ["movie"], {"metadata.broker.list": "147.46.216.122:9092"})
# Deserialize the json-formattted data into python dict.
movies = kvs.map(lambda x: json.loads(x[1]))

# Start the computation
stream_context.start()
# Await termination for 60 seconds
stream_context.awaitTermination(60)
stream_context.stop()

In [0]:
stream_context.stop()

# Part 4: Running a structured stream application

Spark **structured stream** offers high-level stream processing using Spark's **Dataframe** API. Using structured stream, we can apply SQL-like operations on continuously incoming datastreams.
Due to the limitation of jupyter notebook, we will perform the practice session on linux terminal. Before we start this session, prepare a new Jupyter terminal which would be used for executing python scripts.

## Setup
```
cd ~
/home/ubuntu/spark_scripts/start_cluster.sh
git clone https://github.com/DifferentSC/gw-stream-script.git
```

Below is the running example for the TCP-word count using structured stream.

You can execute this code by the following command
`/home/ubuntu/spark/bin/spark-submit /home/ubuntu/gw-stream-script/structured_wc.py`

To see the result, open another terminal from your notebook, run `nc -lk 20332`, and type random sentences.

**NOTE: Do not run your code on Jupyter notebook. Run it on your terminal instead**

In [0]:
import os

findspark.init('/home/ubuntu/spark')

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

## Make a spark sql session
spark = SparkSession.builder \
    .appName("StructuredSocketWordCount") \
    .master('spark://master:7077') \
    .getOrCreate()

## Get the spark data from TCP socket
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "master") \
    .option("port", 20332) \
    .load()

## Make the new column "word" by splitting the line
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

wordCounts = words.groupBy("word").count()

query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Using `structured stream`, you can also easily consume JSON-formatted data from Kafka stream, and perform relational operations (selection, projection, ...) on them. Here is a sample code which selects the `title` of the movies whose `year` is less than 2000. To run this code, you need to add additional kafka dependency. The running script would be `/home/ubuntu/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 /home/ubuntu/gw-stream-script/structured_json.py`.

**NOTE: Do not run your code on Jupyter notebook. Run it on your terminal instead**

In [0]:
import os

findspark.init('/home/ubuntu/spark')

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

## Make a spark sql session
spark = SparkSession.builder \
    .appName("StructuredSocketWordCount") \
    .master('spark://master:7077') \
    .getOrCreate()
    
## Get the json-formatted data from Kafka stream
kafka_movies = spark \
    .readStream \
    .format("kafka")
    .option("kafka.bootstrap.servers", "147.46.216.122:9092")
    .option("subscribe", "movie")
    .load()
    
## Define the relational schema
schema = StructType([
    StructField("title", StringType()),
    StructField("genre", StringType()),
    StructField("year", IntegerType())
])

## Change the JSON events into relational tuples
relational_movies = kafka_movies.select(from_json(col("value").cast("string"), schema))

## Select the movie titles with year < 2000
results = relational_movies.select("title").where("year < 2000")

query = results \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()