In [1]:
staticDataFrame = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true")\
        .load("/FileStore/tables/2011*.csv")




In [2]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [3]:
from pyspark.sql.functions import window, column, desc, col

staticDataFrame\
.selectExpr(    "CustomerId",    "(UnitPrice * Quantity) as total_cost",    "InvoiceDate")\
.groupBy(    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

In [4]:
# in Python
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/FileStore/tables/2011*.csv")

In [5]:
streamingDataFrame.isStreaming

In [6]:
# in Python
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(    "CustomerId",    "(UnitPrice * Quantity) as total_cost",    "InvoiceDate")\
.groupBy(    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

In [7]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

In [8]:
# in Python
spark.sql("""  SELECT *  FROM customer_purchases  ORDER BY `sum(total_cost)` DESC  """)\
.show(5)

In [9]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

In [10]:
# in Python
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [11]:
preppedDataFrame.show(5)