<h1>Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#SparkContext를-통한-스파크-초기화" data-toc-modified-id="SparkContext를-통한-스파크-초기화-1">SparkContext를 통한 스파크 초기화</a></span><ul class="toc-item"><li><span><a href="#Code-1" data-toc-modified-id="Code-1-1.1">Code 1</a></span></li><li><span><a href="#Code-2" data-toc-modified-id="Code-2-1.2">Code 2</a></span></li><li><span><a href="#Code-3" data-toc-modified-id="Code-3-1.3">Code 3</a></span></li></ul></li><li><span><a href="#RDD-Creation" data-toc-modified-id="RDD-Creation-2">RDD Creation</a></span><ul class="toc-item"><li><span><a href="#parallelize()" data-toc-modified-id="parallelize()-2.1">parallelize()</a></span></li><li><span><a href="#textFile()" data-toc-modified-id="textFile()-2.2">textFile()</a></span></li></ul></li><li><span><a href="#RDD-Operation-(1)-Transformations" data-toc-modified-id="RDD-Operation-(1)-Transformations-3">RDD Operation (1) Transformations</a></span><ul class="toc-item"><li><span><a href="#map" data-toc-modified-id="map-3.1">map</a></span></li><li><span><a href="#filter" data-toc-modified-id="filter-3.2">filter</a></span></li><li><span><a href="#flatmap" data-toc-modified-id="flatmap-3.3">flatmap</a></span></li><li><span><a href="#CSV-파일-읽어들이기" data-toc-modified-id="CSV-파일-읽어들이기-3.4">CSV 파일 읽어들이기</a></span></li></ul></li><li><span><a href="#RDD-Operation-(2)-Actions" data-toc-modified-id="RDD-Operation-(2)-Actions-4">RDD Operation (2) Actions</a></span><ul class="toc-item"><li><span><a href="#collect" data-toc-modified-id="collect-4.1">collect</a></span></li><li><span><a href="#take" data-toc-modified-id="take-4.2">take</a></span></li><li><span><a href="#count" data-toc-modified-id="count-4.3">count</a></span></li><li><span><a href="#reduce" data-toc-modified-id="reduce-4.4">reduce</a></span></li><li><span><a href="#saveAsTextFile" data-toc-modified-id="saveAsTextFile-4.5">saveAsTextFile</a></span></li></ul></li><li><span><a href="#MapReduce-실습" data-toc-modified-id="MapReduce-실습-5">MapReduce 실습</a></span><ul class="toc-item"><li><span><a href="#RDD-전체-과정-요약" data-toc-modified-id="RDD-전체-과정-요약-5.1">RDD 전체 과정 요약</a></span></li><li><span><a href="#Word-Counter-구현하기" data-toc-modified-id="Word-Counter-구현하기-5.2">Word Counter 구현하기</a></span></li><li><span><a href="#Titanic-데이터-분석하기" data-toc-modified-id="Titanic-데이터-분석하기-5.3">Titanic 데이터 분석하기</a></span></li></ul></li></ul></div>

# SparkContext를 통한 스파크 초기화
## Code 1

In [1]:
import pyspark
pyspark.__version__

'3.1.1'

In [2]:
from pyspark import SparkConf, SparkContext

sc = SparkContext()
sc

In [3]:
type(sc)

pyspark.context.SparkContext

In [4]:
# 실수로 한 개 더 만들면 에러 발생
new_sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-2-09e2fe521565>:3 

In [5]:
sc.stop()

## Code 2
SparkContext의 Configuration을 세팅할 수 있습니다.

In [6]:
sc = SparkContext(master='local', appName='PySpark Basic')
sc

In [7]:
# 생성한 SparkContext의 Configuration 확인
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.name', 'PySpark Basic'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '172.30.1.6'),
 ('spark.driver.port', '36181'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.app.startTime', '1615364092491'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1615364092569'),
 ('spark.ui.showConsoleProgress', 'true')]

In [8]:
sc.master

'local'

In [9]:
sc.appName

'PySpark Basic'

In [10]:
sc.stop()

## Code 3
- SparkConf()를 이용해 SparkContext의 Configuration을 설정하는 방법을 사용해서 SparkContext를 만들 수 있습니다.
- .setMaster(), .setAppName()을 이용해 어플리케이션의 이름과 Master의 URL을 설정해 줄 수 있습니다.

In [11]:
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')
sc = SparkContext(conf=conf)
sc

# RDD Creation
## parallelize()
방금 전 만든 SparkContext()의 parallelize()함수를 이용해서 내부의 데이터 집합을 RDD로 만들 수 있습니다.

In [12]:
rdd = sc.parallelize([1,2,3])
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [13]:
type(rdd)

pyspark.rdd.RDD

RDD는 생성과 transformations 연산을 바로 수행하지 않습니다. 이 단계에서는 연산을 하고 있진 않고 계보(lineage)만 만들어 놓고, Actions 동작을 할 때 RDD가 비로소 만들어집니다. 이를 느긋한 계산법이라 합니다. 따라서 지금은 RDD를 생성했지만 RDD가 만들어지지는 않았습니다. Actions를 해봅시다.

In [14]:
rdd.take(3)

[1, 2, 3]

## textFile()
.textFile()함수를 이용해 외부 파일을 로드하여 RDD를 만들 수 있습니다.

In [15]:
import os

file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
    for i in range(10):
        f.write(str(i)+'\n')
        
print('OK')

OK


In [16]:
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))

/home/ssac21/aiffel/bigdata_ecosystem/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>


In [17]:
rdd2.take(3)

['0', '1', '2']

spark는 .textFile()을 통해 얻어온 데이터 타입을 무조건 string으로 처리하기 때문에 숫자를 입력해도 문자열의 list가 얻어진다.

# RDD Operation (1) Transformations
## map
x의 모든 원소에 대해 map함수를 적용한 결과는 y값입니다. 따라서 x와 y의 원소 개수는 같습니다.

In [18]:
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다. 
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


In [19]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x: x*x)
print(squares.collect())

[1, 4, 9]


## filter
filter연산은 어떤 조건을 만족하는 값만을 반환합니다. 따라서 조건문이 들어가야 하며, x와 y의 원소의 개수는 같지 않을 수 있습니다.

In [20]:
x = sc.parallelize([1,2,3,4,5])
y = x.filter(lambda x: x%2 == 0) 
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4]


In [21]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect())
print(A.collect())

['a', 'b', 'c', 'd']
['A']


## flatmap
FlatMap은 RDD의 원소에 map연산을 수행하고 원소의 개수를 증가시키기도 합니다. 원소의 개수는 꼭 동일하게 증가시키지 않아도 됩니다.

In [22]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*10, 30))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 10, 30, 2, 20, 30, 3, 30, 30]


In [23]:
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
# 공백은 제거합니다.
# 단어를 공백기준으로 split 합니다. 
result.collect()

['spark',
 'is',
 'funny',
 'it',
 'is',
 'beautiful',
 'and',
 'also',
 'it',
 'is',
 'implemented',
 'by',
 'python']

## CSV 파일 읽어들이기

In [24]:
import os
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)

['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
 '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
 '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
 '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
 '1,female,35.0,1,0,53.1,First,C,Southampton,n']

In [25]:
# 비어있는 라인은 제외하고, delimeter인 ,로 line을 분리해 줍니다. 
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
csv_data_1.take(5)

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone'],
 ['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]

In [26]:
# 컬럼만 분리
columns = csv_data_1.take(1)
columns

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone']]

In [27]:
# 컬럼을 제외한 나머지 데이터만 분리
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())  # 첫 번째 컬럼이 숫자인 것만 필터링
csv_data_2.take(5)

[['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
 ['0',
  'male',
  '28.0',
  '0',
  '0',
  '8.4583',
  'Third',
  'unknown',
  'Queenstown',
  'y']]

In [28]:
# (컬럼명, 데이터) 형태로 가공
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(5)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '35.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '53.1'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '0'),
  ('sex', 'male'),
  ('age', '28.0'),
  ('n_siblings_spouses', '0'),
  ('parch'

In [29]:
# CSV 파일을 DataFrame으로 읽어들이기
from pyspark import SparkConf, SparkContext, SQLContext

url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
|1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
|1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
|1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
|0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
only showing top 5 rows



In [30]:
# 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다. 
df2 = df[df['age']>40]
df2.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
|0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
|1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
|0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
|0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
only showing top 5 rows



# RDD Operation (2) Actions
## collect
RDD 내의 모든 값을 리턴합니다. 빅데이터를 다루고 있다면 함부로 호출하지 않는 게 좋습니다.

In [31]:
nums = sc.parallelize(list(range(10)))
nums.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

## take
RDD에서 앞쪽 n개의 데이터의 list를 리턴합니다. collect()보다는 안전하게 데이터를 확인해 볼 수 있습니다.

In [32]:
nums.take(3)

[0, 1, 2]

## count
RDD에 포함된 데이터 개수를 리턴합니다.

In [33]:
nums.count()

10

## reduce
스파크에서 Map은 Transformation 함수로, Reduce는 Action 함수로 구현되어 있습니다. Reduce할 데이터가 RDD로 메모리상에 존재하므로 이전의 다른 구현체보다 훨씬 빠르게 MapReduce를 실행할 수 있습니다.

In [34]:
# RDD의 모든 데이터를 차례차례 더하는 sum() 구현
nums.reduce(lambda x, y: x + y)

45

## saveAsTextFile
RDD 데이터를 파일로 저장합니다. 아래 코드를 실행하면 file.txt라는 디렉토리에 RDD 내용이 저장됩니다.

In [36]:
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/file.txt'
nums.saveAsTextFile(file_path)

!ls -l ~/aiffel/bigdata_ecosystem

total 40
drwxr-xr-x 2 ssac21 ssac21  4096  3월 10 17:32 file.txt
-rw-r--r-- 1 ssac21 ssac21    20  3월 10 17:14 test.txt
-rw-r--r-- 1 ssac21 ssac21 30874  2월 21  2019 train.csv


# MapReduce 실습
## RDD 전체 과정 요약

In [37]:
# RDD 생성
rdd = sc.parallelize(range(1,100))

# RDD Transformation 
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)

# RDD Action 
rdd2.reduce(lambda x, y: x + y)

1580.0

## Word Counter 구현하기
- Map 함수를 구현할 때, 입력 스트링의 각 문자 x에 대해 x -> (x, 1) 형태의 tuple로 매핑하면 수월합니다.
- 일반적인 reduce 함수보다는 reduceByKey 함수를 사용하는 것을 추천합니다. 

In [38]:
text = sc.parallelize('hello python')

# map 함수를 적용한 RDD 구하기
text_1 = text.filter(lambda x: x != " ")
text_2 = text_1.map(lambda x:(x, 1))

#reduceByKey 함수를 적용한 Word Counter 출력
word_count = text_2.reduceByKey(lambda accum, n: accum + n)  
word_count.collect()

[('h', 2),
 ('e', 1),
 ('l', 2),
 ('o', 2),
 ('p', 1),
 ('y', 1),
 ('t', 1),
 ('n', 1)]

## Titanic 데이터 분석하기
- 타이타닉 생존자와 사망자의 평균연령을 구해봅시다.
- map 함수를 이용해 모든 데이터를 (생존 여부, 연령)의 형태로 바꾸면 생존자, 사망자 각각의 연령의 총합을 쉽게 구할 수 있습니다.

In [39]:
# 이전 스텝에서 CSV 파일을 로딩했던 내역입니다. 
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])

csv_data_3.take(3)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')]]

In [40]:
# csv_data_3을 가공하여 생존자, 사망자의 연령 총합과 사람 수를 각각 구해 봅시다. 
# 이후 각각의 데이터로부터 생존자와 사망자의 평균 연령을 구할 수 있습니다. 

# 생존자와 사망자의 연령 총합 구하기
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))   # (생존여부, 연령)
age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))  
age_sum = age_sum_data.collect()

# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count)) 
survived_count = survived_data.collect()

age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)

생존자 평균 연령: 29.110411522633743
사망자 평균 연령: 29.9609375
