In [21]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab7_Ex3")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
    [StructField("uname", StringType(), True),
     StructField("tname", StringType(), True),
     StructField("score", IntegerType(), True),
     StructField("timestamp_in_ms", LongType(), True),
     StructField("readable_time", StringType(), True)
     ])

# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "game") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_csv(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_csv(value).*"))

sdf.printSchema()

# create the event time column 
withEventTimedf = sdf.selectExpr(
    "*",
    "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

withEventTimedf.printSchema()

avgscoredf = withEventTimedf \
    .groupBy(window(col("event_time"), "10 seconds"), "uname", "tname") \
    .agg(avg("score").alias("value"))

resultdf = avgscoredf.select(concat(col("uname"), lit(" "), col("tname")).alias("key"), col("value").cast("string"))

resultdf.printSchema()

query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint") \
    .option("topic", "avg_score") \
    .outputMode("complete") \
    .start()
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- from_csv(value): struct (nullable = true)
 |    |-- uname: string (nullable = true)
 |    |-- tname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)

root
 |-- uname: string (nullable = true)
 |-- tname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

Stoped the streaming query and the spark context


In [22]:
# Stop the spark context
spark.stop()