In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
from time import sleep

In [2]:
spark_conf = SparkConf()
spark_conf.setMaster("spark://master:7077")
spark_conf.setAppName("Lab7_Exercises")
spark_conf.set("spark.driver.memory", "2g")
spark_conf.set("spark.executor.cores", "1")
spark_conf.set("spark.driver.cores", "1")

# Create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

### Exercise 1

Give me the average score of each pair of (user, team) for each 10 seconds.

In [3]:
data_schema = StructType([
    StructField("uname", StringType(), True),
    StructField("tname", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("timestamp_in_ms", LongType(), True),
    StructField("readable_time", StringType(), True)
])

In [4]:
# Read from a source 
sdf = spark.readStream.schema(data_schema).option("maxFilesPerTrigger", 1).csv("../data/game")

In [5]:
# Create the event time column 
with_event_time_df = sdf.selectExpr("*", "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

with_event_time_df.printSchema()

avg_score_df = with_event_time_df.groupBy(window(col("event_time"), "10 seconds"), "uname", "tname").agg(avg("score").alias("value"))

result_df = avg_score_df.select(concat(col("uname"), lit(" "), col("tname")).alias("key"), col("value"))

query = result_df.writeStream.queryName("avg_score_window_ex1").format("memory").outputMode("complete").start()

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [6]:
try:
    for x in range(10):
        spark.sql("SELECT * FROM avg_score_window_ex1").show()
        sleep(10)
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    
    print("Stoped the streaming query and the spark context")

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+--------------------+------------------+
|                 key|             value|
+--------------------+------------------+
|user3_AmaranthDin...|10.232142857142858|
|user3_AntiqueBras...| 8.385964912280702|
|user7_AmaranthDin...|10.714285714285714|
|user0_AmaranthDin...|           9.40625|
|user12_AmberCaneT...| 8.153846153846153|
|user7_AndroidGree...|10.028571428571428|
|user12_AndroidGre...| 8.981132075471699|
|user1_BattleshipG...|              11.5|
|user0_AzureCassow...| 9.788461538461538|
|user6_AntiqueBras...|              10.0|
|user18_AmaranthKo...|10.112903225806452|
|user5_AzureCassow...|10.666666666666666|
|user19_AndroidGre...|10.471698113207546|
|user5_BananaWalla...| 9.142857142857142|
|user17_AmaranthDi...| 9.452054794520548|
|user17_Battleship...|  9.95945945945946|
|user10_AzureCasso...|10.133333333333333|
|user17_ArmyG

### Exercise 2

Refactor the above spark program to use the Kafka sources and sinks.

In [7]:
data_schema = StructType([
    StructField("uname", StringType(), True),
    StructField("tname", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("timestamp_in_ms", LongType(), True),
    StructField("readable_time", StringType(), True)
])

In [8]:
# Read the whole dataset as a batch
kafka_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "game").option("startingOffsets", "latest").load()

df = kafka_stream.selectExpr("CAST(value AS STRING)")

In [9]:
df1 = df.select(from_csv(df.value, data_schema.simpleString()))

df1.printSchema()

root
 |-- from_csv(value): struct (nullable = true)
 |    |-- uname: string (nullable = true)
 |    |-- tname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)



In [10]:
sdf = df1.select(col("from_csv(value).*"))

sdf.printSchema()

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)



In [11]:
# Create the event time column 
with_even_time_df = sdf.selectExpr("*", "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

with_even_time_df.printSchema()

avg_score_df = with_even_time_df.groupBy(window(col("event_time"), "10 seconds"), "uname", "tname").agg(avg("score").alias("value"))

result_df = avg_score_df.select(concat(col("uname"), lit(" "), col("tname")).alias("key"), col("value"))

query = result_df.writeStream.queryName("avg_score_window_ex2").format("memory").outputMode("complete").start()

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



In [12]:
try:
    for x in range(10):
        spark.sql("SELECT * FROM avg_score_window_ex2").show()
        sleep(10)
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    
    print("Stoped the streaming query and the spark context")

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+



### Exercise 3

Publish data to game and consume data from avg_score.

In [13]:
data_schema = StructType([
    StructField("uname", StringType(), True),
    StructField("tname", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("timestamp_in_ms", LongType(), True),
    StructField("readable_time", StringType(), True)
])

In [14]:
# Read the whole dataset as a batch
kafka_stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("failOnDataLoss", "false").option("subscribe", "game").option("startingOffsets", "latest").load()

df = kafka_stream.selectExpr("CAST(value AS STRING)")

In [15]:
df1 = df.select(from_csv(df.value, data_schema.simpleString()))

df1.printSchema()

root
 |-- from_csv(value): struct (nullable = true)
 |    |-- uname: string (nullable = true)
 |    |-- tname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)



In [16]:
sdf = df1.select(col("from_csv(value).*"))

sdf.printSchema()

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)



In [17]:
# Create the event time column 
with_event_time_df = sdf.selectExpr("*", "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

with_event_time_df.printSchema()

avg_score_df = with_event_time_df.groupBy(window(col("event_time"), "10 seconds"), "uname", "tname").agg(avg("score").alias("value"))

result_df = avg_score_df.select(concat(col("uname"), lit(" "), col("tname")).alias("key"), col("value").cast("string"))

result_df.printSchema()

query = result_df.writeStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("checkpointLocation", "../checkpoint").option("topic", "avg_score").outputMode("complete").start()

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [None]:
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    
    print("Stoped the streaming query and the spark context")

In [19]:
# Stop the spark context
spark.stop()