## 1. 스파크 클러스터 접속

Spark Context 생성시켜 `sc`변수에 지정하고 이를 연결지점으로 스파크 클러스터에 접속한다.

In [38]:
import findspark
findspark.init()
import pyspark

sc = pyspark.SparkContext()

## 2. `sc.parallelize()`로 RDD 변환
파이썬 리스트를 `sc.parallelize()` 함수로 불러온다.

In [7]:
# 숫자 리스트를 바탕으로 RDD 객체 생성
list_rdd = sc.parallelize([1,2,3,4,5,6])

## 3. `.map()` 함수로 자료변환
`.map()` 함수를 사용해서 RDD 객체 각 원소에 대해서 연산작업을 수행한다; 각 원소를 제곱한다.

In [8]:
list_squared_rdd = list_rdd.map(lambda x: x**2)
list_squared_list = list_squared_rdd.collect()

for element in list_squared_list:
    print("원소를 제곱한 값: ", element)

원소를 제곱한 값:  1
원소를 제곱한 값:  4
원소를 제곱한 값:  9
원소를 제곱한 값:  16
원소를 제곱한 값:  25
원소를 제곱한 값:  36


## 2. `sc.readText()` 데이터 불러오기
`iris.csv` 외부 `.csv` 데이터를  `sc.textFile()` 함수로 불러와서 RDD 객체로 변환시킨다.
람다 무명함수로 `.filter`를 걸어 "setosa"가 포함된 모든 행을 뽑아내서 `iris_setosa_rdd` RDD 객체를 생성시킨다.
그리고 나서 `.count()` 함수로 "setosa"가 포함된 행을 센다. `.take()` 함수로 스파크 클러스터에서 빼내서 `for`문을 돌려 
"setosa"가 포함된 7줄을 뽑아내서 출력시킨다.

In [9]:
iris_rdd = sc.textFile("../data/iris.csv")

# "setosa" 품종이 포함된 행만 필터를 걸어서 추출함.
iris_setosa_rdd = iris_rdd.filter(lambda species: "setosa" in species)

# "setosa" 품종이 포함된 행수를 개수함.
print("IRIS 데이터 setosa 품종수: ", iris_setosa_rdd.count())

# 첫 7행을 화면에 출력시킴
for species in iris_setosa_rdd.take(7): 
  print(species)

IRIS 데이터 setosa 품종수:  50
5.1,3.5,1.4,0.2,setosa
4.9,3.0,1.4,0.2,setosa
4.7,3.2,1.3,0.2,setosa
4.6,3.1,1.5,0.2,setosa
5.0,3.6,1.4,0.2,setosa
5.4,3.9,1.7,0.4,setosa
4.6,3.4,1.4,0.3,setosa


## 4. 키-값 자료구조 

### 4.1. 파이썬 딕셔너리 활용
튜플 리스트로 데이터가 준비된 경우 각 정당별(key) 득표수(value)에 대한 합을 구하는 경우, 
스파크 클러스터를 활용하지 않고 데이터가 작은 경우 `defaultdict()` 함수를 활용해서 계산하는 것이 가능하다.

In [31]:
from collections import defaultdict

# 튜플 리스트
party_list = [('민주당',1), ('바른미래',2), ('자유한국',3), ('민주당',30), ('바른미래',20), ('자유한국',10)]

party_dict = defaultdict(int)

# 정당별 합을 구하는 과정
for party, vote in party_list:
    party_dict[party] += vote

# 정당별 합을 출력
for party, vote in party_dict.items():
    print(party, ":", vote)

민주당 : 31
바른미래 : 22
자유한국 : 13


### 4.2. 스파크 `reduceByKey()` 함수
`reduceByKey()`함수를 사용해서 키값을 기준으로 값(Value)에 대한 연산작업을 수월히 수행할 수 있다.

In [26]:
party_rdd = sc.parallelize(party_list)

# 정당별 총합 계산
party_total_rdd = party_rdd.reduceByKey(lambda x, y: x+y).sortByKey(ascending=True)

# 정당별 총합을 계산하고 내림차순으로 정렬
for num in party_total_rdd.collect():
  print(num)

('민주당', 31)
('바른미래', 22)
('자유한국', 13)


### 4.3. 판다스 데이터프레임 변환 
튜플 리스트를 `pd.DataFrame()` 함수로 판다스 데이터프레임으로 변환을 시킨 후에 `groupby()` 연산을 사용해서 정당별
득표수 합을 구한다.

In [36]:
import pandas as pd

party_df = pd.DataFrame(party_list, columns=['party', 'vote'])

party_df.groupby('party').sum()

Unnamed: 0_level_0,vote
party,Unnamed: 1_level_1
민주당,31
바른미래,22
자유한국,13
