In [57]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("RDD-application-1") \
      .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Creating RDD

**NOTE**: <br>

Nhớ rằng chỉ có 3 cách để tạo ra một RDD:
- Load từ storage
- Thực hiện từ spark driver (spark context)
- Tạo từ một RDD khác: chính là các transformation


### Thực hiện đọc từ Storage

Create RDD using sparkContext.textFile()

In [None]:

#Create RDD from external Data source
rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

Create RDD using sparkContext.wholeTextFiles()

In [None]:
rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

### Thực hiện từ Spark Context

Create using sparkContext.parallelize()

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = spark.sparkContext.parallelize(data, numSlices=10)   # numSlices chính là số partition của RDD
rdd2 = spark.sparkContext.parallelize(range(0, 6, 1), numSlices=10)

Image: RDD creation <br>
![alt text](imgs/rdd-creation-1.png "image title")

Create empty RDD using sparkContext.emptyRDD

In [None]:
# Creates empty RDD with no partition
rdd = spark.sparkContext.emptyRDD

# RDD partition manupulation

**NOTE** <br>
When we use **parallelize(no set partition)** or **textFile()** or **wholeTextFiles()** methods of SparkContext to initiate RDD <br>
=> it automatically splits the data into partitions based on resource availability (number of cores available)

In [13]:
rdd = spark.sparkContext.textFile("file:///home/hadoop/Codes/pyspark_projects/tmp_data/text.txt")

getNumPartitions()

In [14]:
print("initial partition count: ", rdd.getNumPartitions())

initial partition count:  1


repartition() <br>
- Thực chất là 1 dạng Wide Transform
- Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. 

In [15]:
reparRdd = rdd.repartition(4)
print("re-partition count: ", reparRdd.getNumPartitions())

re-partition count:  4


coalesce()
- Thực chất là 1 dạng Wide Transform
- Chỉ định số nodes mà rdd sẽ di chuyển qua. Ví dụ coalesce(2) => các partition chỉ đi qua 2 nodes

In [None]:
reparRdd = rdd.coalesce(2)

# RDD transformation

**NOTE**
- Transformation luôn trả về một rdd khác
- Có hai loại transformation:
    + Narrow Transformation: Mỗi partition đầu vào sẽ dùng để tính 1 partition đầu ra
    + Wide Transformation (shuffle transformation): Nhiều partition đầu vào sẽ dùng để tính 1 partition đầu ra

### Narrow Transformation

flatmap() <br>
- flatMap() transformation flattens the RDD after applying the function in map and returns a new RDD.

In [16]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))

map()
- the output of map transformations would always have the same number of records as input.

In [17]:
rdd3 = rdd2.map(lambda x: (x,1))

reduceByKey()
- merges the values for each key with the function specified

In [18]:
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)

sortByKey() (của wide transform)
- transformation is used to sort RDD elements on key.

In [20]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
#Print rdd5 result to console
print(rdd5.collect())

[(6, 'the'), (4, 'Lorem'), (4, 'of'), (3, 'Ipsum'), (3, 'and'), (2, 'dummy'), (2, 'text'), (2, 'has'), (2, 'a'), (2, 'type'), (2, 'It'), (2, 'with'), (1, 'is'), (1, 'simply'), (1, 'printing'), (1, 'typesetting'), (1, 'industry.'), (1, 'been'), (1, "industry's"), (1, 'standard'), (1, 'ever'), (1, 'since'), (1, '1500s,'), (1, 'when'), (1, 'an'), (1, 'unknown'), (1, 'printer'), (1, 'took'), (1, 'galley'), (1, 'scrambled'), (1, 'it'), (1, 'to'), (1, 'make'), (1, 'specimen'), (1, 'book.'), (1, 'survived'), (1, 'not'), (1, 'only'), (1, 'five'), (1, 'centuries,'), (1, 'but'), (1, 'also'), (1, 'leap'), (1, 'into'), (1, 'electronic'), (1, 'typesetting,'), (1, 'remaining'), (1, 'essentially'), (1, 'unchanged.'), (1, 'was'), (1, 'popularised'), (1, 'in'), (1, '1960s'), (1, 'release'), (1, 'Letraset'), (1, 'sheets'), (1, 'containing'), (1, 'passages,'), (1, 'more'), (1, 'recently'), (1, 'desktop'), (1, 'publishing'), (1, 'software'), (1, 'like'), (1, 'Aldus'), (1, 'PageMaker'), (1, 'including'), (

filter() 
- transformation is used to filter the records in an RDD.

In [22]:
rdd6 = rdd5.filter(lambda x : x[0] > 1)
print(rdd6.collect())

[(6, 'the'), (4, 'Lorem'), (4, 'of'), (3, 'Ipsum'), (3, 'and'), (2, 'dummy'), (2, 'text'), (2, 'has'), (2, 'a'), (2, 'type'), (2, 'It'), (2, 'with')]


union()
- gom 2 rdd lại với nhau

In [None]:
union_rdd = rdd2.union(rdd3)

sample()
- Lấy sample dữ liệu
    + nếu không có withReplacement => Fraction là tỉ lệ % đầu ra [0, 1]
    + nếu có withReplacement => Fraction là số lần xuất hiện (kì vọng) của mỗi phần tử được chọn
- withReplacement: cho phép hoàn trả hay không

In [28]:
rdd = spark.sparkContext.parallelize(range(1, 10))
sample_rdd = rdd.sample(withReplacement=False, fraction=0.7)  
sample_rdd_2 = rdd.sample(True, fraction=1)
print(sample_rdd.collect())
print(sample_rdd_2.collect())


[1, 4, 5, 6, 9]
[1, 1, 1, 2, 2, 3, 3, 4, 4, 4, 4, 4, 5, 6, 7, 9]


### Wide Transformation

**NOTE**<br>
- Thường có dạng <key, value>: Pair RDD

groupByKey() and reduceByKey()

In [36]:
words = ["one", "two", "two", "three", "three", "three"]
wordPairsRDD = spark.sparkContext.parallelize(words).map(lambda word : (word, 1))

wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda x, y: x + y).collect()
wordCountsWithGroup= wordPairsRDD.groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()

print(wordCountsWithGroup)
print(wordCountsWithReduce)

[('one', 1), ('two', 2), ('three', 3)]
[('one', 1), ('two', 2), ('three', 3)]


Image: reduceByKey vs groupByKey <br>
![](imgs/ReduceAndGroupByKey.png)

join() - distinct() - intersect()
- join 2 rdd dựa trên key
- distinct lấy ra các phần tử không bị trùng lặp
- intersect lấy phần giao nhau

In [40]:
rdd1 = spark.sparkContext.parallelize([(1, 'a'), (2, 'b'), (3, 'c')])
rdd2 = spark.sparkContext.parallelize([(1, 'a'), (2, 'e'), (4, 'f')])

print("data join: ")
data_join = rdd1.join(rdd2)
data_join.foreach(print)

print("data intersect")
data_intersect = rdd1.intersection(rdd2)
data_intersect.foreach(print)

print("data distinct")
data_distinct = rdd1.distinct()
data_distinct.foreach(print)

data join: 


(2, ('b', 'e'))
(1, ('a', 'a'))


data intersect


(1, 'a')


data distinct


(1, 'a')
(2, 'b')
(3, 'c')


# RDD actions

**NOTE**
- Action luôn trả về giá trị hoặc list giá trị thay vì trả về một rdd mới
- Nên hiểu reduceByKey và reduce nói chung là sẽ đưa hết các record về cùng 1 thằng
    + Nếu không có ByKey thì reduce tất cả về 1 record duy nhất
    + Nếu có ByKey thì reduce tất cả những thằng cùng key về với nhau 
    (key ở đây có thể là phần tử đầu tiên hoặc function mà mình tự định nghĩa)
    + Hàm lambda truyền vào cho thằng reduce gồm hai biến tương trưng cho 2 thằng cùng được reduce liên tiếp, áp dụng lần lượt từng cặp một cho đến khi về 1 record

- count() – Returns the number of records in an RDD <br>
- first() – Returns the first record. <br>
- max() – Returns max record. <br>
- reduce() – Reduces the records to single, we can use this to count or sum. <br>
- take() – Returns the record specified as an argument, trả về một rdd mới với số lượng các records theo argument <br>
- collect() – Returns all data from RDD as an array; maybe out of mem if the rdd is huge <br>
- saveAsTextFile() – write the RDD to a text file.

In [55]:
rdd = spark.sparkContext.parallelize([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'f'), (11, 'e'), (3, 'c')])

print(f"Count: {rdd.count()}")
print(f"First element: {rdd.first()}")
print(f"Max element by key: {rdd.max()}")
print(f"Max element by value: {rdd.max(key=(lambda x: x[1]))}")
print(f"Reduce: ", rdd.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1])))
print(f"3 records: ", rdd.take(3))
print(rdd.collect())
rdd.saveAsTextFile("file:///home/hadoop/Codes/pyspark_projects/tmp_data/text_from_rdd.txt")

Count: 7
First element: (1, 'a')
Max element by key: (11, 'e')
Max element by value: (5, 'f')
Reduce:  (29, 'abcdfec')
3 records:  [(1, 'a'), (2, 'b'), (3, 'c')]
[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'f'), (11, 'e'), (3, 'c')]
None


# RDD Optimization

**NOTE** <br>
Mục đích:
- Nhằm tận dụng lại những computation đã thực hiện. 

Động lực:
- Về bản chất thì Spark không hề lưu các giá trị hay các rdd trung gian được tính ra trong các bước transformation, nó chỉ lưu linage graph - đồ thị diễn đạt các bước thực hiện transform này mà thôi.
- Chúng chỉ thực hiện transform (lazy operation) và lưu tạm thời khi có một action nào đó được kêu gọi.

Vấn đề: Đặt ra vấn đề là nếu muốn sử dụng lại một rdd trung gian nào đó thì phải làm thế nào?
- Thực hiện tính toán lại từ đầu ? => Rất tốn chi phí
=> Do đó Spark cho phép lưu tạm thời (caching) thằng rdd xuống memory/disk để thực hiện các subsequent transformation/actions khác
=> Lúc này mỗi node sẽ lưu các partitions của cached rdd mà node đó đang nắm giữ xuống memory/disk


cache():
- Sẽ tự động gọi đến persist()

In [56]:
cachedRdd = rdd.cache()

persist():
- persist cho phép thiết lập StorageLevel:
    + MEMORY_ONLY : default, lưu xuống JVM memory nhưng không tuần tự hóa
    + MEMORY_ONLY_SER : lưu xuống memory nhưng tuần tự hóa, tiết kiệm space hơn với MEMORY_ONLY, nhưng mất thêm vài CPU cycles để giải tuần tự hóa
    + MEMORY_AND_DISK : lưu xuống memory nhưng không tuần tự hóa, nếu RDD cần lưu xuống lớn hơn RAM hiện có => lưu các excess partitions xuống disk 
    + MEMORY_AND_DISK_SER : như MEMORY_AND_DISK nhưng tuần tự
    + DISK_ONLY
    <br><br>
    + MEMORY_ONLY_2 : tương tự nhưng replication từng partition ra 2 nodes
    + MEMORY_ONLY_SER_2 : tương tự nhưng replication từng partition ra 2 nodes
    + MEMORY_AND_DISK_2 : tương tự nhưng replication từng partition ra 2 nodes
    + MEMORY_AND_DISK_SER_2 : tương tự nhưng replication từng partition ra 2 nodes

In [None]:
dfPersist = rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) 
dfPersist.show(False)

unpersist()
- PySpark drops persisted data if not used or by using least-recently-used (LRU) algorithm.
- Can also manually remove using unpersist() method. unpersist() marks the RDD as non-persistent, and remove all blocks for it from memory and disk

In [None]:
rdd.unpersist()

In [None]:
spark.stop()