# Chapter 2 RDD(Resilient Distributed Datasets)
### 내부작동원리
----------------
스파크의 가장 큰 장점 - 병렬로 동작하는 RDD / 
각 트랜스포메이션은 속도를 비약적으로 향상시키기 위해 실행 / 
모든 트랜스포메이션은 데이터셋에 대한 액션이 호출됐을 때 실행 

### RDD 생성하기
-------------------
RDD를 생성하는 방법은 두가지이다.
1. 컬렉션에 대해 parallelize()함수를 수행

2. 파일 불러오기

In [1]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
     ('Amber', 9)])

보통 한 클러스터에서 2~4개 정도의 파티션으로 데이터 셋을 나누는 것이 좋다.
sc.textFile(A, n)에서 n은 데이터셋이 나눠진 파티션 개수를 의미

In [2]:
data_from_file = sc.textFile('./data/VS14MORT.txt.gz',4)

**Schema**

RDD는 데이터프레임과 달리 스키마리스 구조. 데이터셋에 대해 .collect() 함수를 수행하면 파이썬에서도 객체 내의 데이터에 접근할 수 있다

.collect() 함수는 RDD의 모든 엘리먼트를 드라이버에 리턴, 드라이버에서 엘리먼트들은 리스트로 나열

In [3]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
data_heterogenous

[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]

In [4]:
data_heterogenous[1]['Porsche']

100000

**파일로부터 데이터 읽기**



텍스트 파일을 읽을때 파일의 각 행이 RDD의 한 엘리먼트를 이룬다.

In [5]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

**Lambda**


주의: 일반적인 파이썬 함수 선언시 스파크가 파이썬 인터프리터와 JVM을 지속적으로 스위치 하기에 느려질 수 있다. 가능하면 반드시 스파크 내장 함수를 사용하자

In [6]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)

데이터셋을 쪼개고 변형하기 위해 extractinformation() 사용. map() 함수에 오로지 함수 시그니처만 전달(이 함수는 각 파티션에서 RDD 내의 한 데이터만 extractinformation() 함수에 전달)

In [7]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

### 전역 범위 vs 지역 범위
----------------

스파크는 로컬 모드, 클러스터 모드로 동작한다. 스파크가 로컬모드로 동작할 때는 파이썬을 실행시키는 것과 다르지 않을 수도 있다.
참고: https://spark.apache.org/docs/latest/rdd-programming-guide.html#local-vs-cluster-modes 

### Transformations
------------------
필터링, 조인, 데이터셋 내의 값들에 대한 트랜스코딩등을 포함한 데이터셋의 형태를 만드는 것.

**.map()**

가장 많이 쓰이는 함수. 이 함수는 RDD의 각 엘리먼트에 적용된다. data_from_file_conv 데이터셋에서 이 함수를 각각의 행에 대한 트렌스포메이션으로 볼 수 있다. 리스트 형태로 리턴

In [8]:
data_2014 = data_from_file_conv.map(lambda row: int(row[16])) #사망날짜를 숫자값으로 변형
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], row[5])) #사망날짜와 성별
print("data_2014.take(10)\n",data_2014.take(10))
print("\ndata_2014_2.take(10)\n",data_2014_2.take(10))

data_2014.take(10)
 [2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

data_2014_2.take(10)
 [('2014', 'M'), ('2014', 'M'), ('2014', 'F'), ('2014', 'M'), ('2014', 'M'), ('2014', 'F'), ('2014', 'M'), ('2014', 'M'), ('2014', 'F'), ('-99', '-99')]


**.flatMap()**

.map() 함수와 비슷하게 동작. 그러나 평면화된 결과를 리턴

In [9]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (int(row[16]), row[5]))
data_2014_flat.take(10)

[2014, 'M', 2014, 'M', 2014, 'F', 2014, 'M', 2014, 'M']

**.filter()**

특정 조건에 맞는 엘리먼트 선택 가능. 2014년에 사고사 인원을 카운트 해보자

In [10]:
data_filtered = data_from_file_conv.filter(
    lambda row: row[16]=='2014' and row[21] =='0')
data_filtered.count()

22

**.distinct()**

특정 칼럼에서의 중복된 값을 제거하여 고유값을 리스트로 리턴

성별을 나타내는 row[5]에 또 다른 값이 있는지 확인. 

.collect()함수는 많은 자원을 사용하고 데이터를 섞는 연산을 포함하기에 필요할때만 쓰자

In [11]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender

['-99', 'M', 'F']

**.sample()**

데이터셋에서 임의로 추출된 샘플을 리턴

.sample(중복여부,리턴할데이터셋과 전체 데이터셋 간의 크기 비율, 랜덤시드)
원본데이터와 샘플데이터의 count를 확인해보니 10%에 해당하는 임의의 샘플을 얻었다

In [12]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

print(data_sample.take(1))
print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
       '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
       '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
       '       ', '       ', '       ', '       ', '       ', '       ',
       '       ', '       ', '       ', '       ', '       ', '       ',
       '       ', '       ', '       ', '       ', '       ', '02',
       'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
       '     ', '     ', '     ', '     ', '     ', '     ', '     ',
       '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
       ' ', '2', '4', '100', '8'], dtype='<U40')]
Original dataset: 2631171, sample: 263247


**.leftOuterJoin()**

왼쪽 RDD에 오른쪽 RDD가 추가된 결과가 리턴
데이터를 섞는 과정을 포함하므로 성능저하가 올 수 있다. 필요할때만 쓰자

In [13]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()

[('c', (10, None)), ('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

**.join()**

두 RDD에서 'a'와 'b'가 겹치므로 두 값만 가짐

In [14]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

**.intersection()**

두 RDD의 교집합 엘리먼트를 리턴

In [15]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

[('a', 1)]

**.repartition()**

데이터셋을 재파티션. 성능저하 올 수 있음. 

.glom()이 생성하는 리스트는 파티션 개수만큼의 엘리먼트를 가지고 있다.

In [16]:
rdd1 = rdd1.repartition(4)
len(rdd1.glom().collect())

4

### Action

트랜스포메이션과는 다르게 Action은 데이터셋에서 스케줄된 태스크를 실행

트랜스포메이션 경우, 데이터 트랜스포메이션이 끝난 후 트랜스포메이션 실행 가능

액션은 어떠한 트랜스포메이션도 포함하지 않거나(예로, .take(n)은 어떠한 트랜스포메이션이 수행되지 않아도 RDD에서 n개의 데이터 리턴) 어떠한 트랜스포메이션도 될 수 있다.

**.take()**

가장 위에 있는 n행 리턴. RDD 전체를 리턴하는 .collect()보다 더 자주 쓰임.

In [17]:
data_first = data_from_file_conv.take(1)
data_first

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

**.takeSample(a,b,c)**

데이터로부터 임의의 샘플을 얻는다.

a: 샘플링이 재선택되는 경우 허용 여부

b: 리턴 데이터 개수

c: 랜덤시드

In [18]:
data_take_sampled = data_from_file_conv.takeSample(False,1,911)
data_take_sampled

[array(['1', '  ', '3', '1', '09', 'F', '1', '086', ' ', '43', '23', '11',
        '  ', '4', 'D', '2', '2014', 'U', '7', 'B', 'U', ' ', ' ', 'N185',
        '327', '100', '   ', '31', '04', '11N185 ', '12I500 ', '61J189 ',
        '62E46  ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '04',
        'N185 ', 'E46  ', 'I500 ', 'J189 ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

**.collect()**

RDD의 모든 엘리먼트를 드라이버로 리턴한다. 데이터셋이 클수록 많은 연산이 요구되므로 성능저하의 주요 원인

**.reduce()**

.reduce() 함수는 특정 함수를 사용해 RDD의 개수를 줄인다. RDD의 총합을 구하기 위해 이 함수를 사용할 수 있다.

In [19]:
# map()을 이용해 RDD1의 값 리스트 생성
temp_for_reduce = rdd1.map(lambda row: row[1])
print("map result:",temp_for_reduce)

# 각 파티션에서 합계 함수를 수행(여기서 lambda로 표기)
# 마지막 집계가 수행되는 드라이버 노드에 합계 리턴
redu = temp_for_reduce.reduce(lambda x, y: x + y)
print("reduce result:",redu)

map result: PythonRDD[53] at RDD at PythonRDD.scala:52
reduce result: 15


주의: 리듀서로 전달되는 함수는 결합법칙, 교환법칙이 성립해야한다. 즉, 엘리먼트 순서가 바뀌어도, 피연산자의 순서가 바뀌어도 결과는 같아야 한다.

아래처럼 같은 데이터이나 파티션을 나눔에 따라 결과가 다르게 도출된다. 어떤 함수를 리듀서로 정할지 신중히 결정해야 한다.

In [20]:
data_reduce = sc.parallelize([1,2,.5,.1,5,.2],1)
print("One partion, use reduce:\t",data_reduce.reduce(lambda x, y: x / y))

data_reduce = sc.parallelize([1,2,.5,.1,5,.2],3)
print("Three partion, use reduce:\t",data_reduce.reduce(lambda x, y: x / y))

One partion, use reduce:	 10.0
Three partion, use reduce:	 0.004


**.reduceByKey()**

.reduce() 함수와 비슷하게 동작하나 .reduceByKey()는 키 값을 기반으로 리듀스 하는 것이 차이점

In [24]:
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

**.count()**

RDD 엘리먼트 개수를 리턴

In [25]:
data_reduce.count()

6

**.countByKey()**

데이터셋이 키-값 형태라면 고유 키의 수를 구할 수 있다.

In [26]:
data_key.countByKey().items()

dict_items([('a', 2), ('b', 2), ('d', 2), ('c', 1)])

**.saveAsTextFile()**

RDD를 텍스트파일로 저장한다. 모든 행은 스트링으로 표시된다.

In [27]:
data_key.saveAsTextFile('./data/data_key.txt')

In [28]:
def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc.textFile('./data/data_key.txt').map(parseInput)
data_key_reread.collect()

[('b', 1), ('d', 3), ('a', 8), ('d', 2), ('a', 4), ('b', 3), ('c', 2)]

**.foreach()**

같은 함수를 RDD의 각 엘리먼트에 반복적으로 적용하는 함수. map()과 달리 정의된 함수를 하나하나 각각의 데이터에 적용.PySpark에서 지원하지 않는 DB에 데이터를 저장하고 싶을 때 유용하다.

In [37]:
def f(x): 
    print(x)

data_reduce.foreach(f) #data_key RDD에 저장된 모든 데이터를 출력

None
