**Table of contents**<a id='toc0_'></a>    
- [mapPartitions(func)](#toc1_)    
- [map(func)](#toc2_)    
- [flatMap(func)](#toc3_)    
- [filter(func)](#toc4_)    
- [distinct([numPartitions])      ](#toc5_)    
- [Set Operations](#toc6_)    
  - [union(otherRDD)](#toc6_1_)    
  - [intersection(otherRDD)](#toc6_2_)    
  - [subtract(otherRDD)](#toc6_3_)    
  - [cartesian(otherRDD)](#toc6_4_)    
- [groupByKey-NumPartitions](#toc7_)    
- [reduceByKey(func, [numPartitions])     ](#toc8_)    
- [aggregateByKey(zeroValue)(seqFunc, combFunc, [numPartitions])     ](#toc9_)    
- [sortByKey(ascending=True, numPartitions=None, keyfunc=<function>)](#toc10_)    
- [join(otherRDD, [numPartitions])    ](#toc11_)    
- [cogroup(otherRDD, [numPartitions])    ](#toc12_)    
- [coalesce(numPartitions, shuffle=False)](#toc13_)    
- [repartition(numPartitions)](#toc14_)    
- [sample(withReplacement, fraction, seed=None)](#toc15_)    
- [randomSplit(weights, seed=None)](#toc16_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD-Examples").getOrCreate()
sc = spark.sparkContext

# <a id='toc1_'></a>[mapPartitions(func)](#toc0_)

`mapPartitions` in Spark applies a function to each partition of an RDD or DataFrame. 
- It takes an iterable of rows as input and returns an iterable of processed rows. 
- This method is efficient for operations that benefit from accessing multiple rows at once, such as batch processing or adding new columns. 
- It reduces overhead by processing data in bulk rather than row-by-row, allowing for custom logic that leverages the entire partition's data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Assuming 'auto_df' is your original DataFrame
# For this example, let's create a sample DataFrame
data = [(1970, 18.0), (1971, 15.0), (1972, 16.0)]
columns = ["modelyear", "mpg"]
sample_df = spark.createDataFrame(data, columns)

# Define the function to process each partition
def add_processed_info(rows):
    processed_rows = []
    for row in rows:
        # Process the row to create a new value
        new_info = f"Model Year: {row.modelyear}, MPG: {row.mpg}"
        # Append the new Row with the original data and the new info
        processed_rows.append(Row(modelyear=row.modelyear, mpg=row.mpg, processed_info=new_info))
    return processed_rows

# Apply mapPartitions to add the new column
new_rdd = sample_df.rdd.mapPartitions(add_processed_info)

# Convert the RDD back to a DataFrame
df_with_new_column = spark.createDataFrame(new_rdd)

# Show the new DataFrame with the added column
df_with_new_column.show(truncate=False)


In [None]:
# Create an RDD with 10 elements
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)

# Define a function that takes an iterator of elements and returns the square of each element
def square_partition(iter):
    for x in iter:
        yield x ** 2

# Apply the function to each partition of the RDD using mapPartitions
squared_rdd = rdd.mapPartitions(square_partition)

# Print the resulting RDD
print(squared_rdd.glom().collect())  # Prints [[1, 4, 9], [16, 25, 36], [49, 64, 81, 100]]

print(squared_rdd.collect())  # Prints [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


# <a id='toc2_'></a>[map(func)](#toc0_)

In [4]:
rdd = sc.parallelize([1, 2, 3, 4])
result = rdd.map(lambda x: x * 2)
print(result.collect())  # Output: [2, 4, 6, 8]


[2, 4, 6, 8]


In [None]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd_1.map(lambda x: (x[0].upper(), x[1]))

result = mapped_rdd.collect()
print("rdd with uppercease name: ", result)

# <a id='toc3_'></a>[flatMap(func)](#toc0_)

In [5]:
rdd = sc.parallelize([1, 2, 3])
result = rdd.flatMap(lambda x: (x, x*2))
print(result.collect())  # Output: [1, 2, 2, 4, 3, 6]

[1, 2, 2, 4, 3, 6]


In [None]:
rdd9 = rdd5.flatMap(lambda x: x.split(" "))
rdd9.collect()

# <a id='toc4_'></a>[filter(func)](#toc0_)

In [6]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.filter(lambda x: x % 2 == 0)
print(result.collect())  # Output: [2, 4]


[2, 4]


In [None]:
# Filter transformation: Filter records where age is greater than 30
filtered_rdd = rdd_1.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

# <a id='toc5_'></a>[distinct([numPartitions])](#toc0_)       [&#8593;](#toc0_)

In [7]:
rdd = sc.parallelize([1, 2, 3, 3, 4])
result = rdd.distinct()
print(result.collect())  # Output: [1, 2, 3, 4]


[4, 1, 2, 3]


# <a id='toc6_'></a>[Set Operations](#toc0_)

In [6]:
import numpy as np

list_1=np.random.randint(0,10,3)
rdd_1=sc.parallelize(list_1)

list_2=np.random.randint(10,20,3)
rdd_2=sc.parallelize(list_2)



rdd_1: [7, 5, 1]
rdd_2: [18, 16, 11]


## <a id='toc6_1_'></a>[union(otherRDD)](#toc0_)

In [8]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)
print(result.collect())  # Output: [1, 2, 3, 3, 4, 5]


[1, 2, 3, 3, 4, 5]


In [8]:
print("rdd_1:",rdd_1.collect())
print("rdd_2:",rdd_2.collect())

(rdd_1+rdd_2).collect()

rdd_1: [7, 5, 1]
rdd_2: [18, 16, 11]


[7, 5, 1, 18, 16, 11]

## <a id='toc6_2_'></a>[intersection(otherRDD)](#toc0_)


In [9]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([2, 3, 4])
result = rdd1.intersection(rdd2)
print(result.collect())  # Output: [2, 3]


[2, 3]


## <a id='toc6_3_'></a>[subtract(otherRDD)](#toc0_)

In [10]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([2, 3, 4])
result = rdd1.subtract(rdd2)
print(result.collect())  # Output: [1]


[1]


## <a id='toc6_4_'></a>[cartesian(otherRDD)](#toc0_)

In [11]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4])
result = rdd1.cartesian(rdd2)
print(result.collect())  # Output: [(1, 3), (1, 4), (2, 3), (2, 4)]


[(1, 3), (1, 4), (2, 3), (2, 4)]


In [11]:
print("rdd_1:",rdd_1.collect())
print("rdd_2:",rdd_2.collect())

rdd_1.cartesian(rdd_2).collect()

rdd_1: [7, 5, 1]
rdd_2: [18, 16, 11]


[(7, 18),
 (7, 16),
 (7, 11),
 (5, 18),
 (5, 16),
 (5, 11),
 (1, 18),
 (1, 16),
 (1, 11)]

# <a id='toc7_'></a>[groupByKey-NumPartitions](#toc0_)

In [12]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
result = rdd.groupByKey().mapValues(list)
print(result.collect())  # Output: [('a', [1, 3]), ('b', [2])]


[('b', [2]), ('a', [1, 3])]


In [None]:
# Example of groupByKey
data = [(1, 2), (2, 3), (1, 4), (2, 1),(1, 2), (2, 3), (1, 4), (2, 1)]
rdd = sc.parallelize(data)
result_data = rdd.groupByKey().collect()


# Function to convert ResultIterable to list and print
def print_result_iterable(result_iterable):
    return list(result_iterable)

# Print the contents of each iterable
for key, iterable in result_data:
    print(f"Key: {key}, Iterable Contents: {print_result_iterable(iterable)}")

# Key: 1, Iterable Contents: [2, 4, 2, 4]
# Key: 2, Iterable Contents: [3, 1, 3, 1]

result_data

# [(1, <pyspark.resultiterable.ResultIterable at 0x1e932d85a60>),
#  (2, <pyspark.resultiterable.ResultIterable at 0x1e931c6d8b0>)]


In [None]:
wordPairsRDD.groupByKey().mapValues(len).collect()

# [('one', 1), ('two', 2), ('three', 3)]

In [None]:
wordPairsRDD.groupByKey().mapValues(list).collect()

# [('one', [1]), ('two', [1, 1]), ('three', [1, 1, 1])]

# <a id='toc8_'></a>[reduceByKey(func, [numPartitions])](#toc0_)      [&#8593;](#toc0_)



In [13]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())  # Output: [('a', 4), ('b', 2)]


[('b', 2), ('a', 4)]


In [None]:
# ReduceByKey transformation: Calculate the total age for each name
reduced_rdd = rdd_1.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

In [None]:
# Example of reduceByKey to sum values per key
data = [(1, 2), (2, 3), (1, 4), (2, 1), (1, 2), (2, 3), (1, 4), (2, 1)]
rdd = sc.parallelize(data)
result = rdd.reduceByKey(lambda x, y: x + y).collect()
result
# Output:[(1, 12), (2, 8)]


In [None]:
'''
ReduceByKey: Data are combined at each partition and only one output for each key at each paritiion to send 
over the network. It basically does everything ay the map side
'''


words = ["one", "two", "two", "three", "three", "three"]

wordPairsRDD  =sc.parallelize(words).map(lambda a: (a,1))

rdd1 = wordPairsRDD.reduceByKey(lambda a,b:a+b).collect() # [('one', 1), ('two', 2), ('three', 3)]
rdd2 = wordPairsRDD.reduceByKey(add).collect() #[('one', 1), ('two', 2), ('three', 3)]

# <a id='toc9_'></a>[aggregateByKey(zeroValue)(seqFunc, combFunc, [numPartitions])](#toc0_)      [&#8593;](#toc0_)



In [15]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
zero_value = 0
seq_func = lambda acc, value: acc + value
comb_func = lambda acc1, acc2: acc1 + acc2
result = rdd.aggregateByKey(zero_value, seq_func, comb_func)
print(result.collect())  # Output: [('a', 4), ('b', 2)]


[('b', 2), ('a', 4)]


# <a id='toc10_'></a>[sortByKey(ascending=True, numPartitions=None, keyfunc=<function>)](#toc0_)



In [16]:
rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)])
result = rdd.sortByKey()
print(result.collect())  # Output: [('a', 1), ('b', 2), ('c', 3)]


[('a', 1), ('b', 2), ('c', 3)]


In [None]:
# SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd_1.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

# <a id='toc11_'></a>[join(otherRDD, [numPartitions])](#toc0_)     [&#8593;](#toc0_)

In [17]:
rdd1 = sc.parallelize([('a', 1), ('b', 2)])
rdd2 = sc.parallelize([('a', 3), ('b', 4)])
result = rdd1.join(rdd2)
print(result.collect())  # Output: [('a', (1, 3)), ('b', (2, 4))]


[('a', (1, 3)), ('b', (2, 4))]


# <a id='toc12_'></a>[cogroup(otherRDD, [numPartitions])](#toc0_)     [&#8593;](#toc0_)



In [18]:
rdd1 = sc.parallelize([('a', 1), ('b', 2)])
rdd2 = sc.parallelize([('a', 3), ('a', 4)])
result = rdd1.cogroup(rdd2).mapValues(lambda x: (list(x[0]), list(x[1])))
print(result.collect())  # Output: [('a', ([1], [3, 4])), ('b', ([2], []))]


[('a', ([1], [3, 4])), ('b', ([2], []))]


# <a id='toc13_'></a>[coalesce(numPartitions, shuffle=False)](#toc0_)



In [20]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
result = rdd.coalesce(2)
print(result.getNumPartitions())  # Output: 2


2


# <a id='toc14_'></a>[repartition(numPartitions)](#toc0_)



In [21]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
result = rdd.repartition(3)
print(result.getNumPartitions())  # Output: 3


3


# <a id='toc15_'></a>[sample(withReplacement, fraction, seed=None)](#toc0_)



In [22]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.sample(False, 0.4, 42)
print(result.collect())  # Output: a random subset of elements


[1]


# <a id='toc16_'></a>[randomSplit(weights, seed=None)](#toc0_)



In [23]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
splits = rdd.randomSplit([0.5, 0.5], 42)
print([split.collect() for split in splits])  # Output: two random subsets


[[1], [2, 3, 4, 5]]
