#### RDD(Resilient Distributed Datase) : 분산되어 있는 변경 불가능한 객체 모음.(분산되어 존재하는 데이터 요소들의 모임.)
- <RDD 객체의 멤버함수>
    - 1. Action 함수
        - take()
        - collect()
        - stats()
        - mean()
        - min()
        - max()
        - stdev()
        - sum()
        - reduce()
        - sample()
        - takeSample()
        - first()
        - countByKey()
        - sortBy()
        - sortByKey()
    - 2. Transformation 함수
        - filter()
        - map()
        - flatMap()
        - distinct()
        - reduceByKey()

In [1]:
from pyspark import SparkContext

In [2]:
# 한 번만 실행. 현재 클러스터 구성을 아직 안해서 Local로 주고 이용.
sc = SparkContext('local')

# 사용할 데이터 리스트.
my = [10,20,30,40,50]

# 파이썬 리스트를 넘기고 스파크 RDD를 생성. 또는 데이터를 k개의 파티션으로 나눌 수도 있음.
nRdd = sc.parallelize(my)

In [3]:
# 분산 리스트 for 방대한 데이터.
nRdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [4]:
# Action 함수. => take(가져올 데이터 수) : RDD에서 데이터를 가져옴.
nRdd.take(3) 

[10, 20, 30]

In [5]:
# Action 함수. => foreach() : collection을 리스트로 출력함.
nRdd.foreach( lambda x : print(x) )

In [6]:
# Transformation 함수. => map() : 데이터를 가공함. 반환 타입이 같지 않아도 되서 유용함.
nRdd.map( lambda x: x + 2 ).collect()

[12, 22, 32, 42, 52]

In [7]:
# sc.textFile() : 텍스트 파일 읽기.
rdd1 = sc.textFile('data/aa.txt') 

# Action 함수. => collect() : RDD의 모든 데이터 요소를 하나의 리스트로 반환. 
rdd1.collect()

['스파크', '테스트', '입니다', '확인']

In [8]:
rdd2 = sc.textFile('data/bb.txt')
rdd2.collect()

['안녕 스파크', '반가워 파이썬', '내일은 새해', '내일 뵐께요']

In [9]:
# Transformation 함수. => map() : 데이터를 가공함. 반환 타입이 같지 않아도 되서 유용함.
# x : 개별 데이터를   split() : 공백 기준으로 자름.
rdd2.map( lambda x : x.split() ).collect()  

[['안녕', '스파크'], ['반가워', '파이썬'], ['내일은', '새해'], ['내일', '뵐께요']]

In [10]:
# Transformation 함수. => flatMap() : 각 입력 데이터에 대해 여러 개의 아웃풋 데이터를 생성.
rdd2.flatMap( lambda x: x.split() ).collect()

['안녕', '스파크', '반가워', '파이썬', '내일은', '새해', '내일', '뵐께요']

In [11]:
# Action 함수. => count() : RDD의 데이터 개수. 
rdd2.flatMap( lambda x: x.split() ).count() 

8

In [12]:
rdd3 = sc.textFile('data1/bb.txt')
rdd3.collect()

['10,20', '30,40', '50,60', '20,40']

In [13]:
rdd3.flatMap(lambda x : x.split(',')).collect()

['10', '20', '30', '40', '50', '60', '20', '40']

In [14]:
rdd4 = rdd3.flatMap(lambda x : x.split(',')).map(lambda x : int(x))
rdd4.collect()

[10, 20, 30, 40, 50, 60, 20, 40]

In [15]:
# Transformation 함수. => 중복값 제외.
rdd4.distinct().collect()

[10, 20, 30, 40, 50, 60]

In [16]:
# Action 함수. => first() : 첫번째 요소를 반환함.
rdd4.first() 

10

In [17]:
# Transformation 함수. => sample() : 복원 추출. 0.3의 의미는 30% 정도를 복원 추출해준다는 것을 의미.
samRdd1 = rdd4.sample(True, 0.3)    
samRdd1.collect()

[10, 30, 40, 60, 20]

In [18]:
# Transformation 함수. => sample() : 비복원 추출. 0.3의 의미는 30% 정도를 비복원 추출해준다는 것을 의미.
samRdd2 = rdd4.sample(False, 0.3) 
samRdd2.collect()

[]

In [19]:
# Action 함수. => takeSample() : True : 복원.
rdd4.takeSample(True, 2)  

[40, 20]

In [20]:
# Action 함수. => takeSample() : False : 비복원.
rdd4.takeSample(False, 4)  

[40, 20, 40, 30]

In [21]:
# Action 함수. => mean() : 평균을 구해줌.
rdd4.mean()     

33.75

In [22]:
# Action 함수. => sum() : 합계를 구해줌.
rdd4.sum() 

270

In [23]:
# Action 함수. => min() : 최소값을 구해줌.
rdd4.min()

10

In [24]:
# Action 함수. => max() : 최대값을 구해줌.
rdd4.max()

60

In [25]:
# Action 함수. => stdev() : 표준편차를 구해줌.
rdd4.stdev()

15.761900266148114

In [26]:
# Action 함수. => stats() : 기술통계량을 구해줌.
rdd4.stats()

(count: 8, mean: 33.75, stdev: 15.761900266148114, max: 60.0, min: 10.0)

In [27]:
# 1 <= data < 50, 50 <= data < 100 인 개수를 반환.
a, b = rdd4.histogram([1,50,100])   

# [1, 50) : 6개, [50, 100) : 2개
b

[6, 2]

In [28]:
# 함수로도 이용가능.
def f(x,y) :
    print('x=', x)
    print('y=', y)
    return x + y
f(1,2)

x= 1
y= 2


3

In [29]:
# Action 함수. => reduce() : 누적합의 개념.
# rdd4.reduce( lambda x, y : x + y )
rdd4.reduce( f )

270

In [30]:
dt  = [(1,2),(3,4),(5,6),(1,7),(3,4)]
rdd6 = sc.parallelize(dt)
rdd6.collect()

[(1, 2), (3, 4), (5, 6), (1, 7), (3, 4)]

In [31]:
# Transform 함수. => reduceByKey() : ( key, value ) 동일 key에 대하여 value의 합.
rdd6.reduceByKey(lambda x,y : x + y).collect()   

[(1, 9), (3, 8), (5, 6)]

In [32]:
rdd6.flatMap( lambda x : x ).collect()

[1, 2, 3, 4, 5, 6, 1, 7, 3, 4]

In [33]:
rdd7 = rdd6.flatMap( lambda x : x ).map( lambda x :(x,1) )
rdd7.collect()

[(1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (5, 1),
 (6, 1),
 (1, 1),
 (7, 1),
 (3, 1),
 (4, 1)]

In [34]:
# Action 함수. => sortByKey() : 키로 정렬된 RDD로 반환.
rdd7.sortByKey(ascending = False).collect()

[(7, 1),
 (6, 1),
 (5, 1),
 (4, 1),
 (4, 1),
 (3, 1),
 (3, 1),
 (2, 1),
 (1, 1),
 (1, 1)]

In [35]:
# Action 함수. => sortBy() : key 지정이 가능.
rdd6.sortBy(lambda x : x[1]).collect() 

[(1, 2), (3, 4), (3, 4), (5, 6), (1, 7)]

In [36]:
rdd7.reduceByKey( lambda x,y : x + y ).collect()

[(1, 2), (2, 1), (3, 2), (4, 2), (5, 1), (6, 1), (7, 1)]

In [37]:
# Action 함수. => countByKey() : 각 키에 대한 값의 개수를 센다
rdd7.countByKey() 

defaultdict(int, {1: 2, 2: 1, 3: 2, 4: 2, 5: 1, 6: 1, 7: 1})

In [38]:
# 인덱싱 활용이 가능.
rdd7.countByKey() [1]

2

#### 문제 1.  aa.txt 에 있는 각 단어의 개수를 구하시오.

In [39]:
rdd8 = sc.textFile('data1/aa.txt')
rdd8.collect()

['나는 자랑스런', '태극기 앞에', '조국과 태극기', '몸과 나는']

In [40]:
rdd8.flatMap( lambda x:x.split()).collect()

['나는', '자랑스런', '태극기', '앞에', '조국과', '태극기', '몸과', '나는']

In [41]:
rdd8.flatMap( lambda x:x.split()).map( lambda x : (x,1)).collect()

[('나는', 1),
 ('자랑스런', 1),
 ('태극기', 1),
 ('앞에', 1),
 ('조국과', 1),
 ('태극기', 1),
 ('몸과', 1),
 ('나는', 1)]

In [42]:
rdd8.flatMap( lambda x:x.split()).map( lambda x : (x,1)).countByKey()

defaultdict(int, {'나는': 2, '자랑스런': 1, '태극기': 2, '앞에': 1, '조국과': 1, '몸과': 1})

In [43]:
rdd8.flatMap( lambda x:x.split()).map( lambda x : (x,1)).reduceByKey(lambda x,y: x+y).collect()

[('나는', 2), ('자랑스런', 1), ('태극기', 2), ('앞에', 1), ('조국과', 1), ('몸과', 1)]

#### 문제 2. w.txt 에 있는 각 단어의 개수를 구하시오.

In [44]:
rdd9 = sc.textFile('data1/w.txt')
rdd9.collect()

['이명박(MB) 전 대통령의 측근인 이재오 국민의힘 상임고문은 4일 더불어민주당이 전직 대통령 사면에 ‘당사자의 반성’을 조건으로 달자 “시중의 잡범들에게나 하는 얘기”라고 비난했다.',
 '',
 '이 상임고문은 이날 CBS라디오 김현정의 뉴스쇼에서 “(수감된 이명박·박근혜 전 대통령이) 살인·강도나 잡범도 아니고, 한 나라의 정권을 담당했던 전직 대통령들 아니냐”며 이같이 말했다.',
 '',
 '그는 “당사자들 입장에선 2년, 3년 감옥에서 산 것만 해도 억울한데, 내보내 주려면 곱게 내보내 주는 거지 무슨 소리냐”며 “대법원 판결은 판결이고, 정치적 보복에 대한 억울함은 (별개)”라고 했다.',
 '',
 '이낙연 더불어민주당 대표가 꺼낸 이명박·박근혜 전 대통령 사면 문제와 관련 청와대와 여당 지도부 간 사전 공감대가 있었을 것이라고 했다. 이 상임고문은 사면에 대한 국민적 공감대가 필요하다는 여당의 주장에 관해서는 ‘사면권자의 판단 문제’라고 했다.',
 '',
 '이 상임고문은 “여당 대표가 그 정도 이야기를 할 때는 청와대에 이야기하는 게 수순”이라며 “국정에 영향을 미치지 않는 문제는 대표가 개인적으로 (말을) 할 수 있지만 사면권은 대통령에게 있기 때문에 사전에 귀띔이라도 했어야 한다. 그렇지 않았다면 그건 무모한 짓”이라고 했다.',
 '',
 '그는 “이 대표와 국회의원을 같이 했지만 그분이 무모하게 내지르고 하실 분은 아니다”며 “돌다리도 두들겨 보고 가는 사람”이라고 덧붙였다.',
 '',
 '그는 “국민의 동의라는 것은 찬성하는 사람도 있고, 반대하는 사람도 있을 것이다. 국민 동의라는 것이 국민 전원이 찬성하거나 반대하는 것은 아니지 않냐”며 “그렇다면 이것은 판단의 문제”라고 했다.',
 '',
 '이 상임 고문은 “(여당에서는 당사자의) 반성이 중요하다고 했는데 전직 대통령 입장에서는 반성하려면 (자신들을) 잡아간 사람이 해야지, 잡혀 간 사람이 무슨 반성을 하냐. 말이 되는 소리냐”고 했다.',
 '',
 '‘당사자가 반성해

In [45]:
rdd9.flatMap( lambda x:x.split()).map( lambda x : (x,1)).countByKey()

defaultdict(int,
            {'이명박(MB)': 1,
             '전': 5,
             '대통령의': 1,
             '측근인': 1,
             '이재오': 1,
             '국민의힘': 1,
             '상임고문은': 4,
             '4일': 1,
             '더불어민주당이': 1,
             '전직': 3,
             '대통령': 3,
             '사면에': 2,
             '‘당사자의': 1,
             '반성’을': 1,
             '조건으로': 1,
             '달자': 1,
             '“시중의': 1,
             '잡범들에게나': 2,
             '하는': 4,
             '얘기”라고': 1,
             '비난했다.': 1,
             '이': 5,
             '이날': 1,
             'CBS라디오': 1,
             '김현정의': 1,
             '뉴스쇼에서': 1,
             '“(수감된': 1,
             '이명박·박근혜': 2,
             '대통령이)': 1,
             '살인·강도나': 1,
             '잡범도': 1,
             '아니고,': 1,
             '한': 1,
             '나라의': 1,
             '정권을': 1,
             '담당했던': 1,
             '대통령들': 1,
             '아니냐”며': 1,
             '이같이': 1,
             '말했다.': 1,
             '그는': 4,
     

In [46]:
rdd10 = rdd9.flatMap( lambda x:x.split()).map( lambda x : (x,1)).reduceByKey(lambda x,y: x + y).\
            sortBy(lambda x : x[1], ascending=False).take(10)
rdd10

[('했다.', 8),
 ('전', 5),
 ('이', 5),
 ('상임고문은', 4),
 ('하는', 4),
 ('그는', 4),
 ('전직', 3),
 ('대통령', 3),
 ('대표가', 3),
 ('공감대가', 3)]

In [47]:
# sc을 종료시킴.
sc.stop()