In [49]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Shared Variables

- 모든 노드에서 사용하기 위한 공유변수


- 공유변수로 지정한 값은 모든 노드에 중복되어 캐시된다.


- 반복적으로 사용해야하는 변수라면,  
  스파크의 노드는 네트워크를 통해 통신 하기 때문에 모든 노드에 중복 캐시하는 시스템적 비용보다  
  네트워크 과정에서 발생하는 오버헤드 비용이 더 많이 발생하게 된다.

## Broadcast Variables

- 각 노드에 공유되는 읽기 전용 변수

In [26]:
# 학생별 수업카테고리코드로 지정되어있는 값을 카테고리 전체이름으로 변경한다고 가정 해보자

data = [("홍길동","DE"),
    ("이제동","DS"),
    ("하명도","DE"),
    ("변현재","WD")]

code_desc = {"DE":"Data Engineer", "DS":"Data Science", "WD":"Web Developer"}
    
students_rdd = sc.parallelize(data,3)
students_rdd.mapValues(lambda e : code_desc[e]).collect()   
    

[('홍길동', 'Data Engineer'),
 ('이제동', 'Data Science'),
 ('하명도', 'Data Engineer'),
 ('변현재', 'Web Developer')]

In [43]:
# Broadcast_variables 사용하기
broadcastStates = spark.sparkContext.broadcast(code_desc)
print(broadcastStates.value)

# 읽기전용 변수, 수정을 하면
broadcastStates.value["DE"] = '되나?'
print(broadcastStates.value)

print('-------------------------------------')

# 삭제를 해도
del(broadcastStates.value["DE"])
print(broadcastStates.value)

# 아무일도 발생하지 않았다.
# broadcast 함수를 사용해 생성하는 시점에 이미 SparkContext에 등록
students_rdd.mapValues(lambda e : broadcastStates.value[e]).collect()


{'DE': 'Data Engineer', 'DS': 'Data Science', 'WD': 'Web Developer'}
{'DE': '되나?', 'DS': 'Data Science', 'WD': 'Web Developer'}
-------------------------------------
{'DS': 'Data Science', 'WD': 'Web Developer'}


[('홍길동', 'Data Engineer'),
 ('이제동', 'Data Science'),
 ('하명도', 'Data Engineer'),
 ('변현재', 'Web Developer')]

## Accumulator

- 각 노드에 공유되는 누산기 함수

In [45]:
accum=sc.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driver

15


In [63]:
# # accumulator를 사용하지 않는다면?
# a = 0

# # 모든 노드에서 발생하는 데이터 횟수를 확인해보자
# def change_cate(e):
#      a = a + 1
#      return broadcastStates.value[e]
    
# students_rdd.mapValues(lambda e : change_cate(e)).collect()

# # 횟수 확인
# # local variable 'a' referenced before assignment 발생
# a




In [62]:
accum=sc.accumulator(0)

# 모든 노드에서 발생하는 데이터 횟수를 확인해보자
def change_cate(e):
    accum.add(1)
    return broadcastStates.value[e]
    
students_rdd.mapValues(lambda e : change_cate(e)).collect()

# 횟수 확인
# local variable 'a' referenced before assignment 발생
accum.value

[('홍길동', 'Data Engineer'),
 ('이제동', 'Data Science'),
 ('하명도', 'Data Engineer'),
 ('변현재', 'Web Developer')]

4