# Spark 입문

### 사전 작업
java가 깔려있어야 한다.
java 있는지 확인(C>Program Files>Java 폴더 확인) 후 anaconda 관리자 권한 실행
conda install -c conda-forge pyspark (오래 걸릴 수 있다. done까지 완료)

## 요약
RDD의 개념을 익히고 Pair RDD, 외부 데이터를 로드해서 RDD 생성, Spark DataFrame을 생성해봤다.

## RDD의 개념
RDD : Resilient Disributed Data

- RDD는 두 가지의 연산으로 이루어져 있다.
- Transformation, Action에 대한 이해가 필요

- Transformation -> Lazy Execution 또는 Lazy Loading
- 트랜스포메이션이 행해지면, RDD가 수행되는 것이 아니라, 새로운 RDD를 만들어 내고 그 새로운 RDD에 수행결과를 저장한다. (commit, push 같은 개념)


## RDD의 Operation
### Transformation
기존의 RDD 데이타를 변경하여 새로운 RDD 데이타를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이타만 뽑아 내거나 map 함수 처럼, 데이타를 분산 배치 하는 것 등을 들 수 있다.

### Action
Method로 이루어진 실행 작업이며, Transformation이 행해지고 나서 이루어지는 Evaluation 작업
RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [2]:
conf  = SparkConf().setMaster('local').setAppName('sparkApp')
spark = SparkContext(conf=conf)
spark

In [3]:
rdd = spark.textFile('./data/test.txt')
rdd
# Lazy Loading
# 없는 파일을 불러오는데도  Error가 나지 않는다. RDD가 만들어졌을 뿐 실행되진 않은 것이기 때문.
# Transformation을 한 것. RDD가 수행된 것이 아니라 저장된 것.

./data/test.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
lines = rdd.filter(lambda x : 'spark' in x)
lines
# filter를 사용한 것이 Action이다. 결과는 Error

PythonRDD[2] at RDD at PythonRDD.scala:53

### RDD 생성
- 데이터를 직접 만드는 방법( parallelize() ), 외부 데이터를 로드 방법으로

In [5]:
sample_rdd = spark.parallelize(['test', 'this is a test rdd'])
sample_rdd
# RDD 라는 변수의 타입을 만드는 과정

ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:195

In [6]:
sample_rdd.collect()
# 연산을 수행하는 과정

['test', 'this is a test rdd']

## RDD 에서 자주 쓰는 연산 함수
- collect() : RDD에 Transfomation 된 결과를 return하는 함수
- map() : 연산을 수행할 때 사용하는 함수

In [7]:
numbers = spark.parallelize(list(range(5)))
numbers
# RDD 객체 생성

ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:195

In [8]:
s = numbers.map(lambda x : x * x).collect()
s
# collect를 안쓰면 결과가 나오지 않는 것을 확인할 수 있다.

[0, 1, 4, 9, 16]

- flatmap() : 리스트들의 원소를 하나의 리스트로 flatten해서 return하는 함수

In [9]:
strings = spark.parallelize(['hi spark', 'hi python', 'hi django', 'hi sklearn'])
unique_string = strings.flatMap(lambda x : x.split(' ')).collect()
unique_string

['hi', 'spark', 'hi', 'python', 'hi', 'django', 'hi', 'sklearn']

In [10]:
num = spark.parallelize(list(range(1, 30, 3)))
num
# RDD 생성

ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:195

In [11]:
result = num.filter(lambda x : x % 2 == 0). collect()
result

[4, 10, 16, 22, 28]

## Pair RDD
key-value 쌍으로 이루어진 RDD
- python tuple을 의미한다

In [18]:
# pair rdd 생성
pairRDD = spark.parallelize([(1,3),(1,6),(2,4),(3,3)])
# pairRDD = spark.parallelize([(1,3),(2,4),(3,3),(1,6)]) # sortBykey() test용
pairRDD

ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:195

- reduceByKey()
- mapValues()

In [13]:
{
    i:j
    for i, j in pairRDD.reduceByKey(lambda x, y : x+y).collect()
}

{1: 9, 2: 4, 3: 3}

In [14]:
{
    i:j
    for i, j in pairRDD.mapValues(lambda x : x ** 2).collect()
}
# 1:36을 보면 6의 제곱이다. key 값이 중복되면 뒤에 나온 value로 적용된다.

{1: 36, 2: 16, 3: 9}

In [15]:
pairRDD.groupByKey().collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x1ef6ba11c18>),
 (2, <pyspark.resultiterable.ResultIterable at 0x1ef6ba11cf8>),
 (3, <pyspark.resultiterable.ResultIterable at 0x1ef6ba11da0>)]

In [16]:
pairRDD.values().collect()

[3, 6, 4, 3]

In [19]:
pairRDD.sortByKey().collect()
# key 순서대로? 정렬된다.
# (1,6)을 뒤로 보낸 후 test하더라도 해당 결과가 나온다.

[(1, 3), (1, 6), (2, 4), (3, 3)]

## 외부 데이터를 로드해서 RDD 생성하는 방법

In [20]:
customerRDD = spark.textFile('./data/spark-rdd-name-customers.csv')
customerRDD

./data/spark-rdd-name-customers.csv MapPartitionsRDD[27] at textFile at NativeMethodAccessorImpl.java:0

In [21]:
customerRDD.first()
# pairRDD 객체임을 확인 가능 (key, value)
# first()만 해줘도 결과가 나오므로 이 자체가 Action이라고 보면 됨
# 그 근거로 .collect()를 해줄 경우 Error가 나옴
# key : Alfreds Futterkiste, value : Germany

'Alfreds Futterkiste,Germany'

In [24]:
# map 연산자를 이용해서 ,(comma)로 split하고 tuple로 return하는 구문을 작성하기
# cusPairs = customerRDD.map(lambda x : (x.split(',')[0], x.split(',')[1]))
# 위의 코드와 다르게 key를 앞으로 뺐다.
cusPairs = customerRDD.map(lambda x : (x.split(',')[1], x.split(',')[0]))
cusPairs.collect()

[('Germany', 'Alfreds Futterkiste'),
 ('Mexico', 'Ana Trujillo Emparedados y helados'),
 ('Mexico', 'Antonio Moreno Taqueria'),
 ('UK', 'Around the Horn'),
 ('Sweden', 'Berglunds snabbkop'),
 ('Germany', 'Blauer See Delikatessen'),
 ('France', 'Blondel pere et fils'),
 ('Spain', 'Bolido Comidas preparadas'),
 ('France', "Bon app'"),
 ('Canada', 'Bottom-Dollar Marketse'),
 ('UK', "B's Beverages"),
 ('Argentina', 'Cactus Comidas para llevar'),
 ('Mexico', 'Centro comercial Moctezuma'),
 ('Switzerland', 'Chop-suey Chinese'),
 ('Brazil', 'Comercio Mineiro'),
 ('UK', 'Consolidated Holdings'),
 ('Germany', 'Drachenblut Delikatessend'),
 ('France', 'Du monde entier'),
 ('UK', 'Eastern Connection'),
 ('Austria', 'Ernst Handel'),
 ('Brazil', 'Familia Arquibaldo'),
 ('Spain', 'FISSA Fabrica Inter. Salchichas S.A.'),
 ('France', 'Folies gourmandes'),
 ('Sweden', 'Folk och fa HB'),
 ('Germany', 'Frankenversand'),
 ('France', 'France restauration'),
 ('Italy', 'Franchi S.p.A.'),
 ('Portugal', 'Furi

- groupByKey() : 키 값을 리스트 형태로 리턴 함수

In [26]:
cusPairs.groupByKey().collect()

[('Germany', <pyspark.resultiterable.ResultIterable at 0x1ef6ba042b0>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04630>),
 ('UK', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04828>),
 ('Sweden', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04b70>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04588>),
 ('Spain', <pyspark.resultiterable.ResultIterable at 0x1ef6ba047b8>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04ac8>),
 ('Argentina', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04fd0>),
 ('Switzerland', <pyspark.resultiterable.ResultIterable at 0x1ef6ba04470>),
 ('Brazil', <pyspark.resultiterable.ResultIterable at 0x1ef6ba046a0>),
 ('Austria', <pyspark.resultiterable.ResultIterable at 0x1ef6b9a1748>),
 ('Italy', <pyspark.resultiterable.ResultIterable at 0x1ef6b9a1da0>),
 ('Portugal', <pyspark.resultiterable.ResultIterable at 0x1ef6b9a1208>),
 ('USA', <pyspark.resultiterable.ResultIterable at 0x1ef6b9a1f98>),
 ('

In [34]:
# UK에 사는 고객이름만 출력하기 (dict 형식으로 만들어서)
groupKey = cusPairs.groupByKey().collect()
groupKey
# Iterable 주소번지를 출력

[('Germany', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92390>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x1ef6ba923c8>),
 ('UK', <pyspark.resultiterable.ResultIterable at 0x1ef6ba924e0>),
 ('Sweden', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92438>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x1ef6ba925c0>),
 ('Spain', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92630>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x1ef6ba926a0>),
 ('Argentina', <pyspark.resultiterable.ResultIterable at 0x1ef6ba926d8>),
 ('Switzerland', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92710>),
 ('Brazil', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92780>),
 ('Austria', <pyspark.resultiterable.ResultIterable at 0x1ef6ba927f0>),
 ('Italy', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92860>),
 ('Portugal', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92898>),
 ('USA', <pyspark.resultiterable.ResultIterable at 0x1ef6ba92908>),
 ('

In [33]:
for country, names in groupKey :
    if country == 'UK' :
        for name in names :
            print(name)

Around the Horn
B's Beverages
Consolidated Holdings
Eastern Connection
Island Trading
North/South
Seven Seas Imports


In [31]:
# 동일한 결과를 list 형식으로 출력
# 이중 for문
customerDict = {
    country : [ name for name in names ] for country, names in groupKey
}

customerDict['UK']

['Around the Horn',
 "B's Beverages",
 'Consolidated Holdings',
 'Eastern Connection',
 'Island Trading',
 'North/South',
 'Seven Seas Imports']

In [37]:
# sortByKey : key 오름차순으로 정렬하고 상위 10개 추출하기
cusPairs.sortByKey().keys().collect()[:10]

['Argentina',
 'Argentina',
 'Argentina',
 'Austria',
 'Austria',
 'Belgium',
 'Belgium',
 'Brazil',
 'Brazil',
 'Brazil']

In [45]:
# 나라별 고객이 몇명인지 count하기
mapR = cusPairs.mapValues(lambda x : 1)
# mapR.collect()
# 각각의 value가 1로 지정되고

In [46]:
# reduceBykey 활용해서 같은 key 값들을 누적하여 count
mapR = cusPairs.mapValues(lambda x : 1).reduceByKey(lambda x, y : x+y)
mapR.collect()

[('Germany', 11),
 ('Mexico', 5),
 ('UK', 7),
 ('Sweden', 2),
 ('France', 11),
 ('Spain', 5),
 ('Canada', 3),
 ('Argentina', 3),
 ('Switzerland', 2),
 ('Brazil', 9),
 ('Austria', 2),
 ('Italy', 3),
 ('Portugal', 2),
 ('USA', 13),
 ('Venezuela', 4),
 ('Ireland', 1),
 ('Belgium', 2),
 ('Norway', 1),
 ('Denmark', 2),
 ('Finland', 2),
 ('Poland', 1)]

In [41]:
# 같은 결과를 for문을 활용하여 출력
{
    i: j
    for i, j in mapR.collect()
}

{'Germany': 11,
 'Mexico': 5,
 'UK': 7,
 'Sweden': 2,
 'France': 11,
 'Spain': 5,
 'Canada': 3,
 'Argentina': 3,
 'Switzerland': 2,
 'Brazil': 9,
 'Austria': 2,
 'Italy': 3,
 'Portugal': 2,
 'USA': 13,
 'Venezuela': 4,
 'Ireland': 1,
 'Belgium': 2,
 'Norway': 1,
 'Denmark': 2,
 'Finland': 2,
 'Poland': 1}

## Spark DataFrame
RDD의 확장된 구조
- 행, 열로 이루어진 내장 RDD

### 생성
- Spark session을 활용한 생성
- SQL context의 테이블을 통한 생성

In [47]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import json

In [48]:
sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x1ef6ba87b00>

In [49]:
# json 파일
# json -> RDD -> DataFrame
sample_json = spark.textFile('./data/cars.json')
sample_json.collect()

['{"brand":"Ford", "models":{"name":"Fiesta", "price": "14260"}}',
 '{"brand":"Ford", "models":{"name": "Focus", "price": "18825"}}',
 '{"brand":"Ford", "models":{"name": "Mustang", "price": "26670"}}',
 '{"brand":"BMW", "models":{"name":"320", "price": "40250"}}',
 '{"brand":"BMW", "models":{"name":"X3", "price": "41000"}}',
 '{"brand":"BMW", "models":{"name":"X5", "price": "60700"}}',
 '{"brand":"Fiat", "models":{"name":"500", "price": "16495"}}']

In [56]:
cars_df = sqlCtx.createDataFrame( sample_json.map(lambda x : json.loads(x)) )
cars_df.collect()
# collect 해서 Row가 나오는데 Row는 RDD를 내장하고 있는 것



[Row(brand='Ford', models={'name': 'Fiesta', 'price': '14260'}),
 Row(brand='Ford', models={'name': 'Focus', 'price': '18825'}),
 Row(brand='Ford', models={'name': 'Mustang', 'price': '26670'}),
 Row(brand='BMW', models={'name': '320', 'price': '40250'}),
 Row(brand='BMW', models={'name': 'X3', 'price': '41000'}),
 Row(brand='BMW', models={'name': 'X5', 'price': '60700'}),
 Row(brand='Fiat', models={'name': '500', 'price': '16495'})]

In [53]:
cars_df.printSchema()

root
 |-- brand: string (nullable = true)
 |-- models: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [52]:
cars_df.show()

+-----+--------------------+
|brand|              models|
+-----+--------------------+
| Ford|[name -> Fiesta, ...|
| Ford|[name -> Focus, p...|
| Ford|[name -> Mustang,...|
|  BMW|[name -> 320, pri...|
|  BMW|[name -> X3, pric...|
|  BMW|[name -> X5, pric...|
| Fiat|[name -> 500, pri...|
+-----+--------------------+



In [54]:
# DataFrame에 대한 연산
# select()

cars_df.select('brand').show()

+-----+
|brand|
+-----+
| Ford|
| Ford|
| Ford|
|  BMW|
|  BMW|
|  BMW|
| Fiat|
+-----+



In [55]:
cars_df.select('models.price').show()

+-----+
|price|
+-----+
|14260|
|18825|
|26670|
|40250|
|41000|
|60700|
|16495|
+-----+



In [57]:
# 컬럼의 타입 변환
from pyspark.sql.types import IntegerType

In [60]:
cars_price_type = cars_df.select('brand','models.name','models.price')
cars_price_type
# type이 string임을 확인

DataFrame[brand: string, name: string, price: string]

In [61]:
# IntegerType() 를 활용해서 price를 정수형으로 변환
cars_price_type.withColumn('price', cars_price_type['price'].cast(IntegerType()))
cars_price_type.show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford| Fiesta|14260|
| Ford|  Focus|18825|
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
| Fiat|    500|16495|
+-----+-------+-----+



In [62]:
cars_price_type.printSchema()

root
 |-- brand: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)



In [63]:
cars_price_type.first()

Row(brand='Ford', name='Fiesta', price='14260')

In [64]:
# 비교 연산
# collect 해서 Row가 나오는데 Row는 RDD를 내장하고 있는 것
cars_price_type.collect()

[Row(brand='Ford', name='Fiesta', price='14260'),
 Row(brand='Ford', name='Focus', price='18825'),
 Row(brand='Ford', name='Mustang', price='26670'),
 Row(brand='BMW', name='320', price='40250'),
 Row(brand='BMW', name='X3', price='41000'),
 Row(brand='BMW', name='X5', price='60700'),
 Row(brand='Fiat', name='500', price='16495')]

In [65]:
# 가격이 20000 이상인 열을 추출
cars_price_type.filter(cars_price_type['price'] > 20000).show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
+-----+-------+-----+



In [66]:
# Grouping
# group화 해서 count
cars_price_type.groupBy('brand').count().show()

+-----+-----+
|brand|count|
+-----+-----+
|  BMW|    3|
| Fiat|    1|
| Ford|    3|
+-----+-----+

