# spark 기본개념 잡기 : RDD
+ 여러 분산노드에 걸쳐 저장하는 변경불가 데이터 집합을 의미
+ RDD 생성은 직접 만들거나 파일을 통해 생성할 수 있음
+ RDD는 transformation과 action으로 구성
   - 기존 RDD의 데이터를 토대로 새로운 RDD를 만들어 냄
   - RDD를 기반으로 무언가를 계산해서 결과를 만들어 냄
+ RDD는 `Lazy` 로딩 방식을 사용

## 스파크 중요 개념 : RDD, dataframe
* RDD : 탄력적이고 분산된 데이터셋
* HDFS와는 달리 쓰기 불가능 데이터셋
* 다양한 연산(map, reduce, count, filter, join) 수행 가능
* 작업은 lazy하게 병렬로 수행되고 메모리에 저장됨

## History of Spark API
* RDD   (2011)
    + v1 부터 지원, 분산 데이터셋
    + 연산을 제어하는 코드 작성이 어려움
* dataframe (2013)
    + v1.3부터 지원
    + 데이터를 스키마형태로 추상화, 속도 개선
* dataset (2015)
    + v1.6부터 지원
    + 데이터의 자료형 검사, 직렬화 지원
* dataset (2016)
    + v2.0부터 지원
    + dataframe과 dataset을 dataset으로 통합
* 스파크 애플리케이션 개발 : RDD 이용, SparkContext 사용
* SQL on Spark : dataset,dataframe 이용, SparkSession 사용

In [2]:
# README.md 파일 읽어들임
lines = sc.textFile('/usr/share/spark/README.md')

In [3]:
# 읽어들인 라인들 중 10줄만 확인 : collect
lines.collect()[:10]

                                                                                

['# Apache Spark',
 '',
 'Spark is a unified analytics engine for large-scale data processing. It provides',
 'high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 'supports general computation graphs for data analysis. It also supports a',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 'MLlib for machine learning, GraphX for graph processing,',
 'and Structured Streaming for stream processing.',
 '',
 '<https://spark.apache.org/>']

In [4]:
# 읽어들인 라인수 확인 : count
lines.count()

                                                                                

109

In [5]:
# 읽어들인 라인들 중 Spark라는 단어를 찾음 : filter(transformation)
filterLines = lines.filter(lambda x: "Spark" in x)
filterLines.collect()[:10]

['# Apache Spark',
 'Spark is a unified analytics engine for large-scale data processing. It provides',
 'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 '[![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/spark)',
 'You can find the latest Spark documentation, including a programming',
 '## Building Spark',
 'Spark is built using [Apache Maven](https://maven.apache.org/).',
 'To build Spark and its example programs, run:',
 '["Building Spark"](https://spark.apache.org/docs/latest/building-spark.html).',
 'For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](https://spark.apache.org/developer-tools.html).']

### Lazy 로딩방식 확인

In [6]:
# 잘못된 위치의 파일을 읽으려고 시도 - 오류 출력이 안됨
lines = sc.textFile('usr/share/spark/README.md')

In [None]:
# lazy loading에 의해서 collect라는 action이 호출이 되어야만 비로소 오류 출력이 가능함!
lines.collect()[:10]

## RDD 생성
+ 직접 생성한 데이터로 만들거나
   - sc.parallelize(리스트)
+ 외부 데이터로 만드는 방법 존재
   - sc.textFile(경로/파일)

In [None]:
data = ['Hello, World!!','Hello, Python!!','Hello, RDD!!']
rdd = sc.parallelize(data)
rdd.count()

### 직책별 사원수 조회하는 RDD 코드 작성
+ employees.csv 이용

In [8]:
emp = sc.textFile('data/employees.csv')
emp.collect()[:5]

['EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID',
 '100,Steven,King,SKING,515.123.4567,2003-06-17,AD_PRES,24000.00,,,90',
 '101,Neena,Kochhar,NKOCHHAR,515.123.4568,2005-09-21,AD_VP,17000.00,,100,90',
 '102,Lex,De Haan,LDEHAAN,515.123.4569,2001-01-13,AD_VP,17000.00,,100,90',
 '103,Alexander,Hunold,AHUNOLD,590.423.4567,2006-01-03,IT_PROG,9000.00,,102,60']

In [9]:
# 헤더 제외하고 데이터만 골라냄
header = emp.first()
header

'EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID'

In [10]:
emp = emp.filter(lambda x: header != x)
emp.first()

'100,Steven,King,SKING,515.123.4567,2003-06-17,AD_PRES,24000.00,,,90'

In [11]:
# 사원 데이터에서 ,로 각 컬럼을 분리하고
# 이름과 직책을 추출함
emp2 = emp.map(lambda x: (x.split(',')[6], x.split(',')[1]))  # key 와 value를 넣어야함
emp2.collect()[:5]  # collect를 꼭 써야만 진정한 실행이 되는것

[('AD_PRES', 'Steven'),
 ('AD_VP', 'Neena'),
 ('AD_VP', 'Lex'),
 ('IT_PROG', 'Alexander'),
 ('IT_PROG', 'Bruce')]

In [12]:
# 추출한 데이터에서 직책을 1로 매핑
maps = emp2.mapValues(lambda x: 1)
maps.collect()[:5]

[('AD_PRES', 1), ('AD_VP', 1), ('AD_VP', 1), ('IT_PROG', 1), ('IT_PROG', 1)]

In [14]:
# 같은 직책끼리 모아서 집계처리함
reduces = maps.reduceByKey(lambda x, y: x + y)
reduces.collect()[:5]

[('AD_PRES', 1),
 ('AD_VP', 2),
 ('IT_PROG', 5),
 ('FI_MGR', 1),
 ('FI_ACCOUNT', 5)]

### 타이타닉 승객의 생존자/사망자 수를 조회하는 RDD 코드를 작성하세요
+ titanic.csv를 이용함

In [32]:
titanic = sc.textFile('data/titanic.csv')
titanic.collect()[:5]

['pclass,survived,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked',
 '1,1,"Allen, Miss. Elisabeth Walton",female,29,0,0,24160,211.3375,B5,S',
 '1,1,"Allison, Master. Hudson Trevor",male,0.9167,1,2,113781,151.5500,C22 C26,S',
 '1,0,"Allison, Miss. Helen Loraine",female,2,1,2,113781,151.5500,C22 C26,S',
 '1,0,"Allison, Mr. Hudson Joshua Creighton",male,30,1,2,113781,151.5500,C22 C26,S']

In [33]:
# 헤더 제외하고 데이터만 골라냄
header = titanic.first()
titanic = titanic.filter(lambda x: header != x)
titanic.first()

'1,1,"Allen, Miss. Elisabeth Walton",female,29,0,0,24160,211.3375,B5,S'

In [34]:
# 타이타닉 데이터에서 ,로 각 컬럼을 분리하고
# 생존자/사망자 수를 추출함
titanic2 = titanic.map(lambda x: (x.split(',')[1], x.split(',')[4]))  # key 와 value를 넣어야함
titanic2.collect()[:5]  # collect를 꼭 써야만 진정한 실행이 되는것

[('1', 'female'),
 ('1', 'male'),
 ('0', 'female'),
 ('0', 'male'),
 ('0', 'female')]

In [35]:
maps = titanic2.mapValues(lambda x: 1)
maps.collect()[:5]

[('1', 1), ('1', 1), ('0', 1), ('0', 1), ('0', 1)]

In [36]:
reduces = maps.reduceByKey(lambda x, y: x + y)
reduces.collect()[:5]

[('1', 500), ('0', 809), ('', 1)]