<a href="https://colab.research.google.com/github/yankunsong/Spark_learning/blob/main/Working_with_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Working with RDD (Resilient Distributed Dataset)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

source: https://spark.apache.org/docs/latest/rdd-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3dcd948f153a590b36630fb4dd02dff90eb86abda0692150546bfb093afe6396
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
# Linking with Spark
from pyspark import SparkContext, SparkConf

In [4]:
# Initializing Spark
conf = SparkConf().setAppName("RDD_practice").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)

<SparkContext master=local[*] appName=RDD_practice>


# **Part 1: Create RDDs and Basic Operations**
# **There are two ways to create RDDs:**

1.   Parallelizing an existing collection in your driver program
2.   Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [5]:
# Generate random data:
import random
randomlist = random.sample(range(0,100), 10)
print(randomlist)

[49, 46, 44, 53, 84, 97, 64, 65, 51, 39]


In [7]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()

[49, 46, 44, 53, 84, 97, 64, 65, 51, 39]

In [12]:
# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print("The first two partitions", rdd1.glom().take(2))

4
[[49, 46], [44, 53], [84, 97], [64, 65, 51, 39]]
The first two partitions [[49, 46], [44, 53]]


In [14]:
# Print last partition
rdd1.glom().collect()[-1]

[64, 65, 51, 39]

In [15]:
# count():
rdd1.count()

10

In [17]:
# first():
rdd1.first()

49

In [18]:
# top():
rdd1.top(3)

[97, 84, 65]

In [19]:
# distinct():
rdd1.distinct().collect()

[44, 84, 64, 49, 53, 97, 65, 46, 51, 39]

In [21]:
# map():
rdd_map = rdd1.map(lambda x: 1 if x > 50 else -1 )
rdd_map.collect()

[-1, -1, -1, 1, 1, 1, 1, 1, 1, -1]

In [23]:
# filter(): 
rdd_filter = rdd1.filter(lambda x: x > 50)
rdd_filter.glom().collect()

[[], [53], [84, 97], [64, 65, 51]]

In [25]:
# flatMap():
rdd_flatmap = rdd1.flatMap(lambda x: [x+1, x-1])
rdd_flatmap.collect()
rdd_flatmap.reduce(lambda x, y: x + y)

1184

In [26]:
# Descriptive statistics:
print(rdd1.max(), rdd1.mean(), rdd1.stdev(), rdd1.sum())

97 59.2 17.6737092880923 592


In [28]:
# mapPartitions():
def myfunc(partition):
    yield sum(partition)

rdd1.mapPartitions(myfunc).glom().collect()

[[95], [97], [181], [219]]

# **Part 2: Advanced RDD Transformations and Actions**

In [37]:
# union():
rdd2 = sc.parallelize([1,2,3,49,53], 2)
rdd_union = rdd1.union(rdd2)
rdd_union.glom().collect()

[[49, 46], [44, 53], [84, 97], [64, 65, 51, 39], [1, 2], [3, 49, 53]]

In [38]:
# intersection():
rdd_intersection = rdd1.intersection(rdd2)
rdd_intersection.glom().collect()

[[], [49], [], [], [], [53]]

In [None]:
# Find empty partitions


In [39]:
# coalesce(numPartitions):
rdd_intersection.coalesce(1).glom().collect()

[[49, 53]]

In [40]:
# takeSample(withReplacement, num, [seed])
rdd1.takeSample(False, 3)

[46, 65, 39]

In [41]:
# takeOrdered(n, [ordering])
rdd1.takeOrdered(5)

[39, 44, 46, 49, 51]

In [42]:
# reduce():
rdd1.reduce(lambda x,y: x+y)

592

In [48]:
# reduceByKey():
rdd_rbk = sc.parallelize([(1,1), (1,2), (2,5),(2,8), (3,4)], 2)
a = rdd_rbk.reduceByKey(lambda x, y: x+y)
print(type(a))
print(a.collect())

<class 'pyspark.rdd.PipelinedRDD'>
[(2, 13), (1, 3), (3, 4)]


In [50]:
# sortByKey():
a.sortByKey().collect()

[(1, 3), (2, 13), (3, 4)]

In [51]:
# countByKey()
rdd_rbk.countByKey()

defaultdict(int, {1: 2, 2: 2, 3: 1})

In [55]:
# groupByKey():
rdd_rbk.groupByKey().collect()

[(2, <pyspark.resultiterable.ResultIterable at 0x7f0738af44f0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f0738af4d30>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f0738af46a0>)]

In [56]:
# lookup(key):
rdd_rbk.lookup(1)

[1, 2]

In [57]:
# cache:
# By default, each transformed RDD may be recomputed each time you run an action on it.
# However, you may also persist an RDD in memory using the persist (or cache) method,
# in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
rdd_rbk.persist()

ParallelCollectionRDD[124] at readRDDFromFile at PythonRDD.scala:274

In [None]:
# Persistence (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
from pyspark import StorageLevel
rdd1.persist(StorageLevel.MEMORY_AND_DISK)