# BIG DATA ANALYTICS PROGRAMMING : PySpark
### PySpark 맛보기

- JAVA 설치가 필요한 과정이므로, 문제가 있으신 분들은 다음 링크에서 작업해주세요!
- https://colab.research.google.com/drive/13mngg3qXnBKm-pH82o6Dfw3HMikUN3p2#offline=true&sandboxMode=true

---

In [None]:
import sys
!{sys.executable} -m pip install pyspark

In [None]:
# PYSPARK를 활용하기 위한 관련 설정
import os
import sys

os.environ["PYSPARK_PYTHON"]=sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"]=sys.executable


## RDD 활용하기
- Resilient Disributed Data

In [None]:
# pyspark 임포트
from pyspark import SparkContext

In [None]:
# Spark context를 활용해 RDD를 생성 할 수 있다
sc = SparkContext()

### 테스트 파일 생성

In [None]:
%%writefile example.txt
first line
second line
third line
fourth line

### RDD 기본 동작

In [None]:
textFile = sc.textFile('example.txt')

In [None]:
textFile

### Line 수 세기

In [None]:
textFile.count()

### 첫번째 줄 출력

In [None]:
textFile.first()

## 특정 text를 포함하는 데이터 출력

In [None]:
secfind = textFile.filter(lambda line: 'second' in line)

In [None]:
# RDD, 아직까지 어떠한 연산도 이루어지지 않은 상태입니다!
secfind

In [None]:
# 이때 연산 시작
secfind.collect()

In [None]:
# 이때 연산 시작
secfind.count()

## RDD에서의 전처리

In [None]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

In [None]:
text_rdd = sc.textFile('example2.txt')

In [None]:
text_rdd.collect()

### Map과 Flatmap의 차이

In [None]:
text_rdd.map(lambda line: line.split()).collect()

In [None]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

### CSV 파일 전처리

In [None]:
rdd = sc.textFile('data.csv')

In [None]:
rdd.take(2)

In [None]:
rdd.map(lambda x: x.split(",")).take(3)

In [None]:
rdd.map(lambda x: x.replace(" ","_")).collect()

In [None]:
rdd.map(lambda x: x.replace(" ","_")).map(lambda x: x.replace("'","_")).collect()

In [None]:
rdd.map(lambda x: x.replace(" ","_")).map(lambda x: x.replace("'","_")).map(lambda x: x.replace("/","_")).collect()

In [None]:
clean_rdd = rdd.map(lambda x: x.replace(" ","_").replace("'","_").replace("/","_").replace('"',""))

In [None]:
clean_rdd.collect()

In [None]:
clean_rdd = clean_rdd.map(lambda x: x.split(","))

In [None]:
clean_rdd.collect()

### Group BY 구현

In [None]:
clean_rdd.map(lambda lst: (lst[0],lst[-1])).collect()

In [None]:
# 첫번째 원소(lst[0])를 키로 인지
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
         .reduceByKey(lambda value1,value2 : value1+value2)\
         .collect()

In [None]:
# 올바른 연산을 위해 Float으로 캐스팅
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
         .reduceByKey(lambda value1,value2 : float(value1)+float(value2))\
         .collect()

In [None]:
# 최종 코드
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
.reduceByKey(lambda value1,value2 : float(value1)+float(value2))\
.filter(lambda x: not x[0]=='gender')\
.collect()

In [None]:
# 평균은 어떻게 구할 수 있을까?

clean_rdd.map(lambda lst: (lst[0],(lst[-1],1)))\
.reduceByKey(lambda value1, value2 : ((float(value1[0])+float(value2[0])), int(value1[1])+int(value2[1])))\
.filter(lambda x: not x[0]=='gender')\
.mapValues(lambda x: x[0]/x[1])\
.collect()

## DataFrame 활용하기

In [None]:
from pyspark.sql import SparkSession

appName = "Python Example - PySpark Read CSV"
master = 'local'

# Create Spark session
spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

# Convert list to data frame
df = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('data.csv')
df.show()
print(f'Record count is: {df.count()}')

In [None]:
df.columns

In [None]:
df.describe()

In [None]:
df.select('gender').show()

In [None]:
df.select('gender').distinct().show()

In [None]:
df.select('race/ethnicity').distinct().show()

In [None]:
from pyspark.sql import functions as F
df.groupBy("gender").agg(F.mean('writing score'), F.mean('math score')).show()