참고: https://blog.exxactcorp.com/the-benefits-examples-of-using-apache-spark-with-pyspark-using-python/

In [1]:
from pyspark import SparkContext
import numpy as np

## Resilient Distributed Dataset(RDD) and SparkContext

cluster manager로 worker nodes에서 분산처리

In [5]:
sc=SparkContext(master="local[4]")

In [6]:
sc

In [54]:
sample = np.random.randint(0, 10, 2) # 0 ~ 100000000000 10000개 랜덤
A=sc.parallelize(sample)
A

ParallelCollectionRDD[204] at readRDDFromFile at PythonRDD.scala:262

In [14]:
# opposite to parallelization
A.collect()

[5, 3, 2, 0, 4, 8, 5, 5, 7, 7, 3, 9, 6, 5, 1, 2, 2, 2, 9, 9]

In [15]:
# A : 더 이상 numpy array가 아님
A.glom().collect() # chunks = 4

[[5, 3, 2, 0, 4], [8, 5, 5, 7, 7], [3, 9, 6, 5, 1], [2, 2, 2, 9, 9]]

In [17]:
sc.stop()
# reinitialize( 2 cores )
sc = SparkContext(master="local[2]")
A = sc.parallelize(sample)
A.glom().collect() # chunks = 2

[[5, 3, 2, 0, 4, 8, 5, 5, 7, 7], [3, 9, 6, 5, 1, 2, 2, 2, 9, 9]]

In [18]:
len(sample)

20

In [19]:
A.first()

5

In [20]:
A.take(3)

[5, 3, 2]

In [21]:
A_distinct = A.distinct()
A_distinct.collect()

[2, 0, 4, 8, 6, 5, 3, 7, 9, 1]

#### spark 분산되면 훨씬( 약 3~4 배 빠르게 계산됨 )

In [55]:
big = np.random.randint(0, 100000000000, 10000) # 0 ~ 100000000000 10000개 랜덤
parallel_big =sc.parallelize(sample)
print(type(parallel_big), type(big))

ParallelCollectionRDD[205] at readRDDFromFile at PythonRDD.scala:262

In [45]:
%%timeit
parallel_big.reduce(lambda x, y : x+y)

53.7 ms ± 4.94 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [49]:
%%time
parallel_big.sum()

CPU times: user 23.9 ms, sys: 0 ns, total: 23.9 ms
Wall time: 73.8 ms


503900550164537

In [50]:
%%time
big.sum()

CPU times: user 365 µs, sys: 0 ns, total: 365 µs
Wall time: 278 µs


503900550164537

In [24]:
A.reduce(lambda x, y: x if x>y else y)

9

In [53]:
%%time
words = """Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. 
Structured Streaming automatically checkpoints the state data to fault-tolerant storage ( for example, HDFS, AWS S3, Azure Blob storage ) and restores it after restart. 
However, this assumes that the schema of the state data remains same across restarts. This means that any changes ( that is, additions, deletions, or schema modifications ) 
to the stateful operations of a streaming query are not allowed between restarts.  
Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery""".split(' ')
wordRDD = sc.parallelize(words)
wordRDD.reduce(lambda x, y: x if len(x)>len(y) else y)

CPU times: user 15.2 ms, sys: 0 ns, total: 15.2 ms
Wall time: 104 ms


'fault-tolerant'

In [31]:
%%time
A.filter(lambda x:x%3==0 and x!=0).collect()

CPU times: user 12.8 ms, sys: 0 ns, total: 12.8 ms
Wall time: 119 ms


[3, 3, 9, 6, 9, 9]

In [32]:
# pure-python
def find_larger(x, y):
    if len(x)>len(y):
        return x
    elif len(y) > len(x):
        return y
    else:
        if x < y: return x
        else: return y
wordRDD.reduce(find_larger)

'Macintosh'

In [33]:
mapA = A.map(lambda x: x*x)
mapA.collect()

[25, 9, 4, 0, 16, 64, 25, 25, 49, 49, 9, 81, 36, 25, 1, 4, 4, 4, 81, 81]

In [34]:
result = A.groupBy(lambda x:x%2).collect()
sorted([x, sorted(y)] for (x, y) in result)

[[0, [0, 2, 2, 2, 2, 4, 6, 8]], [1, [1, 3, 3, 5, 5, 5, 5, 7, 7, 9, 9, 9]]]

In [37]:
mapA.histogram([x for x in range(0, 100, 10)])

([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [8, 1, 4, 1, 2, 0, 1, 0, 3])

## Lazy evaluation and Caching

In [3]:
sc.stop() # stop안하면 Cannot run multiple SparkContexts at once; existing SparkContext 오류남
sc = SparkContext(master="local[2]")

In [16]:
%%time
num = range(1000000)
rdd1 = sc.parallelize(num)

CPU times: user 3.3 ms, sys: 0 ns, total: 3.3 ms
Wall time: 4.57 ms


In [17]:
from math import cos
def check_time(x):
    [cos(j) for j in range(100)] # 100번
    return cos(x)

In [18]:
%%time
check_time(2) # 41.5ms

CPU times: user 15 µs, sys: 0 ns, total: 15 µs
Wall time: 16.7 µs


-0.4161468365471424

In [19]:
%%time
# 실제 execution이 일어나진 않음( 스케줄링만 )
interim = rdd1.map(lambda x: check_time(x))
interim # 3.81ms

CPU times: user 40 µs, sys: 0 ns, total: 40 µs
Wall time: 44.1 µs


PythonRDD[7] at RDD at PythonRDD.scala:53

#### RDD vs. pure-python

In [None]:
%%time
# 실제 execution
print(interim.reduce(lambda x, y: x+y))

In [9]:
from functools import reduce

In [10]:
%%time
print(reduce(lambda x, y: x+y, list(map(check_time, num))))

KeyboardInterrupt: 

In [95]:
%%time
# 실제 execution
print(interim.filter(lambda x:x>0).count())

KeyboardInterrupt: 