In [1]:
MASTERNODE_PRIVATE_IP="192.168.2.30"

## Setting up Spark Session / Context

In [2]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession.builder\
        .master(f"spark://{MASTERNODE_PRIVATE_IP}:7077") \
        .appName("Lecture1_Example3_RDD_function_examples")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 8)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/10 07:09:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<img align=left src="images/pyspark-rdd.svg" width=750 height=750 />

---

## 1. Using getNumPartitions
<img align=left src="images/pyspark-getNumPartitions.svg" width=500 height=500 />

In [3]:
# getNumPartitions
rdd = spark_context.parallelize(range(100_000_000), 16)
rdd.getNumPartitions()

16

---


## 2. Using map & mapValues:
<img align=left src="images/pyspark-map.svg" width=475 height=500/>
<img align=right src="images/pyspark-mapValues.svg" width=475 height=500/>

In [4]:
x = spark_context.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return x[0] + ":" + "-".join(x[1])

# map
print("Result of map:", x.map(f).collect())

# mapValues - example 1
def f(x): return len(x)
print("Result of mapValues (example 1):", x.mapValues(f).collect())

# mapValues - example 2
def f(x): return "-".join(x)
print("Result of mapValues (example 2):", x.mapValues(f).collect())

                                                                                

Result of map: ['a:apple-banana-lemon', 'b:grapes']
Result of mapValues (example 1): [('a', 3), ('b', 1)]
Result of mapValues (example 2): [('a', 'apple-banana-lemon'), ('b', 'grapes')]


---

## 3. Using flatMap & flatMapValues:
<img align=left src="images/pyspark-flatMap.svg" width=475 height=475 />
<img align=left src="images/pyspark-flatMapValues.svg" width=475 height=475 />

In [5]:
# flatMap
x = spark_context.parallelize([1, 2, 3, 4, 5, 6])
print("Result of flatMap:", x.flatMap(lambda x: (x, 100*x, x**2)))

# flatMapValues
x = spark_context.parallelize([("a", [1, 2, 3]), ("b", [5, 6])])
print("Result of flatMapValues:", x.flatMapValues(lambda x: [i+2 for i in x]).collect())

Result of flatMap: PythonRDD[6] at RDD at PythonRDD.scala:58
Result of flatMapValues: [('a', 3), ('a', 4), ('a', 5), ('b', 7), ('b', 8)]


---

## 4. Using mapPartitions & mapPartitionsWithIndex:
<img align=left src="images/pyspark-mapPartition.svg" width=475 height=475/>
<img align=left src="images/pyspark-mapPartitionWithIndex.svg" width=475 height=475/>

In [6]:
# rdd = spark_context.parallelize(range(100_000_000), 16)

# mapPartitions
def f(iterator): yield sum(iterator)

rdd_2 = rdd.mapPartitions(f)
print(rdd_2.glom().collect())  # glom() flattens elements on the same partition


# mapPartitionsWithIndex
def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))

rdd_2 = rdd.mapPartitionsWithIndex(f)
print(rdd_2.glom().collect())  # glom() flattens elements on the same partition

                                                                                

[[19531246875000], [58593746875000], [97656246875000], [136718746875000], [175781246875000], [214843746875000], [253906246875000], [292968746875000], [332031246875000], [371093746875000], [410156246875000], [449218746875000], [488281246875000], [527343746875000], [566406246875000], [605468746875000]]
[[(0, 19531246875000)], [(1, 58593746875000)], [(2, 97656246875000)], [(3, 136718746875000)], [(4, 175781246875000)], [(5, 214843746875000)], [(6, 253906246875000)], [(7, 292968746875000)], [(8, 332031246875000)], [(9, 371093746875000)], [(10, 410156246875000)], [(11, 449218746875000)], [(12, 488281246875000)], [(13, 527343746875000)], [(14, 566406246875000)], [(15, 605468746875000)]]


                                                                                

---

## 5. Using filter:
<img align=left src="images/pyspark-filter.svg" width=500 height=500/>

In [7]:
# filter
rdd_2 = rdd.filter(lambda x: x%2 == 1 and x < 20)
print("Result of filter:", rdd_2.collect())



Result of filter: [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]


                                                                                

---


## 6. Using distinct:
<img align=left src="images/pyspark-distinct.svg" width=500 height=500/>

In [8]:
# distinct
x = spark_context.parallelize(["A","A","B"])
y = x.distinct()
print("Original RDD:", x.collect())
print("Distinct RDD:", y.collect())

Original RDD: ['A', 'A', 'B']




Distinct RDD: ['B', 'A']


                                                                                

In [9]:
spark_session.stop()