Copyright (C) 2019 Software Platform Lab, Seoul National University

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

# Part 0: Environment setup

In [1]:
# Setting up spark
!rm -rf /content/*
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
!tar -xf /content/spark-2.3.2-bin-hadoop2.7.tgz
!pip install -q findspark
# Download necessary dependency file for Kafka
!wget -q http://central.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.2/spark-sql-kafka-0-10_2.11-2.3.2.jar
!wget -q http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.11/2.3.2/spark-streaming-kafka-0-10-assembly_2.11-2.3.2.jar
!ls /content/

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.149)] [Connecting to security.u                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,609 B]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ Packages [60.4 kB]
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:11 http://ppa.launchpad.net/marutter/c2d4u3

In [0]:
# Setting the environment variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars /content/spark-sql-kafka-0-10_2.11-2.3.2.jar,/content/spark-streaming-kafka-0-10-assembly_2.11-2.3.2.jar pyspark-shell"

# Part 1: Running a simple wordcount query

We will implement a simple continuous wordcount query. This query will
* Read the text sentence from a Kafka
* Split the sentence into words
* Continuously aggregate the counts for each word

Firstly, we need to start from making a simple TCP server on the master server which produces random sentences to its clients. In this class, we will use `nc (netcat)` program. You need to setup your own TCP server by following the processes

* Open the new terminal from jupyter
* Start a TCP server by entering "nc -lk 20332"
* Enter arbitrary texts to send events

After running the simple TCP server, run the scripts below. The query will continuously run in a background thread.

In [0]:
## socket 으로 전달한 데이터는 peer to peer 라 한명만 
import findspark
import os

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("DS2") \
    .master('local[*]') \
    .getOrCreate()

# Get the text data stream from TCP server
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "147.46.216.122") \
    .option("port", 20332) \
    .load()

#explode and split are SQL functions. 
#Both operate on SQL Column. 
#split takes a Java regular expression as a second argument. 
#If you want to separate data on arbitrary whitespace
#you'll need something like this:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

wordCounts = words.groupBy("word").count()

#outputMode('complete') : 누적결과를 보여줌
# streaming 에서 중간중간 
# (1)차이나는 부분만 보여주는 방법
# (2)누적된 결과를 보여주는 방법
query = wordCounts \
    .writeStream \
    .queryName("wordcount_simple") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [0]:
#

In [20]:
wordCounts

DataFrame[word: string, count: bigint]

The running query will process the incoming text data from the TCP server. You may see the result by running the script below. Execute the script repeatedly, and you will see the evolving result as the data arrives.

In [7]:
result = spark.table("wordcount_simple")
result.show()

+----+-----+
|word|count|
+----+-----+
+----+-----+



You may stop the running query with the following script

In [0]:
query.stop()

# Part 2: Running a stream application from Kafka source

Apache Kafka is a distributed streaming platform which supports messaging, processing, and storing of the stream data. In this practice session, we will focus on leveraging Kafka as a message brokering system.

Kafka supports high-throughput & fault-tolerant messaging via publish-subscribe model. In publish-subscribe model, stream events are managed in **topics**. A **Producer** consistently generates a data, whereas **Consumer** receives the data events. Each topic is partitioned into multiple "partitions", and partitions are distributed and stored in the secondary storage to guarantee fault tolerance.

As we can guess from the information above, we need the server address and topic name to fetch the data from a Kafka broker. Kafka server and producers are already set up by TAs. We will review the Producer code firstly.

After revewing the code, we will implement the same word count application from the Kafka source. The broker address is **147.46.216.122:9092** and the topic is **wc**.

In [0]:
import findspark
import os

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("DS2") \
    .master('local[*]') \
    .getOrCreate()
    
# Get the text data stream from Kafka
lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "wc") \
    .load()
    
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

wordCounts = words.groupBy("word").count()

query = wordCounts \
    .writeStream \
    .queryName("wordcount_kafka") \
    .outputMode("complete") \
    .format("memory") \
    .start()

You may see the result and stop the query with the following scripts.

In [13]:
result = spark.table("wordcount_kafka")
result.show()

+--------+-----+
|    word|count|
+--------+-----+
|    "DS2|   11|
|practice|   12|
|      is|   11|
|    day"|   15|
|   "Have|   15|
|session"|   12|
|    fun"|   11|
|"Welcome|   12|
|   class|   11|
|"Session|   13|
|       a|   15|
|    nice|   15|
|expired"|   13|
|      to|   12|
|     DS2|   12|
+--------+-----+



In [0]:
query.stop()

# Quiz 0: Simple Filtered Aggregation

In the following cell, implement the word count example which gets the data from the **"wc"** topic according to the following condition.
* Do not count the words whose lengths are shorter than 3

Hint: Use *length()* function

In [0]:
import findspark
import os

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("DS2") \
    .master('local[*]') \
    .getOrCreate()
    
# Get the text data stream from Kafka
lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "wc") \
    .load()
    
# words = lines.select(
#     explode(
#         split(lines.value, " ")
#     ).alias("word")
# )

words = lines.select(
   
        split(lines.value, " ").alias("word")
)





In [39]:
words.writeStream \
    .queryName("quiz_10") \
    .outputMode("complete") \
    .format("memory") \
    .start()

AnalysisException: ignored

In [37]:
filteredWordCounts = words.where(length("word")>=3).groupBy('word').count()
# Implement your code here!

AnalysisException: ignored

In [0]:
query = filteredWordCounts \
    .writeStream \
    .queryName("quiz_0") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [35]:
result = spark.table("quiz_0")
result.show()

+--------+-----+
|    word|count|
+--------+-----+
|    "DS2|   15|
|practice|   11|
|    day"|   13|
|   "Have|   13|
|session"|   11|
|    fun"|   15|
|"Welcome|   11|
|   class|   15|
|"Session|   12|
|    nice|   13|
|expired"|   12|
|     DS2|   11|
+--------+-----+



In [0]:
query.stop()

# Part 3: Processing JSON-formatted data

Until now, we processed only the simple plain texts. From this time, we will process JSON-formatted data events which are widely used for data transfer. Here, we will get the json-formatted movie datasets from the kafka server. We may get the data from the **movie** topic.

To parse json-formatted data into spark dataframe, you need to use `from_json()` function and type casting.

The script below parses the json events to dataframe and filters out the events whose "year" is less than 2000.

In [0]:
#parsing 후 데이터 처리 방법에 대해서 나올 예정

import os
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Make a spark sql session
spark = SparkSession.builder \
    .appName("DS2") \
    .master("local[*]") \
    .getOrCreate()

## Get the json-formatted data from Kafka stream
kafka_movies = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "movie") \
    .load()

## Change the JSON events into relational tuples with string types
relational_movies = kafka_movies.select([get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["title", "genre", "year", "time"]])

## Change the type of year from string to integer,
## and change the type of time from string to timestamp
relational_movies = relational_movies.select(col("title"), col("genre"),
        relational_movies.year.cast('integer').alias('year'),
        col("time").cast('double').cast('timestamp')
    )

## Select the movie with year < 2000
filtered_movies = relational_movies.select("*").where("year >= 1900")

query = filtered_movies \
    .writeStream \
    .queryName("twentyfirstcentury_movies") \
    .outputMode("append") \
    .format("memory") \
    .start()

In [48]:
results = spark.table("twentyfirstcentury_movies")
results.show()

+--------------------+------+----+--------------------+
|               title| genre|year|                time|
+--------------------+------+----+--------------------+
|          Easy Rider| Drama|1969|2019-04-23 07:35:...|
|Delta Force 2: Th...|Action|1990|2019-04-23 07:35:...|
+--------------------+------+----+--------------------+



In [0]:
query.stop()

# Quiz 1: Filtered Aggregation on JSON-formatted data

In the following cell, implement the stream application which receives the **movie** topic from the Kafka stream and filters out all the movies which does not contain the word "the" in their titles (cases are ignored). 

After that, count the number of movies within the filtered data stream.

**Input**: {"title": "The titanic", "genre": "drama", ...}, {"title": "Titanic", "genre": "drama, ..."}, {"title": "Flintheart Glomgold", "genre": "comedy", ...}

**Output**: ("drama", 1), ("comedy", 1)

Hint: Use *lower()* function and *like* clauses with *wildcard (%)*

In [0]:
import os
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import col, from_json, get_json_object
from pyspark.sql.types import *

## Make a spark sql session
spark = SparkSession.builder \
    .appName("IAB") \
    .master("local[*]") \
    .getOrCreate()

## Get the json-formatted data from Kafka stream
kafka_movies = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "movie") \
    .load()

## Change the JSON events into relational tuples with string types
relational_movies = kafka_movies.select([get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["title", "genre", "year", "time"]])

## Change the type of year from string to integer,
## and change the type of time from string to timestamp
relational_movies = relational_movies.select(col("title"), col("genre"),
        relational_movies.year.cast('integer').alias('year'),
        col("time").cast('double').cast('timestamp')
    )

# Implement your code here!
aggregated_counts = relational_movies.where("lower(title) like '%the%'") #.groupBy('genre').count()
query = aggregated_counts \
    .writeStream \
    .queryName("quiz_1") \
    .outputMode("append") \
    .format("memory") \
    .start()

In [71]:
result = spark.table("quiz_1")
result.show()

+--------------------+---------------+----+--------------------+
|               title|          genre|year|                time|
+--------------------+---------------+----+--------------------+
|The Girl with the...|Drama, Thriller|2011|2019-04-23 07:45:...|
|            The Heat|      Buddy cop|2013|2019-04-23 07:45:...|
|            The Hand|         Horror|1981|2019-04-23 07:45:...|
+--------------------+---------------+----+--------------------+



In [0]:
query.stop()

# Part 4: Running a windowed stream application

By windowing, we can continuously get the set of recent data. A time-based **sliding window** can be defined by **window size** and **sliding interval**. For example, the window of `(window size = 5 seconds, sliding interval = 1 seconds)` consistently emits the data events in recent five seconds for every one second. For the special cases when the window size and the sliding interval are same, we call them as **tumbling windows**. In structured stream, it is possible to make windows from **event times**.

Let's make a windowed movie aggregation query, which counts the frequency of each genre within a sliding window (size = 30 seconds, interval = 5 seconds).

In [0]:
import os
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Make a spark sql session
spark = SparkSession.builder \
    .appName("DS2") \
    .master("local[*]") \
    .getOrCreate()

## Get the json-formatted data from Kafka stream
kafka_movies = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "movie") \
    .load()

## Change the JSON events into relational tuples with string types
relational_movies = kafka_movies.select([get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["title", "genre", "year", "time"]])

## Change the type of year from string to integer,
## and change the type of time from string to timestamp
relational_movies = relational_movies.select(col("title"), col("genre"),
        relational_movies.year.cast('integer').alias('year'),
        col("time").cast('double').cast('timestamp')
    )

## Make windows and aggregate
windowed_movies = relational_movies.groupBy(
    window(relational_movies.time, "30 seconds", "5 seconds"),
    "genre"
  ).count()

query = windowed_movies \
    .writeStream \
    .queryName("windowed_movies") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [0]:
result = spark.table("windowed_movies")
result.show()

+--------------------+-------+-----+
|              window|  genre|count|
+--------------------+-------+-----+
|[2019-04-23 04:25...|Western|    1|
|[2019-04-23 04:25...|Western|    1|
|[2019-04-23 04:25...|Western|    1|
|[2019-04-23 04:25...|Western|    1|
|[2019-04-23 04:25...|Western|    1|
|[2019-04-23 04:26...|Western|    1|
+--------------------+-------+-----+



In [0]:
query.stop()

# Quiz 2: Windowed aggregation by the first character

In this quiz, you will implement a sliding-window aggregation query with *movie* stream query.

Implement a query which counts the number of movies according to the first characters of their *titles* (cases are ignored) within a sliding window (size = 30 seconds, interval = 5 seconds)

Example:

**Input**: {"title": "The titanic", ...}, {"title": "Avengers", ...}, {"title": "a little boy", ...}

**Output**: ("t", 1), ("a", 2)

Hint: Use substring() function

In [79]:
import os
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Make a spark sql session
spark = SparkSession.builder \
    .appName("DS2") \
    .master("local[*]") \
    .getOrCreate()

## Get the json-formatted data from Kafka stream
kafka_movies = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "147.46.216.122:9092") \
    .option("subscribe", "movie") \
    .load()

## Change the JSON events into relational tuples with string types
relational_movies = kafka_movies.select([get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["title", "genre", "year", "time"]])

## Change the type of year from string to integer,
## and change the type of time from string to timestamp
relational_movies = relational_movies.select(col("title"), col("genre"),
        relational_movies.year.cast('integer').alias('year'),
        col("time").cast('double').cast('timestamp')
    )

first_characters = relational_movies.select('time',substring('title',0,1).alias('first_character'))

# Implement your code here!
## Extract the lower-cased first characters from relational_movie


## Make windows and aggregate
windowed_movies = first_characters.groupBy(
    window(first_characters.time, "30 seconds", "5 seconds"),
    first_characters.first_character
  ).count()

query = windowed_movies \
    .writeStream \
    .queryName("quiz_2") \
    .outputMode("complete") \
    .format("memory") \
    .start()

IllegalArgumentException: ignored

In [80]:
result = spark.table("quiz_2")
result.show()

+--------------------+---------------+-----+
|              window|first_character|count|
+--------------------+---------------+-----+
|[2019-04-23 08:07...|              D|    3|
|[2019-04-23 08:07...|              S|    1|
|[2019-04-23 08:07...|              W|    1|
|[2019-04-23 08:07...|              T|    3|
|[2019-04-23 08:07...|              D|    2|
|[2019-04-23 08:07...|              S|    2|
|[2019-04-23 08:07...|              S|    2|
|[2019-04-23 08:07...|              D|    3|
|[2019-04-23 08:07...|              D|    3|
|[2019-04-23 08:07...|              S|    1|
|[2019-04-23 08:07...|              T|    3|
|[2019-04-23 08:07...|              W|    1|
|[2019-04-23 08:07...|              T|    3|
|[2019-04-23 08:07...|              D|    1|
|[2019-04-23 08:07...|              S|    2|
|[2019-04-23 08:07...|              S|    2|
|[2019-04-23 08:07...|              F|    1|
|[2019-04-23 08:07...|              F|    1|
|[2019-04-23 08:07...|              F|    1|
|[2019-04-

In [0]:
query.stop()