# BIG DATA ANALYTICS PROGRAMMING : PySpark
### PySpark 맛보기
---

In [5]:
import sys
!{sys.executable} -m pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 10.4 MB/s eta 0:00:011   |██████████████                  | 88.9 MB 12.2 MB/s eta 0:00:10     |██████████████████████████▌     | 169.4 MB 12.1 MB/s eta 0:00:03
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 11.8 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612244 sha256=28ff896afa1fde1de08c06e1cb91b2cb34ea0be83d0dcc8e178de48e7b544ab5
  Stored in directory: /Users/jungwons/Library/Caches/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [6]:
# PYSPARK를 활용하기 위한 관련 설정
import os
import sys

os.environ["PYSPARK_PYTHON"]=sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"]=sys.executable


## RDD 활용하기
- Resilient Disributed Data

In [7]:
# pyspark 임포트
from pyspark import SparkContext

In [9]:
# Spark context를 활용해 RDD를 생성 할 수 있다
sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-8-2dfc28fca47d>:1 

### 테스트 파일 생성

In [10]:
%%writefile example.txt
first line
second line
third line
fourth line

Overwriting example.txt


### RDD 기본 동작

In [11]:
textFile = sc.textFile('example.txt')

In [12]:
textFile

example.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

### Line 수 세기

In [13]:
textFile.count()

4

### 첫번째 줄 출력

In [14]:
textFile.first()

'first line'

## 특정 text를 포함하는 데이터 출력

In [21]:
secfind = textFile.filter(lambda line: 'second' in line)

In [22]:
# RDD, 아직까지 어떠한 연산도 이루어지지 않은 상태입니다!
secfind

PythonRDD[7] at RDD at PythonRDD.scala:53

In [24]:
# 이때 연산 시작
secfind.collect()

['second line']

In [25]:
# 이때 연산 시작
secfind.count()

1

## RDD에서의 전처리

In [26]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Overwriting example2.txt


In [27]:
text_rdd = sc.textFile('example2.txt')

In [29]:
text_rdd.collect()

['first ', 'second line', 'the third line', 'then a fourth line']

### Map과 Flatmap의 차이

In [28]:
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [24]:
# Collect everything as a single flat map
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

### CSV 파일 전처리

In [25]:
rdd = sc.textFile('data.csv')

In [26]:
rdd.take(2)

['"gender","race/ethnicity","parental level of education","lunch","test preparation course","math score","reading score","writing score"',
 '"female","group B","bachelor\'s degree","standard","none","72","72","74"']

In [28]:
rdd.map(lambda x: x.split(",")).take(3)

[['"gender"',
  '"race/ethnicity"',
  '"parental level of education"',
  '"lunch"',
  '"test preparation course"',
  '"math score"',
  '"reading score"',
  '"writing score"'],
 ['"female"',
  '"group B"',
  '"bachelor\'s degree"',
  '"standard"',
  '"none"',
  '"72"',
  '"72"',
  '"74"'],
 ['"female"',
  '"group C"',
  '"some college"',
  '"standard"',
  '"completed"',
  '"69"',
  '"90"',
  '"88"']]

In [30]:
rdd.map(lambda x: x.replace(" ","_")).collect()

['"gender","race/ethnicity","parental_level_of_education","lunch","test_preparation_course","math_score","reading_score","writing_score"',
 '"female","group_B","bachelor\'s_degree","standard","none","72","72","74"',
 '"female","group_C","some_college","standard","completed","69","90","88"',
 '"female","group_B","master\'s_degree","standard","none","90","95","93"',
 '"male","group_A","associate\'s_degree","free/reduced","none","47","57","44"',
 '"male","group_C","some_college","standard","none","76","78","75"',
 '"female","group_B","associate\'s_degree","standard","none","71","83","78"',
 '"female","group_B","some_college","standard","completed","88","95","92"',
 '"male","group_B","some_college","free/reduced","none","40","43","39"',
 '"male","group_D","high_school","free/reduced","completed","64","64","67"',
 '"female","group_B","high_school","free/reduced","none","38","60","50"',
 '"male","group_C","associate\'s_degree","standard","none","58","54","52"',
 '"male","group_D","associate\

In [31]:
rdd.map(lambda x: x.replace(" ","_")).map(lambda x: x.replace("'","_")).collect()

['"gender","race/ethnicity","parental_level_of_education","lunch","test_preparation_course","math_score","reading_score","writing_score"',
 '"female","group_B","bachelor_s_degree","standard","none","72","72","74"',
 '"female","group_C","some_college","standard","completed","69","90","88"',
 '"female","group_B","master_s_degree","standard","none","90","95","93"',
 '"male","group_A","associate_s_degree","free/reduced","none","47","57","44"',
 '"male","group_C","some_college","standard","none","76","78","75"',
 '"female","group_B","associate_s_degree","standard","none","71","83","78"',
 '"female","group_B","some_college","standard","completed","88","95","92"',
 '"male","group_B","some_college","free/reduced","none","40","43","39"',
 '"male","group_D","high_school","free/reduced","completed","64","64","67"',
 '"female","group_B","high_school","free/reduced","none","38","60","50"',
 '"male","group_C","associate_s_degree","standard","none","58","54","52"',
 '"male","group_D","associate_s_deg

In [32]:
rdd.map(lambda x: x.replace(" ","_")).map(lambda x: x.replace("'","_")).map(lambda x: x.replace("/","_")).collect()

['"gender","race_ethnicity","parental_level_of_education","lunch","test_preparation_course","math_score","reading_score","writing_score"',
 '"female","group_B","bachelor_s_degree","standard","none","72","72","74"',
 '"female","group_C","some_college","standard","completed","69","90","88"',
 '"female","group_B","master_s_degree","standard","none","90","95","93"',
 '"male","group_A","associate_s_degree","free_reduced","none","47","57","44"',
 '"male","group_C","some_college","standard","none","76","78","75"',
 '"female","group_B","associate_s_degree","standard","none","71","83","78"',
 '"female","group_B","some_college","standard","completed","88","95","92"',
 '"male","group_B","some_college","free_reduced","none","40","43","39"',
 '"male","group_D","high_school","free_reduced","completed","64","64","67"',
 '"female","group_B","high_school","free_reduced","none","38","60","50"',
 '"male","group_C","associate_s_degree","standard","none","58","54","52"',
 '"male","group_D","associate_s_deg

In [42]:
clean_rdd = rdd.map(lambda x: x.replace(" ","_").replace("'","_").replace("/","_").replace('"',""))

In [43]:
clean_rdd.collect()

['gender,race_ethnicity,parental_level_of_education,lunch,test_preparation_course,math_score,reading_score,writing_score',
 'female,group_B,bachelor_s_degree,standard,none,72,72,74',
 'female,group_C,some_college,standard,completed,69,90,88',
 'female,group_B,master_s_degree,standard,none,90,95,93',
 'male,group_A,associate_s_degree,free_reduced,none,47,57,44',
 'male,group_C,some_college,standard,none,76,78,75',
 'female,group_B,associate_s_degree,standard,none,71,83,78',
 'female,group_B,some_college,standard,completed,88,95,92',
 'male,group_B,some_college,free_reduced,none,40,43,39',
 'male,group_D,high_school,free_reduced,completed,64,64,67',
 'female,group_B,high_school,free_reduced,none,38,60,50',
 'male,group_C,associate_s_degree,standard,none,58,54,52',
 'male,group_D,associate_s_degree,standard,none,40,52,43',
 'female,group_B,high_school,standard,none,65,81,73',
 'male,group_A,some_college,standard,completed,78,72,70',
 'female,group_A,master_s_degree,standard,none,50,53,58'

In [44]:
clean_rdd = clean_rdd.map(lambda x: x.split(","))

In [45]:
clean_rdd.collect()

[['gender',
  'race_ethnicity',
  'parental_level_of_education',
  'lunch',
  'test_preparation_course',
  'math_score',
  'reading_score',
  'writing_score'],
 ['female',
  'group_B',
  'bachelor_s_degree',
  'standard',
  'none',
  '72',
  '72',
  '74'],
 ['female',
  'group_C',
  'some_college',
  'standard',
  'completed',
  '69',
  '90',
  '88'],
 ['female',
  'group_B',
  'master_s_degree',
  'standard',
  'none',
  '90',
  '95',
  '93'],
 ['male',
  'group_A',
  'associate_s_degree',
  'free_reduced',
  'none',
  '47',
  '57',
  '44'],
 ['male', 'group_C', 'some_college', 'standard', 'none', '76', '78', '75'],
 ['female',
  'group_B',
  'associate_s_degree',
  'standard',
  'none',
  '71',
  '83',
  '78'],
 ['female',
  'group_B',
  'some_college',
  'standard',
  'completed',
  '88',
  '95',
  '92'],
 ['male', 'group_B', 'some_college', 'free_reduced', 'none', '40', '43', '39'],
 ['male',
  'group_D',
  'high_school',
  'free_reduced',
  'completed',
  '64',
  '64',
  '67'],
 [

### Group BY 구현

In [46]:
clean_rdd.map(lambda lst: (lst[0],lst[-1])).collect()

[('gender', 'writing_score'),
 ('female', '74'),
 ('female', '88'),
 ('female', '93'),
 ('male', '44'),
 ('male', '75'),
 ('female', '78'),
 ('female', '92'),
 ('male', '39'),
 ('male', '67'),
 ('female', '50'),
 ('male', '52'),
 ('male', '43'),
 ('female', '73'),
 ('male', '70'),
 ('female', '58'),
 ('female', '78'),
 ('male', '86'),
 ('female', '28'),
 ('male', '46'),
 ('female', '61'),
 ('male', '63'),
 ('female', '70'),
 ('male', '53'),
 ('female', '73'),
 ('male', '80'),
 ('male', '72'),
 ('male', '55'),
 ('female', '75'),
 ('male', '65'),
 ('female', '75'),
 ('female', '74'),
 ('female', '61'),
 ('female', '65'),
 ('male', '38'),
 ('male', '82'),
 ('male', '79'),
 ('female', '83'),
 ('female', '59'),
 ('female', '88'),
 ('male', '57'),
 ('male', '54'),
 ('female', '68'),
 ('female', '65'),
 ('male', '66'),
 ('female', '54'),
 ('male', '57'),
 ('female', '62'),
 ('female', '76'),
 ('female', '76'),
 ('male', '82'),
 ('male', '48'),
 ('male', '68'),
 ('male', '42'),
 ('male', '75')

In [47]:
# 첫번째 원소(lst[0])를 키로 인지
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('gender', 'writing_score'),
 ('female',
  '7488937892507358782861707375757461658359886865546276768743861071597457734872685082887467827492706262897210070729854100827961658993865184677164543381668878878591100787870817054875877621006647708769668555768677687666608839746273547194837483556862837070687881775190817795705871937580847846827581836752806975916384798053439451956627606374759569805770706989595832588570967382100778345437570676477776395675510062687756747993646780627050798069767785977462891007691913885907484619183705654747670909068527668829292548087629485847378798443477073539483627989669764503379707974801007856647189586896807880777673626565739972738163308082546265667493723854625581858188739079808165685581769874796758696760718768768478667676744410053788173568690707982726764588599748799747088778480945773568272598592656454637277757279557043825784826244773261607051738157958778754067836468889293825270768157898974797346366468639396815387807661807470715480957491857369387982744110084917282666

In [48]:
# 올바른 연산을 위해 Float으로 캐스팅
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('gender', 'writing_score'), ('female', 37538.0), ('male', 30516.0)]

In [49]:
# 최종 코드
clean_rdd.map(lambda lst: (lst[0],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='gender')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()

[('female', 37538.0), ('male', 30516.0)]

## DataFrame 활용하기

In [59]:
from pyspark.sql import SparkSession

appName = "Python Example - PySpark Read CSV"
master = 'local'

# Create Spark session
spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

# Convert list to data frame
df = spark.read.format('csv') \
                .option('header',True) \
                .option('multiLine', True) \
                .load('data.csv')
df.show()
print(f'Record count is: {df.count()}')

+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|     

In [60]:
df.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [62]:
df.describe()

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]

In [64]:
df.select('gender').show()

+------+
|gender|
+------+
|female|
|female|
|female|
|  male|
|  male|
|female|
|female|
|  male|
|  male|
|female|
|  male|
|  male|
|female|
|  male|
|female|
|female|
|  male|
|female|
|  male|
|female|
+------+
only showing top 20 rows



In [66]:
df.select('gender').distinct().show()

+------+
|gender|
+------+
|female|
|  male|
+------+



In [67]:
df.select('race/ethnicity').distinct().show()

+--------------+
|race/ethnicity|
+--------------+
|       group B|
|       group C|
|       group D|
|       group A|
|       group E|
+--------------+



In [71]:
from pyspark.sql import functions as F
df.groupBy("gender").agg(F.mean('writing score'), F.mean('math score')).show()

+------+------------------+------------------+
|gender|avg(writing score)|   avg(math score)|
+------+------------------+------------------+
|female| 72.46718146718146|63.633204633204635|
|  male| 63.31120331950208| 68.72821576763485|
+------+------------------+------------------+

