<a href="https://colab.research.google.com/github/kasikotnani23/Kasi-k/blob/main/Working_with_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Working with RDD (Resilient Distributed Dataset)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=8f205e94096e0864ad99c9275b92fdf8319207402af52cfcb44e9055bcb55a23
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [3]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [42]:
# Generate random 
import random 
random_list = random.sample(range(1,19),10)
print(random_list)

[9, 11, 12, 4, 10, 14, 6, 18, 15, 16]


In [48]:
# Create RDD:
rdd1 = sc.parallelize(random_list,4)
print(rdd1.collect())
print(rdd1.getNumPartitions())
rdd1.glom().collect()

[9, 11, 12, 4, 10, 14, 6, 18, 15, 16]
4


[[9, 11], [12, 4], [10, 14], [6, 18, 15, 16]]

In [49]:
# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print("Two partitions: ", rdd1.glom().take(2))


4
[[9, 11], [12, 4], [10, 14], [6, 18, 15, 16]]
Two partitions:  [[9, 11], [12, 4]]


In [50]:
# Print last partition
for item in rdd1.glom().collect()[3]:
  print(item)

6
18
15
16


In [51]:
# count():
rdd1.count()

10

In [52]:
# first():
rdd1.first()

9

In [53]:
# top():
rdd1.top(2)

[18, 16]

In [54]:
# distinct():
rdd1.distinct().collect()

[12, 4, 16, 9, 10, 14, 6, 18, 11, 15]

In [55]:
# map():
rdd_map = rdd1.map(lambda x : x+2)
print(rdd_map.collect())

[11, 13, 14, 6, 12, 16, 8, 20, 17, 18]


In [56]:
# filter(): 
print(rdd1.collect())
rdd_filter = rdd1.filter(lambda x:x%3 == 0)
print(rdd_filter.collect())

[9, 11, 12, 4, 10, 14, 6, 18, 15, 16]
[9, 12, 6, 18, 15]


In [71]:
# flatMap():
rdd_flatmap = rdd1.flatMap(lambda x : (x+3,x+5))
print(rdd_flatmap.glom().collect())
print(rdd_flatmap.collect())
print(rdd_flatmap.reduce(lambda a,b: a + b))

[[12, 14, 14, 16], [15, 17, 7, 9], [13, 15, 17, 19], [9, 11, 21, 23, 18, 20, 19, 21]]
[12, 14, 14, 16, 15, 17, 7, 9, 13, 15, 17, 19, 9, 11, 21, 23, 18, 20, 19, 21]
310


In [70]:
# Descriptive statistics:
print((rdd1.max(),rdd1.min(),rdd1.mean(),rdd1.sum(),rdd1.top(2),round(rdd1.stdev(),2)))

(18, 4, 11.5, 115, [18, 16], 4.2)


In [72]:
# mapPartitions():
def myfunc(partition):
  sum = 0
  for item in partition:
    sum = sum + item 
  yield sum
rdd1.mapPartitions(myfunc).collect()


[20, 16, 24, 55]

# **Part 2: Advanced RDD Transformations and Actions**

In [77]:
# union():
random_list1 = random.sample(range(1,25),10)
print(random_list1)
rdd2 = sc.parallelize(random_list1 , 4)
print(rdd2.glom().collect())
rdd_union = rdd1.union(rdd2)
print(rdd_union.collect())

[6, 5, 15, 14, 12, 1, 24, 8, 17, 20]
[[6, 5], [15, 14], [12, 1], [24, 8, 17, 20]]
[9, 11, 12, 4, 10, 14, 6, 18, 15, 16, 6, 5, 15, 14, 12, 1, 24, 8, 17, 20]


In [79]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
print(rdd_intersection.glom().collect())

[[], [], [], [], [12], [], [14, 6], [15]]


In [82]:
# Find empty partitions
counter = 0
for item in rdd_intersection.glom().collect():
  if len(item) == 0:
    counter = counter+1
print(counter)


5


In [85]:
# coalesce(numPartitions):
rdd_colace = rdd_intersection.coalesce(1)
print(rdd_colace.glom().collect())

[[12, 14, 6, 15]]


In [89]:
# takeSample(withReplacement, num, [seed])
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
rdd2.takeSample(False, 5)

[24, 8, 5, 14, 1]

In [93]:
# takeOrdered(n, [ordering])
print(rdd2.collect())
print(rdd2.takeOrdered(5))
print(rdd2.takeOrdered(5 , lambda x: -x))


[6, 5, 15, 14, 12, 1, 24, 8, 17, 20]
[1, 5, 6, 8, 12]
[24, 20, 17, 15, 14]


In [94]:
# reduce():
rdd2.reduce(lambda x,y : x+y)

122

In [96]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1,4),(7,10),(5,7),(1,12),(7,12),(7,1),(9,1),(7,4)], 2)
print(rdd_Rbk.collect())
print(rdd_Rbk.reduceByKey(lambda x,y: x+y).collect())


# tabular visualization
import pandas as pd
Counter = pd.DataFrame({'Key': rdd_Rbk.keys().collect(),
                 'Values': rdd_Rbk.values().collect()})
Counter

[(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)]
[(1, 16), (7, 27), (5, 7), (9, 1)]


Unnamed: 0,Key,Values
0,1,4
1,7,10
2,5,7
3,1,12
4,7,12
5,7,1
6,9,1
7,7,4


In [97]:
# sortByKey():
rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey().collect()



[(1, 16), (5, 7), (7, 27), (9, 1)]

In [103]:
# countByKey()
print(rdd_Rbk.countByKey())
print(rdd_Rbk.countByKey().items())
sorted(rdd_Rbk.countByKey())
sorted(rdd_Rbk.countByKey().items())


defaultdict(<class 'int'>, {1: 2, 7: 4, 5: 1, 9: 1})
dict_items([(1, 2), (7, 4), (5, 1), (9, 1)])


[(1, 2), (5, 1), (7, 4), (9, 1)]

In [107]:
# groupByKey():
rdd_groupby = rdd_Rbk.groupByKey()
print(rdd_groupby.getNumPartitions())
for item in rdd_groupby.collect():
  print(item[0], [value for value in item[1]])

2
1 [4, 12]
7 [10, 12, 1, 4]
5 [7]
9 [1]


In [109]:
# lookup(key):
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

In [110]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
rdd_Rbk.persist()

ParallelCollectionRDD[200] at readRDDFromFile at PythonRDD.scala:274

In [111]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)


ParallelCollectionRDD[93] at readRDDFromFile at PythonRDD.scala:274