# DSE6220
# Week 7
# Spark Streaming

# Schema Definition
Define the schema of the incoming files using the appropriate data types. The datetime field will be handled in the stream. 
Drop the table on re-runs and delete the checkpoint directory. 

In [0]:
from pyspark.sql.types import StructType, StructField, StringType , IntegerType, FloatType
from pyspark.sql import functions as F
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

schema = StructType([   
                     StructField('InvoiceNo',IntegerType()),
                     StructField('StockCode',StringType()),
                     StructField('Description',StringType()),
                     StructField('Quantity',IntegerType()),
                     StructField('InvoiceDate',StringType()),
                     StructField('UnitPrice',FloatType()),
                     StructField('CustomerID',IntegerType()),
                     StructField('Country',StringType()),

])

In [0]:
%sql
drop table if exists merrimack.invoices

In [0]:
dbutils.fs.rm("dbfs:/FileStore/merrimack/invoices/invcheckpoint",recurse=True)

False

# Formatting and Stream Setup
Function adopted from the prvious lab with total sales calculated within the function. 

Build the streaming aggregation method to calculate total sales by invoice date and partition date. I had to use invoicedate here because of the watermark. 

In [0]:
def format_data(data):
    temp = data.withColumn('InvoiceDate', F.concat(F.regexp_replace('InvoiceDate', '\.', ':'), F.lit(":00")))\
                    .withColumn(
                                'InvoiceDate',
                                    F.to_timestamp(
                                        F.col("InvoiceDate"),
                                        'dd-MM-yyyy HH:mm:ss'
                                    
                                )
                            )\
                    .withColumn('partdate', F.date_format('InvoiceDate', 'yyyyMM'))\
                    .withColumn('totalSales', data.Quantity * data.UnitPrice)   
    return temp 

In [0]:
source_dir = 'dbfs:/FileStore/merrimack/invoices/'

In [0]:
dbutils.fs.rm("dbfs:/FileStore/merrimack/invoices/invcheckpoint",recurse=True)

In [0]:
%sql
use merrimack

In [0]:
df = spark.readStream.format("csv")\
        .option('header','true')\
        .schema(schema)\
        .load(source_dir)

In [0]:
"""
Apply the formatting function to the data and create an aggregation with a watermark. The watermark was required to do streaming aggregation. 
"""

df = format_data(df)

agg_df = df.withWatermark("InvoiceDate", "10 minutes") \
    .groupBy(['InvoiceDate','partdate'])\
    .agg(F.round(F.sum(F.col('totalSales')),2).alias('totalSales'))

# Displaying the Stream & Writing to a Table
Here I am displaying the aggregated stream and writing the data to a table with a partition on the partition date. 

In [0]:
display(agg_df)

InvoiceDate,partdate,totalSales
2023-03-22T11:20:00Z,202303.0,204.15
2023-01-07T10:54:00Z,202301.0,230.56
2021-04-07T09:02:00Z,202104.0,259.86
2021-01-08T08:28:00Z,202101.0,22.2
2023-04-28T11:22:00Z,202304.0,572.38
2023-05-30T11:45:00Z,202305.0,276.6
2021-06-15T09:37:00Z,202106.0,444.98
2021-05-09T09:32:00Z,202105.0,259.86
2022-06-01T09:38:00Z,202206.0,2187.71
2021-01-01T08:26:00Z,202101.0,139.12


In [0]:
 WriteStream = ( agg_df.writeStream
        .option('checkpointLocation',f'{source_dir}/invcheckpoint')
        .outputMode("append")
        .queryName('AppendQuery')
        .partitionBy('partdate')
        .toTable("merrimack.invoices"))

# Examine the Output Data & Optimize
This code just displays the output table, its partitions, and performs an optimize with a Zorder on the output table. 

In [0]:
%sql
SELECT * FROM merrimack.invoices

InvoiceDate,partdate,totalSales
2021-04-07T09:02:00Z,202104,259.86
2021-04-23T09:09:00Z,202104,350.4
2021-06-15T09:37:00Z,202106,444.98
2021-06-01T09:34:00Z,202106,22.2
2023-05-26T11:41:00Z,202305,1056.63
2023-05-03T11:23:00Z,202305,106.2
2023-05-09T11:35:00Z,202305,235.16
2021-03-22T09:01:00Z,202103,22.2
2021-03-03T09:00:00Z,202103,204.0
2023-01-07T10:54:00Z,202301,230.56


In [0]:
%sql
describe history merrimack.invoices

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2024-06-12T21:53:26Z,2551193103357533,g27258752@gwu.edu,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> e80722e9-0821-4c75-88d5-ae98e7106208, epochId -> 1, statsOnLoad -> false)",,List(456803328417616),0330-191842-alkvksrc,1.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 50, numOutputBytes -> 40682, numAddedFiles -> 48)",,Databricks-Runtime/15.0.x-cpu-ml-scala2.12
1,2024-06-12T21:52:45Z,2551193103357533,g27258752@gwu.edu,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> e80722e9-0821-4c75-88d5-ae98e7106208, epochId -> 0, statsOnLoad -> false)",,List(456803328417616),0330-191842-alkvksrc,0.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0, numAddedFiles -> 0)",,Databricks-Runtime/15.0.x-cpu-ml-scala2.12
0,2024-06-12T21:51:51Z,2551193103357533,g27258752@gwu.edu,CREATE TABLE,"Map(partitionBy -> [""partdate""], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(456803328417616),0330-191842-alkvksrc,,WriteSerializable,True,Map(),,Databricks-Runtime/15.0.x-cpu-ml-scala2.12


In [0]:
WriteStream.stop()

In [0]:
%sql
show partitions merrimack.invoices

partdate
202110
202101
202111
202304
202108
202109
202303
202206
202102
202105


In [0]:
%sql
OPTIMIZE merrimack.invoices ZORDER BY (InvoiceDate)

path,metrics
dbfs:/user/hive/warehouse/merrimack.db/invoices,"List(17, 47, List(814, 953, 829.8823529411765, 17, 14108), List(815, 849, 848.2340425531914, 47, 39867), 18, List(minCubeSize(107374182400), List(0, 0), List(48, 40682), 0, List(47, 39867), 17, null), 1, 48, 1, false, 0, 0, 1718229392923, 1718229413342, 4, 17, null, List(0, 0), 3, 3, 4340, 0, null)"
