In [None]:
import atexit
import logging
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import os
from datetime import datetime
import findspark
findspark.init()

In [None]:
logger_format = '%(asctime)-15s %(message)s'
logging.basicConfig(format=logger_format)
logger = logging.getLogger('stream-processing')
logger.setLevel(logging.INFO)

In [None]:
topic = 'stockTopic'
target_topic = "transformedStockTopic"
brokers = '127.0.0.1:9092'

In [None]:
def on_shutdown(spark):
  '''
  shutdown hook to be called before the shutdown
  :param spark: spark streaming instance
  :return: None
  '''
  try:
      logger.info("Stopping Spark Streaming Application")
      spark.stop(true, true)
      logger.info("Application stopped")
  except KafkaError as ke:
        logger.warnng(f'Failed to flush pending messages to kafka, due to {ke}')

In [None]:
def process_stream(pyspark_stream):

    time_udf=udf(lambda x:datetime.strptime(x, '%Y-%m-%dT%H:%M:%SZ'),TimestampType())
    
    pyspark_stream=pyspark_stream.withColumn('timestamp',time_udf(pyspark_stream.Time))
    
    aggregate_stream=pyspark_stream.withWatermark("timestamp", "20 seconds")\
                    .groupby(window("timestamp","10 seconds","5 seconds"), "StockSymbol")\
                    .agg(avg("Price").alias("Price"), min("Price").alias("minPrice"), max("Price").alias("maxPrice"), count("Price").alias("count"))\
                    .select("window.start", "window.end", "StockSymbol", "Price", "minPrice", "maxPrice", "count")
    
    aggregate_stream.writeStream.queryName('KafkaAggregate').outputMode("append").option("truncate", False).format("console").start()
    
    writeToKafka = aggregate_stream.selectExpr("CAST(StockSymbol AS STRING) AS key", "to_json(struct(*)) AS value")\
      .writeStream.format("kafka").option("kafka.bootstrap.servers",brokers).option("topic", target_topic)\
      .option("startingOffsets", "earliest").option("endingOffsets", "latest")\
      .option("checkpointLocation", "/tmp/sparkcheckpoint/").queryName("aggregate_kafka_streaming")\
      .outputMode("append").start()

In [None]:
if __name__ == '__main__':

    spark=SparkSession.builder.appName('test').config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2').getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    
    schema = StructType().add("StockSymbol", StringType()).add("Price", DoubleType()).add("Time",StringType())
    
    stock_stream=spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers",brokers).option("subscribe",topic)\
        .option("startingOffsets", "latest").load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value")).select(col("parsed_value.*"))
    
    process_stream(stock_stream)
    # - setup proper shutdown hook
    atexit.register(on_shutdown, spark)

    spark.streams.awaitAnyTermination() 