## RDD Tranformation & Actions

# tranformations Examples

# creating Spark Session

In [32]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD Operations").getOrCreate()
sc = spark.sparkContext


âœ… 1. map()

Apply a function to each element.

In [34]:
rdd = sc.parallelize([1,2,3,4])
result = rdd.map(lambda x: x + 2)
print(result.collect())

[3, 4, 5, 6]


âœ… 2. flatMap()

Like map, but output can be multiple items per input.

In [35]:
rdd = sc.parallelize(["hello world", "big data","Ramoji Rao","Prabas Sai"])
result = rdd.flatMap(lambda x: x.split(" "))
print(result.collect())



['hello', 'world', 'big', 'data', 'Ramoji', 'Rao', 'Prabas', 'Sai']


âœ… 3. filter()

Return only elements that satisfy a condition.

In [37]:
rdd = sc.parallelize([1,2,3,4,5,6])
evens = rdd.filter(lambda x: x % 2 == 0)
odd = rdd.filter(lambda y: y%2 !=0)
print(evens.collect())
print("odd numbers", odd.collect())

[2, 4, 6]
odd numbers [1, 3, 5]


âœ… 4. mapPartitions()

Apply logic to each partition.

In [45]:
rdd = sc.parallelize([1,2,3,4], 2)

def multiply(iterator):
    return (x * 10 for x in iterator)
    
result = rdd.mapPartitions(multiply)
print(result.collect())

[10, 20, 30, 40]


âœ… 5. mapPartitionsWithIndex()

Get partition index + values.

In [40]:
rdd = sc.parallelize([1,2,3,4,5,6], 3)

def f(index, iterator):
    return (f"Partition {index}: {x}" for x in iterator)

print(rdd.mapPartitionsWithIndex(f).collect())


['Partition 0: 1', 'Partition 0: 2', 'Partition 1: 3', 'Partition 1: 4', 'Partition 2: 5', 'Partition 2: 6']


âœ… 6. union()

Merge two RDDs.

In [46]:
r1 = sc.parallelize([1,2,3])
r2 = sc.parallelize([3,4,1])
print(r1.union(r2).collect())
# [1,2,3,4]


[1, 2, 3, 3, 4, 1]


âœ… 7. distinct()

Remove duplicates.

In [47]:
rdd = sc.parallelize([1,2,2,3,3])
print(rdd.distinct().collect())

[1, 2, 3]


In [48]:
print(r1.union(r2).distinct().collect())

[1, 2, 3, 4]


âœ… 8. sample()

Random sample of RDD.

In [55]:
rdd = sc.parallelize(range(10))
print(rdd.collect())
print(rdd.sample(False, 0.4).collect())
print(rdd.takeSample(True,5))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2, 3, 6, 7, 9]
[6, 4, 0, 6, 1]


âœ… 9. groupBy()

Group elements by a function.

In [56]:
rdd = sc.parallelize([1,2,3,4,5,6])
groups = rdd.groupBy(lambda x: x % 2)
print([(k, list(v)) for k,v in groups.collect()])


[(0, [2, 4, 6]), (1, [1, 3, 5])]



## Key-Value Pair Transformations


âœ… 10. groupByKey()

Group values under each key.

In [12]:
rdd = sc.parallelize([("a",1),("a",2),("b",3)])
result = rdd.groupByKey().mapValues(list)
print(result.collect())

[('a', [1, 2]), ('b', [3])]


âœ… 11. reduceByKey()

Reduce values under each key.

In [13]:
rdd = sc.parallelize([("a",1),("a",2),("b",3)])
print(rdd.reduceByKey(lambda a,b: a+b).collect())



[('a', 3), ('b', 3)]


âœ… 12. combineByKey() (Most powerful)

Allows custom create, merge, combine functions.

In [14]:
rdd = sc.parallelize([("a",10),("a",20),("b",5)])

create = lambda x: (x,1)
merge_val = lambda acc,x: (acc[0]+x, acc[1]+1)
merge_com = lambda a,b: (a[0]+b[0], a[1]+b[1])

print(rdd.combineByKey(create, merge_val, merge_com).collect())


[('a', (30, 2)), ('b', (5, 1))]


âœ… 13. aggregateByKey()

Aggregate values with two functions.

In [15]:
rdd = sc.parallelize([("a",1),("a",3),("b",4)])

result = rdd.aggregateByKey(0, 
                            lambda acc, v: acc + v,
                            lambda a, b: a + b)

print(result.collect())


[('a', 4), ('b', 4)]


âœ… 14. sortByKey()

In [16]:
rdd = sc.parallelize([(3,"c"),(1,"a"),(2,"b")])
print(rdd.sortByKey().collect())


[(1, 'a'), (2, 'b'), (3, 'c')]


âœ… 15. join()

Join two RDDs by key.

In [17]:
r1 = sc.parallelize([("a",1),("b",2)])
r2 = sc.parallelize([("a","apple"),("b","ball")])
print(r1.join(r2).collect())


[('a', (1, 'apple')), ('b', (2, 'ball'))]


16. leftOuterJoin(), rightOuterJoin(), fullOuterJoin()

In [18]:
r1 = sc.parallelize([("a",1)])
r2 = sc.parallelize([("a",2),("b",3)])

print(r1.leftOuterJoin(r2).collect())


[('a', (1, 2))]


## RDD Actions (Trigger Execution)

âœ… 1. collect()

Returns all elements.

In [19]:
print(rdd.collect())


[(3, 'c'), (1, 'a'), (2, 'b')]


âœ… 2. count()

In [20]:
print(rdd.count())


3


âœ… 3. first()

In [21]:
print(rdd.first())

(3, 'c')


âœ… 4. take(n)

In [23]:
print(rdd.take(3))


[(3, 'c'), (1, 'a'), (2, 'b')]


âœ… 5. takeSample()

In [24]:
print(rdd.takeSample(False, 2))


[(2, 'b'), (1, 'a')]


âœ… 6. reduce()

In [25]:
rdd = sc.parallelize([1,2,3,4])
print(rdd.reduce(lambda a,b: a+b))



10


âœ… 7. fold()

Like reduce, but requires zero value.

In [26]:
rdd = sc.parallelize([1,2,3])
print(rdd.fold(0, lambda a,b: a+b))


6


In [27]:
rdd = sc.parallelize([1,2,3,4])
print(rdd.aggregate(0,
                    lambda acc,v: acc+v,
                    lambda a,b: a+b))


10


âœ… 9. countByValue()

In [28]:
rdd = sc.parallelize([1,2,2,3,3,3])
print(rdd.countByValue())
# {1:1, 2:2, 3:3}


defaultdict(<class 'int'>, {1: 1, 2: 2, 3: 3})


âœ… 10. foreach()

Execute function on each element (runs on worker nodes).

In [31]:
rdd.foreach(lambda x: print(x))


ðŸŸ¢ 11. saveAsTextFile()

In [30]:
rdd.saveAsTextFile("output_folder")