In [1]:
import os



SCALA_VERSION = '2.11'
SPARK_VERSION = '2.4.7'

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'


import findspark
findspark.init()

import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaInvoiceStream").getOrCreate()

In [None]:
# kafka-console-consumer --bootstrap-server localhost:9092 --topic  aggregated-invoices  --from-beginning 

In [3]:
# read from kafka, here spark is consumer for kafka topic called invoices
# spark streaming works as dataframe/sql
kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "invoices2")\
  .option("group.id","invoice-group")\
  .load()

In [4]:
# print kafka stream for topic
kafkaDf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
invoiceJsonRawDf = kafkaDf.selectExpr("timestamp", "CAST(value AS STRING)")
invoiceJsonRawDf.printSchema() # we get only value as string

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: string (nullable = true)



In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType, DateType
# json is object, spark DF needs schema 

schema = StructType(
        [
            StructField("InvoiceNo", IntegerType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("Description", StringType(), True),
            StructField("InvoiceDate", StringType(), True),
            #StructField("InvoiceDate", DateType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", IntegerType(), True),
            StructField("Country", StringType(), True),
        ]
)


In [7]:
# parse value which contains json text , produced by invoice-producer.py and map the parsed result into schema defiend 
# above
# value is overwritten from string to nested struct
jsonDf = invoiceJsonRawDf.withColumn("value", F.from_json("value", schema))
jsonDf.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- InvoiceNo: integer (nullable = true)
 |    |-- StockCode: string (nullable = true)
 |    |-- Quantity: integer (nullable = true)
 |    |-- Description: string (nullable = true)
 |    |-- InvoiceDate: string (nullable = true)
 |    |-- UnitPrice: double (nullable = true)
 |    |-- CustomerID: integer (nullable = true)
 |    |-- Country: string (nullable = true)



In [8]:
# now we will extract value which is struct type with all schema field mention, to specific columns
#InvoiceNo, StockCode, ....
# replace value with actual fields from value fields
invoiceDf = jsonDf.select("timestamp", F.col("value.*"))
invoiceDf.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
invoiceDf = invoiceDf.withColumn("Amount", F.col("Quantity") * F.col("UnitPrice") )
invoiceDf.printSchema() 

root
 |-- timestamp: timestamp (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Amount: double (nullable = true)



In [10]:
# sliding window
windowedAmountSum = invoiceDf.groupBy("Country", F.window(invoiceDf.timestamp, 
                                              "10 minutes", 
                                               "5 minutes"))\
                              .agg(F.sum("Amount").alias("TotalAmount"))\
                              .selectExpr("to_json(struct(*)) AS value")\
                              .selectExpr("CAST(value AS STRING)")
windowedAmountSum.printSchema()

root
 |-- value: string (nullable = true)



In [11]:
echoOnconsole = windowedAmountSum\
                .writeStream\
                .outputMode("complete")\
                .option("truncate", False)\
                .format("console")\
                .start() # start the query. spark will subscribe for data

In [12]:
windowedAmountSum \
    .writeStream \
    .format("kafka") \
    .outputMode("complete") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "aggregated-invoices") \
    .option("checkpointLocation", "file:///c:/spark/temp56") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x279f1a21b08>