In [1]:
import os
# save using vs code
SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.3'
# Download Kafka Jar file, this for readStream.format("kafka"), "kafka" is a driver
# kafka driver code is part of Maven Jar file
# https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.3
# pyspark-shell shall download the jar file behind..
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark= SparkSession.builder.master("local[4]").appName("usecase2").config('spark.sql.shuffle.partitions', 4).getOrCreate()


22/03/28 00:28:32 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33)
22/03/28 00:28:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3381b926-35b3-486f-ac2f-99afb77a6583;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 984ms :: artifacts dl 10ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central

In [4]:

#     kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic stock-ticks 
#     kafka-console-consumer --bootstrap-server localhost:9092 --topic  stock-ticks   --from-beginning

In [5]:
readfromkafkaDf= spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "stock-ticks").option("group-id","stock-ticks-group4-nav").load()

In [6]:
readfromkafkaDf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
stockDf= readfromkafkaDf.selectExpr(("CAST(value as STRING)"),("timestamp"))
stockDf.printSchema()

root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [8]:
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, LongType, TimestampType
schema= StructType([
    StructField("symbol",StringType(),True),
    StructField("volume", LongType(), True),
     StructField("price", DoubleType(), True),
    StructField("timestamp", LongType(),  True)
    ])

In [9]:
import pyspark.sql.functions as F
stringToJsonStockDf= stockDf.withColumn("value", F.from_json("value",schema))
stringToJsonStockDf.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- symbol: string (nullable = true)
 |    |-- volume: long (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [10]:
valueDf= stringToJsonStockDf.select("value.*")
valueDf.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- volume: long (nullable = true)
 |-- price: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [11]:
from pyspark.sql.functions import *
timeDf= valueDf .withColumn("timestamp", col("timestamp")/1000)\
                .withColumn("time" , to_timestamp(col("timestamp")))\
                .drop("timestamp")
timeDf.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- volume: long (nullable = true)
 |-- price: double (nullable = true)
 |-- time: timestamp (nullable = true)



In [12]:
stock1minDf= timeDf.groupBy("symbol", F.window("time", "60 Seconds"))\
                                .agg( F.sum("volume").alias("volume"),\
                                 F.max("price").alias("high"),\
                                F.min("price").alias("low"),\
                                F.first("price").alias("first"),\
                                F.last("price").alias("last")
                                    )
stock1minDf.printSchema()       
# echoOnconsole = stock1minDf\
#                 .writeStream\
#                 .outputMode("update")\
#                 .format("console")\
#                 .option("truncate", False)\
#                 .start()
                              

root
 |-- symbol: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- first: double (nullable = true)
 |-- last: double (nullable = true)



In [13]:
stock3minDf= timeDf.groupBy("symbol", F.window("time", "3 minutes"))\
                                .agg( F.sum("volume").alias("volume"),\
                                 F.max("price").alias("high"),\
                                F.min("price").alias("low"),\
                                F.first("price").alias("first"),\
                                F.last("price").alias("last")
                                    )
stock3minDf.printSchema()       

root
 |-- symbol: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- first: double (nullable = true)
 |-- last: double (nullable = true)



In [14]:
stock5minDf= timeDf.groupBy("symbol", F.window("time", "5 minutes"))\
                                .agg( F.sum("volume").alias("volume"),\
                                 F.max("price").alias("high"),\
                                F.min("price").alias("low"),\
                                F.first("price").alias("first"),\
                                F.last("price").alias("last")
                                    )
stock5minDf.printSchema()       

root
 |-- symbol: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- first: double (nullable = true)
 |-- last: double (nullable = true)



In [15]:
stock1minDfTojsonDf= stock1minDf.drop("window").selectExpr("to_json(struct(*)) AS value")
stock1minDfTojsonDf.printSchema()

stock3minDfTojsonDf= stock3minDf.drop("window").selectExpr("to_json(struct(*)) AS value")
stock3minDfTojsonDf.printSchema()

stock5minDfTojsonDf= stock5minDf.drop("window").selectExpr("to_json(struct(*)) AS value")
stock5minDfTojsonDf.printSchema()

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)



In [16]:
stock1minDfTojsonDf.writeStream\
             .format("kafka")\
            .outputMode("update")\
             .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("topic", "stock-ticks-1min")\
            .option("checkpointLocation", "file:///tmp/spark6")\
            .start()


stock3minDfTojsonDf.writeStream\
             .format("kafka")\
            .outputMode("update")\
             .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("topic", "stock-ticks-3min")\
            .option("checkpointLocation", "file:///tmp/spark7")\
            .start()

stock5minDfTojsonDf.writeStream\
             .format("kafka")\
            .outputMode("update")\
             .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("topic", "stock-ticks-5min")\
            .option("checkpointLocation", "file:///tmp/spark8")\
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fe80c1f9390>

22/03/28 00:29:00 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {stock-ticks-1min=LEADER_NOT_AVAILABLE}
22/03/28 00:29:00 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {stock-ticks-3min=LEADER_NOT_AVAILABLE, stock-ticks-1min=LEADER_NOT_AVAILABLE}
22/03/28 00:29:00 WARN NetworkClient: [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {stock-ticks-5min=LEADER_NOT_AVAILABLE}
                                                                                