In [1]:
# python3를 기본 python으로 지정
%env PYSPARK_PYTHON=python
%env PYTHONHASHSEED=0

env: PYSPARK_PYTHON=python
env: PYTHONHASHSEED=0


In [2]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

### 기본 사용법

RDD 생성

In [3]:
a=sc.parallelize([1,2,3,4,5])

In [4]:
a

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475

In [5]:
a.getNumPartitions()

1

In [6]:
a.collect()

[1, 2, 3, 4, 5]

In [12]:
c = a.collect()

In [19]:
b = a.map(lambda x:2*x)

In [20]:
b

PythonRDD[1] at RDD at PythonRDD.scala:48

In [21]:
b.collect()

[2, 4, 6, 8, 10]

### 텍스트 데이터 읽기

In [36]:
raw_data = sc.textFile("/data/nasdaq.csv")

In [37]:
raw_data

/data/nasdaq.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [42]:
from collections import namedtuple
# 데이터를 알아보기 쉽게
Record = namedtuple("Record",["date","open","high","low","close","adj_close","volume"])

In [81]:
def parse_record(s):
    fields=s.split(",")
    return Record(fields[0], *map(float, fields[1:6]), int(fields[6]))

In [82]:
parsed_data=raw_data.map(parse_record)

In [83]:
parsed_data.take(1)

[Record(date='2018-03-26', open=7125.200195, high=7225.830078, low=7022.339844, close=7220.540039, adj_close=7220.540039, volume=2326060000)]

캐시를 사용하여 빠른 계산

In [85]:
parsed_data=sc.textFile("/data/nasdaq.csv").map(parse_record).cache()

In [87]:
parsed_data.map(lambda x:x.date).min()

'2018-03-26'

In [88]:
parsed_data.map(lambda x:x.date).max()

'2019-03-25'

In [89]:
parsed_data.map(lambda x: x.volume).sum()

622095540000

### key, value 데이터 처리 - tuple 활용

In [90]:
with_month_data = parsed_data.map(lambda x: (x.date[:7], x))

In [91]:
with_month_data.take(1)

[('2018-03',
  Record(date='2018-03-26', open=7125.200195, high=7225.830078, low=7022.339844, close=7220.540039, adj_close=7220.540039, volume=2326060000))]

mapValues : key 값은 그대로 두고 value만 받아서 처리하는 함수

In [92]:
# '2018-03' 같은 월별 key는 그대로 두고 Record에서 volume만 value로 가져온다.
by_month_data = with_month_data.mapValues(lambda x: x.volume)

reduceByKey : 교환법칙과 결합법칙이 성립하는 계산을 입력해야 한다.

In [97]:
# 같은 key의 volume들을 더한다.
by_month_data = by_month_data.reduceByKey(lambda x, y: x + y)

In [98]:
by_month_data.top(3, lambda x:x[1])

[('2019-02', 97936520000), ('2018-10', 61380960000), ('2019-01', 49199040000)]

### 데이터 저장

In [99]:
result_data = by_month_data.map(lambda t: ",".join(map(str,t)))

In [100]:
result_data.take(1)

['2018-03,9725220000']

기본 save 함수는 partition의 개수만큼 폴더에 저장된다. e.g.) part-00001, part-00002, ...

하나의 파일로 저장하기 위해서는 repartition을 활용할 수 있다.

In [101]:
result_data.saveAsTextFile("out")
#result_data.repartition(1).saveAsTextFile("out-one")

### JOIN

In [3]:
from collections import namedtuple
# 데이터를 알아보기 쉽게
Record = namedtuple("Record",["date","open","high","low","close","adj_close","volume"])

In [4]:
def parse_record(s):
    fields=s.split(",")
    return Record(fields[0], *map(float, fields[1:6]), int(fields[6]))

parsed_data=sc.textFile("/data/nasdaq.csv").map(parse_record).cache()

In [5]:
date_and_close_price = parsed_data.map(lambda r: (r.date, r.close))

In [6]:
date_and_close_price.take(5)

[('2018-03-26', 7220.540039),
 ('2018-03-27', 7008.810059),
 ('2018-03-28', 6949.22998),
 ('2018-03-29', 7063.450195),
 ('2018-04-02', 6870.120117)]

In [7]:
from datetime import datetime, timedelta

In [8]:
def get_next_date(s):
    fmt="%Y-%m-%d" # 날짜 포맷 지정
    return (datetime.strptime(s,fmt)+timedelta(days=1)).strftime(fmt)
# 하루를 더한 후 datetime.datetime 타입을 다시 string으로 변경

In [9]:
get_next_date("2019-03-01")

'2019-03-02'

In [10]:
date_and_prev_close_price = parsed_data.map(lambda r: (get_next_date(r.date), r.close))

In [11]:
date_and_prev_close_price.take(5)

[('2018-03-27', 7220.540039),
 ('2018-03-28', 7008.810059),
 ('2018-03-29', 6949.22998),
 ('2018-03-30', 7063.450195),
 ('2018-04-03', 6870.120117)]

In [12]:
joined = date_and_close_price.join(date_and_prev_close_price)

In [24]:
joined.take(10)

[('2018-03-27', (7008.810059, 7220.540039)),
 ('2018-03-28', (6949.22998, 7008.810059)),
 ('2018-03-29', (7063.450195, 6949.22998)),
 ('2018-04-11', (7069.029785, 7094.299805)),
 ('2018-04-12', (7140.25, 7069.029785)),
 ('2018-04-13', (7106.649902, 7140.25)),
 ('2018-04-17', (7281.100098, 7156.279785)),
 ('2018-04-20', (7146.129883, 7238.060059)),
 ('2018-04-24', (7007.350098, 7128.600098)),
 ('2018-04-25', (7003.740234, 7007.350098))]

In [14]:
# 2018-03-26은 join이 이루어지지 않는다.
date_and_close_price.lookup("2018-03-26") # key로 검색

[7220.540039]

OUTER JOIN에서는 결과가 정렬되지 않는다.

In [15]:
# LEFT JOIN(RIGHT, FULL)
joined_left = date_and_close_price.leftOuterJoin(date_and_prev_close_price)

In [16]:
joined_left.take(5)

[('2018-03-27', (7008.810059, 7220.540039)),
 ('2018-03-28', (6949.22998, 7008.810059)),
 ('2018-03-29', (7063.450195, 6949.22998)),
 ('2018-04-11', (7069.029785, 7094.299805)),
 ('2018-04-12', (7140.25, 7069.029785))]

In [25]:
joined_left.lookup('2018-03-26')

[(7220.540039, None)]

In [113]:
import gc
gc.collect()

0