In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Ops').getOrCreate()

# Read the CSV file


In [3]:
staticDataframe = spark.read.format("csv").option("header","true").option("inferShcema","true").load("/home/supriya/Documents/Supriya/UMBC_IS/UMBC_Courses/IS_700_IndStudy/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")


# Create SQL table from retail data


In [4]:
staticDataframe.createOrReplaceTempView("retail_data")


# Create static schema of retail data- to be used while creating streaming dataframe


In [5]:
staticSchema = staticDataframe.schema


In [6]:
from pyspark.sql.functions import window, column, desc, col
staticDataframe.selectExpr("CustomerId","(UnitPrice *Quantity) as total_cost","InvoiceDate").groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day")).sum("total_cost").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15274.0|[2011-12-04 19:00...|            332.58|
|   14719.0|[2011-12-07 19:00...|406.41999999999985|
|   16794.0|[2011-12-07 19:00...|100.66000000000003|
|   12464.0|[2011-11-28 19:00...|             281.9|
|   15269.0|[2011-11-15 19:00...|             408.8|
+----------+--------------------+------------------+
only showing top 5 rows



# set the number of partitions to 5 instead of default 200(for local machine)


In [7]:
spark.conf.set("spark.sql.shuffle.partitions","5")


# create streaming dataframe


In [8]:
streamingDataframe = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger",1).format("csv").option("header","true").load("/home/supriya/Documents/Supriya/UMBC_IS/UMBC_Courses/IS_700_IndStudy/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")


# verify if streaming dataframe is created

In [9]:
streamingDataframe.isStreaming

True

# Run a query on streaming data

In [11]:
purchaseByCustPerHour = streamingDataframe.selectExpr("CustomerId","(UnitPrice*Quantity) as total_cost","InvoiceDate").groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day")).sum("total_cost") 
print(purchaseByCustPerHour)

DataFrame[CustomerId: string, window: struct<start:timestamp,end:timestamp>, sum(total_cost): double]


# write to stream using  'memory' sink and in 'complete' outputmode

In [12]:
purchaseByCustPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()


<pyspark.sql.streaming.StreamingQuery at 0x7f1be12a9cc0>

In [13]:
spark.sql("""select * from customer_purchases order by 'sum(total_cost)' desc """).show(5)


+----------+--------------------+---------------+
|CustomerId|              window|sum(total_cost)|
+----------+--------------------+---------------+
|   16027.0|[2011-02-09 19:00...|         320.52|
|   12724.0|[2011-01-10 19:00...|          154.9|
|   16011.0|[2010-12-11 19:00...|         168.15|
|   13777.0|[2011-12-08 19:00...|         265.24|
|   15694.0|[2011-12-08 19:00...|          406.8|
+----------+--------------------+---------------+
only showing top 5 rows

