# spark dataset

# 스파크 중요 개념 : RDD, dataframe
* RDD (Resillient Distributed Data) : 탄력적이고 분산된 데이터셋
* HDFS와는 달리 쓰기 불가능 데이터셋
* 다양한 연산(map, reduce, count, filter, join) 수행 가능
* 작업은 lazy하게 병렬로 수행되고 메모리에 저장됨

# History of Spark API
* RDD   (2011)  : v1 부터 지원, 분산 데이터셋,연산을 제어하는 코드 작성이 어려움
* dataframe (2013) : v1.3부터 지원, 데이터를 스키마형태로 추상화, 속도 개선
* dataset (2015) : v1.6부터 지원, 데이터의 자료형 검사, 직렬화 지원
* dataset (2016) : v2.0부터 지원, dataframe과 dataset을 dataset으로 통합
* 스파크 애플리케이션 개발 : RDD 이용, SparkContext 사용
* SQL on Spark : dataset,dataframe 이용, SparkSession 사용

## RDD를 이용해서 데이터프레임 생성

In [8]:
sc

In [1]:
from pyspark.sql.types import StringType, IntegerType

In [2]:
# 리스트를 이용해서 데이터프레임 만들기
# 리스트로 RDD 객체 생성
data = ['apple','peach','banana','mango','pineapple']
words = sc.parallelize(data)
words.collect()

['apple', 'peach', 'banana', 'mango', 'pineapple']

In [3]:
# RDD 객체를 데이터프레임으로 만들기
# 데이터프레임 : 행과 열로 구성된 2차원 데이터객체
# createDataFrame(RDD객체, 타입)
df = spark.createDataFrame(words, StringType())
df.show()       # 데이터프레임 내용 출력

+---------+
|    value|
+---------+
|    apple|
|    peach|
|   banana|
|    mango|
|pineapple|
+---------+



In [6]:
# 리스트 객체를 데이터프레임으로 만들기 - (과일명, 가격)
# 데이터프레임 생성시 컬럼명 지정
data =  [('apple', 1500), ('peach', 2000),('banana', 1500), ('mango', 2500),('pineapple', 3000)]
fruits = spark.createDataFrame(data, ['fruit','price'])
fruits.collect()

[Row(fruit='apple', price=1500),
 Row(fruit='peach', price=2000),
 Row(fruit='banana', price=1500),
 Row(fruit='mango', price=2500),
 Row(fruit='pineapple', price=3000)]

In [7]:
fruits.show()

+---------+-----+
|    fruit|price|
+---------+-----+
|    apple| 1500|
|    peach| 2000|
|   banana| 1500|
|    mango| 2500|
|pineapple| 3000|
+---------+-----+



In [11]:
# 리스트 객체를 데이터프레임으로 만들기 2
# 데이터프레임 생성시 컬럼명과 자료형 지정 (컬럼명:자료형)

fruits = spark.createDataFrame(data, "fruit:string, price:int")
fruits.collect()

[Row(fruit='apple', price=1500),
 Row(fruit='peach', price=2000),
 Row(fruit='banana', price=1500),
 Row(fruit='mango', price=2500),
 Row(fruit='pineapple', price=3000)]

In [13]:
# 데이터프레임에서 컬럼 조회 : select(컬럼명)
fruits.select('fruit').show()

+---------+
|    fruit|
+---------+
|    apple|
|    peach|
|   banana|
|    mango|
|pineapple|
+---------+



# 스파크세션을 이용한 고급 데이터프레임 다루기

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

In [15]:
# 데이터프레임 스키마 정의 - employees
# 스파크 세션 객체 직접 생성
spqrk = SparkSession.builder.appName('emp').getOrCreate()

In [16]:
# 데이터프레임 생성전 스키마 정의
# add(컬럼명, 데이터타입)
emp_schema = StructType().add('empno','integer').add('fname','string')\
.add('lname','string').add('hdate','string').add('sal','integer')\
.add('deptid','integer')

In [17]:
# 지정한 스키마를 이용해서 데이터프레임 생성
# 데이터프레임의 각 행은 set 객체로 정의
# 위에서 정의한 스키마는 schema 속성으로 지정
df = spark.createDataFrame(
   [(123,'steve','king','2003-06-17',35000,None),
   (456,'john','seo','2005-12-15',20000,50),
   (789,'david',None,'2004-03-01',22000,90)], 
   schema=emp_schema)

In [18]:
# 데이터프레임 내용 확인
df.show()

+-----+-----+-----+----------+-----+------+
|empno|fname|lname|     hdate|  sal|deptid|
+-----+-----+-----+----------+-----+------+
|  123|steve| king|2003-06-17|35000|  null|
|  456| john|  seo|2005-12-15|20000|    50|
|  789|david| null|2004-03-01|22000|    90|
+-----+-----+-----+----------+-----+------+



In [19]:
# 데이터프레임 스키마 확인
df.printSchema()

root
 |-- empno: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- hdate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- deptid: integer (nullable = true)



In [20]:
# 데이터프레임 갯수 출력
df.count()

3

In [21]:
# 데이터프레임 요약 보기
# summary(요약항목)
df.summary().show()

+-------+-----+-----+-----+----------+------------------+------------------+
|summary|empno|fname|lname|     hdate|               sal|            deptid|
+-------+-----+-----+-----+----------+------------------+------------------+
|  count|    3|    3|    2|         3|                 3|                 2|
|   mean|456.0| null| null|      null|25666.666666666668|              70.0|
| stddev|333.0| null| null|      null| 8144.527815247077|28.284271247461902|
|    min|  123|david| king|2003-06-17|             20000|                50|
|    25%|  123| null| null|      null|             20000|                50|
|    50%|  456| null| null|      null|             22000|                50|
|    75%|  789| null| null|      null|             35000|                90|
|    max|  789|steve|  seo|2005-12-15|             35000|                90|
+-------+-----+-----+-----+----------+------------------+------------------+



In [22]:
# 특정컬럼에 대한 요약정보 보기
# select(대상컬럼들)
df.select('empno','sal','deptid').summary().show()

+-------+-----+------------------+------------------+
|summary|empno|               sal|            deptid|
+-------+-----+------------------+------------------+
|  count|    3|                 3|                 2|
|   mean|456.0|25666.666666666668|              70.0|
| stddev|333.0| 8144.527815247077|28.284271247461902|
|    min|  123|             20000|                50|
|    25%|  123|             20000|                50|
|    50%|  456|             22000|                50|
|    75%|  789|             35000|                90|
|    max|  789|             35000|                90|
+-------+-----+------------------+------------------+



In [25]:
# csv파일로 데이터프레임 생성하기
# read.csv(파일이름, 헤더여부, 스키마여부)
emp = spark.read.csv('EMPLOYEES.csv', header = True, inferSchema=True)

In [27]:
emp.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: double (nullable = true)
 |-- MANAGER_ID: integer (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



## 데이터프레임 데이터 탐색
+ select : 컬럼 선택
+ filter : 조건 검색
+ where : 고급 조건 검색
+ orderBy : 정렬
+ groupBy : 그룹화

In [29]:
# 모든 사원의 이름 조회 : select([컬럼명, 컬럼명])
emp.select(['FIRST_NAME']).show(5)

+----------+
|FIRST_NAME|
+----------+
|    Steven|
|     Neena|
|       Lex|
| Alexander|
|     Bruce|
+----------+
only showing top 5 rows



In [31]:
# 급여가 7000 이상인 사원 조회 : filter(조건식)
# 특정 컬럼 지정: 데이터프레임이름[컬럼명]
emp.filter(emp['SALARY'] >= 7000).show(5)

+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER| HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03|IT_PROG|  9000|          null|       102|           60|
|        108|     Nancy|Greenberg|NGREENBE|515.124.4569|2002-08-17| FI_MGR| 12008|          null|       101|          100|
+-----------+---

In [32]:
# 급여가 7000 이상인 사원의 수 조회
emp.filter(emp['SALARY'] >= 7000).count()

47

In [35]:
# 2006-02-05부터 2006-11-15사이에 고용된 사원 조회
emp.filter((emp['HIRE_DATE'] >= '2006-02-05') &
          (emp['HIRE_DATE'] <= '2006-11-15')).show(5)

+-----------+-----------+-----------+--------+------------+----------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID| FIRST_NAME|  LAST_NAME|   EMAIL|PHONE_NUMBER| HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+-----------+-----------+--------+------------+----------+----------+------+--------------+----------+-------------+
|        106|      Valli|  Pataballa|VPATABAL|590.423.4560|2006-02-05|   IT_PROG|  4800|          null|       103|           60|
|        112|Jose Manuel|      Urman| JMURMAN|515.124.4469|2006-03-07|FI_ACCOUNT|  7800|          null|       108|          100|
|        118|        Guy|     Himuro| GHIMURO|515.127.4565|2006-11-15|  PU_CLERK|  2600|          null|       114|           30|
|        126|      Irene|Mikkilineni|IMIKKILI|650.124.1224|2006-09-28|  ST_CLERK|  2700|          null|       120|           50|
|        134|    Michael|     Rogers| MROGERS|650.127.1834|2006-08-26|  ST_CLERK|  2900|         

In [37]:
# 2006-02-05부터 2006-11-15사이에 고용된 사원들의 부서번호 조회
emp.filter((emp['HIRE_DATE'] >= '2006-02-05') & (emp['HIRE_DATE'] <= '2006-11-15')).select(['DEPARTMENT_ID']).show(5)

+-------------+
|DEPARTMENT_ID|
+-------------+
|           60|
|          100|
|           30|
|           50|
|           50|
+-------------+
only showing top 5 rows



In [38]:
# 직책이 IT_PROG인 사원수를 조회
emp.filter(emp['JOB_ID'] == 'IT_PROG').count()

5