<a href="https://colab.research.google.com/github/mansibora20/PySpark/blob/main/03_RDD_Operations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Step 1: Initialize Spark Session
Creating a Spark session and obtaining the SparkContext

In [2]:

spark = SparkSession.builder \
    .appName("PySpark RDD Basics") \
    .getOrCreate()
sc = spark.sparkContext


# Step 2: Create an RDD from a list
Parallelizing a Python list to create an RDD
* RDDs (Resilient Distributed Datasets) are the fundamental data structure of Apache Spark.
* We create an RDD by parallelizing a Python list.

In [3]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

# Step 3: RDD Transformations
Applying transformations on RDD.
* Transformations create a new RDD from an existing one.
* Transformations are **lazy**, meaning they are not executed until an action is called.
1. Map Transformation: Applies a function to each element in the RDD.
2. Filter Transformation: Filters elements based on a condition.
3. FlatMap Transformation: Each input item can be mapped to multiple output items.
4. Distinct Transformation: Removes duplicate elements.
5. Sample Transformation: Randomly selects a fraction of the data.



In [20]:
# Map: Multiply each element by 2
rdd_map = rdd.map(lambda x: x * 2)
print("Mapped RDD:", rdd_map.collect())

# Filter: Keep only even numbers
rdd_filter = rdd.filter(lambda x: x % 2 == 0)
print("Filtered RDD:", rdd_filter.collect())

# FlatMap: Flatten the elements
rdd_flatmap = rdd.flatMap(lambda x: (x, x*10))
print("FlatMapped RDD:", rdd_flatmap.collect())

# Distinct: Remove duplicate elements
rdd_distinct = rdd.distinct()
print("Distinct RDD:", rdd_distinct.collect())

# Sample: Randomly sample 50% of the data
rdd_sample = rdd.sample(False, 0.5, seed=42)
print("Sampled RDD:", rdd_sample.collect())

Mapped RDD: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Filtered RDD: [2, 4, 6, 8, 10]
FlatMapped RDD: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 6, 60, 7, 70, 8, 80, 9, 90, 10, 100]
Distinct RDD: [2, 4, 6, 8, 10, 1, 3, 5, 7, 9]
Sampled RDD: [1, 2, 4, 5]


# Step 4: RDD Actions
Actions trigger the execution of transformations and return results

In [8]:
rdd_collect = rdd.collect()  # Collect all elements
print("Original RDD:", rdd_collect)

rdd_count = rdd.count()  # Count the number of elements
print("RDD Count:", rdd_count)

rdd_first = rdd.first()  # Get the first element
print("First Element:", rdd_first)

rdd_take = rdd.take(5)  # Take first 5 elements
print("First 5 Elements:", rdd_take)

rdd_max = rdd.max()  # Get max value
print("Max Value:", rdd_max)

rdd_min = rdd.min()  # Get min value
print("Min Value:", rdd_min)

rdd_sum = rdd.sum()  # Compute sum
print("Sum of Elements:", rdd_sum)

rdd_mean = rdd.mean()  # Compute mean
print("Mean Value:", rdd_mean)

rdd_variance = rdd.variance()  # Compute variance
print("Variance:", rdd_variance)

rdd_stdev = rdd.stdev()  # Compute standard deviation
print("Standard Deviation:", rdd_stdev)


Original RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
RDD Count: 10
First Element: 1
First 5 Elements: [1, 2, 3, 4, 5]
Max Value: 10
Min Value: 1
Sum of Elements: 55
Mean Value: 5.5
Variance: 8.25
Standard Deviation: 2.8722813232690143


# Step 5: Key-Value RDDs
Creating an RDD with key-value pairs.
* Key-Value RDDs allow operations like grouping and aggregating by key.

In [9]:
data_kv = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)]
rdd_kv = sc.parallelize(data_kv)


Performing operations on key-value RDDs

In [10]:
rdd_groupby = rdd_kv.groupByKey().mapValues(list).collect()  # Grouping by key
print("Grouped by Key:", rdd_groupby)

rdd_reduceby = rdd_kv.reduceByKey(lambda x, y: x + y).collect()  # Reduce by key
print("Reduced by Key:", rdd_reduceby)

rdd_sortby = rdd_kv.sortByKey().collect()  # Sorting by key
print("Sorted by Key:", rdd_sortby)

rdd_countbykey = rdd_kv.countByKey()  # Counting elements by key
print("Count by Key:", rdd_countbykey)

rdd_mapvalues = rdd_kv.mapValues(lambda x: x * 10).collect()  # Transforming values
print("Map Values:", rdd_mapvalues)

Grouped by Key: [('b', [2, 4]), ('c', [5]), ('a', [1, 3])]
Reduced by Key: [('b', 6), ('c', 5), ('a', 4)]
Sorted by Key: [('a', 1), ('a', 3), ('b', 2), ('b', 4), ('c', 5)]
Count by Key: defaultdict(<class 'int'>, {'a': 2, 'b': 2, 'c': 1})
Map Values: [('a', 10), ('b', 20), ('a', 30), ('b', 40), ('c', 50)]


# Step 6: Joins on RDDs
Creating another key-value RDD for joins.
*  Joins allow combining two datasets based on keys.


In [11]:
data_kv2 = [("a", 100), ("b", 200), ("c", 300), ("d", 400)]
rdd_kv2 = sc.parallelize(data_kv2)

Performing different types of joins

In [12]:
rdd_join = rdd_kv.join(rdd_kv2).collect()  # Inner Join
print("Join RDD:", rdd_join)

rdd_left_outer = rdd_kv.leftOuterJoin(rdd_kv2).collect()  # Left Outer Join
print("Left Outer Join:", rdd_left_outer)

rdd_right_outer = rdd_kv.rightOuterJoin(rdd_kv2).collect()  # Right Outer Join
print("Right Outer Join:", rdd_right_outer)

rdd_full_outer = rdd_kv.fullOuterJoin(rdd_kv2).collect()  # Full Outer Join
print("Full Outer Join:", rdd_full_outer)


Join RDD: [('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Left Outer Join: [('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Right Outer Join: [('d', (None, 400)), ('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Full Outer Join: [('d', (None, 400)), ('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]


# Step 7: Partitioning
Checking the number of partitions.
* RDDs are split into partitions for parallel processing.

In [13]:
num_partitions = rdd.getNumPartitions()
print("Number of Partitions:", num_partitions)

Number of Partitions: 2


# Step 8: Aggregations
* Fold: Aggregates using an initial zero value.
* Aggregate: Computes sum and count, then calculates average.



In [17]:
rdd_fold = rdd.fold(0, lambda x, y: x + y)

rdd_aggregate = rdd.aggregate((0, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

rdd_avg = rdd_aggregate[0] / rdd_aggregate[1]



# Step 9: Caching & Persistence
* Caching stores RDD in memory for better performance.

In [18]:
rdd_cached = rdd.cache()
rdd_persisted = rdd.persist()

# Step 10: Set Operations
*  Set operations allow comparisons between RDDs.

In [14]:

data2 = [5, 6, 7, 8, 9, 10, 11, 12]
rdd2 = sc.parallelize(data2)

rdd_union = rdd.union(rdd2).collect()  # Union
print("Union of RDDs:", rdd_union)

rdd_intersection = rdd.intersection(rdd2).collect()  # Intersection
print("Intersection of RDDs:", rdd_intersection)

rdd_subtract = rdd.subtract(rdd2).collect()  # Difference
print("Difference of RDDs:", rdd_subtract)

rdd_cartesian = rdd.cartesian(rdd2).collect()  # Cartesian product
print("Cartesian Product:", rdd_cartesian)

Union of RDDs: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12]
Intersection of RDDs: [8, 5, 9, 6, 10, 7]
Difference of RDDs: [4, 1, 2, 3]
Cartesian Product: [(1, 5), (1, 6), (1, 7), (1, 8), (2, 5), (2, 6), (2, 7), (2, 8), (3, 5), (3, 6), (3, 7), (3, 8), (4, 5), (4, 6), (4, 7), (4, 8), (5, 5), (5, 6), (5, 7), (5, 8), (1, 9), (1, 10), (1, 11), (1, 12), (2, 9), (2, 10), (2, 11), (2, 12), (3, 9), (3, 10), (3, 11), (3, 12), (4, 9), (4, 10), (4, 11), (4, 12), (5, 9), (5, 10), (5, 11), (5, 12), (6, 5), (6, 6), (6, 7), (6, 8), (7, 5), (7, 6), (7, 7), (7, 8), (8, 5), (8, 6), (8, 7), (8, 8), (9, 5), (9, 6), (9, 7), (9, 8), (10, 5), (10, 6), (10, 7), (10, 8), (6, 9), (6, 10), (6, 11), (6, 12), (7, 9), (7, 10), (7, 11), (7, 12), (8, 9), (8, 10), (8, 11), (8, 12), (9, 9), (9, 10), (9, 11), (9, 12), (10, 9), (10, 10), (10, 11), (10, 12)]


# Step 11: Checkpointing
* Saves RDD state to disk to avoid recomputation.


In [21]:
sc.setCheckpointDir("/tmp/checkpoints")
rdd.checkpoint()


# Step 12: Broadcast Variables & Accumulators
* Broadcast variables improve performance by sharing large read-only data across workers.
* Accumulators aggregate values from worker nodes.



In [15]:
broadcast_var = sc.broadcast([1, 2, 3, 4, 5])
accumulator = sc.accumulator(0)
rdd.foreach(lambda x: accumulator.add(x))
print("Accumulator Value:", accumulator.value)

Accumulator Value: 55


# Step 13: Printing results

In [19]:

print("Original RDD:", rdd_collect)
print("Mapped RDD:", rdd_map.collect())
print("Filtered RDD:", rdd_filter.collect())
print("FlatMapped RDD:", rdd_flatmap.collect())
print("Distinct RDD:", rdd_distinct.collect())
print("Sampled RDD:", rdd_sample.collect())
print("Grouped by Key:", rdd_groupby)
print("Reduced by Key:", rdd_reduceby)
print("Sorted by Key:", rdd_sortby)
print("Count by Key:", rdd_countbykey)
print("Map Values:", rdd_mapvalues)
print("Join RDD:", rdd_join)
print("Left Outer Join:", rdd_left_outer)
print("Right Outer Join:", rdd_right_outer)
print("Full Outer Join:", rdd_full_outer)
print("Number of Partitions:", num_partitions)
print("Union of RDDs:", rdd_union)
print("Intersection of RDDs:", rdd_intersection)
print("Difference of RDDs:", rdd_subtract)
print("Cartesian Product:", rdd_cartesian)
print("Fold Result:", rdd_fold)
print("Average Using Aggregate:", rdd_avg)
print("Accumulator Value:", accumulator.value)

Original RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Mapped RDD: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Filtered RDD: [2, 4, 6, 8, 10]
FlatMapped RDD: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 6, 60, 7, 70, 8, 80, 9, 90, 10, 100]
Distinct RDD: [2, 4, 6, 8, 10, 1, 3, 5, 7, 9]
Sampled RDD: [1, 2, 4, 5]
Grouped by Key: [('b', [2, 4]), ('c', [5]), ('a', [1, 3])]
Reduced by Key: [('b', 6), ('c', 5), ('a', 4)]
Sorted by Key: [('a', 1), ('a', 3), ('b', 2), ('b', 4), ('c', 5)]
Count by Key: defaultdict(<class 'int'>, {'a': 2, 'b': 2, 'c': 1})
Map Values: [('a', 10), ('b', 20), ('a', 30), ('b', 40), ('c', 50)]
Join RDD: [('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Left Outer Join: [('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Right Outer Join: [('d', (None, 400)), ('b', (2, 200)), ('b', (4, 200)), ('c', (5, 300)), ('a', (1, 100)), ('a', (3, 100))]
Full Outer Join: [('d', (None, 400)), ('b', (2, 200)), ('b', (4, 200)), ('c', (5, 3