# Spark DataFrame

* Last updated 20170221 20161125


## S.1 학습내용

### S.1.1 목표

* Spark DataFrame을 사용할 수 있다.
* Spark SQL을 사용하여 데이터를 추출할 수 있다.
* Spark 데이터를 MongoDB에 쓰고, 읽을 수 있다.

### S.1.2 목차

* S.4 DataFrame 생성
* S.4.1 schema에서 생성하기
* S.4.2 RDD에서 생성하기
* S.4.3 Pandas
* S.4.4 csv 파일 읽기
* S.4.5 tsv 파일 읽기
* S.4.6 JSON 파일 읽기
* S.5 DataFrame API 
* S.6 Spark SQL 
* S.7 MongoDB Spark connector
* S.7.1 설정
* S.7.2 uri 
* S.7.3 MongoDB Python API 
* S.7.4 연습으로 쓰기, 읽기
* S.8 spark-submit
* S.8.1 S.8.1 간단한 작업
* S.8.2 MongoDB

### S.1.3 문제

* 문제 S-1: 파일을 읽어서 feature vector 생성하기. 
* 문제 S-2: Twitter JSON 데이터 읽기
* 문제 S-3: JSON from URL 
* 문제 S-4: Spark SQL Uber csv 
* 문제 S-5: MongoDB 저장된 열린데이터 읽어오는 spark-submit

## S.2 IPython Notebook에서 SparkSession 생성하기


In [29]:
import os
import sys 
os.environ["SPARK_HOME"]=os.path.join(os.environ['HOME'],'Downloads','spark-2.0.0-bin-hadoop2.7')
os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.1-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

In [30]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

In [31]:
print spark.version

2.0.0


## S.3 DataFrame

* DataFrame은 행, 열로 구성된 데이터구조로서, 분산해서 사용할 수 있다.
* RDB 테이블 또는 엑셀 쉬트sheet와 비교할 수 있다. 모델schema가 있어서 '열'별로 명칭 및 데이터타잎을 가지고 있다.
* RDD가 schema를 정하지 않고 사용한다면, DataFrame은 schema가 필요하다.
* RDD도 그러하듯이, DataFrame은 머신러닝의 입력데이터로 사용된다.
* Spark 3.0 이후에는 DataFrame API를 공식적으로 지원한다고 하니, RDD보다 우선적으로 사용하는 것이 좋다.

* DataFrame 주요 API

기능 | 설명 | 예제
-------|-------|-------
json | json 읽기 | spark.read.json("employee.json")
show | 데이터 읽기 | df.show()
schema | 데이터 schema 보기 | df.printSchema()
select | 열을 선택 | df.select("name")
filter | 조건 선택 | df.filter(dfs("age") > 23).show()
groupBy | 그룹 | df.groupBy("age").count().show()
dropna | na를 삭제 | df.dropna() df.na.drop()
fillna | na를 값으로 채우기 | fillna()
count | 행 세기 | df.count()
drop | 삭제 | df.drop("name")


## S.4 DataFrame 만들기

* schema를 정의해서 생성하거나,
    * pyspark.sql.Row를 사용해서 한 줄씩 만들 수 있다.
* 입력파일로부터 읽어서 생성한다.
    * Hive, csv, JSON, RDB, XML, Parquet, Cassandra, RDD
    * Python의 dict를 직접 사용해서 만드는 기능은 더 이상 지원하지 않는다.

[{'name': 'kim', 'height': 170}]

### S.4.1 schema에서 생성하기

* 데이터 타잎
```
NullType
StringType
BinaryType
BooleanType
DateType
TimestampType
DoubleType
DecimalType
ShortType
ArrayType
MapType
```

* Row는 이름이 있는 record
* schema는 'year', 'name', 'height'로 정의
* 데이터타잎은 string, long으로 Spark에서 자동 인식된다.

In [11]:
from pyspark.sql import Row
Person = Row('year','name', 'height')
rows = [Person('1','kim, js',170),Person('1','lee, sm', 175),Person('2','lim, yg',180),Person('2','lee',170)]
myDf=spark.createDataFrame(rows)

In [12]:
print myDf.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

None


### S.4.2 RDD에서 생성하기

* schema를 정하지 않았으므로, 열은 '_1', '_2'와 같이 명명된다.
* JSON, RDD에서 생성하는 경우 schema를 유추하게 된다.


In [58]:
l = [('lee', 20)]
print spark.createDataFrame(l).collect()
print spark.createDataFrame(l, ['name', 'age']).collect()

[Row(_1=u'lee', _2=20)]
[Row(name=u'lee', age=20)]


In [59]:
from pyspark.sql import Row

rows = [('kim', 170), ('lee', 175), ('lim', 180),]
myRdd = spark.sparkContext.parallelize(rows)
rddDf=spark.createDataFrame(myRdd)
rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [60]:
rddDf.where(rddDf._2 < 175)\
    .select([rddDf._1, rddDf._2]).show()
rddDf.groupby(rddDf._2).max().show()

+---+---+
| _1| _2|
+---+---+
|kim|170|
+---+---+

+---+-------+
| _2|max(_2)|
+---+-------+
|170|    170|
|175|    175|
|180|    180|
+---+-------+



In [61]:
_myRdd=myRdd.map(lambda x:Row(name=x[0],height=int(x[1])))

_myDf=spark.createDataFrame(_myRdd)
_myDf.printSchema()

root
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)



In [62]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, TimestampType
r1=Row(name="js1",age=10)
r2=Row(name="js2",age=20)
_myRdd=spark.sparkContext.parallelize([r1,r2])
_myRdd.collect()

[Row(age=10, name='js1'), Row(age=20, name='js2')]

* 주의: 컬럼명이 정렬되면서 age, name 순서가 변경된다.

In [63]:
schema=StructType([
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True),
    #StructField("created", TimestampType(), True)
])

_myDf=spark.createDataFrame(_myRdd,schema)
_myDf.printSchema()
_myDf.show()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

+---+----+
|age|name|
+---+----+
| 10| js1|
| 20| js2|
+---+----+



* schema를 정해서 RDD로부터 DataFrame을 생성할 수 있다.


In [64]:
from pyspark.sql.types import *
myRdd=spark.sparkContext.parallelize([(1, 'kim', 50.0), (2, 'lee', 60.0), (3, 'park', 70.0)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("height", DoubleType(), True)
])
_myDf = spark.createDataFrame(myRdd, schema)
_myDf.printSchema()
_myDf.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- height: double (nullable = true)

+---+----+------+
| id|name|height|
+---+----+------+
|  1| kim|  50.0|
|  2| lee|  60.0|
|  3|park|  70.0|
+---+----+------+



## S.4.3 Pandas

* DataFrame은 다른 언어에서 많이 사용되는 형식이다.
* 스프레드쉬트 (엑셀), R의 Dataframe, Pandas를 예로 들 수 있다.
* Spark와 Pandas의 Dataframe을 비교하면: 
    * Pandas는 데이터 양이 적은 경우, Spark는 분산처리할 수 있으므로 빅데이터에 보다 적합하다.
    * API를 사용하게 되면 다소 차이가 있다.

DataFrame | Spark | Pandas
-------|-------|-------
csv file | map split(',') | read_csv()
| show() | head(), tail()
data types | 맞게 추정 | 모두 strings

* Spark DataFrame을 Pandas로 변환해서 사용할 수 있다.

In [13]:
myDf.toPandas()

Unnamed: 0,year,name,height
0,1,"kim, js",170
1,1,"lee, sm",175
2,2,"lim, yg",180
3,2,lee,170


In [26]:
import pandas as pd

_jfname=os.path.join('src','ds_twitter_seoul_3.json')
# read the entire file into a python array
with open(_jfname, 'rb') as f:
    data = f.readlines()

* 읽은 'data'의 형식
    * JSON이 아니다. 문자열이다.
    * 각 tweet은 '\n'으로 다음 줄에 쓰여 있다.

In [27]:
data = map(lambda x: x.rstrip(), data)

* Tweet을 묶은 list가 없는 경우, 그 구조를 만들어 준다.
    * 앞 뒤로 대괄호를 넣고, 각 tweet은 컴마로 연결한다.

In [28]:
data_json_str = "[" + ','.join(data) + "]"

In [29]:
data_df = pd.read_json(data_json_str)

In [30]:
print data_df.count()

contributors                    0
coordinates                    55
created_at                   2013
entities                     2013
extended_entities             506
favorite_count               2013
favorited                    2013
geo                            55
id                           2013
id_str                       2013
in_reply_to_screen_name       153
in_reply_to_status_id         131
in_reply_to_status_id_str     131
in_reply_to_user_id           153
in_reply_to_user_id_str       153
is_quote_status              2013
lang                         2013
metadata                     2013
place                          65
possibly_sensitive           1097
quoted_status                  19
quoted_status_id              133
quoted_status_id_str          133
retweet_count                2013
retweeted                    2013
retweeted_status             1304
source                       2013
text                         2013
truncated                    2013
user          

In [31]:
data_df['id'][:10]

0    801657325836763136
1    801657325677400064
2    801657307637678080
3    801657305628430336
4    801657297449586688
5    801657287697895424
6    801657280760397824
7    801657276788523008
8    801657268177604608
9    801657258400616449
Name: id, dtype: int64

### S.4.4 csv 파일에서 생성

* RDD를 만들고, DataFrame으로 변환
* DataFrame으로 직접 읽기
    * csv 패키지 추가하기

```
$ vim conf/spark-defaults.conf
spark.jars.packages=com.databricks:spark-csv_2.11:1.5.0
```

In [None]:
# %load /home/jsl/Downloads/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt
Michael, 29
Andy, 30
Justin, 19

In [52]:
from pyspark.sql import Row
cfile= os.path.join(os.environ["SPARK_HOME"],\
           "examples/src/main/resources/people.txt")
lines = spark.sparkContext.textFile(cfile)
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1].strip())))

_myDf = spark.createDataFrame(people)
_myDf.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



* csv 패키지를 사용하기

In [28]:
%%writefile data/ds_spark.csv
1,2,3,4
11,22,33,44
111,222,333,444

Overwriting data/ds_spark.csv


In [29]:
df = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/ds_spark.csv')
df.show()


+---+---+---+---+
|  1|  2|  3|  4|
+---+---+---+---+
| 11| 22| 33| 44|
|111|222|333|444|
+---+---+---+---+



DataFrame[label: int, 2: int, 3: int, 4: int]

### S.4.5 tsv 파일 읽기
* tsv는 Tab으로 분리된 파일
* '\t'이 포함되어 있는 경우, Spark는 string으로 데이터타잎을 설정한다.

In [6]:
import numpy as np
np.array([float(x) for x in '1.658985	4.285136'.split('\t')])

array([ 1.658985,  4.285136])

In [4]:
import numpy as np
np.array([float(x) for x in '1.658985 4.285136'.split(' ')])

array([ 1.658985,  4.285136])

* 데이터 출처: http://wiki.stat.ucla.edu/socr/index.php/SOCR_Data_Dinov_020108_HeightsWeights

In [None]:
# %load data/ds_spark_heightweight.txt
1	65.78	112.99
2	71.52	136.49
3	69.40	153.03
4	68.22	142.34
5	67.79	144.30
6	68.70	123.30
7	69.80	141.49
8	70.01	136.46
9	67.90	112.37
10	66.78	120.67
11	66.49	127.45
12	67.62	114.14
13	68.30	125.61
14	67.12	122.46
15	68.28	116.09
16	71.09	140.00
17	66.46	129.50
18	68.65	142.97
19	71.23	137.90
20	67.13	124.04
21	67.83	141.28
22	68.88	143.54
23	63.48	97.90
24	68.42	129.50
25	67.63	141.85
26	67.21	129.72
27	70.84	142.42
28	67.49	131.55
29	66.53	108.33
30	65.44	113.89
31	69.52	103.30
32	65.81	120.75
33	67.82	125.79
34	70.60	136.22
35	71.80	140.10
36	69.21	128.75
37	66.80	141.80
38	67.66	121.23
39	67.81	131.35
40	64.05	106.71
41	68.57	124.36
42	65.18	124.86
43	69.66	139.67
44	67.97	137.37
45	65.98	106.45
46	68.67	128.76
47	66.88	145.68
48	67.70	116.82
49	69.82	143.62
50	69.09	134.93

* tsv는 '\t'로 분리해야 한다.

In [4]:
from pyspark.sql.types import *
rdd=spark.sparkContext\
    .textFile(os.path.join('data','ds_spark_heightweight.txt'))

tRdd=rdd.map(lambda x:x.split('\t'))
tDf=spark.createDataFrame(tRdd)

In [5]:
tDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)



In [6]:
tDf.take(1)

[Row(_1=u'1', _2=u'65.78', _3=u'112.99')]

* 데이터가 string으로 설정된다.
* schema를 아래와 같이 설정한다고 하더라도, string의 형변환을 명시적으로 해주어야 한다.
```
mySchema = StructType([
    StructField("id", IntegerType(), True),
    StructField("weight", DoubleType(), True),
    StructField("height", DoubleType(), True)
])
myDf=spark.createDataFrame(myRdd, mySchema)
```

In [7]:
tDf=tDf.withColumn("id",tDf['_1'].cast("integer")).drop('_1')
tDf=tDf.withColumn("height",tDf['_2'].cast("double")).drop('_2')
tDf=tDf.withColumn("weight",tDf['_3'].cast("double")).drop('_3')

In [8]:
tDf.take(1)

[Row(id=1, height=65.78, weight=112.99)]

* Python list에서 형변환을 하고, rdd를 만드는 방법으로 할 수 있다.

In [9]:
#import numpy as np
#myRdd=rdd.map(lambda line:np.array([float(x) for x in line.split('\t')]))
tRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])
tRdd.take(1)

[[1.0, 65.78, 112.99]]

In [10]:
import numpy as np
tDf=spark.createDataFrame(tRdd,["id","weight","height"])
tDf.printSchema()
tDf.take(1)

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)



[Row(id=1.0, weight=65.78, height=112.99)]

### S.4.6 JSON 파일에서 생성

* json파일을 읽어서, sql을 사용한다.
* sqlContext.jsonRDD()
* sqlContext.jsonFile()

In [67]:
jfile= os.path.join(os.environ["SPARK_HOME"],\
           "examples/src/main/resources/people.json")

_myDf= spark.read.json(jfile)
_myDf.filter(_myDf['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



* JSON URL

In [33]:
import requests
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")
wc=r.json()

In [34]:
type(wc)

list

In [35]:
wc[0]

{u'Club': u'Club Atl\xc3\xa9tico Talleres de Remedios de Escalada',
 u'ClubCountry': u'Argentina',
 u'Competition': u'World Cup',
 u'DateOfBirth': u'1905-5-5',
 u'FullName': u'\xc3ngel Bossio',
 u'IsCaptain': False,
 u'Number': u'',
 u'Position': u'GK',
 u'Team': u'Argentina',
 u'Year': 1930}

In [36]:
wcDF=spark.createDataFrame(wc)
wcDF.printSchema()



root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)



```
from pyspark.sql.types import *
wcSchema=StructType([
    StructField("Club", StringType(), True),
    StructField("ClubCountry", StringType(), True),
    StructField("Competition", StringType(), True),
    StructField("DateOfBirth", DateType(), True),
    StructField("FullName", StringType(), True),
    StructField("IsCaptain", BooleanType(), True),
    StructField("Number", IntegerType(), True),
    StructField("Position", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Year", IntegerType(), True)
])
wcDF=spark.createDataFrame(wc,wcSchema)
wcDF.printSchema()
```

```Data type
from pyspark.sql.functions import udf
squared_udf = udf(squared, LongType())
df = sqlContext.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
```

## S.5 DataFrame API 사용해 보기

In [14]:
myDf.printSchema()
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



* Column 이름 변경하기

In [9]:
wcDF=wcDF.withColumnRenamed('ClubCountry','ClubNation')
wcDF.printSchema()

root
 |-- Club: string (nullable = true)
 |-- ClubNation: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)



* 속성을 선택할 경우

구분 | 예제 | 권고
-----|-----|-----
점 연산자 | myDf.name | N
인덱스 | myDf['name'] | Y

In [57]:
_name=myDf.select('name')
_name.rdd.collect()

[Row(name=u'kim, js'),
 Row(name=u'lee, sm'),
 Row(name=u'lim, yg'),
 Row(name=u'lee')]

* Row 객체의 값은 단순하게 split 할 수 없다. dict로 변환한 후 그 값을 split해야 한다.

```
_name.rdd.map(lambda line:[line.split(',')])
```

In [75]:
r=Row(name=u'kim, js')
rd=r.asDict()
print rd.values()[0].split(',')

[u'kim', u' js']


In [83]:
myDf.where(myDf['height'] < 175)\
    .select(myDf['name'], myDf['height']).show()
myDf.groupby(myDf['year']).max().show()

+-------+------+
|   name|height|
+-------+------+
|kim, js|   170|
|    lee|   170|
+-------+------+

+----+-----------+
|year|max(height)|
+----+-----------+
|   1|        175|
|   2|        180|
+----+-----------+



* 사용자정의 함수 udf
    * df의 행을 처리하는 함수
    * 함수명과 반환 값을 미리 정의한다.
    * lambda 함수 또는 만든 함수를 사용할 수 있다.
    * withColumn과 같이 사용
    * year의 문자열을 정수로 형변환

* column 추가

In [85]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

toDoublefunc = udf(lambda x: float(x),DoubleType())
myDf = myDf.withColumn("heightD",toDoublefunc(myDf.height))

In [86]:
print int('1')

1


In [87]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
toint=udf(lambda x:int(x),IntegerType())
myDf=myDf.withColumn("yearI",toint(myDf['year']))

myDf.printSchema()
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)
 |-- heightD: double (nullable = true)
 |-- yearI: integer (nullable = true)

+----+-------+------+-------+-----+
|year|   name|height|heightD|yearI|
+----+-------+------+-------+-----+
|   1|kim, js|   170|  170.0|    1|
|   1|lee, sm|   175|  175.0|    1|
|   2|lim, yg|   180|  180.0|    2|
|   2|    lee|   170|  170.0|    2|
+----+-------+------+-------+-----+



In [88]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
 
def uppercase(s):
    return s.upper()

upperUdf = udf(uppercase, StringType())
myDf = myDf.withColumn("nameUpper", upperUdf(myDf['name']))
myDf.show()

+----+-------+------+-------+-----+---------+
|year|   name|height|heightD|yearI|nameUpper|
+----+-------+------+-------+-----+---------+
|   1|kim, js|   170|  170.0|    1|  KIM, JS|
|   1|lee, sm|   175|  175.0|    1|  LEE, SM|
|   2|lim, yg|   180|  180.0|    2|  LIM, YG|
|   2|    lee|   170|  170.0|    2|      LEE|
+----+-------+------+-------+-----+---------+



In [92]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

height_udf = udf(lambda height: "taller" if height >=175 else "shorter", StringType())
heightDf=myDf.withColumn("height>175", height_udf(myDf.heightD))
heightDf.show()

+----+-------+------+-------+-----+---------+----------+
|year|   name|height|heightD|yearI|nameUpper|height>175|
+----+-------+------+-------+-----+---------+----------+
|   1|kim, js|   170|  170.0|    1|  KIM, JS|   shorter|
|   1|lee, sm|   175|  175.0|    1|  LEE, SM|    taller|
|   2|lim, yg|   180|  180.0|    2|  LIM, YG|    taller|
|   2|    lee|   170|  170.0|    2|      LEE|   shorter|
+----+-------+------+-------+-----+---------+----------+



* summary statistics
    * column이 연산가능한 데이터타잎인 경우, 요약 값을 볼 수 있다.

In [93]:
myDf.describe().show()

+-------+------------------+------------------+------------------+
|summary|            height|           heightD|             yearI|
+-------+------------------+------------------+------------------+
|  count|                 4|                 4|                 4|
|   mean|            173.75|            173.75|               1.5|
| stddev|4.7871355387816905|4.7871355387816905|0.5773502691896257|
|    min|               170|             170.0|                 1|
|    max|               180|             180.0|                 2|
+-------+------------------+------------------+------------------+



## S.6 Spark SQL

* Spark SQL은 
    * 데이터를 구조화해서 Sql을 사용할 수 있다. RDD는 비구조적인 경우에 사용한다.

구분 | Spark SQL | RDD
-----|-----|-----
데이터 | 구조적 | 비구조적

* Spark SQL 구성

구분 | 설명
-----|-----
Language API | Python, Java, Scala, Hive QL API를 제공
Schema RDD | RDD에 Schema를 적용해 임시 테이블로 변환한다.<br>createOrReplaceTempView<br>createGlobalTempView
Data Sources | 다양한 형식 지원 - HDFS, Cassandra, HBase, RDB


* World Cup 데이터를 사용한다.

In [42]:
wcDF.printSchema()

root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)



In [44]:
wcDF.createOrReplaceTempView("wc")
spark.sql("select Club,Team,Year from wc").show(1)

+--------------------+---------+----+
|                Club|     Team|Year|
+--------------------+---------+----+
|Club AtlÃ©tico Ta...|Argentina|1930|
+--------------------+---------+----+
only showing top 1 row



In [43]:
spark.catalog.listTables()

[Table(name=u'wc', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True)]

In [45]:
wcDF.createOrReplaceTempView("wc")
wcPlayers=spark.sql("select FullName,Club,Team,Year from wc")

* RDD로 변환해서 이름 출력하기

In [57]:
namesRdd=wcPlayers.rdd.map(lambda x: "Full name: "+x[0])
for e in namesRdd.take(5):
    print e

Full name: Ãngel Bossio
Full name: Juan Botasso
Full name: Roberto Cherro
Full name: Alberto Chividini
Full name: 


## 문제 S-1: 파일을 읽어서 feature vector 생성하기.

* 네트워크 침입데이터를 처리
    * https://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

* 인터넷으로부터 파일을 읽는 처리하는 문제. 그 파일이 압축되어 있다.
* 파일 확장자 'gz'은 'gzip'이라는 압축 도구에서 생성된 파일이다. 지금은 WinZip에서 읽을 수 있다.
* RDD는 압축파일을 읽는 것을 지원한다.
    * 반면, DataFrame은 구조schema를 정의해야 하기 때문에 쉽지 않다. 여기서는 오류가 발생한다.
    * RDD로 처리하고, 그로부터 DataFrame을 생성하고, Sql을 사용한다.

* attack 종류 구분 (41번째 열)

침입구분 | 건수
-------|-------
normal | 97278
attack | 396743
전체 | 494021

In [5]:
import os
import urllib
_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz')
if(not os.path.exists(_fname)):
    print "%s data does not exist! retrieving.." % _fname
    _f=urllib.urlretrieve(_url,_fname)


In [6]:
_rdd = spark.sparkContext.textFile(_fname)

In [7]:
_rdd.count()

494021

In [17]:
_rdd.take(3)

[u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.']

* 데이터에 'normal.'이 포함된 건수

In [18]:
_normal = _rdd.filter(lambda x: 'normal.' in x)
print _normal.count()

97278


In [19]:
_csvRdd=_rdd.map(lambda x: x.split(','))

In [20]:
print _csvRdd.take(1)

[[u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']]


* 데이터 분류
    * reduceByKey()를 사용해 각 경우의 건 수를 센다.

In [21]:
_kv = _csvRdd.map(lambda x: (x[41], 1))
_attack = _kv.reduceByKey(lambda x,y: x+y)

In [22]:
_attack.collect()

[(u'guess_passwd.', 53),
 (u'nmap.', 231),
 (u'warezmaster.', 20),
 (u'rootkit.', 10),
 (u'warezclient.', 1020),
 (u'smurf.', 280790),
 (u'pod.', 264),
 (u'neptune.', 107201),
 (u'normal.', 97278),
 (u'spy.', 2),
 (u'ftp_write.', 8),
 (u'phf.', 4),
 (u'portsweep.', 1040),
 (u'teardrop.', 979),
 (u'buffer_overflow.', 30),
 (u'land.', 21),
 (u'imap.', 12),
 (u'loadmodule.', 9),
 (u'perl.', 3),
 (u'multihop.', 7),
 (u'back.', 2203),
 (u'ipsweep.', 1247),
 (u'satan.', 1589)]

In [23]:
_normalRdd=_csvRdd.filter(lambda x: x[41]=="normal.")
_attackRdd=_csvRdd.filter(lambda x: x[41]!="normal.")

In [24]:
print _normalRdd.count()
print _attackRdd.count()

97278
396743


* 앞 RDD 참조 combineByKey(x, y, z)
    * Combiner function: x
        * key-value에서 value로 combine하려면 (value,1)
    * Merge value function: y
    * Merge combiners function: z

In [34]:
from operator import add
aggregated_counts = (data
    .map(lambda kv: (kv, 1))
    .reduceByKey(add)
    .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
    .groupByKey()
    .mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))

aggregated_counts.collect()

[(0, ([(2.0, 1), (4.0, 1)], 2)), (1, ([(10.0, 1), (0.0, 1), (20.0, 1)], 3))]

In [25]:
sum_counts = _kv.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{u'back.': (2203, 2203),
 u'buffer_overflow.': (30, 30),
 u'ftp_write.': (8, 8),
 u'guess_passwd.': (53, 53),
 u'imap.': (12, 12),
 u'ipsweep.': (1247, 1247),
 u'land.': (21, 21),
 u'loadmodule.': (9, 9),
 u'multihop.': (7, 7),
 u'neptune.': (107201, 107201),
 u'nmap.': (231, 231),
 u'normal.': (97278, 97278),
 u'perl.': (3, 3),
 u'phf.': (4, 4),
 u'pod.': (264, 264),
 u'portsweep.': (1040, 1040),
 u'rootkit.': (10, 10),
 u'satan.': (1589, 1589),
 u'smurf.': (280790, 280790),
 u'spy.': (2, 2),
 u'teardrop.': (979, 979),
 u'warezclient.': (1020, 1020),
 u'warezmaster.': (20, 20)}

* rdd to sql, dataframe

In [26]:
from pyspark.sql import Row

_csv = _rdd.map(lambda l: l.split(","))
_csvRdd = _csv.map(lambda p: 
    Row(
        duration=int(p[0]), 
        protocol=p[1],
        service=p[2],
        flag=p[3],
        src_bytes=int(p[4]),
        dst_bytes=int(p[5])
    )
)

In [27]:
type(_csvRdd)

pyspark.rdd.PipelinedRDD

In [30]:
_df=spark.createDataFrame(_csvRdd)
_df.registerTempTable("_tab")

In [31]:
_df.select("protocol", "duration", "dst_bytes").groupBy("protocol").count().show()

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [32]:
_df.select("protocol", "duration", "dst_bytes")\
    .filter(_df.duration>1000)\
    .filter(_df.dst_bytes==0)\
    .groupBy("protocol")\
    .count()\
    .show()

+--------+-----+
|protocol|count|
+--------+-----+
|     tcp|  139|
+--------+-----+



In [34]:
tcp_interactions = spark.sql(
"""
    SELECT duration, dst_bytes FROM _tab
    WHERE protocol = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")

In [79]:
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



In [37]:
tcp_interactions_out = tcp_interactions.rdd\
    .map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))

In [38]:
for i,ti_out in enumerate(tcp_interactions_out.collect()):
    if(i%10==0):
        print ti_out

Duration: 5057, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 42448, Dest. bytes: 0
Duration: 40121, Dest. bytes: 0
Duration: 31709, Dest. bytes: 0
Duration: 30619, Dest. bytes: 0
Duration: 22616, Dest. bytes: 0
Duration: 21455, Dest. bytes: 0
Duration: 13998, Dest. bytes: 0
Duration: 12933, Dest. bytes: 0


## 문제 S-2: Twitter JSON 데이터 읽기

* [nok] 현재 디렉토리 _tweet.json
    * src/ds_twitter_3.py로 변경 (ds_twitter_3.json으로 저장)



* Twitter JSON을 읽을 경우

구분 | 예
-------|-------
unicode를 사용하면 backslash | "{\"created_at\":\"Sun Nov 13 00:05:19 +0000 2016\"
보통 | {"created_at":"Sun Nov 13 00:05:19 +0000 2016"


    * allowBackslashEscapingAnyCharacter

In [28]:
t2df= spark.read.json(os.path.join("src","ds_twitter_seoul_3.json"))
print type(t2df)
res=t2df.select('id','lang','text').take(1)
for e in res:
    print e['id'],e['lang'],e['text']

<class 'pyspark.sql.dataframe.DataFrame'>
801657325836763136 en RT @soompi: #SEVENTEEN’s Mingyu, Jin Se Yeon, And Leeteuk To MC For 2016 Super Seoul Dream Concert 
https://t.co/1XRSaRBbE0 https://t.co/fi…


In [34]:
twitterDF= spark.read.json(os.path.join("src","ds_twitter_1_noquote.json"))

In [35]:
twitterDF.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- symbols: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- urls: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- user_mentions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- favorite_count: long (nullable = true)
 |-- favorited: boolean (nullable = true)
 |-- geo: string (nullable = true)
 |-- id: long (nullable = true)
 |-- id_str: string (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)
 |-- in_reply_to_status_id_str: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- in_reply_to_user_id_str: s

In [36]:
twitterDF.select('text').show()

+---------------+
|           text|
+---------------+
|Hello 21 160924|
+---------------+



In [39]:
twitterDF.registerTempTable("twitter")
spark.sql("select text from twitter").show()

+---------------+
|           text|
+---------------+
|Hello 21 160924|
+---------------+



## 문제 S-3: JSON from URL

* url에서 데이터 읽으면 string (예: r.iter_lines()하면 문자 1개씩 가져옴)
* response를 json으로 읽으면 ok

* baby names

In [25]:
import json
import requests
_url="https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json?accessType=DOWNLOAD"
_json=requests.get(_url).json()

* json데이터는 meta, data로 구분해서 만들어져 있슴
* data는 52252건

In [48]:
_json.keys()

[u'meta', u'data']

In [49]:
_jsonList=_json['data']
print len(_jsonList)

145570


In [50]:
_json['data'][0]

[1,
 u'5DC7F285-052B-4739-8DC3-62827014A4CD',
 1,
 1425450997,
 u'714909',
 1425450997,
 u'714909',
 u'{\n}',
 u'2013',
 u'GAVIN',
 u'ST LAWRENCE',
 u'M',
 u'9']

* list to spark dataFrame
    * schema를 정하지 않으면 없이 생성함

In [51]:
_df=spark.createDataFrame(_json['data'])
_df.count()

145570

* schema를 정하지 않았으므로 임의로 생성된 속성을 사용하고 있다.

In [52]:
_df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: long (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)
 |-- _11: string (nullable = true)
 |-- _12: string (nullable = true)
 |-- _13: string (nullable = true)



In [53]:
_df.filter(_df['_10'] == u'GAVIN').show(2)

+---+--------------------+---+----------+------+----------+------+---+----+-----+-----------+---+---+
| _1|                  _2| _3|        _4|    _5|        _6|    _7| _8|  _9|  _10|        _11|_12|_13|
+---+--------------------+---+----------+------+----------+------+---+----+-----+-----------+---+---+
|  1|5DC7F285-052B-473...|  1|1425450997|714909|1425450997|714909|{
}|2013|GAVIN|ST LAWRENCE|  M|  9|
| 82|43E9414D-9BE0-456...| 82|1425450997|714909|1425450997|714909|{
}|2013|GAVIN|    SUFFOLK|  M| 54|
+---+--------------------+---+----------+------+----------+------+---+----+-----+-----------+---+---+
only showing top 2 rows



* Sql을 사용할 수 있다.

In [54]:
_df.registerTempTable("babyNames")
spark.sql("select distinct(_10) from babyNames").show(5)

+------+
|   _10|
+------+
|MILANA|
|  JADE|
|  ANNA|
|HUNTER|
|ANJALI|
+------+
only showing top 5 rows



## 문제 S-4: Spark SQL Uber csv

https://github.com/tmcgrath/spark-with-python-course/blob/master/Spark-SQL-CSV-with-Python.ipynb

* fivethirtyeight
    * git clone https://github.com/fivethirtyeight/uber-tlc-foil-response.git
        daily Uber trip statistics in January and February 2015
        ```
        dispatching_base_number	date	active_vehicles	trips
        B02512	1/1/2015	190	1132
        B02765	1/1/2015	225	1765
        ```

In [49]:
data_home=os.path.join(os.environ['HOME'],"Code/git/else/uber-tlc-foil-response")
filePath=os.path.join(data_home,"Uber-Jan-Feb-FOIL.csv")

_fub = spark.sparkContext.textFile(filePath)

In [50]:
type(_fub)
_fub.count()
_fub.first()

u'dispatching_base_number,date,active_vehicles,trips'

* csv는 comma seperated 형식이므로, ','로 분리
* 첫번째 열에서 key값을 추출한다 (header값 포함)

In [51]:
_dub = _fub.map(lambda line: line.split(","))

type(_dub)

_row0keys=_dub.map(lambda row: row[0]).distinct().collect()

print _row0keys

_dub.filter(lambda row: "B02512" in row).count()

[u'B02682', u'B02512', u'dispatching_base_number', u'B02617', u'B02765', u'B02764', u'B02598']


59

* B02512인 경우, trips가 2000보다 큰 레코드 수집

In [52]:
_dub.filter(lambda row: "B02512" in row).filter(lambda row: int(row[3])>2000).collect()

[[u'B02512', u'1/30/2015', u'256', u'2016'],
 [u'B02512', u'2/5/2015', u'264', u'2022'],
 [u'B02512', u'2/12/2015', u'269', u'2092'],
 [u'B02512', u'2/13/2015', u'281', u'2408'],
 [u'B02512', u'2/14/2015', u'236', u'2055'],
 [u'B02512', u'2/19/2015', u'250', u'2120'],
 [u'B02512', u'2/20/2015', u'272', u'2380'],
 [u'B02512', u'2/21/2015', u'238', u'2149'],
 [u'B02512', u'2/27/2015', u'272', u'2056']]

* header는 속성 명을 가지고 있다. 이를 제외하면 전체 갯수에서 1개를 뺀 숫자

In [53]:
_noheader = _fub.filter(lambda line: "base" not in line).map(lambda line:line.split(","))
_noheader.count()

354

* reduceByKey - key별로 value를 합쳐서 결과 -> 아래는 a,3 b,2
```
("a", 1)
("b", 1)
("a", 1)
("a", 1)
("b", 1)
```

In [54]:
_noheader.map(lambda x: (x[0], int(x[3]))).reduceByKey(lambda k,v: k + v).collect()

[(u'B02682', 662509),
 (u'B02512', 93786),
 (u'B02617', 725025),
 (u'B02765', 193670),
 (u'B02764', 1914449),
 (u'B02598', 540791)]

In [None]:
def countPartitions(id,iterator): 
         c = 0 
         for _ in iterator: 
              c += 1 
         yield (id,c) 
_wc=wc.mapPartitions(countPartitions)

* saving
    ```
    rddOfStrings.saveAsTextFile("out.txt")
    ```

## S.7 MongoDB Spark connector

* Spark에서 MongoDB에 저장된 데이터를 읽어 온다.
* 참조: pymongo-spark (Spark와 PyMongo를 사용하는 Python 라이브러리, 설치하려면 pip install pymongo-spark)


### S.7.1 설정

* 참조 https://docs.mongodb.com/spark-connector/
* 설정파일 conf/spark-defaults.conf 수정
    * Spark 버전에 맞는 jar를 선택한다.
    * MongoDB<3.2인 경우, spark.mongodb.input.partitioner가 필요하다.
    * packages 여러 개를 넣을 경우에는 컴마로 분리한다.

```
$vim conf/spark-defaults.conf 
spark.jars.packages=org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
spark.mongodb.input.partitioner=MongoPaginateBySizePartitioner
```

In [32]:
print spark.conf.get('spark.jars.packages')

graphframes:graphframes:0.4.0-spark2.0-s_2.11,org.mongodb.spark:mongo-spark-connector_2.10:2.0.0,com.databricks:spark-csv_2.11:1.5.0


### S.7.2 uri

* SparkSession에 uri를 설정할 수 있다. 연결에 필요한 ip, database, collection을 정의한다.

```
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/myDB.ds_spark_df_mongo") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/myDB.ds_spark_df_mongo") \
    .getOrCreate()
```

* 또는 실행시점에 설정할 수 있다 (아래 참조)


### S.7.3 MongoDB Python API

* format은 "com.mongodb.spark.sql.DefaultSource"로 설정한다.
* 'option'을 사용해서 실행시점에 Database, Colleciton 명을 설정할 수 있다.

구분 | 명령어 예
-----|-----
쓰기 | DataFrame.write.format("com.mongodb.spark.sql.DefaultSource")\<br>.mode("overwrite")\<br>.option("uri","mongodb://127.0.0.1/myDB.ds_spark_ml")\<br>.save()
읽기 | spark.read.format("com.mongodb.spark.sql.DefaultSource")\<br>.option("uri","mongodb://127.0.0.1/ds_twitter.seoul")\<br>.load()


### S.7.4 연습으로 쓰기, 읽기

In [33]:
people = spark.createDataFrame([("kim",10),("lee",20),("choi",30),("park",40)],["name", "age"])

In [34]:
people.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("uri","mongodb://127.0.0.1/myDB.ds_spark_ml")\
    .save()

In [35]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://127.0.0.1/myDB.ds_spark_ml")\
    .load()

In [36]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [38]:
df.select('name').show(3)

+----+
|name|
+----+
| kim|
| lee|
|choi|
+----+
only showing top 3 rows



In [39]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://127.0.0.1/ds_twitter.seoul")\
    .load()
df.select('text').show(5)

+--------------------+
|                text|
+--------------------+
|RT @always_gd: #B...|
|RT @InfiniteUpdat...|
|RT @InfiniteUpdat...|
|RT @PartOfJimin: ...|
|RT @MHDEFB: มาแล้...|
+--------------------+
only showing top 5 rows



## S.8 spark-submit

* spark-submit는 일괄실행 (self-contained app in quick-start 참조)

* MongoDB를 사용하려면, spark-defaults.conf에 jar를 추가한다 (앞서 미리 설정하였다.)

* spark-submit을 실행하기 전, 'conf/log4j.properties'를 수정 log level을 ERROR로 설정하였다.
```
log4j.rootCategory=ERROR, console
```

### S.8.1 간단한 작업

* DataFrame 만들고, 출력하기


In [42]:
%%writefile src/ds_spark_sql.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import pyspark

def doIt():
    d = [{'name': 'Alice', 'age': 1}]
    print spark.createDataFrame(d).collect()

if __name__ == "__main__":
    myConf=pyspark.SparkConf()
    spark = pyspark.sql.SparkSession.builder\
        .master("local")\
        .appName("myApp")\
        .config(conf=myConf)\
        .getOrCreate()
    doIt()
    spark.stop()


Overwriting src/ds_spark_sql.py


In [44]:
!/home/jsl/Downloads/spark-2.0.0-bin-hadoop2.7/bin/spark-submit src/ds_spark_sql.py

Ivy Default Cache set to: /home/jsl/.ivy2/cache
The jars for the packages stored in: /home/jsl/.ivy2/jars
:: loading settings :: url = jar:file:/home/jsl/Downloads/spark-2.0.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
org.mongodb.spark#mongo-spark-connector_2.10 added as a dependency
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found graphframes#graphframes;0.4.0-spark2.0-s_2.11 in spark-packages
	found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in central
	found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 in central
	found org.scala-lang#scala-reflect;2.11.0 in central
	found org.slf4j#slf4j-api;1.7.7 in central
	found org.mongodb.spark#mongo-spark-connector_2.10;2.0.0 in central
	found org.mongodb#mongo-java-driver;3.2.2 in central
	found com.databricks#spark-csv_2.11;1.5.0 in cent

### S.8.2 MongoDB

* Database, Collection 읽기, 쓰기

In [71]:
%%writefile src/ds_spark_mongo.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import pyspark
def doIt():
    print "---------RESULT-----------"
    print "------mongodb write-------"
    myRdd = spark.sparkContext.parallelize([
        ("js", 150),
        ("Gandalf", 1000),
        ("Thorin", 195),
        ("Balin", 178),
        ("Kili", 77),
        ("Dwalin", 169),
        ("Oin", 167),
        ("Gloin", 158),
        ("Fili", 82),
        ("Bombur", None)
    ])
    myDf = spark.createDataFrame(myRdd, ["name", "age"])
    print myDf
    myDf.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
    print "---------read-----------"
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    print df.printSchema()
    df.registerTempTable("myTable")
    myTab = spark.sql("SELECT name, age FROM myTable WHERE age >= 100")
    myTab.show()

if __name__ == "__main__":
    myConf=pyspark.SparkConf()
    spark = pyspark.sql.SparkSession.builder\
        .master("local")\
        .appName("myApp")\
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/myDB.ds_spark_df_mongo") \
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/myDB.ds_spark_df_mongo") \
        .getOrCreate()
    doIt()
    spark.stop()


Overwriting src/ds_spark_mongo.py


In [45]:
!/home/jsl/Downloads/spark-2.0.0-bin-hadoop2.7/bin/spark-submit src/ds_spark_mongo.py

Ivy Default Cache set to: /home/jsl/.ivy2/cache
The jars for the packages stored in: /home/jsl/.ivy2/jars
:: loading settings :: url = jar:file:/home/jsl/Downloads/spark-2.0.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
org.mongodb.spark#mongo-spark-connector_2.10 added as a dependency
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found graphframes#graphframes;0.4.0-spark2.0-s_2.11 in spark-packages
	found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in central
	found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 in central
	found org.scala-lang#scala-reflect;2.11.0 in central
	found org.slf4j#slf4j-api;1.7.7 in central
	found org.mongodb.spark#mongo-spark-connector_2.10;2.0.0 in central
	found org.mongodb#mongo-java-driver;3.2.2 in central
	found com.databricks#spark-csv_2.11;1.5.0 in cent

## 문제 S-5: MongoDB 저장된 열린데이터 읽어오는 spark-submit

* MongoDB에 저장된 데이터 읽기

구분 | 명
-----|-----
Database | ds_open_subwayPassengersDb
Collection | db_open_subwayTable
key | JSON 계층구조를 따라 읽는다. CardSubwayStatisticsService.row.RIDE_PASGR_NUM

* MongoDB shell

```
$ mongo
> use ds_open_subwayPassengersDb
switched to db ds_rest_subwayPassengers_mongo_db
> show tables
db_open_subwayTable
system.indexes
> db.db_open_subwayTable.find().limit(1)
{ "_id" : ObjectId("57fa386ff5e6e94359c033e9"), "CardSubwayStatisticsService" : { "row" : [ { "COMMT" : "", "RIDE_PASGR_NUM" : 111275, "WORK_DT" : "20130723", "LINE_NUM" : "중앙선", "SUB_STA_NM" : "용문", "ALIGHT_PASGR_NUM" : 108878, "USE_MON" : "201306" }, { "COMMT" : "", "RIDE_PASGR_NUM" : 11495, "WORK_DT" : "20130723", "LINE_NUM" : "중앙선", "SUB_STA_NM" : "원덕", "ALIGHT_PASGR_NUM" : 10964, "USE_MON" : "201306" }, { "COMMT" : "", "RIDE_PASGR_NUM" : 118103, "WORK_DT" : "20130723", "LINE_NUM" : "중앙선", "SUB_STA_NM" : "양평", "ALIGHT_PASGR_NUM" : 116604, "USE_MON" : "201306" }, { "COMMT" : "", "RIDE_PASGR_NUM" : 10590, "WORK_DT" : "20130723", "LINE_NUM" : "중앙선", "SUB_STA_NM" : "오빈", "ALIGHT_PASGR_NUM" : 10020, "USE_MON" : "201306" }, { "COMMT" : "", "RIDE_PASGR_NUM" : 26304, "WORK_DT" : "20130723", "LINE_NUM" : "중앙선", "SUB_STA_NM" : "아신", "ALIGHT_PASGR_NUM" : 26358, "USE_MON" : "201306" } ], "RESULT" : { "MESSAGE" : "정상 처리되었습니다", "CODE" : "INFO-000" }, "list_total_count" : 530 } }
```

* MongoDB에서 읽기

In [53]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://127.0.0.1/ds_open_subwayPassengersDb.db_open_subwayTable")\
    .load()
print df.printSchema()

root
 |-- CardSubwayStatisticsService: struct (nullable = true)
 |    |-- row: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- COMMT: string (nullable = true)
 |    |    |    |-- RIDE_PASGR_NUM: double (nullable = true)
 |    |    |    |-- WORK_DT: string (nullable = true)
 |    |    |    |-- LINE_NUM: string (nullable = true)
 |    |    |    |-- SUB_STA_NM: string (nullable = true)
 |    |    |    |-- ALIGHT_PASGR_NUM: double (nullable = true)
 |    |    |    |-- USE_MON: string (nullable = true)
 |    |-- RESULT: struct (nullable = true)
 |    |    |-- MESSAGE: string (nullable = true)
 |    |    |-- CODE: string (nullable = true)
 |    |-- list_total_count: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)

None


In [54]:
df.registerTempTable("mySubway")
myTab = spark.sql("SELECT CardSubwayStatisticsService.row.RIDE_PASGR_NUM FROM mySubway")
#print type(myTab)
print myTab.show()
print myTab.first()
print myTab.head()

+--------------------+
|      RIDE_PASGR_NUM|
+--------------------+
|[111275.0, 11495....|
|[30281.0, 10832.0...|
|[75553.0, 189783....|
|[62789.0, 220544....|
|[74486.0, 152381....|
|[43418.0, 413533....|
|[301856.0, 37978....|
|[146909.0, 227066...|
|[152275.0, 285263...|
|[161298.0, 117720...|
|[95996.0, 39150.0...|
|[59900.0, 201035....|
|[228737.0, 108938...|
|[164574.0, 481998...|
|[748205.0, 817657...|
|[206631.0, 188076...|
|[112991.0, 111791...|
|[225105.0, 938296...|
|[175909.0, 271844...|
|[45047.0, 126837....|
+--------------------+

None
Row(RIDE_PASGR_NUM=[111275.0, 11495.0, 118103.0, 10590.0, 26304.0])
Row(RIDE_PASGR_NUM=[111275.0, 11495.0, 118103.0, 10590.0, 26304.0])


In [50]:
%%writefile src/ds_spark_subway.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import pyspark
def doIt():
    print "---------read-----------"
    df=spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
    #df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
    print df.printSchema()
    df.registerTempTable("mySubway")
    myTab = spark.sql("SELECT CardSubwayStatisticsService.row.RIDE_PASGR_NUM FROM mySubway")
    #print type(myTab)
    print myTab.show()
if __name__ == "__main__":
    myConf=pyspark.SparkConf()
    spark = pyspark.sql.SparkSession.builder\
        .master("local")\
        .appName("myApp")\
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ds_open_subwayPassengersDb.db_open_subwayTable") \
        .getOrCreate()
    doIt()
    spark.stop()


Overwriting src/ds_spark_subway.py


In [51]:
!/home/jsl/Downloads/spark-1.6.0-bin-hadoop2.6/bin/spark-submit src/ds_spark_subway.py

Ivy Default Cache set to: /home/jsl/.ivy2/cache
The jars for the packages stored in: /home/jsl/.ivy2/jars
:: loading settings :: url = jar:file:/home/jsl/Downloads/spark-2.0.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
org.mongodb.spark#mongo-spark-connector_2.10 added as a dependency
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
	found graphframes#graphframes;0.4.0-spark2.0-s_2.11 in spark-packages
	found com.typesafe.scala-logging#scala-logging-api_2.11;2.1.2 in central
	found com.typesafe.scala-logging#scala-logging-slf4j_2.11;2.1.2 in central
	found org.scala-lang#scala-reflect;2.11.0 in central
	found org.slf4j#slf4j-api;1.7.7 in central
	found org.mongodb.spark#mongo-spark-connector_2.10;2.0.0 in central
	found org.mongodb#mongo-java-driver;3.2.2 in central
	found com.databricks#spark-csv_2.11;1.5.0 in cent

## 연습: 더 해보기

* Spark MySql
    * jar를 추가
* read
```
dfmysql = sqlContext.read.format('jdbc')\
        .options(
          url='jdbc:mysql://localhost/database_name',
          driver='com.mysql.jdbc.Driver',
          dbtable='SourceTableName',
          user='your_user_name',
          password='your_password')\
          .load()
```
* write
```
destination_df.write.format('jdbc')\
        .options(
          url='jdbc:mysql://localhost/database_name',
          driver='com.mysql.jdbc.Driver',
          dbtable='DestinationTableName',
          user='your_user_name',
          password='your_password')\
        .mode('append')\
        .save()
```

```
bin/spark-submit --jars mysql-connector-java-5.1.40-bin.jar
      /path_to_your_program/spark_database.py
```