# 스파크 기초

## 라이브러리 설치 

In [1]:
#pip install pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
plt.rc('font', family='Malgun Gothic')
plt.rcParams['axes.unicode_minus'] = False

# Pyspark - SQL
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, Row, SparkSession
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

# Pyspark - ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

#sc = SparkContext.getOrCreate() 

## 스파크 세션 실행

In [2]:
spark = SparkSession.builder\
        .appName('Spark_Guide')\
        .getOrCreate()

### 스파크 세션으로 간단한 데이터 프레임 생성

In [3]:
myRange = spark.range(1000).toDF("number")

### 데이터프레임 트랜스포메이션 (DF 변경)

In [4]:
divisBy2 = myRange.where("number % 2 = 0")

### 액션 (연산 수행)
- 1000 에 "% 2값이 0인 연산 실행"

In [5]:
divisBy2.count()

500

### 예제 활용
- 링크 : https://github.com/FVBros/Spark-The-Definitive-Guide/tree/master/data/flight-data/csv/2015-summary.csv
- 스파크는 세션,파일 등을 생성하거나 불러올때 \와 "." 으로 구분

In [30]:
flightData2015 = spark\
    .read\
    .option('inferSchema', "true")\
    .option("header", 'true')\
    .csv("data/flight-data/csv/2015-summary.csv")

In [None]:
help(spark)

In [41]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [42]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [43]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#146, 200), ENSURE_REQUIREMENTS, [id=#210]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#146] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/REDTABLE/OneDrive/KyungJun/data/flight-data/csv/2015-su..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#146, 200), ENSURE_REQUIREMENTS, [id=#223]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#146] Batched: false, DataFilters: [], Format: CSV, Location: 

In [44]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [50]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [47]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#228L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#146,destination_total#228L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[sum(count#148)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#146, 200), ENSURE_REQUIREMENTS, [id=#391]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#146], functions=[partial_sum(count#148)])
            +- FileScan csv [DEST_COUNTRY_NAME#146,count#148] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/REDTABLE/OneDrive/KyungJun/data/flight-data/csv/2015-su..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




# 3장 스파크 기능 둘러보기

In [52]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [None]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .show(5)

# 책

In [33]:
my_collection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
    .split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

supplementalData = {"Spark" : 1000, "Definitive":200,
                    "Big":-300, "Simple":100}
suppBroadcast = spark.sparkContext.broadcast(supplementalData)
suppBroadcast.value

words.map(lambda word : (word, suppBroadcast.value.get(word, 0)))\
    .sortBy(lambda wordPair : wordPair[1])\
    .collect()