# **Working with RDD (Resilient Distributed Dataset)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 52.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=11e1b67261bc7f961fdf20abe2044f7a32cbd906b5ea4641a222824a5d7dd5d2
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [4]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [5]:
# Generate random data:
import random
# Generate 10 random numbers between 0 and 40
randomlist = random.sample(range(0, 40), 10)
print(randomlist)

[38, 18, 0, 27, 4, 33, 10, 22, 15, 26]


In [7]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()

[38, 18, 0, 27, 4, 33, 10, 22, 15, 26]

In [8]:

# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print("Two partitions: ", rdd1.glom().take(2))

4
[[38, 18], [0, 27], [4, 33], [10, 22, 15, 26]]
Two partitions:  [[38, 18], [0, 27]]


In [11]:
# Print last partition
for i in rdd1.glom().collect()[3]:
  print(i)

10
22
15
26


In [12]:
# count():
rdd1.count()

10

In [13]:
# first():
rdd1.first()

38

In [15]:
# top():
rdd1.top(2)

[38, 33]

In [17]:
# distinct():
rdd1.distinct().collect()

[0, 4, 33, 38, 18, 10, 22, 26, 27, 15]

In [18]:
# map():
rdd_map = rdd1.map(lambda x:(x+1)*3)
rdd_map.collect()

[117, 57, 3, 84, 15, 102, 33, 69, 48, 81]

In [20]:
# filter(): 
rdd_map = rdd1.filter(lambda x: x%3==0)
rdd_map.collect()

[18, 0, 27, 33, 15]

In [21]:
# flatMap():
rdd_flatmap=rdd1.flatMap(lambda x: [x+2,x+5])
print(rdd_flatmap.collect())
print("The summation of elements =", rdd_flatmap.reduce(lambda a,b : a + b))

[40, 43, 20, 23, 2, 5, 29, 32, 6, 9, 35, 38, 12, 15, 24, 27, 17, 20, 28, 31]
The summation of elements = 456


In [22]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2), rdd1.top(2)])

[38, 0, 19.3, 193, 11.67, [38, 33]]


In [23]:
# mapPartitions():
rdd_flatmap=rdd1.flatMap(lambda x: [x+2,x+5])
print(rdd_flatmap.collect())
print("The summation of elements =", rdd_flatmap.reduce(lambda a,b : a + b))

[40, 43, 20, 23, 2, 5, 29, 32, 6, 9, 35, 38, 12, 15, 24, 27, 17, 20, 28, 31]
The summation of elements = 456


# **Part 2: Advanced RDD Transformations and Actions**

In [24]:
# union():
print(rdd1.collect())
rdd2 = sc.parallelize([1, 14, 20, 20, 28, 10, 13, 3],2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.getNumPartitions())
print(rdd_union.collect())

[38, 18, 0, 27, 4, 33, 10, 22, 15, 26]
[1, 14, 20, 20, 28, 10, 13, 3]
6
[38, 18, 0, 27, 4, 33, 10, 22, 15, 26, 1, 14, 20, 20, 28, 10, 13, 3]


In [28]:
# intersection():
rdd_intersection=rdd1.intersection(rdd2)
rdd_intersection.glom().collect()

[[], [], [], [], [10], []]

In [30]:
# Find empty partitions
counter=0
for i in rdd_intersection.glom().collect():
  if len(i) == 0:
    counter = counter + 1
print(counter) 

5


In [31]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[10]]

In [None]:
# takeSample(withReplacement, num, [seed])


In [None]:
# takeOrdered(n, [ordering])


In [None]:
# reduce():


In [None]:
# reduceByKey():


In [None]:
# sortByKey():


In [None]:
# countByKey()


In [None]:
# groupByKey():


In [None]:
# lookup(key):


In [None]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.


In [None]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
