**Table of contents**<a id='toc0_'></a>    
- [map(func)](#toc1_)    
- [flatMap(func)](#toc2_)    
- [filter(func)](#toc3_)    
- [distinct([numPartitions])](#toc4_)    
- [union(otherRDD)](#toc5_)    
- [intersection(otherRDD)](#toc6_)    
- [subtract(otherRDD)](#toc7_)    
- [cartesian(otherRDD)](#toc8_)    
- [groupByKey([numPartitions])](#toc9_)    
- [reduceByKey(func, [numPartitions])](#toc10_)    
- [aggregateByKey(zeroValue)(seqFunc, combFunc, [numPartitions])](#toc11_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD-Examples").getOrCreate()
sc = spark.sparkContext

# <a id='toc1_'></a>[map(func)](#toc0_)

In [4]:
rdd = sc.parallelize([1, 2, 3, 4])
result = rdd.map(lambda x: x * 2)
print(result.collect())  # Output: [2, 4, 6, 8]


[2, 4, 6, 8]


# <a id='toc2_'></a>[flatMap(func)](#toc0_)

In [5]:
rdd = sc.parallelize([1, 2, 3])
result = rdd.flatMap(lambda x: (x, x*2))
print(result.collect())  # Output: [1, 2, 2, 4, 3, 6]

[1, 2, 2, 4, 3, 6]


# <a id='toc3_'></a>[filter(func)](#toc0_)

In [6]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.filter(lambda x: x % 2 == 0)
print(result.collect())  # Output: [2, 4]


[2, 4]


# <a id='toc4_'></a>[distinct([numPartitions])](#toc0_) [&#8593;](#toc0_)

In [7]:
rdd = sc.parallelize([1, 2, 3, 3, 4])
result = rdd.distinct()
print(result.collect())  # Output: [1, 2, 3, 4]


[4, 1, 2, 3]


# <a id='toc5_'></a>[union(otherRDD)](#toc0_)

In [8]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)
print(result.collect())  # Output: [1, 2, 3, 3, 4, 5]


[1, 2, 3, 3, 4, 5]


# <a id='toc6_'></a>[intersection(otherRDD)](#toc0_)


In [9]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([2, 3, 4])
result = rdd1.intersection(rdd2)
print(result.collect())  # Output: [2, 3]


[2, 3]


# <a id='toc7_'></a>[subtract(otherRDD)](#toc0_)

In [10]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([2, 3, 4])
result = rdd1.subtract(rdd2)
print(result.collect())  # Output: [1]


[1]


# <a id='toc8_'></a>[cartesian(otherRDD)](#toc0_)

In [11]:
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4])
result = rdd1.cartesian(rdd2)
print(result.collect())  # Output: [(1, 3), (1, 4), (2, 3), (2, 4)]


[(1, 3), (1, 4), (2, 3), (2, 4)]


# <a id='toc9_'></a>[groupByKey([numPartitions])](#toc0_)



In [12]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
result = rdd.groupByKey().mapValues(list)
print(result.collect())  # Output: [('a', [1, 3]), ('b', [2])]


[('b', [2]), ('a', [1, 3])]


# <a id='toc10_'></a>[reduceByKey(func, [numPartitions])](#toc0_)



In [13]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())  # Output: [('a', 4), ('b', 2)]


[('b', 2), ('a', 4)]


# <a id='toc11_'></a>[aggregateByKey(zeroValue)(seqFunc, combFunc, [numPartitions])](#toc0_)



In [15]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])
zero_value = 0
seq_func = lambda acc, value: acc + value
comb_func = lambda acc1, acc2: acc1 + acc2
result = rdd.aggregateByKey(zero_value, seq_func, comb_func)
print(result.collect())  # Output: [('a', 4), ('b', 2)]


[('b', 2), ('a', 4)]


# sortByKey(ascending=True, numPartitions=None, keyfunc=<function>)



In [16]:
rdd = sc.parallelize([('b', 2), ('a', 1), ('c', 3)])
result = rdd.sortByKey()
print(result.collect())  # Output: [('a', 1), ('b', 2), ('c', 3)]


[('a', 1), ('b', 2), ('c', 3)]
