In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import pandas as pd
import numpy as np

In [2]:
sc = SparkContext('local')
sqlCtx = SQLContext(sc)

In [4]:
emp =[('홍길동',1),('이순신',2),
      ('임꺽정',3),('김철수',3),('김철수1',5)]
dept = [('개발',1), ('연구',2),
        ('영업',3),('기획',4) ]

In [5]:
empRdd = sc.parallelize(emp)
empRdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [7]:
empRdd.collect()

[('홍길동', 1), ('이순신', 2), ('임꺽정', 3), ('김철수', 3), ('김철수1', 5)]

## rdd : 분산 리스트
## dataframe : 분산 데이터프레임

In [6]:
empDF = empRdd.toDF() # rdd --> dataframe
empDF

DataFrame[_1: string, _2: bigint]

In [8]:
empDF.show() # 개별 데이터프레임을 보여줌

# 판다스와 다르게 분산 데이터 프레임
# 데이터가 각 클러스터에 분산되어서 적재됨

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



### 또다른 방법

In [10]:
empDf1 = sqlCtx.createDataFrame(emp)
empDf1.show() # collect가 아니라 print와 같은 개념

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



In [11]:
# 분산데이터프레임 -> 판다스의 데이터프레임
# collect는 하나의 리스트로 만들어짐

df = empDf1.toPandas() # rdd의 멤버함수인 collect()와 비슷 => 따라서 함부로 사용해서는 안됨 => 필터링해서 사용해야..!
df

Unnamed: 0,_1,_2
0,홍길동,1
1,이순신,2
2,임꺽정,3
3,김철수,3
4,김철수1,5


#### spark로 빅데이터를 읽고 rdd나 dataframe으로 읽은 다음 어느정도 필터링된 다음 toPandas를 이용해서 판다스로 분석을 함

In [12]:
empDf2 = sqlCtx.createDataFrame(emp, ['name', 'deptid']) # 두번째 인자에 칼럼명 지정 가능
empDf2.show()

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 이순신|     2|
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [13]:
empDf2.printSchema() # spark데이터 프레임에서는 각 칼럼에 대한 정보 확인 가능 <-> 데이터프레임 : info()

root
 |-- name: string (nullable = true)
 |-- deptid: long (nullable = true)



### spark Sql :
### hive(hadoop 에코(echo)시스템 : sql을 이용하여 데이터를 분석하는 툴) 
### https://rfriend.tistory.com/213 : hive ql 함수
### spark sql 도큐먼트 : https://spark.apache.org/docs/latest/api/sql/index.html


In [14]:
empDf2.createOrReplaceTempView('my') # 분산데이터 프레임에 my라는 이름을 부여

In [49]:
# sql = "select * from my"
# sql = "select name from my"
# sql = "select name, deptid from my"
# sql = "select name, deptid from my where deptid>2"
# sql = "select name, deptid from my where deptid==2 or deptid==3"
# sql = "select name, deptid from my where deptid in (1, 2, 3)"
# sql = "select name, deptid from my where deptid >= 2 and deptid <= 4"
# sql = "select name, deptid from my where deptid between 2 and 4"
# spq에서 문자열은 홀따옴표
# sql = "select name, deptid from my where name='이순신'"
# sql = "select name, deptid from my where name like '%김%'"

# 정규식 패턴
# 정규식 패턴을 사용하기 위해 rlike -> hive ql 에서는 rlike => 표준은 아님 / DB 마다 다름 
# sql = "select name, deptid from my where name rlike '김'"
# sql = "select name, deptid from my where name rlike '^김'"
# sql = "select name, deptid from my where name rlike '수$'"
# sql = "select name, deptid from my where name rlike '[김정]'"

# 정렬
# sql = "select name, deptid from my order by name"
# sql = "select name, deptid from my order by name desc"
# sql = "select name, deptid from my order by deptid"
# sql = "select name, deptid from my order by deptid desc"
# sql = "select name, deptid from my order by deptid desc limit 3" # top3 구할 때

# 집계함수(max, min, avg, sum, count) => 표준
# sql = "select max(deptid) from my"
# sql = "select min(deptid) from my"
# sql = "select min(deptid), max(deptid) from my"  # 두 개 나옴
# sql = "select sum(deptid) from my"
# sql = "select mean(deptid) from my" # 평균을 구하기 위해서는 => mean 또는 avg
# sql = "select avg(deptid) from my"
# sql = "select count(deptid) from my" # 만약 결측치가 있으면 결측치를 제외하고 row 개수 출력
# sql = "select count(*) from my" # 전체 row의 개수 

# 연산 - 요소별 연산
# sql = "select deptid*2 as s from my"  #  칼럼명을 as로 두기

# 그룹바이
# sql = '''select `학년`, avg(`중간`) from grade group by `학년`'''

# 조건별
# sql = '''select name, salary,
#         case
#             when salary >= 4000 then '많음'
#             when salary >= 2000 then '보통'
#             else '적음'
#         end as sal
#         from emp
#         '''

sql = "select deptid*2 as s from my" #  칼럼명을 as로 두기
sqlDF = sqlCtx.sql(sql) # saprk Dataframe
sqlDF.show()

+---+
|  s|
+---+
|  2|
|  4|
|  6|
|  6|
| 10|
+---+



<pre>
0. name salary 컬럼명을 가지는 데이터프레임을 만드시요.
1. 급여가 가장높은 name과 salary를 출력하시요
2. name, salary, tax 를 출력하시요 (tax는 급여에서 세금 3.3을 제한값)
3. name 에 '철'이 포함된  name,salary 를 출력하시요
4. salary top 5 인 name, salary 출력하시요
5. salary  2000  과 4000 사이의 데이터를 출력하시요
</pre>

In [50]:
data1 = ['홍길동,1000','이순신,2000','임꺽정,3000',
         '김철수,4000','이황,5000','이이,6000']

# 0. name salary 컬럼명을 가지는 데이터프레임을 만드시요.

In [55]:
salaryRdd = sc.parallelize(data1)
salaryRdd.collect()

['홍길동,1000', '이순신,2000', '임꺽정,3000', '김철수,4000', '이황,5000', '이이,6000']

In [76]:
salaryRdd1 = salaryRdd.map(lambda v:v.split(',')).map(lambda v:(v[0],int(v[1])))
salaryRdd1.collect()

[('홍길동', 1000),
 ('이순신', 2000),
 ('임꺽정', 3000),
 ('김철수', 4000),
 ('이황', 5000),
 ('이이', 6000)]

In [77]:
salaryDF1 = salaryRdd1.toDF(['name','salary'])
salaryDF1.show()

+------+------+
|  name|salary|
+------+------+
|홍길동|  1000|
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
|  이황|  5000|
|  이이|  6000|
+------+------+



In [78]:
salaryDF1.createOrReplaceTempView('my')

In [84]:
salaryDF1.printSchema()

root
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



# 1. 급여가 가장높은 name과 salary를 출력하시요

In [79]:
sql = "select * from my order by salary desc limit 1" 
sqlDF = sqlCtx.sql(sql) 
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



# 2. name, salary, tax 를 출력하시요 (tax는 급여에서 세금 3.3을 제한값)

In [80]:
sql_add = "select name, salary, (salary-salary*0.033) as tax from my"
sqlDF_add = sqlCtx.sql(sql_add)
sqlDF_add.show()

+------+------+--------+
|  name|salary|     tax|
+------+------+--------+
|홍길동|  1000| 967.000|
|이순신|  2000|1934.000|
|임꺽정|  3000|2901.000|
|김철수|  4000|3868.000|
|  이황|  5000|4835.000|
|  이이|  6000|5802.000|
+------+------+--------+



# 3. name 에 '철'이 포함된  name,salary 를 출력하시요

In [81]:
sql = "select name from my where name like '%철%'" 
sqlDF = sqlCtx.sql(sql) 
sqlDF.show()

+------+
|  name|
+------+
|김철수|
+------+



# 4. salary top 5 인 name, salary 출력하시요

In [82]:
sql = "select * from my order by salary desc limit 5" 
sqlDF = sqlCtx.sql(sql) 
sqlDF.show()

+------+------+
|  name|salary|
+------+------+
|  이이|  6000|
|  이황|  5000|
|김철수|  4000|
|임꺽정|  3000|
|이순신|  2000|
+------+------+



# 5. salary  2000  과 4000 사이의 데이터를 출력하시요

In [83]:
sql = "select * from my where salary between 2000 and 4000" 
# sql = "select * from my where salary >= 2000 and salary <= 4000" 
sqlDF = sqlCtx.sql(sql) 
sqlDF.show()

+------+------+
|  name|salary|
+------+------+
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
+------+------+



# 추가적으로 보기

In [85]:
salaryDF1.createOrReplaceTempView('emp')

### sub query - 표준 => DB 종류 상관없이 똑같음

In [87]:
# sql = "select * from emp" # 전체 row 개수
# sql = "select max(salary) from emp"

sql = "select * from emp where salary == (select max(salary) from emp)"
sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



### case when then (조건별) - 표준

In [88]:
sql = '''select name, salary,
        case
            when salary >= 4000 then '많음'
            when salary >= 2000 then '보통'
            else '적음'
        end as sal
        from emp
        '''
# 새로운 칼럼명을 sal

sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+------+------+----+
|  name|salary| sal|
+------+------+----+
|홍길동|  1000|적음|
|이순신|  2000|보통|
|임꺽정|  3000|보통|
|김철수|  4000|많음|
|  이황|  5000|많음|
|  이이|  6000|많음|
+------+------+----+



## hive ql 함수
### spark sql : hive ql 의 sql문법과 함수를 따른다
### 표준 sql : https://www.w3schools.com/sql/
### spark sql 함수: https://rfriend.tistory.com/213
### spark sql은 hive sql을 따른다
### 도큐먼트 : https://spark.apache.org/docs/latest/api/sql/index.html


In [91]:
sql = "select name, salary, round((salary*(1-0.033)),2) as tax from emp"
sqlDF_add = sqlCtx.sql(sql)
sqlDF_add.show()

+------+------+-------+
|  name|salary|    tax|
+------+------+-------+
|홍길동|  1000| 967.00|
|이순신|  2000|1934.00|
|임꺽정|  3000|2901.00|
|김철수|  4000|3868.00|
|  이황|  5000|4835.00|
|  이이|  6000|5802.00|
+------+------+-------+



In [92]:
# substr : string A를 시작 색인 지점부터 끝까지 잘라서 반환 - 색인은 1부터 시작함

sql = '''select substr(name,2) as n, salary, round( salary*(1-0.033),2) as tax from emp'''
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+----+------+-------+
|   n|salary|    tax|
+----+------+-------+
|길동|  1000| 967.00|
|순신|  2000|1934.00|
|꺽정|  3000|2901.00|
|철수|  4000|3868.00|
|  황|  5000|4835.00|
|  이|  6000|5802.00|
+----+------+-------+



In [93]:
# stddev_samp : 특정 열의 표본 표준편차를 반환

sql = '''select stddev_samp(salary) from emp'''
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+-----------------------------------+
|stddev_samp(CAST(salary AS DOUBLE))|
+-----------------------------------+
|                 1870.8286933869706|
+-----------------------------------+



In [94]:
# percentile : p1, p2, ... percent에 해당하는 int_expr 백분위수 값을 반환

sql = '''select percentile(salary,0.5) as p1,percentile(salary,0.75) as p2 from emp'''
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+------+------+
|    p1|    p2|
+------+------+
|3500.0|4750.0|
+------+------+



#### 참고로 pandas는 sql 구문이 지원이 안됨