In [1]:
from pyspark import SparkContext, SparkConf 

In [2]:
# pyspark 연결하기 (SparkSession이 아닌, SparkContext로 연결하는 고전적인 방법임.)
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

23/11/28 07:29:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<SparkContext master=local[*] appName=RDD_practice>


# Part1: RDD 생성 및 기본 연산

# RDD를 생성하는 두 가지 방법:

1. 드라이버 프로그램에서 기존의 컬렉션을 Parallelizing 하는 방법
2. 공유 파일 시스템, HDFS, HBase, Hadoop InputFormat을 제공하는 데이터 소스 등과 같은 외부 데이터 저장소를 참조하는 방법

In [3]:
# 랜덤 데이터 생성
import random
random_list = random.sample(range(0, 40), 10)
print(random_list)

[8, 37, 38, 3, 34, 17, 5, 39, 27, 33]


In [4]:
# RDD 생성
rdd1 = sc.parallelize(random_list, 4)
rdd1.collect()

                                                                                

[8, 37, 38, 3, 34, 17, 5, 39, 27, 33]

In [5]:
sc.parallelize?

[0;31mSignature:[0m [0msc[0m[0;34m.[0m[0mparallelize[0m[0;34m([0m[0mc[0m[0;34m,[0m [0mnumSlices[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Distribute a local Python collection to form an RDD. Using xrange
is recommended if the input represents a range for performance.

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
[0;31mFile:[0m      /usr/local/lib/python3.9/dist-packages/pyspark/context.py
[0;31mType:[0m      method

In [6]:
# 전체 파티션 수 조회
print(rdd1.getNumPartitions())

# 파티션별 분산된 데이터 조회
print(rdd1.glom().collect())

# 파티션별 분산된 데이터 중 2개의 파티션만 보기
print("The two partitions", rdd1.glom().take(2))

# 마지막 파티션 출력
print("The last partition", rdd1.glom().collect()[-1])

4


                                                                                

[[8, 37], [38, 3], [34, 17], [5, 39, 27, 33]]
The two partitions [[8, 37], [38, 3]]
The last partition [5, 39, 27, 33]


In [7]:
# count() : RDD 전체 element 개수
rdd1.count()

10

In [8]:
# first() : 첫 번쨰 파티션의 첫 번째 element 값
rdd1.first()

8

In [9]:
# top(): 값이 높은 상위 N개의 elemenet 출력
# drive 프로세스에 모든 element를 보낸 후, 정렬(sort) 후 값을 반환하므로 데이터의 사이즈가 크면 사용에 주의가 필요하다.
print("Top 1 :", rdd1.top(1))
print("Top 2 :", rdd1.top(2))

Top 1 : [39]
Top 2 : [39, 38]


                                                                                

In [10]:
# distinct() : 중복을 제거한 element 반환
rdd1.distinct().collect()

                                                                                

[8, 37, 17, 5, 33, 38, 34, 3, 39, 27]

In [11]:
def myfunc(item):
    return (item+1) * 3

# map(): 각 elemnt에 function을 적용한 결과를 RDD로 반환 (파티션의 수는 변하지 않는다.)
rdd_map = rdd1.map(myfunc)
print("def function map :", rdd_map.glom().collect())

# map의 인자로 lambda(익명함수)도 사용 가능하다.
rdd_map = rdd1.map(lambda item: (item+1) * 3)
print("lambda function map :", rdd_map.glom().collect())

def function map : [[27, 114], [117, 12], [105, 54], [18, 120, 84, 102]]
lambda function map : [[27, 114], [117, 12], [105, 54], [18, 120, 84, 102]]


In [12]:
# filter(): element를 조건(function)에 부합하는 element만 필터링하여 RDD 반환 (파티션의 수는 변하지 않는다. 따라서, empty한 파티션이 발생 가능하다.)
# empty한 파티션은 GC(Garbage Collect) 기술인 repartition/coalese릍 통해 차후 제거 가능하다.
rdd_filter = rdd1.filter(lambda x: x%3==0)
print(rdd_filter.glom().collect())

print("After filtering count :", rdd_filter.count())
print("Repartition :", rdd_filter.repartition(1).glom().collect())

[[], [3], [], [39, 27, 33]]
After filtering count : 4
Repartition : [[3, 39, 27, 33]]


In [13]:
# flatMap(): 파티션별 모든 element를 하나의 단일 컬렉션으로 반환한다. (파티션 수가 변하는 것이 아니다. 파티션 내부의 차원을 한 단계 flat하게 만든다.)
rdd_flatmap = rdd1.flatMap(lambda x: [x+2, x+5])

# map()과 비교하기
rdd_map = rdd1.map(lambda x: [x+2, x+5])

# 비교 시작
print("flatMap Collect: ", rdd_flatmap.collect())
print("map Collect: ", rdd_map.collect(), "\n")

print("flatMap glom collect", rdd_flatmap.glom().collect())
print("map glom collect: ", rdd_map.glom().collect(), "\n")

print("flatMap Partition Num:", rdd_flatmap.getNumPartitions())
print("map Partition Num:", rdd_map.getNumPartitions())

flatMap Collect:  [10, 13, 39, 42, 40, 43, 5, 8, 36, 39, 19, 22, 7, 10, 41, 44, 29, 32, 35, 38]
map Collect:  [[10, 13], [39, 42], [40, 43], [5, 8], [36, 39], [19, 22], [7, 10], [41, 44], [29, 32], [35, 38]] 

flatMap glom collect [[10, 13, 39, 42], [40, 43, 5, 8], [36, 39, 19, 22], [7, 10, 41, 44, 29, 32, 35, 38]]
map glom collect:  [[[10, 13], [39, 42]], [[40, 43], [5, 8]], [[36, 39], [19, 22]], [[7, 10], [41, 44], [29, 32], [35, 38]]] 

flatMap Partition Num: 4
map Partition Num: 4


In [14]:
# reduce(): 각 파티션별 배열(1차원)의 element에 대해 function을 수행한다. 파티션별 function의 return 값에 대해 파티션 전체에 대한 reduce를 계속 실행한다.
# reduce()는 transformation이 아닌, action 함수이다.

# flatMap glom collect: [[9, 12, 39, 42], [23, 26, 24, 27], [8, 11, 19, 22], [17, 20, 30, 33, 40, 43, 32, 35]] 라면,
# step1 : [((9+12)+39)+42], [((23+26)+24)+27], [((8+11)+19)+22], [((((((17+20)+30)+33)+40)+43)+32)+35]]
# step2 : (([102] + [100]) + [60]) + [250]
# Return : 512

rdd_flatmap.reduce(lambda x,y: x+y)

552

In [15]:
# 기술 통계 (Descriptive statistic)
print([rdd1.max(), rdd1.min(), rdd1.mean(), round(rdd1.stdev(), 2), rdd1.sum()])

[39, 3, 24.1, 13.74, 241]


In [16]:
# mapPartitions(): 파티션별 function을 하여 특정 결과를 반환한다. action에 reduce가 있다면, transformation에는 mapPartition이라고 생각하면 쉽다.
# 각 파티션의 element에 대해 fuct을 적용한 중간 결과(yield)를 다음 element의 func 연산을 할 떄 사용하므로, 제너레이터에 사용하는 yield를 사용해야 한다!

def myfunc(partition):
    sumatition = 0
    
    for item in partition:
        sumatition = sumatition + item
        
    yield sumatition

print("rdd1 glom collect:", rdd1.glom().collect())
print("rdd1 mapPartition:", rdd1.mapPartitions(myfunc).collect())
print("rdd1 glom Partition:", rdd1.mapPartitions(myfunc).glom().collect())

rdd1 glom collect: [[8, 37], [38, 3], [34, 17], [5, 39, 27, 33]]
rdd1 mapPartition: [45, 41, 51, 104]
rdd1 glom Partition: [[45], [41], [51], [104]]


# Part2: Advanced RDD Transformations and Actions

In [17]:
# union(): 두 RDD를 결합하여 새로운 RDD를 반환한다. 결합한 RDD의 파티션 수는 결합에 사용된 RDD의 파티션의 총 합과 같다.

print("RDD 1:", rdd1.collect())

rdd2 = sc.parallelize([1, 14, 37, 20, 28, 10, 13, 3], 2)
print("RDD 2:", rdd2.collect())

rdd_union = rdd1.union(rdd2)
print("RDD union:", rdd_union.collect())

print("RDD Partition Num:", rdd_union.getNumPartitions())

RDD 1: [8, 37, 38, 3, 34, 17, 5, 39, 27, 33]
RDD 2: [1, 14, 37, 20, 28, 10, 13, 3]
RDD union: [8, 37, 38, 3, 34, 17, 5, 39, 27, 33, 1, 14, 37, 20, 28, 10, 13, 3]
RDD Partition Num: 6


In [18]:
# intersection(): 두 RDD의 공통된 element만 추출하여 새로운 RDD를 반환한다. 교차시킨 RDD의 파티션 수는 교차에 사용된 RDD의 파티션의 총 합과 같다.
rdd_intersection = rdd1.intersection(rdd2)

print("RDD intersection:", rdd_intersection.collect())
print("RDD intersection glom collect:" , rdd_intersection.glom().collect())
print("RDD Partition Num:", rdd_intersection.getNumPartitions())

                                                                                

RDD intersection: [37, 3]




RDD intersection glom collect: [[], [37], [], [3], [], []]
RDD Partition Num: 6


                                                                                

In [19]:
# Find empty partitions

counter = 0

for item in rdd_intersection.glom().collect():
    if len(item) == 0:
        counter += 1

counter

4

In [20]:
# coalesce(numPartitions): 파티션의 수를 줄인다. 
rdd_intersection.coalesce(1).glom().collect()

[[37, 3]]

In [21]:
# takeSample(withReplacement, num, [seed]): 무작위로 데이터를 추출하여 반환한다.
# collect()와 마찬가지로, parallel 하지 않고 drive 메모리에 모든 element를 모아 랜덤 추출하기 때문에 스파크 클러스터가 손상 될 수 있어 사용에 주의가 필요하다.
rdd1.takeSample(False, 5)

[37, 3, 5, 27, 38]

In [22]:
# takeOrdered(n, [ordering]): drive 메모리에 모든 element를 모아 오름차순으로 인자 수 만큼의 element 반환
print(rdd1.takeOrdered(5))

# 내림차순 정렬
print(rdd1.takeOrdered(5, key=lambda x: -x))

[3, 5, 8, 17, 27]
[39, 38, 37, 34, 33]


In [23]:
# reduce():
rdd1.reduce(lambda x,y: x-y)

13

In [24]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (9, 1), (7, 4)], 2)
print(rdd_Rbk.glom().collect())
print(rdd_Rbk.reduceByKey(lambda x,y: x+y).collect())


# user friendly visualization
import pandas as pd
Counter = pd.DataFrame({"key": rdd_Rbk.keys().collect(), "value": rdd_Rbk.values().collect()})
Counter

[[(1, 4), (7, 10), (5, 7)], [(1, 12), (7, 12), (9, 1), (7, 4)]]
[(1, 16), (7, 26), (5, 7), (9, 1)]


Unnamed: 0,key,value
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,9,1
6,7,4


In [25]:
# sortByKey():
print(rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey().collect())

# reverse sortByKey()
print(rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey(False).collect())

[(1, 16), (5, 7), (7, 26), (9, 1)]
[(9, 1), (7, 26), (5, 7), (1, 16)]


In [26]:
# countByKey()
print(rdd_Rbk.countByKey())

print(rdd_Rbk.countByKey().items())

defaultdict(<class 'int'>, {1: 2, 7: 3, 5: 1, 9: 1})
dict_items([(1, 2), (7, 3), (5, 1), (9, 1)])


                                                                                

In [33]:
# groupByKey(): 대규모의 shuffle이 발생 할 수 있기 때문에, filter를 먼저 사용 후 사용을 권장
rdd_group = rdd_Rbk.groupByKey()
print("NumPartitions:", rdd_group.getNumPartitions())

for item in rdd_group.collect():
    print(item[0], [values for values in item[1]])

NumPartitions: 2
1 [4, 12]
7 [10, 12, 4]
5 [7]
9 [1]


In [34]:
# lookup(key): 
rdd_Rbk.lookup(7)

[10, 12, 4]

In [37]:
# cache():
# 기본적으로, 각 transformed된 RDD는 action을 실행 할 때마다 다시 계산(compute) 될 수 있다.₩
# 하지만, persist (또는 cache) 메서드를 사용하여 RDD를 메모리에 유지 할 수도 있다.
# 이런 경우에 Spark가 클러스터 전체에 elements를 유지하여 다음 쿼리 때 훨씬 더 빠르게 접근 할 수 있다.

# 메모리에서 결과를 계산하면 스파크는 Garbage Collector를 사용하여 LRU 알고리즘에 의해 메모리가 부족시 객체를 삭제하게 된다.
# 이런 경우가 있기 떄문에, cache를 통해 읽기 내용을 캐시 내부에 저장 할 수 있다. 캐시된 RDD는 가비지 컬렉터에 의해 삭제되지 않는다.

# cache()는 메모리에 캐시
rdd_Rbk.cache()

# persist()는 저장하는 수준을 사용자가 지정 할 수 있다. default로는 MEMORY_ONLY를 사용하고 있다.
# https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
# 옵션에 따라 자바 RDD 객체를 직렬화 하여 JVM에 저장하고, 부족시 디스크에 저장하여 필요시 읽게 할 수도 있다.
rdd_Rbk.persist()

ParallelCollectionRDD[58] at readRDDFromFile at PythonRDD.scala:262

In [38]:
rdd_Rbk.cache?

[0;31mSignature:[0m [0mrdd_Rbk[0m[0;34m.[0m[0mcache[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Persist this RDD with the default storage level (`MEMORY_ONLY`).
[0;31mFile:[0m      /usr/local/lib/python3.9/dist-packages/pyspark/rdd.py
[0;31mType:[0m      method

In [39]:
rdd_Rbk.persist?

[0;31mSignature:[0m [0mrdd_Rbk[0m[0;34m.[0m[0mpersist[0m[0;34m([0m[0mstorageLevel[0m[0;34m=[0m[0mStorageLevel[0m[0;34m([0m[0;32mFalse[0m[0;34m,[0m [0;32mTrue[0m[0;34m,[0m [0;32mFalse[0m[0;34m,[0m [0;32mFalse[0m[0;34m,[0m [0;36m1[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (`MEMORY_ONLY`).

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
True
[0;31mFile:[0m      /usr/local/lib/python3.9/dist-packages/pyspark/rdd.py
[0;31mType:[0m      method

In [47]:
# cache 해제
rdd_Rbk.unpersist()

ParallelCollectionRDD[58] at readRDDFromFile at PythonRDD.scala:262

In [48]:
from pyspark import StorageLevel

# MEMORY_AND_DISK 레벨로 캐싱
rdd_Rbk.persist(StorageLevel.MEMORY_AND_DISK)

ParallelCollectionRDD[58] at readRDDFromFile at PythonRDD.scala:262