### 설치 방법
- https://sparkdia.tistory.com/65
- https://spidyweb.tistory.com/199

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("sample").master("local[*]").getOrCreate()

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [4]:
#DataFrame 생성 및 한 개의 컬럼과 1000개의 로우로 구성
# 해당 숫자들은 분산 컬렉션을 나타냄
myRange=spark.range(1000).toDF("number")

In [5]:
myRange

number
0
1
2
3
4
5
6
7
8
9


In [6]:
#스키마 : 컬럼과 컬럼의 타입을 정의한 목록

## 스마크의 핵심 데이터 구조
- 불편성 : 한번 생성하면 변경할 수 없음
- 트랜스포메이션 : DataFrame을 변경하기 위해 사용하는 명령

In [7]:
divisBy2=myRange.where("number%2=0")
#코드를 실행해도 결과는 출력되지 않음
#추상적인 트랜스포메이션만 지정한 상태여서 액션을 호출하지 않으면 
#실제 트랜스포메이션을 수행하지 않음

### 트랜스포메이션의 두 가지 유형
- 좁은 의존성 : 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침
  - 하나의 파티션이 하나의 출력 파티션에만 영향을 미침
  - 스파크에서 파이프라이닝을 자동으로 수행함
    - 파이프라이닝이란? : 명령어를 순차적으로 실행하는 프로세서에 적용되는 기술로, 한 번에 하나의 명령어만 실행하는 것이 아니라 하나의 명령어가 실행되는 도중에 다른 명령어를 실행을 시작하는 식으로 동시에 여러 개의 명령어를 실행하는 기법
    - DataFrame에 여러 필터를 지정하는 경우, 모든 작업이 메모리에서 일어남
  - where 구문: 좁은 의존성을 가짐
- 넓은 의존성 : 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침
  - 셔플 : 스파크가 클러스터에서 파티션을 교환하는 것
  - 스파크의 경우, 셔플의 결과를 디스크에 저장함

In [8]:
divisBy2=myRange.where("number%2=0")

### 지연 연산
- 지연 연산 : 스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미
  - 스파크는 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성함
  - 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트랜스포메이션을 간결한 물리적 실행 계획을 컴파일함
  - 이 과정을 거치며 전체 데이터 흐름을 최적화함
  - ex) DataFrame의 조건절 푸시다운 : 필터를 데이터소스로 위임하는 최적화 작업 수행
  
### 액션
- 트랜스포메이션을 사용해 논리적 실행 계획을 세울 수 있음
- 하지만 실제 연산을 수행하려면 액션 명령을 내려야함
- 액션 : 일련의 트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령
  - count : DataFrame의 전체 레코드 수를 반환함
  - 세 가지 유형의 액션
    - 콘솔에서 데이터를 보는 액션
    - 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
    - 출력 데이터소스에 저장하는 액션
  - 액션을 지정하면 스파크 잡이 시작됨
- 스파크 잡(job) : 필터(좁은 트랜스포메이션)를 수행한 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션)함
  - 각 언어에 적합한 네이티브 객체에 결과를 모음
  - 스파크가 제공하는 스파크 UI로 클러스터에서 실행 중인 스파크 잡을 모니터링할 수 있음
  
  
### 스파크 UI
- 스파크 UI
  - 스파크 잡의 진행 상황을 모니터링할 때 사용함
  - 드라이버 노드 4040 포트로 접속할 수 있음
  - 로컬 모드 시 주소 : http://localhost:4040
  - 스파크 잡의 상태, 환경설정, 클러스터 상태 등의 정보를 확인할 수 있음

In [9]:
divisBy2.count()
#count 메서드 : DataFrame의 전체 레코드 수를 반환함

500

## 종합 예제
- 미국 교통통계국의 항공운항 데이터 중 일부


- 다양한 데이터 소스 지원
  - SparkSession의 DataFrameReader 클래스를 사용해서 읽음
  - 특정 파일 포맷과 몇 가지 옵션을 함께 설정함
  - 스키마 추론 기능과 파일의 첫 로우를 헤더로 지정하는 옵션도 함께 설정함
- 스키마 정보를 얻기 위해 데이터를 조금 읽음
  - 해당 로우의 데이터 타입을 스파크 데이터 타입에 맞게 분석함
  - 운영 환경에서는 데이터를 읽는 시점에 스키마를 엄격하게 지정하는 옵션을 사용해야 함

In [8]:
flightData2015=spark\
.read\
.option("inferSchma","true")\
.option("header","true")\
.csv("./Spark-The-Definitive-Guide-master/data/flight-data/csv/2015-summary.csv")

In [15]:
flightData2015.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [14]:
from pyspark.sql.types import *
# import org.apache.spark.sql.functions.col
# import org.apache.spark.sql.types.IntegerType
from pyspark.sql.functions import col
flightData2015.withColumn("string", col("count").cast("int"))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,string
United States,Romania,15,15
United States,Croatia,1,1
United States,Ireland,344,344
Egypt,United States,15,15
United States,India,62,62
United States,Singapore,1,1
United States,Grenada,62,62
Costa Rica,United States,588,588
Senegal,United States,40,40
Moldova,United States,1,1


- 스칼라와 파이썬에서 사용하는 DataFrame : 불특정 다수의 로우와 칼럼을 가짐
  - 로우의 수를 알 수 없는 이유 : 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문
  - 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽음
  - DataFrame에서 csv파일을 읽어 로컬 배열이나 리스트 형태로 변환하는 과정
    - csv파일 -> read -> DataFrame -> take(n) -> Array(row(....),row(....))

In [11]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count='15'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count='1'),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count='344')]

#### NOTE
- sort 메서드는 DataFrame을 변경하지 않음
  - 트랜스포메이션으로 sort 메서드를 사용하면 이전의 DataFrame을 변환해 새로운 DataFrmae을 생성해 반환함
  - sort 메서드는 단지 트랜스포메이션이기 때문에 호출 시 데이터에 아무런 변화도 일어나지 않음
    - 실행 계획을 만들고 검토하여 클러스터에서 처리할 방법을 알아냄
    - 특정 DataFrame 객체에 explain 메서드를 호출하면 DataFrame의 계보나 스파크의 쿼리 실행 계획을 확인할 수 있음

In [12]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#38 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#38 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#81]
   +- FileScan csv [DEST_COUNTRY_NAME#36,ORIGIN_COUNTRY_NAME#37,count#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/hwan0/Spark-The-Definitive-Guide-master/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:string>




- 실행 계획은 위에서 아래 방향으로 읽으며 최종 결과는 가장 위에, 데이터 소스는 가장 아래에 있음
  - 각 줄의 첫 번째 키워드(Sort, Exchange, FileScan) 주목
    - 특정 칼럼을 다른 컬럼과 비교하는 sort 메서드가 넓은 트랜스포메이션으로 동작하는 것을 볼 수 있음
    - 실행 계획은 디버깅과 스파크의 실행 과정을 이해하는 데 도움을 주는 도구일 뿐
- 트랜스포메이션 실행 계획을 시작하기 위해 액션을 호출함
  - 액션을 실행하려면 몇 가지 설정이 필요함
  - 스파크는 셔플 수행 시 기본적으로 200개의 셔플 파티션을 생성함
  - 이 값을 5로 설정해 셔플의 출력 파티션 수를 줄임

In [13]:
spark.conf.set("spark.sql.shuffle.partitions","5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count='1'),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count='1')]

- 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심 : 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있음
  - 함수형 프로그래밍 : 데이터의 변환 규칙이 일정한 경우 같은 입력에 대해 항상 같은 출력을 생성함
  
- 사용자는 물리적 데이터를 직접 다루지 않음
  - 대신 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어함
  - 앞서 셔플 파티션 수를 5로 설정했기 때문에 5개의 출력 파티션이 생성됨
  - 이 값을 변경하면 스파크 잡의 실제 실행 특성을 제어할 수 있음
  - 스파크 UI에 접속해 잡의 실행 상태와 스파크 잡의 물리적, 논리적 실행 특성을 확인할 수 있음

### DataFrame과 SQL
- 스파크는 언어에 상관없이 같은 방식으로 트랜스포메이션을 실행할 수 있음
  - 사용자가 SQL이나 DataFrame(R, 파이썬, 스칼라, 자바에서)으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획(explain 메서드를 호출해 실행 계획을 확인할 수 있음)으로 컴파일함
  - 스파크 SQL을 사용하면 모든 DataFrame을 테이블이나 뷰(임시 테이블)로 등록한 후 SQL쿼리를 사용할 수 있음
  - SQL 쿼리를 DataFrame 코드와 같은 실행 계획으로 컴파일하므로 둘 사이의 성능 차이는 없음
  - createOrReplaceTempView 메서드를 호출하면 모든 DataFrame을 테이블이나 뷰로 만들 수 있음

In [14]:
flightData2015.createOrReplaceTempView("flight_data_2015")

- 새로운 DataFrame을 반환하는 spark.sql 메서드로 SQL 쿼리를 실행함
  - spark는 SparkSession의 변수
  - DataFrame에 쿼리를 수행하면 새로운 DataFrame을 반환함
  - 로직을 작성할 때 반복적인 느낌이 들지만 매우 효율적임
  - 사용자는 어떤 상황에서든 가장 편리한 방식으로 트랜스포메이션을 지정할 수 있음

In [15]:
sqlWay=spark.sql("""
SELECT DEST_COUNTRY_NAME,COUNT(1)
FROM  flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay=flightData2015\
.groupby("DEST_COUNTRY_NAME")\
.count()
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#36, 5), ENSURE_REQUIREMENTS, [id=#110]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#36] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/hwan0/Spark-The-Definitive-Guide-master/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#36, 5), ENSURE_REQUIREMENTS, [id=#129]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#36] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/hwan0/Spark-The-Definitive-Guide-master/data/flight-dat

- 스파크는 빅데이터 문제를 빠르게 해결하는 데 필요한 수백 개의 함수를 제공함
  - max 함수 : DataFrame의 특정 컬럼 값을 스캔하면서 이전 최댓값보다 더 큰 값을 찾음
    - 필터링을 수행해 단일 로우를 결과로 반환하는 트랜스포메이션임

In [16]:
spark.sql("SELECT MAX(count) FROM flight_data_2015").take(1)

[Row(max(count)='986')]

In [17]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)='986')]

In [18]:
maxSql=spark.sql("""
SELECT DEST_COUNTRY_NAME,SUM(count) AS destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|         411352.0|
|           Canada|           8399.0|
|           Mexico|           7140.0|
|   United Kingdom|           2025.0|
|            Japan|           1548.0|
+-----------------+-----------------+



In [21]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)","destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

AnalysisException: "count" is not a numeric column. Aggregation function can only be applied on a numeric column.

- 지향성 비순환 그래프
  - 실행 계획 단계
  - 액션이 호출되면 결과를 만들어냄
  - 각 단계는 불편성을 가진 신규 DataFrame을 생성함
    1. 첫 번째 단계
       - 데이터를 읽음
       - 스파크는 해당 DataFrame이나 자신의 원본 DataFrame에 액션이 호출되기 전까지 데이터를 읽지 않음
    2. 두 번째 단계
       - 데이터를 그룹화함
       - groupBy 메서드 호출되면 최종적으로 그룹화된 DataFrame을 지칭하는 이름을 가진 RelationalGroupedDataset을 반환함
       - 기본적으로 키 혹은 키셋을 기준으로 그룹을 만들고 각 키에 대한 집계를 수행함
    3. 세 번째 단계
       - 집계 유형을 지정하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum 메서드를 사용함
       - sum 메서드 : 새로운 스키마 정보를 가지는 별도의 DataFrame을 생성함
       - 신규 스키마에는 새로 만들어진 각 컬럼의 데이터 타입 정보가 들어 있음
       - sum 메서드 역시 트랜스포메이션이므로 아무런 연산도 일어나지 않음
       - 새롭게 생성된 결과 스키마를 통해 타입 정보를 추적함
    4. 네 번째 단계
       - 컬럼명을 변경함
       - withColumnRenamed 메서드에 원본 컬럼명과 신규 컬럼명을 인수로 지정함(트랜스포메이션)
       - 여전히 연산은 일어나지 않음
    5. 다섯 번째 단계
       - 데이터를 정렬함
       - 결과 DataFrame의 첫 번째 로우를 확인해보면 destination_total 컬럼에서 가장 큰 값을 가짐
       - desc 함수
         - 역순으로 정렬하기 위해 import함
         - 문자열이 아닌 Column 객체를 반환함
         - DataFrame 메서드 중 대부분은 문자열 형태의 컬럼명, Column 타입 그리고 표현식을 파라미터로 사용함(Column과 표현식은 사실상 같은 것)
    6. 여섯 번째 단계
        - limit 메서드로 반환 결과의 수를 제한함
        - DataFrame의 전체 데이터 대신 상위 5개 로우를 반환함
    7. 액션을 수행함
        - DataFrame의 결과를 모으는 프로세스 시작
        - 처리가 끝나면 코드를 작성한 프로그래밍 언어에 맞는 리스트나 배열을 반환함
   - explain 메서드를 호출해 실행 계획 살필 수 있음

In [22]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)","destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

AnalysisException: "count" is not a numeric column. Aggregation function can only be applied on a numeric column.