# 스파크 완벽 가이드

## Chapter 3

스파크는 기본 요소인 저수준 API와 구조적 API 그리고 추가 기능을 제공하는 일련의 표준 라이브러리로 구성되어 있다.

<img src="IMG_7CCDDDB62FA3-1.jpeg" alight='left'>

스파크의 라이브러리는 그래프 분석, 머신러닝 그리고 스트리밍 등 다양한 작업을 지원하며, 컴퓨팅 및 스토리지 시스템과의 통합을 돕는 역할을 한다.

이 장에서는 다음과 같은 내용을 설명한다.
* spark-submit : 명령으로 운영용 애플리케이션 실행
* Dataset: 타입안정성(type-safe)을 제공하는 구조적 api
* 구조적 스트리밍
* RDD: 스파크의 저수준 api
* SparkR
* 서드파티 패키지 에코시스템

## 3.1 운영용 애플리케이션 실행하기

스파크를 사용하여 빅데이터 프로그램을 쉽게 개발할 수 있다. spark-submit명령을 사용해 대화형 셀에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환할 수 있다. spark-submit 명령은 애플리케이션 코드를 클러스터에 전송해 신행시키는 역할을 한다.

다음은 파이썬으로 작성한 애플리케이션을 실행하는 예제이다.

``` python
./bin/spark-submit\
    --master local\
    ./example/src/main/python/pi.py 10

``` 

## 3.2 Dataset : 타입 안정성을 제공하는 구조적 API 

Dataset은 정적 타입 코드를 지원하기 위해 고안된 스파크의 구조적 API이다. Dataset은 타입 안정성을 지원하며 동적 타입 언어인 파이썬과 R에서는 사용할 수 없다.

## 3.3 구조적 스트리밍

구조적 스트리밍은 스파크 2.2버전에서 안정화된 스트림 처리용 고수준 API이다. 구조적 스트리밍을 사용하면 구조적 API로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행 할 수 있으며, 지연 시간을 줄이고 증분 처리할 수 있다.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("interSchema","true")\
.load("./data/retail-data/by-day/*.csv")

In [4]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [6]:
staticDataFrame.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [7]:
from pyspark.sql.functions import window, col

In [8]:
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupby(
col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")\
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15274.0|[2011-12-05 09:00...|            332.58|
|   14719.0|[2011-12-08 09:00...|406.41999999999985|
|   16794.0|[2011-12-08 09:00...|100.66000000000003|
|   12464.0|[2011-11-29 09:00...|             281.9|
|   15269.0|[2011-11-16 09:00...|             408.8|
+----------+--------------------+------------------+
only showing top 5 rows



In [11]:
spark.conf.set("spark.sql.shuffle.partitions",5)

지금 까지 동장 방식을 알아봤다. 이제 스트리밍 코드를 살펴보겠다. read메서드 대신 readStream 메서드를 사용하는 게 가장 큰 차이점이다.
그리고 maxFilePerTrigger옵션을 추가로 지정한다. 이 오션을 사용해 한 번 읽을 파일수를 설정할 수 있다.

In [12]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("csv")\
.option("header","true")\
.load("./data/retail-data/by-day/*.csv")

In [13]:
streamingDataFrame.isStreaming

True

In [15]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")

이 작업 역시 지연 연산이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출 해야 한다. 

여기서 사용할 스트리밍 액션은 **트리거**가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장한다. 

In [16]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa4a411a850>

In [17]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY 'sum(total_cost)' DESC
    """).show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12915.0|[2010-12-02 09:00...|199.64999999999998|
|   12763.0|[2010-12-05 09:00...|            320.08|
|   14708.0|[2010-12-06 09:00...|            419.06|
|   13136.0|[2010-12-09 09:00...|             -20.1|
|   14293.0|[2010-12-06 09:00...|364.91999999999985|
+----------+--------------------+------------------+
only showing top 5 rows



In [18]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa4a4415490>

## 3.4 머신러닝과 고급 분석

스파크가 인기 있는 또 다른 이유는 내장된 머신러닝 알고리즘 라이브러리인 MLilb을 사용해 대규모 머신러닝을 수행할 수 있기 때문이다.

In [19]:
streamingDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



DataFrame 트랜스포메이션을 사용해 날짜 데이터를 다루는 예제

In [20]:
from pyspark.sql.functions import date_format,col

In [21]:
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week",date_format(col("InvoiceDate"),"EEEE"))\
.coalesce(5)

In [22]:
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFram = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

In [23]:
trainDataFrame.count()


245903

In [24]:
testDataFram.count()

296006

스파크 Mlib는 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션을 제공한다. 그중 하나가 StringIndexer이다.

In [25]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_index")


날짜 oneHotEncoder로 나타내기

In [26]:
from pyspark.ml.feature import OneHotEncoder

In [27]:
encoder = OneHotEncoder()\
.setInputCol("day_of_index")\
.setOutputCol("day_of_week_encoder")

스파크의 모든 머리러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용한다.

In [28]:
from pyspark.ml.feature import VectorAssembler

In [37]:
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice","Quantity","day_of_week_encoder"])\
.setOutputCol("features")

In [30]:
from pyspark.ml import Pipeline

In [31]:
transformationPipeline = Pipeline()\
.setStages([indexer,encoder,vectorAssembler])

학습 준비 과정은 두 단계로 이루어진다. 변환자를 데이터셋에 적합 시켜야 하고, 기본적으로 StringIndexer는 인덱싱할 고윳값의 수를 알아야 한다.

In [32]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [None]:
transformatTraning = fittedPipeline.transform(trainDataFrame)

캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하므로 전체 파이프라인을 재실행하는 것보다 훨씬 빠르게 반복적으로 데이터셋에 접근할 수 있다.

모델 학습

In [78]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans()\
.setK(20)\
.setSeed(1)

스파크에서 머신러닝 모델을 삭습시키는 과정은 두 단계로 진행된다. 
* 1. 아직 학습되지 않은 모델 초기화
* 2. 해당 모델을 학습시킨다.

MLlib의 DataFrame API제공하는 모든 알고리즘
* 학습 전 알고리즘 명칭: Algorithm
* 학습 후 알고리즘 명칭: ALgorithmModel

## 3.5 저수준 API

스파크는 RDD를 통해 자바와 파이썬 객체를 다루는 데 필요한 다양한 기본 기능(저수준 API)을 제공한다. <br>
스파크의 거의 모든 기능은 RDD를 기반으로 만들어졌다. 

간단한 숫자를 이용해 병렬화해 RDD를 생성하는 예제이다

In [41]:
from pyspark.sql import Row

In [42]:
spark.sparkContext.parallelize([Row(1),Row(2),Row(3)]).toDF

<bound method _monkey_patch_RDD.<locals>.toDF of ParallelCollectionRDD[15193] at readRDDFromFile at PythonRDD.scala:262>

## 3.6 SparkR

SparkR은 스파크를 R 언어로 사용하기 위한 기능이다.

## 3.7 스파크의 에코시스템과 패키지

스파크의 장점은 커뮤티니가 만들어낸 패키지 에코시스템과 다양한 기능이다.

## 3.8 정리

이 장에서는 스파크를 비즈니스와 기술적 문제 해결에 적용할 수 있는 다양한 방법을 알아보았다.