In [13]:
staticDataFrame = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("./data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [17]:
from pyspark.sql.functions import window, col

staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 09:00...|            -37.6|
|   14126.0|[2011-11-29 09:00...|643.6300000000001|
|   13500.0|[2011-11-16 09:00...|497.9700000000001|
|   17160.0|[2011-11-08 09:00...|516.8499999999999|
|   15608.0|[2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

# 스트리밍 코드

In [20]:
streamingDataFrame = spark.readStream\
.schema(staticSchema).option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")

In [21]:
streamingDataFrame.isStreaming

True

In [25]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

In [26]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x11fb03fd0>

In [27]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)`DESC
""")\
.show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [29]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x11fb21470>