In [18]:
import findspark
from pyspark.sql import SparkSession

In [19]:
findspark.init()
spark = SparkSession.builder.appName("RetailByDayStructuredStreaming").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [20]:
spark.sparkContext.setLogLevel("error")

In [21]:
staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("./data/retail-data/by-day/*.csv")

In [22]:
staticDataFrame.createOrReplaceTempView("retail_data")

In [23]:
staticSchema = staticDataFrame.schema

In [24]:
from pyspark.sql import functions as func

In [25]:
staticDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
    .groupBy(func.col("CustomerId"), func.window(func.col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13417.0|{2011-12-04 09:00...|            404.83|
|   15358.0|{2011-12-05 09:00...| 830.0600000000003|
|   15392.0|{2011-12-05 09:00...|304.40999999999997|
|   15290.0|{2011-12-05 09:00...|263.02000000000004|
|   16811.0|{2011-12-05 09:00...|             232.3|
+----------+--------------------+------------------+
only showing top 5 rows



## Streaming Code

In [26]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("./data/retail-data/by-day/*.csv")

In [27]:
streamingDataFrame.isStreaming

True

In [28]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        func.col("CustomerId"), func.window(func.col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

- Streaming Action은 `count`메서드와 같은 일반적인 정적 액션과는 다른 특성을 가진다.
- 아래는 **트리거**가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다.
- 아래 예제는 파일마다 트리거를 실행한다.
- Spark는 이전 집계값 보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신하므로 언제나 가장 큰 값을 얻을 수 있다. 

In [29]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x1144c5a60>

In [30]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   18102.0|{2010-12-07 09:00...|          25920.37|
|      NULL|{2010-12-06 09:00...|23395.099999999904|
|      NULL|{2010-12-03 09:00...| 23021.99999999999|
|      NULL|{2010-12-09 09:00...|15354.279999999955|
|      NULL|{2010-12-01 09:00...|12584.299999999988|
+----------+--------------------+------------------+
only showing top 5 rows



In [31]:
purchaseByCustomerPerHour.writeStream\
    .format("console")\
    .queryName("customer_purchases_2")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x1144c7170>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 09:00...|             322.4|
|   16583.0|{2010-12-01 09:00...|233.45000000000002|
|   17897.0|{2010-12-01 09:00...|            140.39|
|   12748.0|{2010-12-01 09:00...|              4.95|
|   15350.0|{2010-12-01 09:00...|            115.65|
|   17809.0|{2010-12-01 09:00...|              34.8|
|   13747.0|{2010-12-01 09:00...|              79.6|
|   16250.0|{2010-12-01 09:00...|            226.14|
|   15983.0|{2010-12-01 09:00...|            440.89|
|   17511.0|{2010-12-01 09:00...|           1825.74|
|   14001.0|{2010-12-01 09:00...|            301.24|
|   17460.0|{2010-12-01 09:00...|              19.9|
|   18074.0|{2010-12-01 09:00...|             489.6|
|   12868.0|{2010-12-01 09:00...|             203.3|
| 

                                                                                

- 메모리 또는 콘솔로 출력하는 방식과 파일별로 트리거를 수행하는 방식은 실제 운영 환경에서 사용하는 것이 좋지 않다.
- 이벤트 시간에 따라 윈도우를 구성하는 방식에 주목할 필요가 있다