# 01. Reduction Operations
### 01-1. Reduction
- 요소들을 모아서 하나로 합치는 작업
- 많은 Spark의 연산들이 reduction에 해당됨

# 02. 우리가 배운 것
- Transformations & Actions 
- 지연 실행으로 인한 성능 최적화
- Cache() & Persist()
- Cluster Topology

# 03. Transformations 
### 03-1. Parallel Transformations 
- 주로 변형을 적용시키는 작업들
- map, flatmap, filter
Action은 어떻게 분산된 환경에서 작동할까?

# 04. Reduction이란?
### 04-1. 대부분의 Action은 Reduction
Reduction: 근접하는 요소들을 모아서 하나의 결과로 만드는 일
- 파일 저장, collect()등과 같이 Reduction이 아닌 액션도 있다.

# 05. 병렬처리 하기 가능한 예
### 05-1. Parallel Reduction 
- 병렬 처리가 가능한 경우
    - 두개의 요소들을 모아서 하나의 값으로 모으는 경우 각각이 독립적이어야함
- 병렬 처리가 힘든 경우(병렬처리 하는 의미가 없는 경우)
    - 순서대로가 아니고 

# 06. Reduction Actions
- Reduce
- Fold
- GroupBy
- Aggregate 
    
### 06-1. Reduce
- ***RDD.reduce(fuction)***

In [1]:
from operator import add
from pyspark import SparkConf, SparkContext

import warnings 
warnings.simplefilter(action='ignore')

conf = SparkConf().setMaster('local').setAppName('category-review-average')
sc = SparkContext(conf=conf)
sc.parallelize([1,2,3,4,5]).reduce(add)

23/03/05 22:41:26 WARN Utils: Your hostname, Keemyoui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.35.79 instead (on interface en0)
23/03/05 22:41:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/05 22:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/05 22:41:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/05 22:41:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


                                                                                

15

##### 06-1-1. Partition

In [2]:
sc.parallelize([1,2,3,4]).reduce(lambda x,y : (x*2)+y)

26

In [3]:
# (1,2,3,4) -> ((1*2+2)*2+3)*2+4 = 26
sc.parallelize([1,2,3,4],1).reduce(lambda x,y : (x*2)+y)

26

In [4]:
# (1,2),(3,4) -> ((1*2+2)*2 + (3*2)+4) = 18
sc.parallelize([1,2,3,4],2).reduce(lambda x,y : (x*2)+y)

18

In [5]:
sc.parallelize([1,2,3,4],3).reduce(lambda x,y : (x*2)+y)

18

In [6]:
sc.parallelize([1,2,3,4],4).reduce(lambda x,y : (x*2)+y)

26

##### 파티션이 어떻게 나뉠지 프로그래머가 정확하게 알기 어렵다
- 연산의 순서와 상관 없이 결과값을 보장하려면
    - 교환법칙(a*b = b*a)
    - 결합법칙(a*b)*c = a*(b*c)

### 06-2. Fold
- RDD.fold(zeroValue, Function)

In [7]:
from operator import add 
sc.parallelize([1,2,3,4,5]).fold(0, add)

15

##### FOLD가 어떻게 되어있냐면?

In [8]:
def fold(self, zeroValue, op):
    """
    Aggregate the elements of each partition, and then the results 
    the partitions, using a given associative function and a neutral "zero value."
    
    The function C{op(t1, t2)} is allowed to modify C{t1} and return it as its result value
    to avoid object allocation; however, it shouldnot modify C{t2}.
    
    >>> from operator import add 
    >>> sc.parallelize ([1,2,3,4,5]).fold(0, add)
    15
    """
    
    def func(iterator):
        acc = zeroValue #Zerovalue를 acc 값에 담고
        for obj in iterator:
            acc = op(obj, acc) # acc에 op 연산을 누적
        yield acc
    vals = self.mapPartitions(func).collect() # 인자로 전달하는 함수를 파티션 단위로 전달하고 새로운 RDD로 변환
    return reduce(op, vals, zeroValue) # return하기전 reduce를 적용, 파이썬 function 라이브러리

##### 06-2-1. Fold & Partition 

In [9]:
rdd = sc.parallelize([2,3,4],4) # 파티션은 4개인데, 요소가 3개? -- 어느 파티션은 값이 없게 되겟죠?
rdd.reduce(lambda x,y : x*y) # (2*3*4) = 24 

24

In [10]:
rdd.fold(1, lambda x, y : x*y) # (1*2*3*4)= 24

24

In [11]:
#그러나
rdd.reduce(lambda x, y : x+y) # (0+2+3+4) = 9 

9

In [12]:
rdd.fold(1, lambda x,y : x+y) 
# (1+1) + (1+2) + (1+3) + (1+4) = 14
# 각 파티션의 시작값이 1이기 때문에 

14

### 06-2. GroupBy

In [13]:
# RDD.groupBy(<기준함수>)
rdd = sc.parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x%2).collect() # x를 2로 나눴을 때 나머지를 기준으로 그룹바이 
sorted([(x, sorted(y)) for (x, y) in result]) # x: 나머지 , y는 원래 element를 sorted

[(0, [2, 8]), (1, [1, 1, 3, 5])]

In [14]:
### 연습해보자
rdd = sc.parallelize([1,2,3,4,5,6,7,9,10])
result = rdd.groupBy(lambda x: x%3).collect() 
sorted([(x, sorted(y)) for (x, y) in result])

[(0, [3, 6, 9]), (1, [1, 4, 7, 10]), (2, [2, 5])]

### 06-3. Aggregate
- RDD 데이터 타입과 Action 결과 타입이 다를 경우 사용
- 파티션 단위의 연산 결과를 합치는 과정을 거친다.
- RDD.aggregate(zeroValue, seqOp, combOp) 
    - zeroValue: 각 파티션에서 누적할 시작 값 
    - seqOp: 타입 변경 함수 (=map?) 
    - combOp: 합치는 함수 (=reduce)
    
- 많이 쓰이는 reduction action 
- 대부분의 데이터 작업은 "크고 복잡한 데이터 타입" -> "정제된 데이터" 로 바꾸는 작업
- 배운것 외에도 여러가지 Operation이 존재
- Key-Value RDD
    - groupByKey
    - reduceByKey

In [15]:
seqOp = (lambda x, y: (x[0] + y, x[1] +1)) 
combOp = (lambda x, y: (x[0]+ y[0], x[1] + y[1])) # 파티션마다 값을 모아서 하나로 만들어쥐
sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp) #PairRdd(0,0) 

(10, 4)

- x[0] = 0, x[1] = 0 (zeroValue) 
    - x[0] + y = (0+1), x[1]+1 = (0+1) -> (1,1) 
    - x[0] + y = (1+2), x[1]+1 = (1+1) -> (3,2)

- x[0] = 0, x[1] = 0 (zeroValue)
    - x[0] + y = (0,3), x[1]+1 = (0+1) -> (3,1)
    - x[0] + y = (3,4), x[1]+1 = (1+1) -> (7,2)
    
- x[0]=3, y[0] = 7, x[1] = 2, y[1] =2 
    - x[0]+y[0]  x[1]+y[1] = 10,4

In [16]:
sc.parallelize([]).aggregate((0,0) , seqOp, combOp)

(0, 0)

23/03/06 16:13:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1214802 ms exceeds timeout 120000 ms
23/03/06 16:13:40 WARN SparkContext: Killing executors is not supported by current scheduler.
