# Code Starting Point

In [1]:
# Import all required function to generate streams, queries and views
from pyspark.sql import SparkSession
from IPython.display import display, clear_output
import time
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StringType, StructField, IntegerType, FloatType, BinaryType, LongType, DoubleType

In [2]:
# Initiate a local spark session
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

In [3]:
# Create a function to generate the first stream (all data columns with no filtering)

def generate_stocktrades_stream(keep_stream = False):
    
    # Define the raw Spark Stream
    stream_df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "broker:29092") \
      .option("startingOffsets", "latest") \
      .option("subscribe", "STOCKTRADES_JSON") \
      .load()
    
    # Convert to string types for JSON conversion
    string_stream_df = stream_df \
        .withColumn("key", stream_df["key"].cast(StringType())) \
        .withColumn('value', stream_df["value"].cast(StringType()))
    
    # Define the Schema for the end JSON format
    schema_stocktrades =  StructType([
        StructField("SIDE", StringType(),  True),
        StructField("QUANTITY", IntegerType(),  True),
        StructField("PRICE", IntegerType(),  True),
        StructField("SYMBOL", StringType(),  True),
        StructField("ACCOUNT", StringType(), True),
        StructField("USERID", StringType(), True)
])
    # Convert the string type to json format stream
    json_stream_df = string_stream_df\
    .withColumn("value", F.from_json("value", schema_stocktrades))
    stocktrades_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
        "value.side", \
        "value.quantity", \
        "value.price", \
        "value.symbol", \
        "value.account", \
        "value.userid"
    )
    
    # Export a queryable view od the stream
    
    if not keep_stream:
        return stocktrades_stream_df \
        .writeStream \
        .format("memory") \
        .queryName("stocktrades_view") \
        .start()
    else:
        return stocktrades_stream_df \
        .writeStream \
        .format("memory") \
        .queryName("stocktrades_view") \
        .start(), stocktrades_stream_df

In [None]:
# Run function to generate raw stocktrades stream
stocktrades_stream = generate_stocktrades_stream()

In [None]:
# Show results (limit 20) of stream
clear_output(wait=True)
display(spark.sql('SELECT * FROM stocktrades_view').show(20))
time.sleep(1)

In [None]:
# Stop the stream as it is no longer used
stocktrades_stream.stop()

In [None]:
# Create function to stream filtered streams from Kafka as streaming dataframes
def generate_side_stream(SIDE):
    
    # Create string for source
    source_stream = SIDE + "_TRADES"
    # Define the raw Spark Stream
    
    stream_df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "broker:29092") \
      .option("startingOffsets", "latest") \
      .option("subscribe", source_stream) \
      .load()
    
    # Convert to string types for JSON conversion
    string_stream_df = stream_df \
        .withColumn("key", stream_df["key"].cast(StringType())) \
        .withColumn('value', stream_df["value"].cast(StringType()))
    
    # Define the Schema for the end JSON format
    schema_stocktrades =  StructType([
        StructField("QUANTITY", IntegerType(),  True),
        StructField("PRICE", IntegerType(),  True),
        StructField("SYMBOL", StringType(),  True),
        StructField("ACCOUNT", StringType(), True),
        StructField("USERID", StringType(), True)
])
    # Convert the string type to json format stream
    json_stream_df = string_stream_df\
    .withColumn("value", F.from_json("value", schema_stocktrades))
    stocktrades_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
        "value.quantity", \
        "value.price", \
        "value.symbol", \
        "value.account", \
        "value.userid"
    )
    
    # Export a queryable view od the stream
    return stocktrades_stream_df \
    .writeStream \
    .format("memory") \
    .queryName(SIDE + '_view') \
    .start()

In [None]:
# Create views of the buy and sell streams
BUY_stream = generate_side_stream('BUY')
SELL_stream = generate_side_stream('SELL')

In [None]:
# Show results (limit 20) of stream
clear_output(wait=True)
display(spark.sql('SELECT * FROM BUY_view').show(20))
time.sleep(1)

In [None]:
# Show results (limit 20) of stream
clear_output(wait=True)
display(spark.sql('SELECT * FROM SELL_view').show(20))
time.sleep(1)

In [None]:
# Stop the two streams
BUY_stream.stop()
SELL_stream.stop()

In [4]:
### Create windowed dataframe

# Restart the raw data data frame
raw_stream, raw_stream_df = generate_stocktrades_stream(True)

# Create parameters for the window stream
window_duration = '60 seconds'
slide_duration = '10 seconds'

In [9]:
# Create the windowed stream (this groups by symbol and then counts the numbers of trades in the window, quantity of shares traded and the average price traded)
windowed_agg_df = raw_stream_df \
    .withWatermark('event_timestamp', '1 minutes') \
    .groupBy(F.window(raw_stream_df.event_timestamp, window_duration, slide_duration), raw_stream_df.symbol) \
    .agg(F.count('SYMBOL').alias('no_trades'), \
    F.sum('QUANTITY').alias('tot_quantity'), \
    F.avg('PRICE').alias('avg_price') \
     )

In [10]:
# Export the stream as a qeuryable view
windowed_agg_stream = windowed_agg_df \
                        .writeStream \
                        .format("memory") \
                        .outputMode("Complete") \
                        .queryName("windowed_view") \
                        .start()

In [11]:
# Show output from view
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT * FROM windowed_view').show())
    time.sleep(1)

KeyboardInterrupt: 

In [12]:
raw_stream.stop()

In [13]:
windowed_agg_stream.stop()

In [14]:
spark.stop()