In [1]:
staticDF = spark.read.format('csv')\
                .option('header','true')\
                .option('inferSchema','true')\
                .load('data/retail-data/by-day/*.csv')

In [2]:
staticDF.createOrReplaceTempView('retail_data')
staticSchema = staticDF.schema
staticSchema

StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

In [3]:
from pyspark.sql.functions import window, column, desc, col
staticDF.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
        .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")\
        .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-04 19:00...|            -37.6|
|   14126.0|{2011-11-28 19:00...|643.6300000000001|
|   13500.0|{2011-11-15 19:00...|497.9700000000001|
|   17160.0|{2011-11-07 19:00...|516.8499999999999|
|   15608.0|{2011-11-10 19:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [4]:
streamingDF = spark.readStream.schema(staticSchema)\
                   .option("maxFilesPerTrigger", 1)\
                   .format("csv")\
                   .option("header","true")\
                   .load("data/retail-data/by-day/*.csv")

In [5]:
streamingDF.isStreaming

True

In [6]:
purchaseByCustomerPerHour = streamingDF.selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate"
).groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")).sum("total_cost")

In [7]:
purchaseByCustomerPerHour.writeStream.format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f8b0d538d00>

In [15]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2011-11-13 19:00...|          55316.08|
|      null|{2011-11-06 19:00...|          42939.17|
|      null|{2011-03-28 20:00...| 33521.39999999998|
|      null|{2011-12-07 19:00...|31975.590000000007|
|   18102.0|{2011-09-14 20:00...|31661.540000000005|
+----------+--------------------+------------------+
only showing top 5 rows

