# 구조적 스트리밍

* 구조적 스트리밍을 사용하면 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행할 수 있으며, 지연 시간을 줄이고 증분 처리할 수 있습니다. 
* 구조적 스트리밍은 배치 처리용 코드를 일부 수정하여 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다는 장점이 있습니다. 
* 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있으므로 개념 잡기가 수월
* 다음 예제는 retail 데이터 셋을 사용
* 소매 데이터셋에는 특정 날짜와 시간 정보가 포함
* 지금은 예제 데이터셋 중 하루치 데이터를 나타내는 by-day 디렉터리의 파일을 사용 
-----------
* 여러 프로세스에서 데이터가 꾸준하게 생성되는 상황을 상상해보세요
* 지금 사용하는 데이터는 소매 데이터이므로 소매점에서 생성된 데이터가 구조적 스트리밍 잡이 읽을 수 있는 저장소로 전송되고 있다고 가정

In [1]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/data/retail-data/by-day/*.csv")

In [2]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [3]:
staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

* 시계열 데이터를 다루기 때문에 데이터를 그룹화하고 집계하는 방법을 알아볼 필요가 있음 
* 특정 고객(CustomerId로 구분)이 대량으로 구매하는 영업시간을 살펴볼 것 
* 예를 들어 총 구매비용 컬럼을 추가하고 고객이 가장 많이 소비한 날을 찾아보자
------------
* window function는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성
* 윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용
* 스파크는 관련 날짜의 데이터를 그룹화

In [4]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   16813.0|{2011-12-01 00:00...|148.54000000000002|
|   13102.0|{2011-12-01 00:00...|308.05999999999995|
|   15311.0|{2011-11-18 00:00...|266.90999999999997|
|   14688.0|{2011-10-18 00:00...|            205.17|
|   16950.0|{2010-12-07 00:00...|             172.0|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

### 스트리밍 코드
* read 대신 readStream 메서드를 사용하는 것이 큰 차이점
* maxFilePerTrigger 옵션을 추가로 지정. 이 옵션을 사용해 한 번에 읽을 파일 수를 설정할 수 있음

In [6]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/data/retail-data/by-day/*.csv")

                                                                                

* DataFrame이 스트리밍 유형인지 확인

In [7]:
streamingDataFrame.isStreaming

True

In [8]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [9]:
purchaseByCustomerPerHour.explain()

== Physical Plan ==
*(4) HashAggregate(keys=[CustomerId#73, window#97], functions=[sum(total_cost#83)])
+- StateStoreSave [CustomerId#73, window#97], state info [ checkpoint = <unknown>, runId = 18b350f8-e671-4bbd-8803-c941e564e27b, opId = 0, ver = 0, numPartitions = 5], Append, 0, 2
   +- *(3) HashAggregate(keys=[CustomerId#73, window#97], functions=[merge_sum(total_cost#83)])
      +- StateStoreRestore [CustomerId#73, window#97], state info [ checkpoint = <unknown>, runId = 18b350f8-e671-4bbd-8803-c941e564e27b, opId = 0, ver = 0, numPartitions = 5], 2
         +- *(2) HashAggregate(keys=[CustomerId#73, window#97], functions=[merge_sum(total_cost#83)])
            +- Exchange hashpartitioning(CustomerId#73, window#97, 5), ENSURE_REQUIREMENTS, [id=#126]
               +- *(1) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(CustomerId#73)) AS CustomerId#73, window#97], functions=[partial_sum(total_cost#83)])
                  +- *(1) Project [named_struct(start, pre

* 이 작업 역시 지연 연산이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야 함 
* 스트리밍 액션은 어딘가에 데이터를 채워 넣어야 하므로 count 메서드와 같은 일반적인 정적 액션과는 조금 다른 특성을 가집니다.
* 여기서 사용할 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장 

In [10]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

2022-06-08 05:29:35,555 WARN streaming.ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a286d649-bb8c-4377-84b4-1016e6e0ebe6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
2022-06-08 05:29:35,568 WARN streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7fbd100b7bb0>

                                                                                

In [12]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2011-03-29 00:00...| 33521.39999999998|
|      null|{2010-12-21 00:00...|31347.479999999938|
|   18102.0|{2010-12-07 00:00...|          25920.37|
|      null|{2010-12-10 00:00...|25399.560000000012|
|      null|{2010-12-17 00:00...|25371.769999999768|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [None]:
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|      null|{2010-12-21 00:00...|31347.479999999938|
|   18102.0|{2010-12-07 00:00...|          25920.37|
|      null|{2010-12-10 00:00...|25399.560000000012|
|      null|{2010-12-17 00:00...|25371.769999999768|
|      null|{2010-12-06 00:00...|23395.099999999904|
+----------+--------------------+------------------+

# Spark 머신러닝

* 날짜 형식 변환

spark

In [1]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/data/retail-data/by-day/*.csv")

                                                                                

In [2]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



* coalesce 연산자는 파티션 개수를 줄이거나 늘리는 데 사용한다

In [3]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

In [4]:
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

In [5]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [6]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

In [7]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [8]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")

In [9]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [None]:
pip install numpy 

In [10]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

                                                                                

In [11]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [29]:
transformedTraining.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(7,[0,1,4],[2.95,...|
|(7,[0,1,4],[2.1,8...|
|(7,[0,1,4],[5.95,...|
|(7,[0,1,4],[1.65,...|
|(7,[0,1,4],[0.42,...|
+--------------------+
only showing top 5 rows



In [28]:
transformedTraining.select("features").head(5)

[Row(features=SparseVector(7, {0: 2.95, 1: 6.0, 4: 1.0})),
 Row(features=SparseVector(7, {0: 2.1, 1: 8.0, 4: 1.0})),
 Row(features=SparseVector(7, {0: 5.95, 1: 2.0, 4: 1.0})),
 Row(features=SparseVector(7, {0: 1.65, 1: 6.0, 4: 1.0})),
 Row(features=SparseVector(7, {0: 0.42, 1: 25.0, 4: 1.0}))]

In [27]:
transformedTraining.select("day_of_week_encoded").tail(5)

[Row(day_of_week_encoded=SparseVector(5, {})),
 Row(day_of_week_encoded=SparseVector(5, {})),
 Row(day_of_week_encoded=SparseVector(5, {})),
 Row(day_of_week_encoded=SparseVector(5, {})),
 Row(day_of_week_encoded=SparseVector(5, {}))]

In [14]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [15]:
kmModel = kmeans.fit(transformedTraining)

                                                                                

In [16]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [17]:
predictions = kmModel.transform(transformedTest)

In [18]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

#### 실루엣 
* 각 군집 간의 거리가 얼마나 효율적으로 분리돼 있는지를 표시
* 개별 데이터가 가지는 군집화 지표

In [19]:
silhouette = evaluator.evaluate(predictions)

                                                                                

In [20]:
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9335330049792616


In [30]:
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[3.61878368 5.72170058 0.19585629 0.19391248 0.18010294 0.17205723
 0.14921318]
[1.0400e+00 7.4215e+04 0.0000e+00 1.0000e+00 0.0000e+00 0.0000e+00
 0.0000e+00]
[ 1.6670865e+04 -1.0000000e+00  0.0000000e+00  0.0000000e+00
  0.0000000e+00  1.0000000e+00  0.0000000e+00]
[ 1.0400e+00 -7.4215e+04  0.0000e+00  1.0000e+00  0.0000e+00  0.0000e+00
  0.0000e+00]
[1.37047872e+03 2.12765957e-02 1.48936170e-01 1.70212766e-01
 4.25531915e-01 4.25531915e-02 2.12765957e-01]
[ 3.897e+04 -1.000e+00  0.000e+00  0.000e+00  0.000e+00  0.000e+00
  1.000e+00]
[ 5.43415e+03 -1.00000e+00  0.00000e+00  1.25000e-01  3.75000e-01
  0.00000e+00  5.00000e-01]
[8.45308642e-01 1.10427160e+03 2.46913580e-01 2.34567901e-01
 1.11111111e-01 1.97530864e-01 1.60493827e-01]
[1.16302932e+00 3.77254072e+02 3.01302932e-01 1.98697068e-01
 1.33550489e-01 1.93811075e-01 1.43322476e-01]
[ 0.000e+00 -5.368e+03  0.000e+00  0.000e+00  0.000e+00  1.000e+00
  0.000e+00]
[ 1.46892028 83.37677338  0.21217828  0.22284997 