In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import pandas as pd
import numpy as np

In [2]:
sc = SparkContext( 'local' )
sqlCtx = SQLContext( sc )

In [3]:
emp =[('홍길동',1),('이순신',2),
      ('임꺽정',3),('김철수',3),('김철수1',5)]
dept = [('개발',1), ('연구',2),
        ('영업',3),('기획',4) ]


In [6]:
empRdd = sc.parallelize( emp )
empRdd.collect()

[('홍길동', 1), ('이순신', 2), ('임꺽정', 3), ('김철수', 3), ('김철수1', 5)]

# DataFrame
- 분산 데이터 프레임
- MapReduce에는 없는 형태
- Spark에서 추가된 형태

In [8]:
# (1) RDD를 분산 데이터프레임으로  만들기
empDF = empRdd.toDF() # RDD를 데이터 프레임으로 만드는 함수
empDF

DataFrame[_1: string, _2: bigint]

In [9]:
empDF.show()

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



In [11]:
# (2) SQLContext의 멤버함수로 데이터 프레임 만들기
empDF1 = sqlCtx.createDataFrame( emp )
empDF1.show() # 단순히 찍어주는 것( print )

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



# 판다스 데이터 프레임으로 변환
- df.toPandas( ) 
    - **RDD.collect( )와 같은 개념!**
    - 분산되어 저장된 데이터 프레임의 데이터를 하나의 판다스 데이터프레임으로 만들어 주는 것
    - 따라서 함부로 사용하면 안 됨
        - 충분히 **필터링** 하고 사용, 아니라면 spark데이터 프레임으로 분석

In [24]:
df= empDF1.toPandas() # RDD.collect()와 같은 개념
df

Unnamed: 0,_1,_2
0,홍길동,1
1,이순신,2
2,임꺽정,3
3,김철수,3
4,김철수1,5


# 컬럼명 지정

In [15]:
empDF2 = sqlCtx.createDataFrame( emp, ['name','deptid'] ) # 컬럼명을 설정하는 것
empDF2.show()

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 이순신|     2|
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



# 스키마 확인
- df.printSchema()
- padas의 df.info()와 같음

In [17]:
empDF2.printSchema() 

root
 |-- name: string (nullable = true)
 |-- deptid: long (nullable = true)



# SparkSQL
- **HIVE**(하둡 에코 시스템, sql 이용해 데이터 분석)의 SQL 문법을 따름
- HIVE ql 함수
    - https://rfriend.tistory.com/213
    - https://docs.microsoft.com/ko-kr/azure/hdinsight/hadoop/hdinsight-use-hive
- Spark SQL 구문 라이브러리
    - https://spark.apache.org/docs/latest/api/sql/index.html <br>
<br>     
- Spark SQL을 사용하기 위해선 분산 **데이터 프레임에 이름**을 부여해야함
    - df.createOrReplaceTempView( '이름' )

In [18]:
# **분산데이터 프레임에 이름을 부여해야함**
empDF2.createOrReplaceTempView( 'my' ) 

In [21]:
sql = 'select * from my'
sqlDF = sqlCtx.sql( sql )
sqlDF.show() # spark 데이터 프레임을 반환

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 이순신|     2|
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [23]:
sqlCtx.sql( 'select name from my' ).show()

+-------+
|   name|
+-------+
| 홍길동|
| 이순신|
| 임꺽정|
| 김철수|
|김철수1|
+-------+



In [26]:
sqlCtx.sql( 'select name from my where name != "홍길동" ' ).show()

+-------+
|   name|
+-------+
| 이순신|
| 임꺽정|
| 김철수|
|김철수1|
+-------+



In [28]:
sqlCtx.sql( 'select * from my where deptid >= 3 ' ).show()

+-------+------+
|   name|deptid|
+-------+------+
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



## 데이터가 충분히 필터링 됐다면, 판다스 데이터프레임으로 변환

In [31]:
sqlCtx.sql( 'select * from my where deptid == 2 or deptid == 3 ' ).toPandas()

Unnamed: 0,name,deptid
0,이순신,2
1,임꺽정,3
2,김철수,3


In [35]:
sqlCtx.sql( 'select * from my where deptid in (1,2) ' ).show()

+------+------+
|  name|deptid|
+------+------+
|홍길동|     1|
|이순신|     2|
+------+------+



In [36]:
sqlCtx.sql( 'select * from my where deptid >= 2 and deptid <= 4 ' ).show()

+------+------+
|  name|deptid|
+------+------+
|이순신|     2|
|임꺽정|     3|
|김철수|     3|
+------+------+



In [39]:
sqlCtx.sql( 'select * from my where deptid between 2 and 4' ).show()

+------+------+
|  name|deptid|
+------+------+
|이순신|     2|
|임꺽정|     3|
|김철수|     3|
+------+------+



In [40]:
sqlCtx.sql( 'select * from my where name == "이순신" ' ).show()

+------+------+
|  name|deptid|
+------+------+
|이순신|     2|
+------+------+



In [41]:
sqlCtx.sql( 'select * from my where name like "%김%" ' ).show()

+-------+------+
|   name|deptid|
+-------+------+
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [49]:
sqlCtx.sql( 'select * from my order by name desc ' ).show()

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 임꺽정|     3|
| 이순신|     2|
|김철수1|     5|
| 김철수|     3|
+-------+------+



In [54]:
sqlCtx.sql( 'select * from my order by name desc limit 2' ).show()

+------+------+
|  name|deptid|
+------+------+
|홍길동|     1|
|임꺽정|     3|
+------+------+



## 집계관련 함수
- max, min, avg, sum, count

In [56]:
sqlCtx.sql( 'select count(*) from my' ).show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [57]:
sqlCtx.sql( 'select max(deptid) from my' ).show()

+-----------+
|max(deptid)|
+-----------+
|          5|
+-----------+



In [66]:
sqlCtx.sql( 'select sum(deptid), avg(deptid), mean(deptid) from my' ).toPandas()

Unnamed: 0,sum(deptid),avg(deptid),avg(deptid).1
0,14,2.8,2.8


In [69]:
# 전체 row의 개수( 결측치를 제외하고 )
sqlCtx.sql( 'select count(*) from my' ).show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [71]:
sqlCtx.sql( 'select mean(deptid), sum(deptid) from my' ).show()

+-----------+-----------+
|avg(deptid)|sum(deptid)|
+-----------+-----------+
|        2.8|         14|
+-----------+-----------+



## 연산
- element wise
    - 요소별 연산

In [73]:
sqlCtx.sql( 'select deptid from my' ).show()

+------+
|deptid|
+------+
|     1|
|     2|
|     3|
|     3|
|     5|
+------+



In [78]:
sqlCtx.sql( "select deptid*2 as yaaa from my" ).show()

+----+
|yaaa|
+----+
|   2|
|   4|
|   6|
|   6|
|  10|
+----+



## sql으로 정규식 사용
- db마다 별도의 정규식 존재
- hive ql을 이용해보겠다

In [43]:
sqlCtx.sql( 'select * from my where name rlike "^김" ' ).show() # 김으로 시작하는 단어

+-------+------+
|   name|deptid|
+-------+------+
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [46]:
sqlCtx.sql( 'select * from my where name rlike "수$" ' ).show() # 수로 끝나는 단어

+------+------+
|  name|deptid|
+------+------+
|김철수|     3|
+------+------+



In [47]:
sqlCtx.sql( 'select * from my where name rlike "[김|정]" ' ).show() # 김or정이 있는 단어

+-------+------+
|   name|deptid|
+-------+------+
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



## 연습문제

- 0. name salary 컬럼명을 가지는 데이터프레임을 만드시요.
- 1. 급여가 가장높은 name과 salary를 출력하시요
- 2. name, salary, tax 를 출력하시요 (tax는 급여에서 세금 3.3을 제한값)
- 3. name 에 '철'이 포함된  name,salary 를 출력하시요
- 4. salary top 5 인 name, salary 출력하시요
- 5. salary  2000  과 4000 사이의 데이터를 출력하시요

In [117]:
data1 = ['홍길동,1000','이순신,2000','임꺽정,3000',
         '김철수,4000','이황,5000','이이,6000']

# RDD로 변경
rdd = sc.parallelize( data1 )

# 리스트에 튜플 형태로 변경
rdd2 = rdd.map( lambda v: v.split(',') ).map( lambda v: (v[0], int(v[1]) ) )

# 데이터 프레임으로 변경5
df = rdd2.toDF()
# df.show()

# 컬럼명 지정
df = sqlCtx.createDataFrame( rdd2, ['name','salary'] ) # 컬럼명을 설정하는 것
df.show()

# 데이터 프레임 이름 지정
df.createOrReplaceTempView( 'df' ) 

+------+------+
|  name|salary|
+------+------+
|홍길동|  1000|
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
|  이황|  5000|
|  이이|  6000|
+------+------+



In [119]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
## 풀이 정리 ##

# 1번
sql = " select max(salary) from df"
sqlDF = sqlCtx.sql( sql )
sqlDF.show() # max값 구하기 = 6000

sql = " select * from df where salary == 6000 "
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

# 2번 
sql = 'select name, salary, salary*(1-0.033) as tax from df'
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

# 3번
sql = " select name, salary from df where name like '%철%' "
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

# 4번
sql = " select name, salary from df order by salary desc limit 5"
sqlDF = sqlCtx.sql( sql )ㅈ
sqlDF.show()

# 5번
sql = " select name, salary from df where salary between 2000 and 4000"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

### 1. 급여가 가장높은 name과 salary를 출력하시요
- subquerry 사용
- order by desc 후 limt 1
- 직접 구하고 그 값과 같은 값을 출력

In [142]:
sql = " select * from df where salary == ( select max(salary) from df )"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



In [145]:
sql = " select * from df order by salary desc limit 1"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



In [137]:
sql = " select max(salary) from df"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+-----------+
|max(salary)|
+-----------+
|       6000|
+-----------+



In [141]:
sql = " select * from df where salary == 6000 "
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



### 2. name, salary, tax 를 출력하시요 (tax는 급여에서 세금 3.3%을 제한값)

In [120]:
sql = 'select name, salary, salary*(1-0.033) as tax from df'
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+------+------+--------+
|  name|salary|     tax|
+------+------+--------+
|홍길동|  1000| 967.000|
|이순신|  2000|1934.000|
|임꺽정|  3000|2901.000|
|김철수|  4000|3868.000|
|  이황|  5000|4835.000|
|  이이|  6000|5802.000|
+------+------+--------+



### 3. name 에 '철'이 포함된  name,salary 를 출력하시요

In [121]:
sql = " select name, salary from df where name like '%철%' "
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+------+------+
|  name|salary|
+------+------+
|김철수|  4000|
+------+------+



### 4. salary top 5 인 name, salary 출력하시요

In [123]:
sql = " select name, salary from df order by salary desc limit 5"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+------+------+
|  name|salary|
+------+------+
|  이이|  6000|
|  이황|  5000|
|김철수|  4000|
|임꺽정|  3000|
|이순신|  2000|
+------+------+



### 5. salary  2000  과 4000 사이의 데이터를 출력하시요

In [124]:
sql = " select name, salary from df where salary between 2000 and 4000"
sqlDF = sqlCtx.sql( sql )
sqlDF.show()

+------+------+
|  name|salary|
+------+------+
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
+------+------+



# 서브쿼리

In [155]:
data1 = ['홍길동,1000','이순신,2000','임꺽정,3000',
         '김철수,4000','이황,5000','이이,6000']

# RDD로 변경
rdd = sc.parallelize( data1 )

# 리스트에 튜플 형태로 변경
rdd2 = rdd.map( lambda v: v.split(',') ).map( lambda v: (v[0], int(v[1]) ) )

# 데이터 프레임으로 변경
df = sqlCtx.createDataFrame( rdd2 , ['name','salary'] ) # 컬럼명을 설정

# 데이터 프레임 이름 지정
df.createOrReplaceTempView( 'emp' ) 

df.show()

+------+------+
|  name|salary|
+------+------+
|홍길동|  1000|
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
|  이황|  5000|
|  이이|  6000|
+------+------+



In [157]:
sqlCtx.sql( 'select * from emp').show()

+------+------+
|  name|salary|
+------+------+
|홍길동|  1000|
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
|  이황|  5000|
|  이이|  6000|
+------+------+



- limit로 추출하면 중복된 데이터 필터링을 못한다
- 따라서 서브쿼리를 이용해서 max(salary) 값 구한다

In [168]:
sqlCtx.sql( 'select * from emp where salary == (select max(salary) from emp)').show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



# 조건별(표준)
- case when then
- **콤마** 꼭 필요
            select * , 
            case
                when 조건 then 표현
                when 조건 then 표현
                ...
            end as '어떤 컬럼명으로 출력할건지'
            from DataFrame
    


In [165]:
sqlCtx.sql( '''
               select name, salary,
               case
                   when salary >= 4000 then '많음'
                   when salary >= 2000 then '보통'
                   else '적음'
               end as sal
               from emp
           '''
          ).show()

+------+------+----+
|  name|salary| sal|
+------+------+----+
|홍길동|  1000|적음|
|이순신|  2000|보통|
|임꺽정|  3000|보통|
|김철수|  4000|많음|
|  이황|  5000|많음|
|  이이|  6000|많음|
+------+------+----+



# HIve ql 함수
- HIVE ql 함수
    - https://rfriend.tistory.com/213
    - https://docs.microsoft.com/ko-kr/azure/hdinsight/hadoop/hdinsight-use-hive
    
- spark sql : hive ql 의 sql문법과 함수를 따른다
    - 표준 sql
        - https://www.w3schools.com/sql/
    - spark sql 함수 
        - https://rfriend.tistory.com/213
        - https://spark.apache.org/docs/latest/api/sql/index.html


### round()

In [174]:
sqlCtx.sql( 'select name, salary ,round(salary*(1-0.033), 2) as tax from emp').show()

+------+------+-------+
|  name|salary|    tax|
+------+------+-------+
|홍길동|  1000| 967.00|
|이순신|  2000|1934.00|
|임꺽정|  3000|2901.00|
|김철수|  4000|3868.00|
|  이황|  5000|4835.00|
|  이이|  6000|5802.00|
+------+------+-------+



### substr()

In [179]:
sqlCtx.sql( 'select substr( name,2) as name, salary ,round(salary*(1-0.033), 1) as tax from emp').show()

+----+------+------+
|name|salary|   tax|
+----+------+------+
|길동|  1000| 967.0|
|순신|  2000|1934.0|
|꺽정|  3000|2901.0|
|철수|  4000|3868.0|
|  황|  5000|4835.0|
|  이|  6000|5802.0|
+----+------+------+



### stddev_samp()
- 표준편차

In [184]:
sqlCtx.sql( 'select stddev_samp(salary) as standard_deviataion from emp').show()

+-------------------+
|standard_deviataion|
+-------------------+
| 1870.8286933869706|
+-------------------+



### percentile()

In [194]:
sqlCtx.sql( 'select percentile(salary, 0.5) as q0_5 , percentile(salary, 0.75) as q0_75 from emp').show()

+------+------+
|  q0_5| q0_75|
+------+------+
|3500.0|4750.0|
+------+------+

