# 3.3 구조적 스트리밍

In [10]:
staticDataFrame = spark.read.format('csv')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.load("./ybigta_de/input/sparkData/retail-data/by-day/*.csv")

In [11]:
staticDataFrame.createOrReplaceTempView('retail_data')
staticSchema = staticDataFrame.schema #스키마도 함께 생성

결과가 교재랑 다른데, 코드와 데이터가 완전 똑같은데 왜 다른지 도저히 모르겠습니다.
(교재 gitHub에서 복붙해서 실행도 해봤는데 똑같았습니다.)

In [12]:
# 윈도우 함수: 집계 시 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우 구성

from pyspark.sql.functions import window, col

# 특정 고객(customerID)이 대량으로 구매하는 영업 시간
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 00:00...|            -37.6|
|   14126.0|[2011-11-29 00:00...|643.6300000000001|
|   13500.0|[2011-11-16 00:00...|497.9700000000001|
|   17160.0|[2011-11-08 00:00...|516.8499999999999|
|   15608.0|[2011-11-11 00:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [13]:
# 스트리밍
# read -> readStream
# maxFilesPerTrigger

streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option('maxFilesPerTrigger', 1)\
    .format('csv')\
    .option('header', 'true')\
    .load('./ybigta_de/input/sparkData/retail-data/by-day/*.csv')

In [14]:
streamingDataFrame.isStreaming

True

In [15]:
# 총 판매 금액 계산

purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        "CustomerID",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum('total_cost')
# 지연연산이므로 출력 x 액션 호출해야 함

In [19]:
purchaseByCustomerPerHour.writeStream\
    .format('memory')\
    .queryName('customer_purchases')\
    .outputMode('complete')\
    .start()
# 인메모리 테이블에 저장, 인메모리 테이블명, 모든 카운트 수행 결과 저장

<pyspark.sql.streaming.StreamingQuery at 0x7f6fa008cb50>

In [20]:
spark.sql("""
    SELECT * FROM customer_purchases ORDER BY `sum(total_cost)` DESC
""").show()

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   18102.0|[2010-12-07 00:00...|          25920.37|
|      null|[2010-12-06 00:00...|23395.099999999904|
|      null|[2010-12-03 00:00...| 23021.99999999999|
|      null|[2010-12-01 00:00...|12584.299999999988|
|   15061.0|[2010-12-02 00:00...| 9407.339999999998|
|   13777.0|[2010-12-01 00:00...|           6585.16|
|   17850.0|[2010-12-02 00:00...|3891.8699999999985|
|   16029.0|[2010-12-01 00:00...|           3702.12|
|   17381.0|[2010-12-06 00:00...|           2567.64|
|   13089.0|[2010-12-06 00:00...|2496.2200000000003|
|   16210.0|[2010-12-01 00:00...|2474.7399999999993|
|   13081.0|[2010-12-03 00:00...|           2366.78|
|   16210.0|[2010-12-06 00:00...|2263.7999999999993|
|   17450.0|[2010-12-07 00:00...|           2028.84|
|   15061.0|[2010-12-07 00:00...|           2022.16|
|   16754.0|[2010-12-02 00:00...|            2

In [21]:
# 처리 결과 콘솔 출력

purchaseByCustomerPerHour.writeStream\
    .format('console')\
    .queryName('customer_purchases_2')\
    .outputMode('complete')\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f6fa009ec10>

# 3.4 머신러닝과 고급 분석

In [23]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [24]:
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn('day_of_week', date_format(col('InvoiceDate'), 'EEEE'))\
    .coalesce(5)

In [25]:
# train과 test로 분리

trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [26]:
# 액션 호출하여 데이터 분리

trainDataFrame.count()
testDataFrame.count()

296006

In [27]:
# spark MLlib은 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션 제공

from pyspark.ml.feature import StringIndexer

# 요일을 수치형을 반환 (토-6, 월-1)
indexer = StringIndexer()\
    .setInputCol('day_of_week')\
    .setOutputCol('day_of_week_index')

In [34]:
# 요일을 대소비교 가능한 숫자로 표현하는 것은 문제가 될 수 있어 OneHotEncoder로 불리언 값으로 표현

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
    .setInputCol('day_of_week_index')\
    .setOutputCol('day_of_week_encoded')

# 벡터 타입을 구성할 컬럼 중 하나로 사용
# 스파크의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용

In [35]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(['UnitPrice', 'Quantity' ,'day_of_week_encoded'])\
    .setOutputCol('features')

In [36]:
#  나중에 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인 설정

from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [37]:
# transformer fit

fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [38]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [40]:
# 동일한 트랜스포메이션을 계속 반복할 수 없으므로 모델에 일부 하이퍼파라미터 튜닝값 제공
# 캐싱을 사용해 중간 변환된 데이터셋의 복사본을 메모리에 저장해 전체 파이프라인을 재실행하는것보다 빠르게 접근

transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [44]:
# 모델 학습 위해 클래스 임포트 & 인스턴스 생성

from pyspark.ml.clustering import KMeans

kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [45]:
# 모델 학습
kmModel = kmeans.fit(transformedTraining)

In [46]:
# 비용 계산
kmModel.computeCost(transformedTraining)

transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTraining)

97324473.83355041

# 3.5 저수준 API

In [None]:
# 숫자를 병렬화해 RDD 생성
from pyspark.sql import Row

spark.sparkContex