<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#구조적-스트리밍(Structured-Streaming)" data-toc-modified-id="구조적-스트리밍(Structured-Streaming)-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>구조적 스트리밍(Structured Streaming)</a></span></li><li><span><a href="#머신러닝과-고급분석" data-toc-modified-id="머신러닝과-고급분석-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>머신러닝과 고급분석</a></span><ul class="toc-item"><li><span><a href="#데이터-전처리" data-toc-modified-id="데이터-전처리-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>데이터 전처리</a></span></li><li><span><a href="#데이터-분할" data-toc-modified-id="데이터-분할-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>데이터 분할</a></span></li><li><span><a href="#Feature-Engineering" data-toc-modified-id="Feature-Engineering-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Feature Engineering</a></span></li><li><span><a href="#Vector-화-시키기" data-toc-modified-id="Vector-화-시키기-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Vector 화 시키기</a></span></li><li><span><a href="#Pipeline-화-시키기" data-toc-modified-id="Pipeline-화-시키기-2.5"><span class="toc-item-num">2.5&nbsp;&nbsp;</span>Pipeline 화 시키기</a></span></li><li><span><a href="#모델-학습" data-toc-modified-id="모델-학습-2.6"><span class="toc-item-num">2.6&nbsp;&nbsp;</span>모델 학습</a></span></li><li><span><a href="#Cost(비용)-평가" data-toc-modified-id="Cost(비용)-평가-2.7"><span class="toc-item-num">2.7&nbsp;&nbsp;</span>Cost(비용) 평가</a></span></li></ul></li><li><span><a href="#저수준-API인-RDD" data-toc-modified-id="저수준-API인-RDD-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>저수준 API인 RDD</a></span></li></ul></div>

# 구조적 스트리밍(Structured Streaming)

- 스트림 처리용 고수준 API
- 구조적 API(Dataset, DataFrame, SQL)로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행 가능

In [1]:
staticDataFrame = spark\
                  .read.format('csv')\
                  .option('header', 'true')\
                  .option('inferSchema', 'true')\
                  .load('/Users/younghun/Desktop/gitrepo/data/spark_perfect_guide/retail-data/by-day/*.csv')

In [2]:
# SQL 사용하기 위해 임시 테이블 등록
staticDataFrame.createOrReplaceTempView('retail_data')

# 로드한 csv 데이터들의 스키마를 객체로 할당
staticSchema = staticDataFrame.schema

In [3]:
print(staticSchema, '\n', type(staticSchema))

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true))) 
 <class 'pyspark.sql.types.StructType'>


- 시계열 데이터를 다룰 때 타임스태프를 자주 다루기 떄문에 **윈도우 함수**를 자주 사용
- *총 구매비용 칼럼을 추가하고 고객별로 가장 많이 소비한 날*

In [4]:
staticDataFrame.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



In [5]:
# DataFrame 구문
from pyspark.sql.functions import window, col, desc

# 그룹핑 기준을 2개: 고객별, 날짜(1일) 별
staticDataFrame\
.selectExpr("CustomerID", "(UnitPrice * Quantity) AS total_cost", "InvoiceDate")\
.groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.sort(desc("sum(total_cost)"))\
.limit(5)\
.show()

+----------+--------------------+------------------+
|CustomerID|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|[2011-09-20 09:00...|          71601.44|
|      null|[2011-11-14 09:00...|          55316.08|
|      null|[2011-11-07 09:00...|          42939.17|
|      null|[2011-03-29 09:00...| 33521.39999999998|
|      null|[2011-12-08 09:00...|31975.590000000007|
+----------+--------------------+------------------+



In [6]:
# Shuffle 파티션 수정하기
spark.conf.set('spark.sql.shuffle.partitions', '5')

- 스트리밍 코드로 변환하기
- ``maxFilesPerTrigger``로 한 번에 읽을 파일 개수 설정 가능

In [7]:
streamingDataFrame = spark\
                     .readStream\
                     .schema(staticSchema)\
                     .option('maxFilesPerTrigger', 1)\
                     .format('csv')\
                     .option('header', 'true')\
                     .load('/Users/younghun/Desktop/gitrepo/data/spark_perfect_guide/retail-data/by-day/*.csv')

In [8]:
# 스트리밍 유형 데이터프레임인지 확인
streamingDataFrame.isStreaming

True

In [11]:
streamingDataFrame.explain()

== Physical Plan ==
StreamingRelation FileSource[/Users/younghun/Desktop/gitrepo/data/spark_perfect_guide/retail-data/by-day/*.csv], [InvoiceNo#106, StockCode#107, Description#108, Quantity#109, InvoiceDate#110, UnitPrice#111, CustomerID#112, Country#113]




- 스트리밍 코드의 ``Transformations``

In [12]:
# 고객 별로 하루 당 총 판매금액 계산하는 로직
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr("CustomerID", "(UnitPrice * Quantity) AS total_cost", "InvoiceDate")\
.groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

- 코드실행위해 스트리밍 코드를 ``Action`` 하기
- 단, 스트리밍 액션은 ``count``와 같은 정적 액션과는 다름
- **트리거**가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장함
- 여기서는 파일마다 **트리거**가 실행됨


In [13]:
"""
Args:
    - memory : 인메모리 테이블에 데이터를 저장
    - customer_purchases : 인메모리에 저장될 테이블 이름
    - complete : 코드 수행 결과 모든 것을 테이블에 저장
"""
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9bce5f9250>

In [14]:
# 위에서 인메모리에 저장한 테이블에 데이터가 어떻게 기록되었는지 확인
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY 'sum(total_cost)' DESC""")\
.show(5)

+----------+--------------------+------------------+
|CustomerID|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17576.0|[2010-12-13 09:00...|177.35000000000002|
|   17368.0|[2011-01-06 09:00...| 563.1500000000001|
|   15208.0|[2010-12-21 09:00...|              65.4|
|   15039.0|[2010-12-14 09:00...| 706.2500000000002|
|   16250.0|[2010-12-01 09:00...|            226.14|
+----------+--------------------+------------------+
only showing top 5 rows



In [15]:
# console에 출력시키기
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9bce04b150>

# 머신러닝과 고급분석

In [16]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



## 데이터 전처리
- Timestamp 변수에서 수치형 변수로 파생변수 만들기
- ``coalesce(num_of_partitions)``: RDD/DF의 파티션의 개수를 효율적인 방법으로 몇 개로 줄일 건지
    * 이와 비슷하게 파티션 개수를 늘였다 줄였다 할 수 있는 ``repartition()``도 있음!
    * 위 두 개의 메소드는 공통적으로 shuffle을 사용하는 Transformation이기 때문에 상당히 expensive한 operations들임. 그래서 이를 가능한 한 효율적으로 해주어야 함
    * ``coalesce()``가 ``repartiton()`` 메소드를 operation 비용에서 좀 더 최적화시킴. 파티션 간의 데이터 shuffle를 보다 낮게 이동. 예를 들면 기존의 파티션 5개(1번~5번) 중 3개로 줄이면 기존의 파티션 중 2개의 파티션에 들어있는 데이터들을 다른 파티션(3개)로 분할하면서 데이터의 이동을 줄임
    * 하지만 파티션을 **줄일**때만 사용 가능하다는 것



In [17]:
from pyspark.sql.functions import date_format, col

# day of week(요일) 변수 새로 만들기 -> 'EEEE' -> Monday 문자열 다나옴, 'EEE' -> Mon까지만 나옴
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn('day_of_week', date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [18]:
preppedDataFrame.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
only showing top 3 rows



## 데이터 분할
- 데이터셋 분할 트랜스포메이션 API: ``TrainValidationSplit()``, ``CrossValidator()``도 있음
- 여기 예제에서는 ``where()``를 활용해 분할

In [19]:
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

# count of each dataframe
print('Train:', trainDataFrame.count())
print('Test:', testDataFrame.count())

Train: 245903
Test: 296006


## Feature Engineering
- 범주형 변수를 수치형으로 1:1 변환. 마치 Label Encoder임. 여기서는 ``StringIndexer``을 사용
- 하지만 만약 범주형 변수가 명목형 변수라고 한다면 서열 값이 반영되기 때문에 잘못된 방식임

In [20]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer()\
.setInputCol('day_of_week')\
.setOutputCol('day_of_week_index')

- 명목형 변수를 Boolean 수치형 변수로 바꾸기 위해 ``StringIndex`` 수치값을 ``OneHotEncoder`` 사용

In [24]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
.setInputCol('day_of_week_index')\
.setOutputCol('day_of_week_encoded')

- 위 ``StringIndexer``, ``OneHotEncoder`` 결과는 모두 벡터 타입을 구성할 컬럼 중 하나로 사용됨

## Vector 화 시키기
- Spark MLlib 머신러닝 알고리즘 입력으로 넣기 위해 변수들을 **벡터화**시켜야함

In [25]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
.setInputCols(['UnitPrice', 'Quantity', 'day_of_week_encoded'])\
.setOutputCol('features')

## Pipeline 화 시키기
- 요일(day of week)을 레이블 인코딩, 원-핫 인코딩 시키는 Transformation, 그리고 변수들을 벡터화시키는 벡터 어셈블러 Transformation을 파이프라인으로 만들기

In [26]:
from pyspark.ml import Pipeline

transformationsPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

In [33]:
# 1. 변환자(Transformer)를 적합(fit)시키기(변환시킬 데이터프레임 인자로 넣어주기)
fittedPipeline = transformationsPipeline.fit(trainDataFrame)

# 2. 파이프라인에 지정된 변환자들 사용해 데이터 변환(변환시킬 데이터프레임 인자로 넣어주기)
transformedTraining = fittedPipeline.transform(trainDataFrame)

transformedTest = fittedPipeline.transform(testDataFrame)

In [28]:
# 중간 변환된 데이터셋의 복사본을 메모리에 저장하는 캐싱코드 -> 이를 제거하고 학습 시 속도 차이가 매우 큼
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

## 모델 학습

In [29]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(20).setSeed(1)

In [30]:
# Pipeline 할당(캐싱버전)
import time
start = time.time()

kmModel = kmeans.fit(transformedTraining)
print("소요 시간:", round(time.time() - start, 3),'초')

소요 시간: 17.098 초


In [14]:
# Pipeline 할당(노캐싱버전)
import time
start = time.time()

kmModel = kmeans.fit(transformedTraining)
print("소요 시간:", round(time.time() - start, 3),'초')

소요 시간: 5.428 초


## Cost(비용) 평가
- 현재 KMeans 알고리즘이기 때문에 Cost(비용) = '각 군집 중심점과의 제곱거리의 합'

In [31]:
# Train 데이터에 대한 Cost
from pyspark.ml.evaluation import ClusteringEvaluator

# Train 데이터에 대해 예측
train_pred = kmModel.transform(transformedTraining)

evaluator = ClusteringEvaluator()
train_silhouette = evaluator.evaluate(train_pred)

print("Train 데이터에 대한 실루엣 지표:", train_silhouette)

Train 데이터에 대한 실루엣 지표: 0.6842576726028763


In [34]:
# Test 데이터에 대한 Cost
test_pred = kmModel.transform(transformedTest)

test_silhouette = evaluator.evaluate(test_pred)

print("Test 데이터에 대한 실루엣 지표:", test_silhouette)

Test 데이터에 대한 실루엣 지표: 0.5427938390491535


# 저수준 API인 RDD

- 대부분은 ``Dataset, DataFrame, SQL``과 같은 구조적 API를 사용하는 것이 좋음
- 그래도 RDD 이용하면 DataFrame 보다 더 세밀한 제어 가능
- 드라이버 시스템의 메모리에 저장된 원시 데이터를 병렬처리(parallelize)하는 데 RDD를 사용 가능

In [36]:
# 간단하게 RDD로 만들고 DataFrame으로 변환하기
from pyspark.sql import Row

rdd_df = spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF() # 트랜스폼!
rdd_df.show() # 액션!

+---+
| _1|
+---+
|  1|
|  2|
|  3|
+---+

