In [1]:
import os
# save using vs code
SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.3'
# Download Kafka Jar file, this for readStream.format("kafka"), "kafka" is a driver
# kafka driver code is part of Maven Jar file
# https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.3
# pyspark-shell shall download the jar file behind..
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark


from pyspark.sql import SparkSession
# spark groupBy has default setting for spark.sql.shuffle.partitions as 200
# we set to  4, should NOT be done in production 
spark = SparkSession.builder.master("local[1]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaCandlesA4").getOrCreate()

22/04/01 01:50:51 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33)
22/04/01 01:50:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eddfda95-2ab9-4338-a222-3cdaffc91c67;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 694ms :: artifacts dl 10ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central

In [5]:

kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "orders")\
  .option("group.id", "orders-group-rm")\
  .load()

In [6]:
kafkaDf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
# echoOnconsole =kafkaDf\
#                 .writeStream\
#                 .outputMode("update")\
#                 .format("console")\
#                 .option("truncate", False)\
#                 .start() # start the query. spark will subscribe for data

In [7]:
ticksDf = kafkaDf.selectExpr("CAST(value AS STRING)", "timestamp")
ticksDf.printSchema() # we get only value as string

root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
# echoOnconsole =kafkaDf\
#                 .writeStream\
#                 .outputMode("update")\
#                 .format("console")\
#                 .option("truncate", False)\
#                 .start() # start the query. spark will subscribe for data


In [8]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, LongType, TimestampType,IntegerType

schema = StructType( [
    StructField("Order_id", IntegerType(), True),
    StructField("Item_id", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", IntegerType(),  True),
    StructField("State",StringType(), True),
    StructField("timestamp", LongType(),  True)
])

In [9]:
jsonDf = ticksDf.withColumn("value", F.from_json("value", schema))
jsonDf.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- Order_id: integer (nullable = true)
 |    |-- Item_id: string (nullable = true)
 |    |-- Quantity: integer (nullable = true)
 |    |-- Price: integer (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [10]:
stockTickDf = jsonDf.select (F.col("value.*")) # extract all sub fields as schema
stockTickDf.printSchema()

root
 |-- Order_id: integer (nullable = true)
 |-- Item_id: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- timestamp: long (nullable = true)



In [None]:
echoOnconsole =stockTickDf\
                .writeStream\
                .outputMode("update")\
                .format("console")\
                .option("truncate", False)\
                .start() # start the query. spark will subscribe for data

In [11]:
ordersDf = stockTickDf.withColumn("Amount", F.col("Price") * F.col("Quantity"))
ordersDf.printSchema()

root
 |-- Order_id: integer (nullable = true)
 |-- Item_id: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- Amount: integer (nullable = true)



In [12]:
orderTickDf = ordersDf\
                .withColumn("timestampTemp", (F.col("timestamp") / 1000).cast("timestamp"))\
                .withColumn("trade_time", F.date_trunc("minute", F.col("timestampTemp")))\
                .drop("timestamp")\
                .drop("timestampTemp")\
                .withColumnRenamed("trade_time", "timestamp")

In [None]:
echoOnconsole = orderTickDf\
                .writeStream\
                .outputMode("update")\
                .format("console")\
                .option("truncate", False)\
                .start() # start the query. spark will subscribe for data

In [13]:
order5minDf = orderTickDf.withWatermark("timestamp","1 minute").groupBy("State",F.window("timestamp","5 minutes")).agg(F.sum("Amount").alias("Total_amount"))
                    
order5minDf.printSchema()

root
 |-- State: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- Total_amount: long (nullable = true)



In [None]:
## Converting back to kafka writable format
order5minDfKafka = order5minDf\
                            .selectExpr("to_json(struct(*)) AS value")


In [None]:
echoOnconsole =order5minDfKafka\
                .writeStream\
                .outputMode("update")\
                .format("console")\
                .option("truncate", False)\
                .start() # start the query. spark will subscribe for data

In [None]:
order5minDfKafka \
            .writeStream\
             .format("kafka")\
            .outputMode("update")\
             .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("topic", "orders-5min")\
            .option("checkpointLocation", "file:///tmp/spark3")\
            .start()

In [15]:
 # spark stream with batch output
# spark basically process the data as batch
# for developer convinence, we have api, that can give us finite data frame for writing to file/jsbc/db/custom coding
# process discrete output batch
# this function is called for every update/delete/comlete triggers with result data set as dataframe
# candleBatchDf is BATCH DATAFRAME
# stockTickDf1Min is STREAM DATAFRAME
def processBatchData(candleBatchDf, batch_id):
    print ("process batch called", batch_id, "writing ", candleBatchDf.count())
    # result = candleBatchDf.collect()
    # print(result)
    # candleBatchDf.show(2)
    # write to csv/json/parqeut/database /jdbc etc
    # window is nested column, has its children columns called start and end
    # start and end are part of nested window column
    # append mode will not delete existin data, instead append to existing table
    (
     candleBatchDf
        .select('*', F.col("window.*"))
        .withColumnRenamed("start", "start_time")
        .withColumnRenamed("end", "end_time")
        .drop("window")
        .write
        .mode("append")
        .format("jdbc")
        .option("url", "jdbc:mysql://localhost:3306/stockdb?autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "team")
        .option("password", "Team1234!")
        .option("dbtable", "statewise_earning")
         .save()
    )
    
order5minDf.writeStream.outputMode("update").foreachBatch(processBatchData).start()

22/04/01 02:03:53 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-67af7eda-b6e4-4072-86f5-60c2fd9d8fbf. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7fcf200edc90>

process batch called 0 writing  0
