# Chapter 02_01. 병렬처리와 분산처리

### 병렬처리(Parallel)

1. 데이터를 여러개로 쪼개고
2. 여러 쓰레드에서 각자 task를 적용
3. 각자 만든 결과값을 합치는 과정
    
    
    
    
### 분산처리(Distributed)
1. 데이터를 여러개로 쪼개서 여러 노드로 보낸다.
2. 여러 노드에서 각자 독립적으로 task를 적용
3. 각자 만든 결과값을 합치는 과정

    - 노드간 통신같이 신경써야될 것이 늘어난다.
    - Spark를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜는게 가능하다

##### 어떻게 이게 가능할까?
Spark는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문
- Resiliend Distributed Datasets(RDD)
- RDD.map(< t a s k >) 

그러나 생각없이 코드를 짜면 성능을 제대로 뽑을 수 없다
- 노드간 통신 속도를 신경써야한다

# Chapter02_02. 분산처리와 Latency_1


### 분산처리로 넘어가면서 신경써야될 문제가 많아졌다.
1. 부분 실패: 노드 몇개가 프로그램과 상관 없는 이유로 인해 실패
2. 속도: 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
    - RDD.map(A).filter(B).reduceByKey(C).take(100)
    - RDD.map(A).reduceByKey(C).filter(B).take(100)
        - 어디가 빠르게?
        - 첫 번쨰가 훨씬 빠르다. 필터로 줄인 다음 ReduceByKey




# Chapter02_03. Key-Value RDD

### 1. Structured Data와 RDD
Key와 Value 쌍을 갖는 Key-value RDD
- (key, Value) 쌍을 갖기 때문에 pairs RDD라고도 불림
- 간단한 데이터베이스처럼 다룰 수 있다.

### 2. Key-Value RDD Concepts
1. Single Value RDD
- 간단하게 개수를 세거나 unstructured data를 다룬다.
- 예) 텍스트에 등장하는 단어 수 세기(날짜)

2. Key-Value RDD
- Key를 통해 조금 더 고차원적인 연산을 다룬다.
- 예) 넷플릭스 드라마가 받은 평균 별점(날짜, 승객수)

3. Key와 Value 쌍을 가진다.
    - 예) 지역ID별로 택시 운행 수는 어떻게 될까?
        - Key: 지역 ID
        - Value: 운행 수 
    
    - 다른 에)
        - 드라마 별로 별점 수 모아보기, 평균 구하기
        - 이커머스 사이트에서 상품당 별 평점 
        
4. 코드상으로는 많이 다르지 않다.
- 두개의 값을 리턴하게 되면 Key-Value RDD가 된다.
- pairs = rdd.map(lambda x: (x,[1,1]))
    - [지역, 지역] => [(지역,[1,1]), (지역, [1,1])]

### 3. Key Value RDD로 할 수 있는 것들(Redcution)
- reduceByKey(): 키값을 기준으로 테스크 처리
- groupByKey(): 키값을 기준으로 벨류를 묶는다
- sortByKey(): 키값을 기준으로 정렬
- keys() - 키값 추출
- values() - 벨류값 추출

$pairs = rdd.map(lambda x: (x,1))$

$count = pairs.reduceByKey(lambda a,b: a + b)$

예) [짜장면, 짜장면, 짬뽕, 짬뽕, 짜장면, 우동]
- paris: (짜장면,1), (짜장면,1), (짬뽕,1), (짬뽕,1), (짜장면,1), (우동,1)
- count: (짜장면,3), (짬뽕,2), (우동,1)

### 4. Key Value RDD로 할 수 있는 것들(Join)
- join 
- rightOuterJoin
- LeftOuterJoin
- subtractByKey

### 5. Mapping Values
1. Key Value 데이터에서 Key를 바꾸지 않는 경우
    - map() 대신 value만 다루는 mapValues() 함수를 써주자.
    - Spark 내부에서 파티션을 유지할 수 있어 더욱 효율적
2. mapValues()
3. flatmapValues()
4. Value만 다루는 연산들이지만 RDD에서 Key는 유지된다.
 

### 6. 실습

In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster('local').setAppName('category-review-average')
sc = SparkContext(conf=conf)

23/01/23 14:20:22 WARN Utils: Your hostname, Keemyoui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.35.79 instead (on interface en0)
23/01/23 14:20:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/23 14:20:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/23 14:20:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/01/23 14:20:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
directory = '/Users/keemyohaan/Desktop/001.Python/004. Study/009.FastCampus/003.DE/01-spark/data'
filename = 'restaurant_reviews.csv'

In [5]:
# RDD는 action이 불리기 전까지는 연산되지 않는다.
lines = sc.textFile(f'file:///{directory}/{filename}')
# collect()라는 간단한 action 실행
lines.collect()

['id,item,cateogry,reviews',
 '0,짜장면,중식,125',
 '1,짬뽕,중식,235,',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223   ',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']

In [6]:
# header부터 추출
header = lines.first()
# 필터링 
filtered_lines = lines.filter(lambda row: row != header)
filtered_lines.collect()

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

['0,짜장면,중식,125',
 '1,짬뽕,중식,235,',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223   ',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']

In [7]:
# 카테고리와 리뷰데이터를 파싱하겠다.
def parse(row):
    #'0,짜장면,중식,125'
    fields = row.split(",")
    category = fields[2]
    reviews = int(fields[3])
    return (category, reviews)

# Key value RDD 만들기
categoryReviews = filtered_lines.map(parse)
categoryReviews.collect()   

[('중식', 125),
 ('중식', 235),
 ('분식', 32),
 ('분식', 534),
 ('일식', 223),
 ('일식', 52),
 ('일식', 12),
 ('아시안', 312),
 ('패스트푸드', 12),
 ('패스트푸드', 23)]

In [8]:
# 카테고리 뒤 값을 Value라고 인식함 => lambda를 적용하는건 value에 해당
categoryReviewsCount = categoryReviews.mapValues(lambda x: (x,1))
categoryReviewsCount.collect()

[('중식', (125, 1)),
 ('중식', (235, 1)),
 ('분식', (32, 1)),
 ('분식', (534, 1)),
 ('일식', (223, 1)),
 ('일식', (52, 1)),
 ('일식', (12, 1)),
 ('아시안', (312, 1)),
 ('패스트푸드', (12, 1)),
 ('패스트푸드', (23, 1))]

In [15]:
# key를 기준으로 연산을 치는군 => reduceByKey
# value(x), value(y)를 기준으로 연산
reduced = categoryReviewsCount.reduceByKey(lambda x,y: (x[0] + y[0], x[1]+ y[1]))
reduced.collect()

[('중식', (360, 2)),
 ('분식', (566, 2)),
 ('일식', (287, 3)),
 ('아시안', (312, 1)),
 ('패스트푸드', (35, 2))]

In [16]:
averages = reduced.mapValues(lambda x: x[0] / x[1])
averages.collect()

[('중식', 180.0),
 ('분식', 283.0),
 ('일식', 95.66666666666667),
 ('아시안', 312.0),
 ('패스트푸드', 17.5)]

# Chapter02_04. Transformations and Actions

### 1. Spark Opereation은 두가지로 나뉠 수 있다.
Spark Operation = Transformations + Actions

### 2. Transformations & Actions
1. Transformations
    - 결과값으로 새로운 RDD를 반환
    - 지연실행 - Lazy Execution
2. Actions
    - 결과값을 연산하여 출력하거나 저장
    - 즉시 실행 - Eager Execution
    
||Transformations|Actions|
|:--:|:--:|:--:|
|1|map()|collect()|
|2|flatMap()|cout()|
|3|filter()|countByValue()|
|4|distinct()|take()|
|5|reduceByKey()|top()|
|6|groupByKey()|reduce()|
|7|mapValues()|fold()|
|8|flatMapValues()|foreach()|
|9|sotByKey()|-|

### 3. 실습: rdd_transformations_actions

In [21]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('transformations_actions')
sc = SparkContext.getOrCreate(conf =conf)
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.submitTime', '1674451222400'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.startTime', '1674451222586'),
 ('spark.app.name', 'category-review-average'),
 ('spark.app.id', 'local-1674451223334'),
 ('spark.executor.id', 'driv

In [22]:
# sparkcontext 멈추기
sc.stop()

In [24]:
# 있는 상태에서 또 만들면 에러가 난다.
# 그러나 지정되지 않은 상태에서 호출하면 Nontype Error가 난다.
sc = SparkContext(conf = conf)

23/01/23 20:10:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/01/23 20:10:26 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [27]:
# RDD 만들기
### sc.parallelize: list로부터 RDD를 만든다
foods = sc.parallelize(['짜장면','마라탕','짬뽕','떡볶이','쌀국수','짬뽕','짜장면','짜장면','짜장면','라면','우동','라면'])
foods

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

In [28]:
# collect: RDD안에 있는 값을 확인할 때 => 값을 모두 가져온다.
foods.collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [29]:
# countbyValue: 몇개 ?
foods.countByValue()

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

In [30]:
# take: N개의 element를 뽑고 싶을 때
foods.take(3)

['짜장면', '마라탕', '짬뽕']

In [31]:
# first: 첫 번째 것을 보고싶다
foods.first()

'짜장면'

In [32]:
# count: element가 총 몇개인지
foods.count()

12

In [33]:
# distinct: 중복제거
foods.distinct().collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '라면', '우동']

In [34]:
foods.distinct().count()

7

In [37]:
# foreach: 요소를 하나하나 꺼내서 하나의 함수를 적용 + 값을 리턴하지 않고, 
foods.foreach(lambda x: print(x))

짜장면
마라탕
짬뽕
떡볶이
쌀국수
짬뽕
짜장면
짜장면
짜장면
라면
우동
라면


### 4. transformations
$Transformations = Narrow + Wide $ 

##### 4-1. Narrow Transformation
- 1:1 변환
- filter(), map(), flatMap(), sample(), union()
- 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없다.
- 정렬이 필요하지 않은 경우

##### 4-2. Wide Transformation
- Shuffling
- Intersection and join, distinct cartesian, reduceByKey(), groupBykey()
- 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있다

##### 4-3. Transformations 코드
- Transforms 아웃풋은 RDD, 
- 프로그램이 Transforms를 만날 때 바로 새로운 RDD를 만들지 않고 새로운 RDD는 actions를 만나서 실행될 때 생성
- 프로그램이 코드를 보다가 Transforms를 만나면 신택스 체크후 이 연산을 해야겠군 기억하고 넘어감
- Trnasformations를 여러번 거칠 때, 순서대로 변환과정을 기억하면서 하나의 Dag(Directred Acyclic Graph)를 생성함

In [39]:
# 새로운 RDD를 만든것은 아니고 기억을 해둔 것
sc.parallelize([1,2,3]).map(lambda x: x+2).collect()

[3, 4, 5]

In [40]:
sc.parallelize([1,2,3]).map(lambda x: x*2).collect()

[2, 4, 6]

In [41]:
moviews = [
    '그린 북',
    '매트릭스',
    '토이 스토리',
    '캐스트 어웨이',
    '포드 VS 페라리',
    '보헤미안 랩소디',
    '빽 투 더 퓨쳐',
    '반지의 제왕',
    '죽은 시인의 사회'
]