## RDD(Resilent Distributed Datasets)

### RDD생성

In [1]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye', 4), ('Albert', 12), ('Amber', 9)]
)

In [2]:
data_from_file = sc.textFile('./data/VS14MORT.txt.gz', 4)

- 파일시스템 : NTFS, FAT, Mac OS, HDFS, S3, Cassandra 등의 분산 파일시스템에서 데이터를 읽어드릴 수 있음.
- 데이터 포맷 : txt, parquet, JSON, Hive Table, RDBMS(JDBC드라이버!)
- 압축파일도 가능함
----
데이터가 어떻게 읽히느냐에 따라 데이터를 지니고 있는 객체가 조금씩 다르게 표현될 수 있음. 데이터가 읽히는 파일은 **MapPartitionsRDD**로 표현됨. 

### RDD 스키마
- RDD는 schema-less 데이터구조임. 병렬처리에 문제가 없음

In [3]:
data_heterogenous = sc.parallelize([
    ('Ferrari', 'fast'), {'Porsche', 100000},['Spain', 'visited', 4504]
]).collect() # RDD의 모든 엘리먼트를 드라이버에 리턴
data_heterogenous

[('Ferrari', 'fast'), {100000, 'Porsche'}, ['Spain', 'visited', 4504]]

In [4]:
data_heterogenous.append(('Genesis', 'Korean'))
data_heterogenous

[('Ferrari', 'fast'),
 {100000, 'Porsche'},
 ['Spain', 'visited', 4504],
 ('Genesis', 'Korean')]

In [5]:
data_heterogenous[3][0]

'Genesis'

### txt파일
- 파일의 각 행이 RDD의 한 엘리먼트를 구성함

In [6]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

### User defined functions
- python lambda사용.
    - python 함수를 선언하면 spark가 파이썬 인터프리터와 JVM을 지속적으로 스위치해야 하기 때문에 application이 느려질 수 있음. 되도록이면 spark내장함수를 사용하길 권장함
- 데이터셋에 대한 상세내용이 없음. 그냥 넘기자

In [7]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)

In [8]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

In [9]:
data_from_file_conv.map(lambda row:row).take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

## Spark 전역범위 vs 지역범위
**Spark실행과정**
- *클러스터 모드* : 특정 애플리케이션이 job을 실행하면 job은 driver node(master node)로 보내진다. master node는 job을 위해서 DAG(비순환 방향성 그래프)를 생성하고 어떤 executor node(worker node)가 특정 task를 실행할지 결정함. 이후 driver node는 각 task를 마칠 준비한 이후, worker node가 각자의 task를 수행하고 작업을 마치면 그 결과를 driver node에 리턴함
- 각자의 실행 노드가 드라이버 노드에서 사용되는 변수와 함수를 복사하여 사용함. 태스크를 실행할 때, 실행 노드가 이 변수나 함수를 수정할 경우 다른 실행 노드들의 변수나 함수에는 영향을 미치지 않는다. run-time버그나 이상 행위를 유발할 수 있으며, 오류들을 추적할 때 문제가 발생하기도 함

## Transformation
데이터셋의 형태를 만들어냄. 필터링, 조인, 데이터셋 내의 값등레 대한 트랜스코딩 등을 포함함.

대표적 함수
- map()
- filter()
- flatMap()
- distinct()
- sample()
- leftOuterJoin()
- repartition()


### map
각 엘리먼트에 적용됨.
 - tip) 단일 열과 다수열을 가져오는 경우 tuple, dict, list로 감싸야한다는 것을 기억

In [10]:
# 16 : 사망한 날짜
# 사망한 날짜의 값을 int형으로 변경
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)

[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

In [11]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)

[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]

### filter()
데이터 셋으로부터 특정 조건에 맞는 엘리먼트를 선택함.
 - 시간이 오래 걸릴 수 있음

In [12]:
import time
start_time = time.time()
data_filtered = data_from_file_conv.filter(lambda row: row[16] == '2014' and row[21] == '0')
print(data_filtered.count())
total_time = time.time() - start_time
print(total_time)

22
95.19330382347107


### flatMap()
map함수와 비슷하지만 리스트가 아닌 flattened result를 리턴함
 - 올바르지 않은 형태의 데이터를 제거하기 위해 사용함 

In [13]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 3))
data_2014_flat.take(10)

['2014', 2017, '2014', 2017, '2014', 2017, '2014', 2017, '2014', 2017]

In [14]:
data_2014_notflat = data_from_file_conv.map(lambda row: (row[16], int(row[16]) + 3))
data_2014_notflat.take(10)

[('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('2014', 2017),
 ('-99', -96)]

### distinct()
특정 컬럼에서 중복된 값을 제거해 고유한 값을 리스트로 전달함.

In [15]:
#%time
start_time = time.time()
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct()
print(distinct_gender.collect())
total_time = time.time() - start_time
print(total_time)

['M', 'F', '-99']
96.85437440872192


### sample()
데이터셋으로부터 임의로 추출된 샘플을 리턴. 
- 첫번째 매개변수는 중복샘플링을 허용하는지(boolean)
- 두번째 매개변수는 리턴할 데이터셋과 전체 데이터셋 간의 크기 비율을 명시
- 세번째 매개변수는 임의의 숫자를 생성하기 위한 seed값 설정

In [16]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 665)
data_sample.take(1)
# print('Original dataset : {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

[array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',
        '  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250',
        '214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '03',
        'I250 ', 'E669 ', 'I272 ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

In [17]:
start_time = time.time()
print('Original dataset : {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))
total_time = time.time() - start_time
print(total_time)

Original dataset : 2631171, sample: 262962
187.71924829483032


### leftOuterJoin

In [18]:
# 데이터의 자료구조는 동일하게 설정하는 것이 원칙
rdd1 = sc.parallelize([
    ('a', 1), 
    ('b', 4), 
    ('c', 10)])
rdd2 = sc.parallelize([
    ('a', 4), 
    ('a', 1), 
    ('b', '6'), 
    ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)

In [19]:
rdd3.collect()

[('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]

In [20]:
rdd4 = rdd1.join(rdd2) # 자연조인
rdd4.collect()

[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

In [21]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

[('a', 1)]

### repartition
데이터셋을 재파티션하면 데이터가 나눠지는 파티션의 갯수가 변경됨. 되도록이면 사용하지 않기를 권장함
 - glom() : 하나의 리스트를 생성함. 리스트에 생성된 각 엘리먼트들은 명시된 파티션에 존재하는 데이터셋의 모든 엘리먼트에 대한 리스트임

In [22]:
rdd1 = rdd1.repartition(4)
print(len(rdd1.glom().collect()))
print(rdd1.glom().collect())

4
[[], [], [], [('a', 1), ('b', 4), ('c', 10)]]


## Action
액션은 스케쥴된 task를 실행함.
- take()
- collect()
- reduce()
- count()
- saveAsTextFile()
- foreach()

### take()
하나의 파티션에서 상위 n개의 엘리먼트를 리턴함

In [23]:
data_first = data_from_file_conv.take(1)
data_first

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

In [24]:
data_take_sample = data_from_file_conv.takeSample(False, 1, 556)
data_take_sample

[array(['1', '  ', '4', '1', '10', 'M', '1', '025', ' ', '31', '11', '05',
        '  ', '7', 'S', '3', '2014', 'N', '1', 'C', 'Y', '9', ' ', 'V294',
        '388', '114', '   ', '38', '02', '11T07  ', '12V294 ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '02',
        'V294 ', 'T07  ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'],
       dtype='<U40')]

### reduce()
특정 함수를 사용해 RDD의 갯수를 줄임.

In [25]:
rdd1.map(lambda row: row[1]).collect()

[1, 4, 10]

In [26]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

15

### 주의할 점
- reducer로 전달되는 함수는 결합법칙과 교환법칙이 성립되어야 함. 이전의 법칙들이 무시되면, 문제가 발생함

In [28]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

In [29]:
works = data_reduce.reduce(lambda x, y : x / y)
works

10.0

In [30]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y : x / y)

0.004

### reduceByKey()
- 키값을 기반으로 reduce작업을 수행함

In [31]:
data_key = sc.parallelize([
    ('a', 4),
    ('b', 3),
    ('c', 2),
    ('a', 8),
    ('d', 2),
    ('b', 1),
    ('d', 3)],4
)
data_key.reduceByKey(lambda x, y: x + y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

### count()
 -  len(data_reduce.collect())은 사용하지 말자
 - 데이터셋의 형태가 key-value의 형태이면 countByKey()함수를 사용!

In [32]:
data_reduce.count() 

6

In [33]:
data_key.countByKey()

defaultdict(int, {'a': 2, 'b': 2, 'c': 1, 'd': 2})

In [34]:
data_key.countByKey().items()

dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])

### saveAsTextFile()
- 각 파티션을 분리된 파일에 저장함

In [36]:
data_key.saveAsTextFile('./data/pre_processed_data/data_keys.txt')

In [45]:
! ls ./data/pre_processed_data/data_keys.txt/

_SUCCESS  part-00000  part-00001  part-00002  part-00003


파티션별로 분리된 파일로 저장된 것을 확인할 수 있음

In [46]:
def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))

In [47]:
data_key_reread1 = sc.textFile('data/pre_processed_data/data_keys.txt')
data_key_reread1.collect()

["('a', 8)",
 "('d', 2)",
 "('a', 4)",
 "('b', 1)",
 "('d', 3)",
 "('b', 3)",
 "('c', 2)"]

In [48]:
data_key_reread_parse = sc.textFile('data/pre_processed_data/data_keys.txt').map(parseInput)
data_key_reread_parse.collect()

[('a', 8), ('d', 2), ('a', 4), ('b', 1), ('d', 3), ('b', 3), ('c', 2)]

### foreach()
같은 함수를 RDD의 각 엘리먼트에 반복적으로 적용하는 함수임. map()함수와 다르게 foreach()함수는 정의된 함수를 하나나 각각의 데이터에 적용함. 
 - 파이스파크에서 지원하지 않는 데이터베이스에 데이터를 저장하고 싶을 때 자주 사용하게 됨

In [49]:
def f(x):
    print(x)
    
data_key.foreach(f)

## 요약
- RDD는 스파크의 핵심. RDD와 같은 schema-less 데이터 구조는 스파크에서 다룰 수 있는 가장 기본적인 데이터 구조임
- parallelize()함수나 txt파일로부터 데이터를 읽는 방법을 학습. 또한 비구조적인 데이터를 처리하는 방법
- transformation, action에 대해 간단한 학습
- scala RDD와 python RDD는 스피드에서 가장 큰 차이가 있음.