In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/19 13:05:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
sc = spark.sparkContext

In [3]:
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [18]:
rdd_x = sc.parallelize(x)
rdd_x_2 = sc.parallelize(x, 2)
rdd_obama = sc.textFile("./obama.txt")

In [10]:
rdd_x

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:274

In [12]:
print(rdd_x.getNumPartitions())
print(rdd_x_2.getNumPartitions())

4
2


In [16]:
# collect(): RDD의 모든 원소를 모아서 배열로 돌려준다
# 액션 연산이다. 
# collect() 메서드를 호출한 서버의 메모리에 전체 데이터를 모두 담을 수 있을 정도의 충분한 메모리 공간이 확보돼 있어야 한다

rdd_x.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [17]:
rdd_x.count()

10

## RDD 트랜스포메이션

- 맵: RDD 모든 요소에 함수를 적용해 새로운 RDD를 생성
- 그룹화: 특정 조건에 따라 요소를 그룹화하거나 특정 함수를 적용
- 집합: RDD간에 합집합, 교집합 등을 계산
- 파티션: RDD의 파티션 개수를 조정
- 필터: 특정 조건을 만족하는 요소만 선택
- 정렬: 요소를 정해진 기준에 따라 정렬

In [22]:
rdd_x.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

### 맵 트랜스포메이션

In [28]:
# map(): 메서드의 인자로 함수가 전달된다 (스파크는 스칼라 언어로 개발되었기 때문에, 함수형 프로그래밍을 지원한다)
rdd_x_plus_1 = rdd_x.map(lambda x: x + 1)

rdd_x_plus_1.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [32]:
# flatMap(): map()과의 차이는 인자로 전달되는 함수가 반환하는 값의 타입이 이터레이션이 가능한 컬렉션과 유사한 타입의 값을 반환해야 한다
rdd_obama_words = rdd_obama.flatMap(lambda x: x.split(" "))
rdd_obama_words.collect()[:10]

['Barack',
 'Hussein',
 'Obama',
 'II',
 'is',
 'an',
 'American',
 'politician',
 'who',
 'served']

In [47]:
# map() vs flatMap()
# One of the use cases of flatMap() is to flatten column which contains arrays, list, or any nested collection
x = ['a,b,c', 'd,e,f']
rdd_x_map = sc.parallelize(x).map(lambda x: x.split(","))
rdd_x_flatmap = sc.parallelize(x).flatMap(lambda x: x.split(","))
print(f"result of map: {rdd_x_map.collect()}")
print(f"result of flatmap: {rdd_x_flatmap.collect()}")

result of map: [['a', 'b', 'c'], ['d', 'e', 'f']]
result of flatmap: ['a', 'b', 'c', 'd', 'e', 'f']


In [53]:
# mapPartitions()
# 파티션 단위로 함수에 적용
# 파티션에 속한 모든 요소를 한 번의 함수 호출로 처리
# 파티션 단위의 중간 산출물을 만들거나, 데이터베이스 연결과 같은 고비용의 자원을 파티션 단위로 공유해 사용할 수 있다는 장점
rdd_x = sc.parallelize(range(1, 11), 3)

def increase(numbers):
    print("DB 연결")
    return [numbers[0]]

rdd_x_mapPartitions = rdd_x.mapPartitions(increase)

rdd_x_mapPartitions.collect()

DB 연결
DB 연결
DB 연결


[1, 4, 7]

In [55]:
# mapValues(): 전달받은 함수를 페어RDD의 값(value)에 해당하는 요소에만 적용하고 그 결과를 반환 (모든 요소가 키(key)를 가지고 있어야함)

rdd_key = sc.parallelize(["a", "b", "b", "c", "c", "c", "c"])

pair_rdd = rdd_key.map(lambda x: (x, 0))

pair_rdd_map_value = pair_rdd.mapValues(lambda x: x + 1)

pair_rdd_map_value.collect()

[('a', 1), ('b', 1), ('b', 1), ('c', 1), ('c', 1), ('c', 1), ('c', 1)]

### 그룹 트랜스포메이션

In [60]:
# zip(): 첫 번째 컬렉션 RDD와 두 번째 컬렉션 RDD의 인덱스가 같은 원소끼리 그룹한 결과를 반환합니다

rdd_num = sc.parallelize([1, 2, 3])
rdd_kor = sc.parallelize(["가", "나", "다"])

rdd_zip = rdd_num.zip(rdd_kor)
rdd_zip.collect()

[(1, '가'), (2, '나'), (3, '다')]

In [65]:
# groupBy(): RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고, 함수는 키를 반환하고, groupBy()는 그룹으로 구성된 새로운 RDD를 반환합니다
rdd_nums = sc.parallelize(range(1, 11))

rdd_groupBy = rdd_nums.groupBy(lambda x: "even" if x % 2 == 0 else "odd")

for k, v in rdd_groupBy.collect():
    print(f"key: {k}, value: {list(v)}")

key: even, value: [2, 4, 6, 8, 10]
key: odd, value: [1, 3, 5, 7, 9]


In [71]:
# groupByKey(): 페어RDD를 키(Key)로 그룹지어 결과를 반환한다

pair_rdd = sc.parallelize(["a", "a", "b", "b", "b", "c"]).map(lambda x: (x, 1))

for k, v in pair_rdd.groupByKey().collect():
    print(f"key: {k}, value: {list(v)}")

key: b, value: [1, 1, 1]
key: c, value: [1]
key: a, value: [1, 1]


### 집합 트랜스포메이션

In [72]:
# distinct(): 중복을 제외한 요소로만 구성된 새로운 RDD를 반환한다
rdd_raw = sc.parallelize([1, 3, 4, 2, 3, 3, 1, 1, 4])
rdd_distinct = rdd_raw.distinct()
rdd_distinct.collect()

[4, 1, 2, 3]

In [73]:
# cartesian(): 두 RDD 요소의 카르테시안 곱을 구하고 그 결과를 새로운 RDD로 반환
rdd_num = sc.parallelize([1, 2, 3])
rdd_kor = sc.parallelize(['가', '나'])

rdd_cartesian = rdd_num.cartesian(rdd_kor)
rdd_cartesian.collect()

[(1, '가'), (1, '나'), (2, '가'), (2, '나'), (3, '가'), (3, '나')]

In [74]:
# subtract()
# union()
# intersection()

In [77]:
# join(): 두 RDD의 요소중 키(key)값이 같은 요소끼리 조인한 결과를 반환한다 (inner join)
rdd1 = sc.parallelize(["a", "b", "c", "d"]).map(lambda x: (x, 1))
rdd2 = sc.parallelize(["b", "c"]).map(lambda x: (x, 2))

rdd_join = rdd1.join(rdd2)
print(f"Inner Join: {rdd_join.collect()}")

# leftOuterJoin()
rdd_left_join = rdd1.leftOuterJoin(rdd2)
print(f"Left Outer Join: {rdd_left_join.collect()}")

# rightOuterJoin()
rdd_right_join = rdd1.rightOuterJoin(rdd2)
print(f"Right Outer Join: {rdd_right_join.collect()}")

Inner Join: [('b', (1, 2)), ('c', (1, 2))]
Left Outer Join: [('a', (1, None)), ('b', (1, 2)), ('c', (1, 2)), ('d', (1, None))]
Right Outer Join: [('b', (1, 2)), ('c', (1, 2))]


In [78]:
# subtractByKey(): subtract를 키(key)를 기준으로 한다

rdd1 = sc.parallelize(['a', 'b']).map(lambda x: (x, 1))
rdd2 = sc.parallelize(['b']).map(lambda x: (x, 1))

rdd_sub_by_key = rdd1.subtractByKey(rdd2)
rdd_sub_by_key.collect()

[('a', 1)]

### 집계 트랜스포메이션

In [79]:
# reduceByKey(): 같은 키(key)를 가진 값(value)들을 하나로 병합해 새로운 RDD를 반환한다
# 병합을 수행하기 위해 두 개의 값을 하나로 합치는 함수를 인자로 전달해야 한다
# 이 함수의 연산은 결합법칙과 교환법칙이 성립됨을 보장해야 한다
# 왜냐하면 데이터가 여러 파티션에 분산되어 있어서 항상 같은 순서로 연산이 수행되지 않기 때문이다

rdd = sc.parallelize(['a', 'b', 'b']).map(lambda x: (x, 1))
rdd_reduce_by_key = rdd.reduceByKey(lambda x1, x2: x1 + x2)
rdd_reduce_by_key.collect()

[('b', 2), ('a', 1)]

In [80]:
# foldByKey(): reduceByKey() 에 초기값을 전달할 수 있는 능력을 가지고 있다

rdd = sc.parallelize(['a', 'b', 'b', 'c']).map(lambda x: (x, 1))
rdd_fold_by_key = rdd.foldByKey(0, lambda x1, x2: x1 + x2)
rdd_fold_by_key.collect()

[('b', 2), ('c', 1), ('a', 1)]

### 필터 트랜스포메이션

In [81]:
# filter()

rdd = sc.parallelize(range(1, 11))

rdd_filtered = rdd.filter(lambda x: x % 2 == 0)
rdd_filtered.collect()

[2, 4, 6, 8, 10]

### 정렬 트랜스포메이션

In [93]:
# sortByKey()
# 정렬후 파티션 내부의 요소는 정렬 순서상 인접한 요소로 재구성된다

rdd = sc.parallelize(['a', 'e', 'z', 'd']).map(lambda x: (x, 1))

rdd_sorted = rdd.sortByKey()
rdd_sorted.collect()

[('a', 1), ('d', 1), ('e', 1), ('z', 1)]

In [95]:
# keys(), values()

rdd = sc.parallelize(['a', 'e', 'z', 'd']).map(lambda x: (x, 1))

print(f"Keys: {rdd.keys().collect()}")
print(f"Values: {rdd.values().collect()}")

Keys: ['a', 'e', 'z', 'd']
Values: [1, 1, 1, 1]


In [96]:
# sortBy()

rdd = sc.parallelize([1, 4, 7, 2, 3, 8, 1])

rdd_sorted = rdd.sortBy(lambda x: x)
rdd_sorted.collect()

[1, 1, 2, 3, 4, 7, 8]

In [130]:
# sample(withReplacement, fraction, seed)
# withReplacement: true -> 복원 추출 -> 한 개 추출하고 다시 원래 상태로 돌리고 다시 추출
# fraction: 복원 추출시 각 요소의 평균 발생 회수, 비복원 추출시 각 요소가 뽑힐 확률
# seed: 반복 실행해도 결과를 일정하게 하기 위한 숫자

# 주의할 점
# 평균 발생 회수가 1회라고 N개의 결과가 N개가 됨을 보장하는 것이 아니고,
# 샘플링 확률이 0.1이라고 해서 N개의 결과가 0.1N개가 됨을 보장하지 않는다
# 전체 N개에 대해서 랜덤하게 0.3N개만 뽑고 싶은 경우에는 takeSample()을 사용한다 rdd.takeSample(False, 3) -> 결과는 리스트 -> 액션 연산

# 균등분포가 아니라 확률분포 샘플링은 어떻게 할까?

rdd = sc.parallelize(range(1, 11))

rdd_sample1 = rdd.sample(True, 0.5)
rdd_sample2 = rdd.sample(True, 5)

rdd_sample3 = rdd.sample(False, 0.1)
rdd_sample4 = rdd.sample(False, 0.7)

print(f"복원 추출, 평균 발생 회수 0.5회: {rdd_sample1.collect()}")
print(f"복원 추출, 평균 발생 회수 5회: {rdd_sample2.collect()}")
print(f"비복원 추출, 샘플링 확률 0.1: {rdd_sample3.collect()}")
print(f"비복원 추출, 샘플링 확률 0.7: {rdd_sample4.collect()}")

복원 추출, 평균 발생 회수 0.5회: [1, 1, 1, 2, 10]
복원 추출, 평균 발생 회수 5회: [1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 6, 6, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10]
비복원 추출, 샘플링 확률 0.1: []
비복원 추출, 샘플링 확률 0.7: [1, 2, 3, 4, 9]


## RDD 액션

- 액션 연산이 호출되면 비로소 앞에 있던 모든 트랜스포메이션 연산이 실행된다
- 문제는 이러한 Lazy Operation은 액션 연산이 호출될 때마다, 결과를 재사용하지 않고 계산을 한다는 것이다
- 액션 연산 앞에 10개의 트랜스포메이션 연산이 있었다면 액션 연산을 3번 호출하면 총 30번의 트랜스포메이션 연산을 하게 된다
- 결과를 반복 사용한다면 캐시를 적절히 이용해야 한다

In [131]:
# take()

rdd = sc.parallelize(range(1, 11))
rdd.take(3)

[1, 2, 3]

In [132]:
# collect()

rdd = sc.parallelize(range(1, 11))
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [133]:
# count()

rdd = sc.parallelize(range(1, 11))
rdd.count()

10

In [135]:
# countByValue()

rdd = sc.parallelize([1, 1, 2, 3, 3, 3, 3])

rdd.countByValue()

defaultdict(int, {1: 2, 2: 1, 3: 4})

In [164]:
# reduce(): RDD에 포함된 모든 요소를 하나의 값으로 병합하고 결과를 반환
# 모든 요소가 순서되로 처리되는 것 아님 (각 서버에 흩어져 있는 파티션 단위로 나눠져서 처리된다)

rdd = sc.parallelize(range(1, 11))
rdd.reduce(lambda x1, x2: x1 + x2)

55

In [165]:
# fold(): reduce()에 초기값을 지정할 수 있다

rdd = sc.parallelize(range(1, 11))
rdd.fold(0, lambda x1, x2: x1 + x2)

55

In [166]:
# sum()

rdd.sum()

55

In [173]:
# foreach(): 인자로 전달받은 함수를 각 요소에 적용한다. 함수의 실행이 드라이버 프로그램이 실행된 서버가 아닌 각 개별 노드에서 실행된다
# 각 노드에서 실행되도 의미있는 작업: DB, 파일시스템과 같은 다른 외부시스템과 통신
rdd = sc.parallelize(range(1, 11))

rdd.foreach(lambda x: x + 1)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [177]:
# cache(), persist(), unpersist()
rdd = sc.parallelize(range(1, 11))
rdd.cache

from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK_DESER)

rdd.unpersist()

PythonRDD[564] at RDD at PythonRDD.scala:53