<a href="https://colab.research.google.com/github/kptej/MyLearning/blob/main/spark_rdd_examples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



**spark RDD**

RDD (Resilient Distributed Dataset) is a core building block of PySpark. It is a fault-tolerant, immutable, distributed collection of objects. Immutable means that once you create an RDD, you cannot change it. The data within RDDs is segmented into logical partitions, allowing for distributed computation across multiple nodes within the cluster.





In [None]:
#create sparkcontext using spark, and read the rdd list

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark_RDD_examples").getOrCreate()

print(spark)
print(spark.sparkContext.appName)
print(spark.sparkContext.getConf())
print(spark.sparkContext)

#create rdd
rdd = spark.sparkContext.parallelize([1,2,3,4,5])
#print RDD
print(rdd)
#collect RDD
print(rdd.collect())
print(rdd.count())




<pyspark.sql.session.SparkSession object at 0x7fa62271ab90>
Spark_RDD_examples
<pyspark.conf.SparkConf object at 0x7fa63aede690>
<SparkContext master=local[*] appName=Spark_RDD_examples>
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
[1, 2, 3, 4, 5]
5


In [None]:
data = [1,2,3,4,5,6,7,8,9,10,11,12]

rdd = spark.sparkContext.parallelize(data)

print(rdd.collect())

print(rdd.count())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
12


In [None]:
#empty Rdd
rdd = spark.sparkContext.emptyRDD

print(rdd)



<bound method SparkContext.emptyRDD of <SparkContext master=local[*] appName=Spark_RDD_examples>>


#rdd Tranformations


Apache Spark's Resilient Distributed Datasets (RDDs) support two primary types of operations: Transformations and Actions. Transformations are lazy operations that define a new RDD from an existing one, while Actions trigger the execution of these transformations and return results

**RDD Transformations**

Transformations are operations that create a new RDD from an existing one. They are evaluated lazily, meaning computation is deferred until an action requires the result. Transformations can be categorized into:

**Narrow Transformations:** Each output partition depends on a single input partition (e.g., map, filter).

**Wide Transformations:** Output partitions depend on multiple input partitions, often requiring data shuffling across the cluster (e.g., reduceByKey, join)

| Transformation                      | Description                                                               |                                                                                                                                  |
| ----------------------------------- | ------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------- |
| `map(func)`                         | Applies a function to each element, returning a new RDD.                  |                                                                                                                                  |
| `flatMap(func)`                     | Similar to `map`, but can return multiple output elements for each input. |                                                                                                                                  |
| `filter(func)`                      | Returns elements that satisfy the predicate function.                     |                                                                                                                                  |
| `distinct()`                        | Removes duplicate elements.                                               |                                                                                                                                  |
| `union(otherRDD)`                   | Returns the union of two RDDs.                                            |                                                                                                                                  |
| `intersection(otherRDD)`            | Returns the intersection of two RDDs.                                     |                                                                                                                                  |
| `subtract(otherRDD)`                | Returns elements present in the first RDD but not in the second.          |                                                                                                                                  |
| `cartesian(otherRDD)`               | Returns the Cartesian product of two RDDs.                                |                                                                                                                                  |
| `groupByKey()`                      | Groups values with the same key.                                          |                                                                                                                                  |
| `reduceByKey(func)`                 | Merges values with the same key using the specified function.             |                                                                                                                                  |
| `sortByKey()`                       | Sorts RDD by key.                                                         |                                                                                                                                  |
| `join(otherRDD)`                    | Joins two RDDs by key.                                                    |                                                                                                                                  |
| `coalesce(numPartitions)`           | Reduces the number of partitions.                                         |                                                                                                                                  |
| `repartition(numPartitions)`        | Reshuffles data into a specified number of partitions.                    |                                                                                                                                  |
| `pipe(command)`                     | Pipes each partition through an external command.                         |                                                                                                                                  |
| `mapPartitions(func)`               | Applies a function to each partition.                                     |                                                                                                                                  |
| `sample(withReplacement, fraction)` | Samples a fraction of the data.                                           | ([sparkcodehub.com][1], [LinkedIn][2], [Apache Spark][3], [Stack Overflow][4], [DataFlair][5], [Medium][6], [Stack Overflow][7]) |

[1]: https://www.sparkcodehub.com/spark/rdd/transformations?utm_source=chatgpt.com "Mastering Apache Spark RDD Transformations - SparkCodeHub"
[2]: https://www.linkedin.com/pulse/spark-transformations-actions-lazy-evaluation-mohammad-younus-jameel?utm_source=chatgpt.com "Spark Transformations, Actions and Lazy Evaluation. - LinkedIn"
[3]: https://spark.apache.org/docs/latest/rdd-programming-guide.html?utm_source=chatgpt.com "RDD Programming Guide - Spark 3.5.5 Documentation"
[4]: https://stackoverflow.com/questions/45908291/rdd-transformation-and-actions?utm_source=chatgpt.com "RDD transformation and actions - apache spark - Stack Overflow"
[5]: https://data-flair.training/blogs/spark-rdd-operations-transformations-actions/?utm_source=chatgpt.com "Spark RDD Operations-Transformation & Action with Example"
[6]: https://medium.com/%40sujathamudadla1213/spark-transformations-and-actions-ff4b576cbef8?utm_source=chatgpt.com "Spark RDD Transformations and Actions. | by Sujatha Mudadla"
[7]: https://stackoverflow.com/questions/78722890/where-can-i-find-an-exhaustive-list-of-actions-for-spark?utm_source=chatgpt.com "Where can I find an exhaustive list of actions for spark?"



In [None]:
#rdd Tranformations

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark_RDD_examples").getOrCreate()

rdd = spark.sparkContext.parallelize([1,2,3,4,5,6,7,8,9,10,11,12])

print(rdd.collect())



[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


In [None]:
#map:Applies a function to each element, returning a new RDD.
from pyspark import SparkContext

# Create an RDD from a Python list
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Use the map transformation to square each element
squared_numbers = numbers.map(lambda x: x ** 2)

num = numbers.map(lambda x: (x,1) )

print(num.collect())
# Collect the results and print
print(squared_numbers.collect())


[(1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]
[1, 4, 9, 16, 25]


In [None]:
#flat map
# flatMap() in Spark with an RDD — it's often used to split elements into multiple parts, like splitting sentences into words.
#🔍 Key Differences from map()
""" map() would return a list of lists: one list per sentence.
flatMap() flattens those lists into a single list of words."""

# Create an RDD of sentences
sentences = spark.sparkContext.parallelize([
    "Apache Spark is fast",
    "It supports many operations"
])

# Use flatMap to split each sentence into words
words = sentences.flatMap(lambda sentence: sentence.split())

w = sentences.flatMap(lambda x: x.split())

print(w.foreach(lambda x : print(x)))

# Collect and print the words
print(words.collect())

None
['Apache', 'Spark', 'is', 'fast', 'It', 'supports', 'many', 'operations']


In [None]:
#filter
"""
Returns elements that satisfy the predicate function.
Filtering out null or empty values in datasets
Selecting records that meet certain criteria (e.g., age > 30)
Removing bad or incomplete records
"""

words = spark.sparkContext.parallelize(["cat", "elephant", "rat", "dog", "giraffe"])

# Filter words with length greater than 3
long_words = words.filter(lambda word: len(word) > 3)

# Collect and print the result
print(long_words.collect())


filt = words.filter(lambda x: x != "cat")

print(filt.collect())

fil_cat = words.filter(lambda x: x == "cat")

print(fil_cat.collect())

diff_char = words.filter(lambda x: x != "cat" and x != "dog")

print(diff_char.collect())

['elephant', 'giraffe']
['elephant', 'rat', 'dog', 'giraffe']
['cat']
['elephant', 'rat', 'giraffe']


In [None]:
from typing import List
#distinct
"""
Removes duplicate elements.

The distinct() transformation removes duplicate elements from an RDD. It returns a new RDD that contains only the unique elements.
distinct() involves a shuffle operation, which may impact performance for large datasets. If performance is critical, consider combining it with map()
and reduceByKey() for custom deduplication.

"""

mylist = [1,1,2,2,3,3,4,5,6,7]

rdd = spark.sparkContext.parallelize(mylist)

print(rdd.collect())

print(rdd.distinct().collect())

print(rdd.sortBy(lambda x: x).collect())

print(rdd.distinct().sortBy(lambda x: x, ascending=True).collect())



[1, 1, 2, 2, 3, 3, 4, 5, 6, 7]
[2, 4, 6, 1, 3, 5, 7]
[1, 1, 2, 2, 3, 3, 4, 5, 6, 7]
[1, 2, 3, 4, 5, 6, 7]


In [None]:
#union
"""
The union() transformation in Apache Spark combines two RDDs into a single RDD that contains all elements from both.

It does not remove duplicates — use distinct() afterward if needed.
Both RDDs should have the same data type.

Returns the union of two RDDs.

"""

rdd1 = spark.sparkContext.parallelize([1, 2, 3])
rdd2 = spark.sparkContext.parallelize([3, 4, 5])

union_rdd = rdd1.union(rdd2)

print(union_rdd.collect())

print("after distinct")
distinct_combined = union_rdd.distinct()
print(distinct_combined.sortBy(lambda x: x, ascending=True).collect())
# Output: [1, 2, 3, 4, 5]



[1, 2, 3, 3, 4, 5]
after distinct
[1, 2, 3, 4, 5]


In [None]:
#intersection
"""
The intersection() transformation in Spark is used to return a new RDD that contains the common elements between two RDDs (i.e., the set intersection).

rdd1.intersection(rdd2): Compares both RDDs and keeps only the elements that appear in both.
It removes duplicates automatically, just like a mathematical set intersection.

intersection() can trigger a shuffle operation, which may be costly for large datasets.
Use it only when necessary, especially in distributed environments.
"""
rdd1 = spark.sparkContext.parallelize([1, 2, 3,4])
rdd2 = spark.sparkContext.parallelize([3, 4, 5,6])

intersection_rdd = rdd1.intersection(rdd2)

print(intersection_rdd.collect())  # Output: [3, 4]




[4, 3]


In [None]:
#subtract
"""
In Apache Spark RDDs, subtract() is a transformation used to remove elements present in another RDD. It performs a set difference operation.

Definition: Returns an RDD with elements from the first RDD that are not in the second RDD.
Use case: Filtering out unwanted data or comparing datasets.

"""

rdd1 = spark.sparkContext.parallelize([1, 2, 3,4,5])
rdd2 = spark.sparkContext.parallelize([3, 4, 5,6,7])

subt_rdd = rdd1.subtract(rdd2)

print(subt_rdd.collect())

"""
rdd1: [1, 2, 3, 4, 5]
rdd2: [3, 4, 5, 6, 7]
rdd1.subtract(rdd2): Returns elements in rdd1 that are not in rdd2 → [1, 2]
"""

[1, 2]


'\nrdd1: [1, 2, 3, 4, 5]\nrdd2: [3, 4, 5, 6, 7]\nrdd1.subtract(rdd2): Returns elements in rdd1 that are not in rdd2 → [1, 2]\n'

In [None]:
#cartesian
"""
The cartesian() transformation in Apache Spark returns the Cartesian product of two RDDs — meaning it returns all possible pairs of elements, one from each RDD.

rdd1.cartesian(rdd2) pairs each element in rdd1 with every element in rdd2.
Result is a new RDD with tuple pairs.

"""


rdd1 = spark.sparkContext.parallelize([1,2])
rdd2 = spark.sparkContext.parallelize(['a','b','c'])

cartesian_rdd = rdd1.cartesian(rdd2)

print(cartesian_rdd.collect())



[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c')]


In [None]:
#groupby : Groups values with the same key.

"""
 groupBy() (or more commonly, groupByKey() for key-value RDDs) is a transformation that groups elements sharing the same key.
 It does not perform aggregation — it simply groups values, and then you can perform operations like sum, count, etc., afterward.

 Used on (key, value) pair RDDs.
 Groups all values with the same key into a single list.
 Often followed by a function like mapValues() or reduce() to perform aggregation (e.g., add).

 """

rdd = spark.sparkContext.parallelize([
    ("apple", 3),
    ("banana", 2),
    ("apple", 4),
    ("banana", 1),
    ("orange", 5) ])

group_rdd = rdd.groupByKey()

print(group_rdd.collect())

print("\n group key values")
for key, values in group_rdd.collect():
 print(f"Key: {key}, Values: {list(values)}")

print("\n sum of values")
data = group_rdd.mapValues(lambda x: sum(x))

print(data.collect())


[('apple', <pyspark.resultiterable.ResultIterable object at 0x7d5967461b10>), ('banana', <pyspark.resultiterable.ResultIterable object at 0x7d595326ca50>), ('orange', <pyspark.resultiterable.ResultIterable object at 0x7d5953359750>)]

 group key values
Key: apple, Values: [3, 4]
Key: banana, Values: [2, 1]
Key: orange, Values: [5]

 sum of values
[('apple', 7), ('banana', 3), ('orange', 5)]


In [None]:
#reduceByKey

"""
Merges values with the same key using the specified function.

reduceByKey is a transformation used on key-value (pair) RDDs in Apache Spark. It merges the values for each key using a specified reduce function.

This is useful when you want to aggregate data by key, like summing numbers for each word or category.

rdd.reduceByKey(func)

"""

rdd = spark.sparkContext.parallelize([
    ("apple", 1),
    ("banana", 2),
    ("apple", 4),
    ("banana", 1),
    ("orange", 5) ])


red_rdd = rdd.reduceByKey(lambda x,y: x+y)

print(red_rdd.collect())

#word count

a = "hello this is a word count hello this is a word count"
print('\n', a)

word = spark.sparkContext.parallelize(a.split())
#split
print("\n", word.collect())
#key& value
word_count = word.map(lambda x: (x,1))

print('\n' , word_count.collect())

#group key and values

grp_rdd = word_count.reduceByKey(lambda x,y: x+y)

print("\n", grp_rdd.collect())

[('apple', 5), ('banana', 3), ('orange', 5)]

 hello this is a word count hello this is a word count

 ['hello', 'this', 'is', 'a', 'word', 'count', 'hello', 'this', 'is', 'a', 'word', 'count']

 [('hello', 1), ('this', 1), ('is', 1), ('a', 1), ('word', 1), ('count', 1), ('hello', 1), ('this', 1), ('is', 1), ('a', 1), ('word', 1), ('count', 1)]

 [('hello', 2), ('this', 2), ('word', 2), ('is', 2), ('a', 2), ('count', 2)]


In [None]:
#Sorts RDD by key.

"""
Sorts RDD by key.

sortBy() is a transformation in Apache Spark used to sort the elements of an RDD based on a given key or function.

rdd.sortBy(keyfunc, ascending=True, numPartitions=None)

keyfunc: A function to extract the key for sorting.
ascending: Sort order (default is True).
numPartitions: Number of partitions after sort (optional).

"""

rdd1 = spark.sparkContext.parallelize([1,2,3,8,9,10,12,13])
rdd2 = spark.sparkContext.parallelize(["apple","abs","acd"])

sort_rdd = rdd1.sortBy(lambda x: x)

print(sort_rdd.collect())

sort_rdd2 = rdd2.sortBy(lambda x: x)

print(sort_rdd2.collect())

#sort by squre
sort_rdd3 = rdd1.sortBy(lambda x: x ** 2)

print(sort_rdd3.collect())

sort_rdd4 = rdd1.sortBy(lambda x: x, ascending=False, numPartitions=2)

print(sort_rdd4.collect())

print(sort_rdd4.getNumPartitions())

sort_rdd5 = rdd1.sortBy(lambda x: x, ascending=True, numPartitions=1)

[1, 2, 3, 8, 9, 10, 12, 13]
['abs', 'acd', 'apple']
[1, 2, 3, 8, 9, 10, 12, 13]
[13, 12, 10, 9, 8, 3, 2, 1]
2


In [None]:
#join:Joins two RDDs by key.

"""
In Apache Spark, the join() transformation is used to combine two RDDs based on keys, similar to a SQL INNER JOIN.
It's used with key-value pair RDDs ((key, value) format).
join() is used to combine two key-value pair RDDs (i.e., RDDs of the form (key, value)) based on matching keys.

It performs an inner join by default — only keys that are present in both RDDs will appear in the result.

joined_rdd = rdd1.join(rdd2)
Where:

rdd1: RDD with pairs like (K, V1)
rdd2: RDD with pairs like (K, V2)
Result: RDD of (K, (V1, V2)) for keys common in both RDDs.

"""

rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = spark.sparkContext.parallelize([("a", "apple"), ("b", "banana"), ("d", "dragonfruit")])

joined_rdd = rdd1.join(rdd2)

print(joined_rdd.collect())

ljoin_rdd = rdd1.leftOuterJoin(rdd2)

print(ljoin_rdd.collect())

rjoin_rdd = rdd1.rightOuterJoin(rdd2)

print(rjoin_rdd.collect())

full_jion_rdd = rdd1.fullOuterJoin(rdd2)

print(full_jion_rdd.collect())


[('b', (2, 'banana')), ('a', (1, 'apple'))]
[('b', (2, 'banana')), ('c', (3, None)), ('a', (1, 'apple'))]
[('d', (None, 'dragonfruit')), ('b', (2, 'banana')), ('a', (1, 'apple'))]
[('d', (None, 'dragonfruit')), ('b', (2, 'banana')), ('c', (3, None)), ('a', (1, 'apple'))]


In [None]:
#coalesce(numPartitions)	Reduces the number of partitions.

"""
The coalesce() transformation reduces the number of partitions in an RDD. It is commonly used for optimizing performance before saving to disk
(especially after wide transformations or when writing small output files).

RDD.coalesce(numPartitions, shuffle=False)
numPartitions: The number of partitions you want.
shuffle (optional): If True, allows reshuffling of data for better distribution. By default, it is False (no shuffle, just merges adjacent partitions).


"""

data = spark.sparkContext.parallelize(range(1,20),6) #creates an RDD with 6 partitions

print(data.getNumPartitions())
print(data.glom().collect())

coalesce_rdd = data.coalesce(2) #reduce to 2 partions

print(coalesce_rdd.getNumPartitions())
print(coalesce_rdd.glom().collect())


6
[[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12], [13, 14, 15], [16, 17, 18, 19]]
2
[[1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]


In [None]:
#repartition(numPartitions)	Reshuffles data into a specified number of partitions.

"""
repartition(numPartitions) is a transformation in Spark that reshuffles the data to increase or decrease the number of partitions in an RDD.

It performs a full shuffle, which means it can be expensive in terms of performance, but useful for load balancing or preparing data for
further operations like joins or saves.

rdd.repartition(numPartitions)

When to Use repartition():

When you want more parallelism by increasing partitions
After a coalesce() call that reduced partitions too much
Before a write to evenly distribute data across output files
"""

data = spark.sparkContext.parallelize(range(1,20), numSlices=2)# 2 partitions

print(data.getNumPartitions())
print(data.glom().collect())

repart_rdd = data.repartition(4) #reshuffle to slice of 4

print('\n', repart_rdd.getNumPartitions())
print(repart_rdd.glom().collect())

"""
⚠️ Note
If you're reducing partitions (e.g. from 10 to 2), prefer coalesce() instead of repartition() — it's more efficient because it avoids full data shuffle.
"""


print('\n',repart_rdd.coalesce(2).glom().collect())



2
[[1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]

 4
[[], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [], []]

 [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], []]


In [None]:
from os import pipe
#pipe(command)	Pipes each partition through an external command.

"""
The pipe() transformation in Spark RDDs is used to run shell commands or external scripts on each RDD partition.
It allows piping partition data through an external process, often used when integrating with command-line tools or legacy systems.

📘 Syntax

rdd.pipe(command)

command: A string representing the shell command to run.
Each partition’s data is sent to the external command via standard input, and the command’s standard output becomes the new RDD.


"""

data = spark.sparkContext.parallelize([
    "hello this is a lines ",
    "this is new",
    "this is aswome" ], numSlices=2)

pip_rdd = data.pipe("grep this")


print(pip_rdd.collect())

"""
⚠️ Notes

The command runs independently on each partition.
The external tool must be installed and accessible on every executor node.
Data is streamed via stdin/stdout, so it's line-oriented.
It’s not portable across platforms (Windows/Linux) or clusters without consistent environment setup.

"""

['hello this is a lines ', 'this is new', 'this is aswome']


"\n⚠️ Notes\n\nThe command runs independently on each partition.\nThe external tool must be installed and accessible on every executor node.\nData is streamed via stdin/stdout, so it's line-oriented.\nIt’s not portable across platforms (Windows/Linux) or clusters without consistent environment setup.\n\n"

In [None]:
#mapPartitions(func)	Applies a function to each partition.
"""

The mapPartitions() transformation applies a function to each partition of the RDD (not to each element like map()), which can be more efficient when working with large datasets or external connections like databases.

✅ Syntax
RDD.mapPartitions(f)
Why Use mapPartitions()?
Performance: Reduces function call overhead by applying once per partition.
Resource Sharing: Ideal when you need to open a DB connection or expensive resource once per partition, not for every record.

"""


data = spark.sparkContext.parallelize([1,2,3,4,5], numSlices=2)
print(data)


mp_rdd = data.mapPartitions(lambda x: [i*2 for i in x])

print(mp_rdd.collect())

print("\n using function:")
def double(partition):#function
  return [i*2 for i in partition]

mp_rdd2 = data.mapPartitions(double)#functon call

print('\n', mp_rdd2.collect())


ParallelCollectionRDD[782] at readRDDFromFile at PythonRDD.scala:289
[2, 4, 6, 8, 10]

 using function:

 [2, 4, 6, 8, 10]


In [None]:
#sample(withReplacement, fraction)	Samples a fraction of the data.

"""
📘 sample() in Spark RDDs
The sample() transformation in Apache Spark is used to extract a random sample from an RDD.

🧪 Syntax
RDD.sample(withReplacement, fraction, seed=None)
withReplacement (bool) – Can elements be selected more than once? (True = yes)
fraction (float) – Approximate fraction of the dataset to sample (e.g., 0.1 = 10%)
seed (int, optional) – Random seed for reproducibility

If withReplacement=True, some elements may appear multiple times.
If withReplacement=False, it's more like a subset of the original data.
Useful for testing, prototyping, or stratified sampling from large datasets.

TypeError: RDD.sample() missing 2 required positional arguments: 'withReplacement' and 'fraction'
"""


data = spark.sparkContext.parallelize([1,2,3,4,5])

sample_rdd = data.sample(withReplacement=False, fraction=0.5)

print(sample_rdd.collect())

sample_rdd1 = data.sample(withReplacement=True, fraction=0.6)

print(sample_rdd1.collect())

[1, 3, 4, 5]
[4, 4]


In [None]:
#cogroup
"""
The cogroup() transformation is used on two (or more) key-value RDDs. It groups the values of each RDD that share the same key into iterables (lists).
🧬 Syntax
rdd1.cogroup(rdd2)
Both rdd1 and rdd2 must be pair RDDs (i.e., RDDs of key-value tuples).
The result is an RDD of the form (key, (Iterable1, Iterable2)).

"""
rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
rdd2 = spark.sparkContext.parallelize([("a", "apple"), ("b", "banana"), ("c", "cherry")])

cogrp_rdd = rdd1.cogroup(rdd2)


for key, (vals1, vals2) in cogrp_rdd.collect():
 print(f"{key}: {list(vals1)}, {list(vals2)}")



b: [2], ['banana']
c: [], ['cherry']
a: [1, 3], ['apple']


In [None]:
 #aggregateByKey in Apache Spark (RDD)
"""
The aggregateByKey() transformation is used on pair RDDs (key-value pairs) to combine values with the same key using an aggregation function, similar to reduceByKey — but with more control over:

How values are combined within partitions
How values are combined between partitions

📌 Syntax
RDD.aggregateByKey(zeroValue)(seqFunc, combFunc)

zeroValue: The initial value for each key.
seqFunc: Function to merge a value into the accumulator within a partition.
combFunc: Function to merge accumulators from different partitions.

"""

rdd = spark.sparkContext.parallelize([
    ("math", 80),
    ("math", 60),
    ("math", 90),
    ("science", 70),
    ("science", 85),
    ("science", 75)], 2)


# aggregateByKey to get (max, sum) per subject
zero_value = (0, 0)  # (max, sum)

def seq_op(acc, value):
    return (max(acc[0], value), acc[1] + value)

def comb_op(acc1, acc2):
    return (max(acc1[0], acc2[0]), acc1[1] + acc2[1])

result =  rdd.aggregateByKey(zero_value, seq_op, comb_op)     #RDD.aggregateByKey(zeroValue)(seqFunc, combFunc)

print(result.collect())


[('math', (90, 230)), ('science', (85, 230))]


In [None]:
#repartitionAndSortWithinPartitions
"""
The repartitionAndSortWithinPartitions transformation is used to repartition an
RDD according to a custom partitioner and sort records within each partition.

 Syntax (PySpark)
rdd.repartitionAndSortWithinPartitions(partitioner, ascending=True, numPartitions=None)
partitioner: An object defining how the data is partitioned (like HashPartitioner or RangePartitioner).
ascending: Whether to sort in ascending order (default is True).
numPartitions: Optional. If not given, uses the default from the partitioner.
⚠️ This function is available in Scala/Java natively. In PySpark, this transformation is not directly exposed, so it’s commonly used through
sortByKey().partitionBy() as a workaround.
 """

from pyspark import SparkContext
from pyspark.rdd import portable_hash

data = [("cat", 1), ("apple", 3), ("banana", 2), ("dog", 4)]
rdd = spark.sparkContext.parallelize(data)

# Sort by key and then partition manually
sorted_rdd = rdd.sortByKey()
partitioned_rdd = sorted_rdd.partitionBy(2)

# Show results
def show_partition(index, iterator):
    yield f"Partition {index}: {list(iterator)}"

print(partitioned_rdd.mapPartitionsWithIndex(show_partition).collect())


["Partition 0: [('apple', 3), ('banana', 2), ('dog', 4)]", "Partition 1: [('cat', 1)]"]


In [36]:
#mapPartitions

"""
🔧 mapPartitions() in Spark — Syntax & Example
✅ What It Does:

mapPartitions() is a transformation that applies a function to each partition of the RDD rather than to each element (as map() does). It's efficient for batch processing and when initializing expensive resources (e.g., database connections) once per partition.

📌 Syntax:
rdd.mapPartitions(func)
func is a function that takes an iterator over a partition and returns another iterator.
🔍 Why Use mapPartitions()?
To reuse resources like database connections, sessions, or caches.
To optimize performance for large transformations where map() might be too fine-grained.
To batch process data partition-wise rather than row-by-row.

"""


rdd = spark.sparkContext.parallelize([1,2,3,4,5], numSlices=2)

def multiply(iterator):
  return [i*2 for i in iterator]

map_rdd = rdd.mapPartitions(multiply)

print(map_rdd.collect())

[2, 4, 6, 8, 10]


# ✅ RDD Actions

Actions trigger the execution of transformations and return results to the driver program or write data to external storage


| Action                                | Description                                                    |                                                                    |
| ------------------------------------- | -------------------------------------------------------------- | ------------------------------------------------------------------ |
| `collect()`                           | Returns all elements of the RDD as an array to the driver.     |                                                                    |
| `count()`                             | Returns the number of elements in the RDD.                     |                                                                    |
| `first()`                             | Returns the first element of the RDD.                          |                                                                    |
| `take(n)`                             | Returns the first `n` elements of the RDD.                     |                                                                    |
| `takeSample(withReplacement, num)`    | Returns a sample of `num` elements.                            |                                                                    |
| `reduce(func)`                        | Aggregates the elements using the specified function.          |                                                                    |
| `fold(zeroValue)(func)`               | Aggregates elements with a zero value and a function.          |                                                                    |
| `aggregate(zeroValue)(seqOp, combOp)` | Aggregates elements using sequence and combine functions.      |                                                                    |
| `foreach(func)`                       | Applies a function to each element (usually for side effects). |                                                                    |
| `saveAsTextFile(path)`                | Saves the RDD as a text file.                                  |                                                                    |
| `saveAsSequenceFile(path)`            | Saves the RDD as a SequenceFile.                               |                                                                    |
| `saveAsObjectFile(path)`              | Saves the RDD as a serialized Java object file.                |                                                                    |
| `countByKey()`                        | Counts the number of elements for each key.                    |                                                                    |
| `foreachPartition(func)`              | Applies a function to each partition.                          | ([Medium][1], [Stack Overflow][2], [Medium][3], [Apache Spark][4]) |

[1]: https://medium.com/%40sujathamudadla1213/spark-transformations-and-actions-ff4b576cbef8?utm_source=chatgpt.com "Spark RDD Transformations and Actions. | by Sujatha Mudadla"
[2]: https://stackoverflow.com/questions/45908291/rdd-transformation-and-actions?utm_source=chatgpt.com "RDD transformation and actions - apache spark - Stack Overflow"
[3]: https://medium.com/towardsdev/transformations-and-actions-in-spark-69d63adfdd0a?utm_source=chatgpt.com "Transformations and Actions in spark : | by shubham mishra - Medium"
[4]: https://spark.apache.org/docs/latest/rdd-programming-guide.html?utm_source=chatgpt.com "RDD Programming Guide - Spark 3.5.5 Documentation"


In [None]:
#collect

data = [1,2,3,4,5]

rdd = spark.sparkContext.parallelize(data)

print(rdd.collect())



[1, 2, 3, 4, 5]


In [None]:
#count

print(rdd.count())

5


In [None]:
#first
print(rdd.first())

1


In [None]:
#take
"""

The take(n) action returns the first n elements of the RDD as a list. It's useful when you want to quickly inspect a small portion of your dataset without collecting the entire RDD.

✅ Syntax
rdd.take(n)
n: Number of elements to return from the start of the RDD."""

print(rdd.take(3))

[1, 2, 3]


In [None]:
#takesample
"""
🎯 takeSample() in Spark RDD
The takeSample() action is used to randomly sample elements from an RDD.

✅ Syntax
RDD.takeSample(withReplacement, num, [seed])
withReplacement (bool): Whether to allow the same element to be chosen more than once.
num (int): Number of elements to sample.
seed (optional, int): Seed for random generator (for reproducibility).

"""
rdd = spark.sparkContext.parallelize([1,2,3,4,5])

print(rdd.takeSample(withReplacement=False, num=3))
print(rdd.takeSample(withReplacement=True, num=3))




[5, 4, 2]
[5, 2, 2]


In [None]:
#reduce

"""
The reduce() action in Apache Spark RDDs is used to aggregate the elements of an RDD using a binary function (a function that takes two arguments).

📘 Syntax

rdd.reduce(function)
The function must be commutative and associative so that it can be computed in parallel correctly.
It returns a single value, not an RDD.

"""

rdd = spark.sparkContext.parallelize([1,2,3,4,5])

redu_rdd = rdd.reduce(lambda x,y: x+y)#sum of total

print(redu_rdd)


15


In [None]:
#fold

"""
The fold() action aggregates elements in an RDD using a specified function and a zero value (an initial value).

🧪 Syntax
rdd.fold(zeroValue, func)
zeroValue: The initial value used for aggregation in each partition and also across partitions.
func: A function that combines two values.
🔔 fold() is similar to reduce(), but with a zero value that is used in every partition.

The RDD is split into partitions.
The function (a + b) is applied in each partition starting with the zero value 0.
The results from each partition are then folded together again using the same function and the zero value.

"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("fold_example").getOrCreate()

rdd = spark.sparkContext.parallelize([1,2,3,4,5])

fold_rdd = rdd.fold(0, lambda x,y: x+y)

print(fold_rdd)

fold_rdd1 = rdd.fold(1, lambda x,y: x*y) #print(1*2*3*4*5)

print(fold_rdd1)



15
120


In [None]:
#aggregate
"""
The aggregate() function in Apache Spark RDD is a powerful action used to compute results across RDD elements using different logic for local aggregation and global aggregation.

🧠 What is aggregate()?

RDD.aggregate(zeroValue)(seqOp, combOp)
Parameters:
zeroValue: The initial value for the aggregation (like 0 for sum or an empty list).
seqOp: Function to accumulate data within a partition (local aggregation).
combOp: Function to combine results across partitions (global aggregation).

"""

# Create an RDD of numbers
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# zeroValue is (sum, count)
zero_value = (0, 0)

# seqOp: add current value to sum and increment count
seq_op = lambda acc, value: (acc[0] + value, acc[1] + 1)
print(f"seq_op: " , seq_op)

# combOp: combine two tuples from different partitions
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
print(f"comb_op :", comb_op)

# Run aggregate
result = rdd.aggregate(zero_value, seq_op, comb_op)

print("Sum:", result[0])
print("Count:", result[1])


seq_op:  <function <lambda> at 0x7a0782624220>
comb_op : <function <lambda> at 0x7a0782626fc0>
Sum: 15
Count: 5


In [None]:
#foreach

"""
The foreach() action applies a function to each element in the RDD, but does not return a value to the driver. It's typically used for side effects, like writing to databases, logging, or updating external systems.

🧪 Syntax
rdd.foreach(function)
function: A function to apply to each element of the RDD.
Unlike map(), foreach() does not return a new RDD.


"""


data = spark.sparkContext.parallelize([1,2,3,4,5])

# Use foreach to print each element (will print from worker nodes, not the driver)
data.foreach(lambda x: print(f"Value: {x}")) #you dont see the output

for item in data.collect():
    print(f"Value: {item}")


#collect foreach data

transformed_data = data.map(lambda x: x * 2)

transformed_data.foreach(print)


Value: 1
Value: 2
Value: 3
Value: 4
Value: 5


In [None]:
#saveAsTextFile
"""
The saveAsTextFile() action writes the contents of an RDD to a text file (or directory) in the distributed file system
(like HDFS, S3, or local FS). Each element of the RDD is written as a line in the output text file.

🧬 Syntax
rdd.saveAsTextFile(path)
path: The output path (directory) where the files will be stored.
Creates a directory with multiple part files (one per partition).
📂 Output Structure
In the output/fruit_names/ directory, Spark will create files like:

output/
└── fruit_names/
    ├── part-00000
    ├── part-00001
    └── _SUCCESS
Each part-* file contains a slice of the RDD's data, one item per line.
"""
#dataload

data = """ he saveAsTextFile() action writes the contents of an RDD to a text file (or directory) in the distributed file system
(like HDFS, S3, or local FS). Each element of the RDD is written as a line in the output text file.

🧬 Syntax
rdd.saveAsTextFile(path)
path: The output path (directory) where the files will be stored.
Creates a directory with multiple part files (one per partition).

"""

rdd = spark.sparkContext.parallelize(data)

t2= rdd.flatMap(lambda x: x.split(" "))

t3=t2.map(lambda x: (x,1))

t4=t3.reduceByKey(lambda x,y: x+y)

t4.repartition(1).saveAsTextFile("/content/sample_data/output.txt")
t4.getNumPartitions()

2

In [34]:
#countbykey

"""
countByKey() is an action used with key-value RDDs (RDDs of tuples) to count the number of occurrences for each key.

✅ Syntax
RDD.countByKey()
Returns a dictionary (dict) where each key maps to its count.
Only works on RDDs of type (K, V) (i.e., key-value pairs).

"""

data = [("a", 1), ("b", 1), ("a", 1), ("b", 1), ("a", 1)]
rdd = spark.sparkContext.parallelize(data)

count_by_key = rdd.countByKey()

print(count_by_key)

count_by_key.items()
count_by_value = rdd.countByValue()
print(count_by_value)

defaultdict(<class 'int'>, {'a': 3, 'b': 2})
defaultdict(<class 'int'>, {('a', 1): 3, ('b', 1): 2})


In [47]:
#foreachPartition

"""
The foreachPartition() action applies a function to each partition of the RDD, rather than each element like foreach().

It’s useful for efficient operations, such as opening a database connection once per partition instead of once per element.

✅ Syntax
rdd.foreachPartition(func)
func: A function that takes an iterator and processes all elements in a single partition.

"""

rdd = spark.sparkContext.parallelize([1,2,3,4,5,6], 3)

def process_partition(partition):
  print("process new partition")
  for item in partition:
    print(item)

rdd.foreachPartition(process_partition)
rdd.foreachPartition(lambda partition: print(list(partition)))


In [49]:
#treeAggregate

"""
treeAggregate() is an optimized version of the aggregate() action in Spark, used to aggregate data in parallel using a tree pattern,
which reduces communication cost and improves performance.

It is especially useful for large-scale aggregations, as it avoids bottlenecks that can occur when all partitions send their results directly
to the driver.

🧮 Syntax
rdd.treeAggregate(zeroValue)(seqOp, combOp, depth=2)
zeroValue: Initial value used for both seqOp and combOp.
seqOp: Function to merge an element from the RDD into the accumulator.
combOp: Function to merge two accumulators.
depth: Tree depth used to control the number of aggregation stages.

"""

rdd = spark.sparkContext.parallelize([1,2,3,4,5])

# Use treeAggregate to compute (sum, count)
zero_value = (0, 0)

# seqOp: Add value to running total and count
seq_op = lambda acc, x: (acc[0] + x, acc[1] + 1)

# combOp: Merge two accumulators
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])

result = rdd.treeAggregate(zero_value, seq_op, comb_op)

print(f"Sum: {result[0]}, Count: {result[1]}")



Sum: 15, Count: 5


TreeReduce

treeReduce() is an optimized version of reduce() in Apache Spark that performs reduction in a tree-like hierarchical fashion, which can improve performance for large distributed datasets.

| Feature     | `reduce()`               | `treeReduce()`                         |
| ----------- | ------------------------ | -------------------------------------- |
| Execution   | Linear aggregation       | Hierarchical (tree-based) aggregation  |
| Performance | Slower on large clusters | Faster due to parallel reduction steps |
| Usage       | Basic reductions         | Large-scale or expensive computations  |
| Tuning      | No control over depth    | You can set the `depth` of the tree    |


In [52]:
"""
✅ Syntax
rdd.treeReduce(func, depth=2)
func: A binary function used to reduce elements.
depth: Optional. Controls the tree's depth (default is 2). More depth = more stages but less data per stage.
💡 When to Use treeReduce()
When working with large RDDs across many partitions.
When a faster, more balanced reduction is needed.
For costly aggregation functions where reducing communication overhead is critical.

"""
rdd = spark.sparkContext.parallelize([1,2,3,4,5])
print(rdd.treeReduce(lambda x,y: x+y))


15


In [53]:
#countApprox
"Return approximate count of elements in the dataset, this method returns incomplete when execution time meets timeout."

print(rdd.countApprox(timeout=10, confidence=0.95))


5


In [54]:
#countapproxdistinct
"Return an approximate number of distinct elements in the dataset."
print(rdd.countApproxDistinct(relativeSD=0.1))


5


In [59]:
print(rdd.first())
print(rdd.min())
print(rdd.max())
print(rdd.top(2))

1
1
5
[5, 4]


ACTIONS:

RDD actions are operations that return non-RDD values, since RDD’s are lazy they do not execute the transformation functions until we call actions. hence, all these functions trigger the transformations to execute and finally returns the value of the action functions to the driver program.