# 구조적 스트리밍(Structured Streaming)

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("chapter3") \
        .getOrCreate()

24/09/13 14:35:13 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.65 instead (on interface en0)
24/09/13 14:35:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/13 14:35:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
staticDataFrame = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("./data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

                                                                                

In [4]:
from pyspark.sql.functions import window, col

staticDataFrame \
.selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate") \
.groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
.sum("total_cost") \
.show(5)



+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 09:00...|            -37.6|
|   14126.0|{2011-11-29 09:00...|643.6300000000001|
|   13500.0|{2011-11-16 09:00...|497.9700000000001|
|   17160.0|{2011-11-08 09:00...|516.8499999999999|
|   15608.0|{2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



                                                                                

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [6]:
streamingDataFrame = spark.readStream \
.schema(staticSchema) \
.option("maxFilesPerTrigger", 1) \
.format("csv") \
.option("header", "true") \
.load("./data/retail-data/by-day/*.csv")

streamingDataFrame.isStreaming

                                                                                

True

In [7]:
purchaseByCustomerPerHour = streamingDataFrame \
.selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate") \
.groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day")) \
.sum("total_cost")

In [9]:
purchaseByCustomerPerHour.writeStream \
.format("memory") \
.queryName("customer_purchases") \
.outputMode("complete") \
.start()

24/09/13 14:51:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/t7/q3w9gqsj62l3jptkgcr3s_0w0000gn/T/temporary-9d9d0f55-bb0c-4318-bbf4-8ec63f8eb45c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/13 14:51:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x107787d00>

                                                                                

In [10]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
    """) \
.show(5)

                                                                                

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      NULL|{2010-12-21 09:00...|31347.479999999938|
|   18102.0|{2010-12-07 09:00...|          25920.37|
|      NULL|{2010-12-10 09:00...|25399.560000000012|
|      NULL|{2010-12-17 09:00...|25371.769999999768|
|      NULL|{2010-12-06 09:00...|23395.099999999904|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                