## RDDs are the fundamental data structure in Apache Spark and were designed to meet the challenges of distributed computing, especially for handling large-scale data efficiently

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf

#change config
conf = SparkConf()
conf.set("spark.log.level", "error")  # To display only errors
conf.set("spark.ui.showConsoleProgress", "false")  # To not display Spark job progress in Python

# SparkContext for RDD
sc = SparkContext.getOrCreate(conf=conf)

# SparkSession for RDD
spark = SparkSession.builder.master("local").appName("Introduction to DataFrame").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Setting Spark log level to "ERROR".


# Transformations

### map() - applies function to each element - narrow transformation

In [2]:
#map
x_map = sc.parallelize([1, 2, 3, 4, 5])
y_map = x_map.map(lambda x: (x,x**2))
# collect copies the elements of the RDD to a list
print('Values x_map: {0}'.format(x_map.collect()))
print('Values y_map: {0}'.format(y_map.collect()))

Values x_map: [1, 2, 3, 4, 5]
Values y_map: [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]


In [3]:
x_map.getNumPartitions()

2

### filter() - filters elements using a function that return a boolean - narrow transformation

In [4]:
x_rdd = sc.parallelize([1, 2, 3, 4, 5,6,7,8,9])
y_filter = x_rdd.filter(lambda x: x%2 == 0) 
print('Values x_rdd: {0}'.format(x_rdd.collect()))
print('Values y_filter: {0}'.format(y_filter.collect()))

Values x_rdd: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Values y_filter: [2, 4, 6, 8]


In [5]:
x_rdd.getNumPartitions()

2

### flatMap() - applies a function to each element and flattens the result - narrow transformation

In [6]:
x_flatMap_rdd = sc.parallelize([1, 2, 3, 4, 5,6])
y_flatMap = x_flatMap_rdd.flatMap(lambda x: (x, x**2, 100*x))
print('Values x_flatMap_rdd: {0}'.format(x_flatMap_rdd.collect()))
print('Values y_flatMap: {0}'.format(y_flatMap.collect()))

Values x_flatMap_rdd: [1, 2, 3, 4, 5, 6]
Values y_flatMap: [1, 1, 100, 2, 4, 200, 3, 9, 300, 4, 16, 400, 5, 25, 500, 6, 36, 600]


In [7]:
x_flatMap_rdd.getNumPartitions()

2

### mapPartitions() - applies a function to each partition - narrow transformation

In [8]:
x = sc.parallelize([1, 2, 3, 4, 5,6], 3)
def f(iterator): yield sum(iterator)
y=x.mapPartitions(f)
# glom() flatterns elements on the same partition
print(x.glom().collect())
print(y.glom().collect())
print(x.collect())
print(y.collect())

[[1, 2], [3, 4], [5, 6]]
[[3], [7], [11]]
[1, 2, 3, 4, 5, 6]
[3, 7, 11]


In [9]:
x.getNumPartitions()

3

### mapPartitionsWithIndex() - narrow transformation

In [10]:
x = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
def f(partitionIndex, iterator): yield (partitionIndex, sum(iterator))
y = x.mapPartitionsWithIndex(f)
print(x.glom().collect())
print(y.glom().collect())
print(x.collect())
print(y.collect())

[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
[[(0, 6)], [(1, 15)], [(2, 24)]]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[(0, 6), (1, 15), (2, 24)]


In [11]:
x.getNumPartitions()

3

### sample() - narrow transformation

In [12]:
x = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 4)
y = x.sample(False,0.3)
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[1, 2, 4, 6, 8, 9]


In [13]:
x.getNumPartitions()

4

### union() - narrow transformation

In [14]:
rdd_1 = sc.parallelize([1, 1, 3, 4, 5, 6, 4, 8, 9], 4)
rdd_2 = sc.parallelize([1, 2, 3, 5, 5, 6, 5, 8, 9], 3)

rdd_1.union(rdd_2).collect()

[1, 1, 3, 4, 5, 6, 4, 8, 9, 1, 2, 3, 5, 5, 6, 5, 8, 9]

### intersection - narrow transformation

In [15]:
rdd_1 = sc.parallelize([1, 1, 3, 4, 5, 6, 4, 8, 9], 4)
rdd_2 = sc.parallelize([1, 2, 3, 5, 5, 6, 5, 8, 9], 3)

rdd_1.intersection(rdd_2).collect()

[1, 8, 9, 3, 5, 6]

### distinct() - return unique elements - wide transformation

In [16]:
x = sc.parallelize(['A','A','B','C','D','E','D'])
y = x.distinct()
print('Values x: {0}'.format(x.collect()))
print('Distinct Values y: {0}'.format(y.collect()))

Values x: ['A', 'A', 'B', 'C', 'D', 'E', 'D']
Distinct Values y: ['C', 'A', 'B', 'D', 'E']


### groupByKey - group elements by their key - wide transformation

In [17]:
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
[('B', [5, 4]), ('A', [3, 2, 1])]


### reduceByKey() - aggregates elements by their key before reducing them - wide transformation

In [18]:
rdd_reduce = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)], 2)
result = rdd_reduce.reduceByKey(lambda x, y: x + y).collect()
print(result)

[('b', 6), ('c', 5), ('a', 4)]


##### Thank you: https://www.youtube.com/watch?v=ou0MYgLnftg&t=135s

# Actions

### reduce(), count(), collect(), take(), foreach()

# Shared Variables

In [19]:
count = sc.accumulator(0)
rdd = sc.parallelize([1,2,3,4,5])

def add_count(x):
    count.add(x)

rdd.foreach(add_count)
print(count.value)

15
