<a href="https://colab.research.google.com/github/manoj7pal/Google-Colab-Notebooks/blob/master/2_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RDD has 2 operations:

1. Transformation : 
    - new calculations over the dataset, 
    - create new paritions with transformed data in the form of RDD, 
    - It has mapping os the raw dataset and the transformed dataset.
    - Executes on Spark Cluster or Worker Nodes

2. Action
    - it converts RDD into human readable format 
    - Functions like print, display, count(), take()
    - Executes on the Driver or the Master nodes.


In [1]:
!pip3 install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Using cached pyspark-3.3.0.tar.gz (281.3 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 8.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7165a077c4c7822dee158d92bc8077e649f70a4db27793deaa14753a3cc72835
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
conf = SparkConf().setAppName('RDD_practice').setMaster('local[*]')
sc = SparkContext(conf = conf)
print(sc.defaultParallelism)
print(sc)

2
<SparkContext master=local[*] appName=RDD_practice>


Section 1 - Create RDD and Basic Operations

In [4]:
# Generate Random Data

import random

randomList = random.sample(range(0,20), 10)
print(randomList)


[10, 0, 12, 16, 11, 2, 5, 17, 13, 6]


In [5]:
# Create RDD

rdd1 = sc.parallelize(randomList,4 ) # 4: no of partitions

print(type(rdd1))
rdd1.collect()

<class 'pyspark.rdd.RDD'>


[10, 0, 12, 16, 11, 2, 5, 17, 13, 6]

In [6]:
# Data Distribution in partition: tries to max numbers of equal partitions

print(rdd1.getNumPartitions())
print(rdd1.glom().collect())

print(rdd1.glom().take(2) )

4
[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[[10, 0], [12, 16]]


In [7]:
# count()

rdd1.count()

10

In [8]:
# first()

rdd1.first()

10

In [9]:
# top()

rdd1.top(2)

[17, 16]

# Transformation Functions

In [10]:
# distinct() 

rdd1.distinct().collect()

[0, 12, 16, 5, 17, 13, 10, 2, 6, 11]

In [11]:
# map() - can map a function to a list of values

def fun1(num):
  return num**2

rdd_map = rdd1.map(fun1)

print(randomList)
print(rdd_map.collect())

[10, 0, 12, 16, 11, 2, 5, 17, 13, 6]
[100, 0, 144, 256, 121, 4, 25, 289, 169, 36]


In [12]:
# simple map - using multiple o/p expressions

rdd_fm = rdd1.map(lambda x: [x**2, x**3])

print(rdd1.collect())
print(rdd_fm.collect())

print("--"*60)

print(rdd1.glom().collect())
print(rdd_fm.glom().collect())

[10, 0, 12, 16, 11, 2, 5, 17, 13, 6]
[[100, 1000], [0, 0], [144, 1728], [256, 4096], [121, 1331], [4, 8], [25, 125], [289, 4913], [169, 2197], [36, 216]]
------------------------------------------------------------------------------------------------------------------------
[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[[[100, 1000], [0, 0]], [[144, 1728], [256, 4096]], [[121, 1331], [4, 8]], [[25, 125], [289, 4913], [169, 2197], [36, 216]]]


In [13]:
print(rdd_map.glom().collect())

[[100, 0], [144, 256], [121, 4], [25, 289, 169, 36]]


In [14]:
rdd_map = rdd1.map( lambda x: x**2 )
print(rdd_map.collect())

[100, 0, 144, 256, 121, 4, 25, 289, 169, 36]


In [15]:
# filter

rdd_filter = rdd1.filter(lambda x: x>5)
print(randomList)
print(rdd_filter.collect())

[10, 0, 12, 16, 11, 2, 5, 17, 13, 6]
[10, 12, 16, 11, 17, 13, 6]


In [16]:
print(rdd1.glom().collect())
print(rdd_filter.glom().collect())

[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[[10], [12, 16], [11], [17, 13, 6]]


In [17]:
print(rdd1.count())
print(rdd_filter.count())

10
7


In [18]:
#Repartitioning the RDD - when the items are filtered/removed out

if rdd_filter.count() >=4:
  new_rdd_filter = rdd_filter.repartition(2)

print(rdd_filter.glom().collect())  
print(new_rdd_filter.glom().collect())  

[[10], [12, 16], [11], [17, 13, 6]]
[[10, 11, 17, 13, 6], [12, 16]]


In [19]:
# flatMap() and reduce() - map to a collection of o/p expressions, 
  # and performs aggregations using reduce()

rdd_fm = rdd1.flatMap(lambda x: [x**2, x**3])

print(rdd1.collect)
print(rdd_fm.collect())


<bound method RDD.collect of ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274>
[100, 1000, 0, 0, 144, 1728, 256, 4096, 121, 1331, 4, 8, 25, 125, 289, 4913, 169, 2197, 36, 216]


In [20]:
print(rdd1.glom().collect())
print(rdd_fm.glom().collect())

[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[[100, 1000, 0, 0], [144, 1728, 256, 4096], [121, 1331, 4, 8], [25, 125, 289, 4913, 169, 2197, 36, 216]]


In [21]:
rdd_fm.reduce(lambda x,y : x+y)

16758

In [22]:
# Descriptive Statistics:

print(rdd1.max(), rdd1.min(), rdd1.count(), rdd1.mean(), round(rdd1.stdev(),2), rdd1.sum())

17 0 10 9.2 5.46 92


In [23]:
# mapPartitions() : Map a function to each parition

def fun1(partition):
  sum=0

  for item in partition:
    sum += item
  
  yield sum

rdd_map_part = rdd1.mapPartitions( fun1 ).collect()

print(rdd1.glom().collect())
print(rdd_map_part)

[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[10, 28, 13, 41]


### Part 2 - Advanced RDD Transformations and Actions

In [24]:
# union

print(rdd1.glom().collect())

rdd2 = sc.parallelize( [1, 14,20,28,10,13,3], 2 )
print(rdd2.glom().collect())

rdd_union = rdd1.union(rdd2)

print(f"RDD Union: {rdd_union.glom().collect()}, count is {rdd_union.count()}, and number of partitions is {rdd_union.getNumPartitions()}")

[[10, 0], [12, 16], [11, 2], [5, 17, 13, 6]]
[[1, 14, 20], [28, 10, 13, 3]]
RDD Union: [[10, 0], [12, 16], [11, 2], [5, 17, 13, 6], [1, 14, 20], [28, 10, 13, 3]], count is 17, and number of partitions is 6


In [25]:
#intersection

rdd_intersection = rdd1.intersection(rdd2)

print(rdd_intersection.glom().collect())
print(rdd_intersection.getNumPartitions())

[[], [13], [], [], [10], []]
6


In [26]:
# Find empty partitions

count=0

for partition in rdd_intersection.glom().collect():
  if len(partition) ==0:
    count +=1

print(count)

4


In [27]:
def is_empty_partition(partition):
  count=0
  if len(list(partition))==0:   # the partition variable is passed as a mapped object so to find the length converted it into list type
    count+=1

    yield count

count_empty_partition = rdd_intersection.mapPartitions(is_empty_partition)

print( sum(count_empty_partition.collect()) )

4


In [28]:
# coalesce(numPartition) - decrease the no of partition

print(rdd_intersection.glom().collect())

new_rdd_intersection = rdd_intersection.coalesce(1)
print(new_rdd_intersection.glom().collect())

[[], [13], [], [], [10], []]
[[13, 10]]


In [29]:
# takeSample(withReplacement, num, [seed])

rdd1.takeSample(withReplacement=False, num=5, seed=123)

[13, 17, 2, 6, 12]

In [30]:
# takeOrdered(, [ordering])

rdd1.takeOrdered(10)

[0, 2, 5, 6, 10, 11, 12, 13, 16, 17]

In [31]:
# reduce

rdd1.reduce(lambda x,y: x+y)

92

In [32]:
# reduceByKey()

rdd_rbk = sc.parallelize([ (1,4), (7,10), (1,12), (7,12), (7,1), (9,1), (7,4) ], 2)
print(rdd_rbk.glom().collect())

new_rdd_rbk = rdd_rbk.reduceByKey(lambda x,y: x+y)
print(new_rdd_rbk.collect())

#user friendly visualization

import pandas as pd

df = pd.DataFrame({
                                'Key': new_rdd_rbk.keys().collect() , 
                                'Values': new_rdd_rbk.values().collect()
                                 })
print(df)

[[(1, 4), (7, 10), (1, 12)], [(7, 12), (7, 1), (9, 1), (7, 4)]]
[(1, 16), (7, 27), (9, 1)]
   Key  Values
0    1      16
1    7      27
2    9       1


In [33]:
# sortByKey

rdd_rbk.reduceByKey(lambda x,y: x+y).sortByKey(ascending=False).collect()


[(9, 1), (7, 27), (1, 16)]

In [34]:
# countByKey

print(rdd_rbk.countByKey().items())
print(rdd_rbk.reduceByKey(lambda x,y: x+y).countByKey().items())

dict_items([(1, 2), (7, 4), (9, 1)])
dict_items([(1, 1), (7, 1), (9, 1)])


In [35]:
# groupByKey

rdd_rbk.groupByKey().collect() 

[(1, <pyspark.resultiterable.ResultIterable at 0x7f95e9251090>),
 (7, <pyspark.resultiterable.ResultIterable at 0x7f95ee1a6150>),
 (9, <pyspark.resultiterable.ResultIterable at 0x7f95e2903e50>)]