# chap03 스파크 기능 둘러보기

In [1]:
spark

##  3.3 구조적 스트리밍

In [2]:
staticDataFrame = spark.read.format('csv').option('inferSchema', 'true').option('header', 'true')\
.csv('file:///home/ubuntu/study/spark/data/retail-data/by-day/*.csv')

createOrReplaceTempView 이용 DataFrame을 tempView로 변환 sql 사용가능

In [3]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [4]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



기존 관계형 데이터베이스는 칼럼과 칼럼간의 연산, 비교, 연결이나 집합에 대한 집계는 쉬운 반면,  
행과 행간의 관계를 정의하거나, 행과 행간을 비교, 연산하는 것을 하나의 SQL문으로 처리하는 것은 매우 어려운 일이었다.   
하지만, 윈도우 함수를 이용한다면 행과 행간의 관계를 쉽게 정의할 수 있게 된다.

In [5]:
from pyspark.sql.functions import col, window

staticDataFrame.selectExpr('CustomerID',"(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
.groupBy(col('CustomerID'), window(col('InvoiceDate'), "1 day"))\
.sum('total_cost').orderBy(col('sum(total_cost)'))\
.show(5)

+----------+--------------------+-------------------+
|CustomerID|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|      null|[2011-01-05 00:00...| -29716.63000000005|
|      null|[2011-05-16 00:00...|-10387.250000000062|
|      null|[2011-02-21 00:00...|  -8437.31000000004|
|      null|[2010-12-07 00:00...| -8066.940000000085|
|      null|[2011-08-12 00:00...| -7862.940000000068|
+----------+--------------------+-------------------+
only showing top 5 rows



1. selectExpr : CustomerID, total_cost, InvoiceDate를 선택 
2. groupBy : CustomerID를 이용하여 1차적으로 묶고 InvoiceDate 를 기준으로 윈도우 형성하는데 1일이 기준, 
3. sum: CustomerID, 별 InvoiceDate(1일)로 묶여있는 상태에서 total_cost의 합을 구함

In [6]:
spark.conf.set("spark.sql.suffle.partitons","5")

### 구조적 스트리밍 설계

In [7]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option('maxFilesPerTrigger',1)\
.option('header','true')\
.format('csv')\
.load('file:///home/ubuntu/study/spark/data/retail-data/by-day/*.csv')

In [8]:
streamingDataFrame.isStreaming

True

1. option에 추가한 maxFilesPerTrigger 는 한번에 읽을 파일 개수를 설정해준다. 

In [9]:
purchaseBycustomerPerHour = streamingDataFrame\
.selectExpr('CustomerID', "(Quantity * UnitPrice) as total_cost", "InvoiceDate")\
.groupBy(col('CustomerID'), window(col('InvoiceDate'), '1 day'))\
.sum('total_cost')

위의 코드는 마찬가지로 /스트리밍 지연 연산/이다. 
따라서 스트리밍 액션을 실행해주어야하는데, 스트리밍 액션은 
1. 트리거가 실행 (여기서는 파일 하나씩 더 읽어옴)
2. 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장

In [10]:
purchaseBycustomerPerHour.writeStream\
.format('memory')\
.queryName('customer_purchases')\
.outputMode('complete')\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f356af787f0>

1. format('memory') :  인메모리 테이블에 저장
2. queryName('customer_purchases') : 인메모리 테이블에 저장될 테이블 명
3. outputMode('complete') : 모든 카운트 수행결과를 테이블에 저장

In [11]:
spark.sql('SELECT * FROM customer_purchases ORDER BY "sum(total_cost)" DESC').show(5)

+----------+------+---------------+
|CustomerID|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



## 3.4 머신러닝과 고급 분석

In [12]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



머신러닝 알고리즘을 위해 변수들을 수치형데이터로 변환

In [23]:
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
.na.fill(0).withColumn('day_of_week', date_format(col('InvoiceDate'),'EEEE')).coalesce(5)

In [24]:
preppedDataFrame.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
only showing top 3 rows



train/test split

In [25]:
trianDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")
trianDataFrame.count(), testDataFrame.count()

(245903, 296006)

In [26]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

indexer = StringIndexer().setInputCol("day_of_week").setOutputCol('day_of_week_index')
encoder = OneHotEncoder().setInputCol("day_of_week_index").setOutputCol('day_of_week_encoded')
vectorAssembler = VectorAssembler().setInputCols(['UnitPrice','Quantity',"day_of_week_encoded"]).setOutputCol('features')

1. StringIndexer : 문자열을 수치형으로 변환  
2. OneHotEncoder : 이산값을 벡터로 변환
3. VectorAssembler : 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용

In [27]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline().setStages([indexer, encoder, vectorAssembler])

1. Pipeline : 위의 변환자들을 뭉처 하나의 파이프라인 만들기

In [28]:
fittedPipeline = transformationPipeline.fit(trianDataFrame)

In [30]:
transformedTraining = fittedPipeline.transform(trianDataFrame)

데이터를 캐싱하여 더빠르게 처리 가능 

In [31]:
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [35]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(20).setSeed(3)

In [36]:
kmModel = kmeans.fit(transformedTraining)

In [38]:
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)

552678468.9342725