In [0]:
# imports
from pyspark.sql import SparkSession
# Create Spark Session
spark = SparkSession.builder \
                    .appName("RDD Operations") \
                    .master("local") \
                    .getOrCreate()

# Create Spark Context
sc = spark.sparkContext
print(type(spark))
print(type(sc))

# Introduction to RDD's

# RDD Creation

## Create RDD from Collection

## Create RDD from external source

# RDD Operations

## Transformations

## Actions

# Transformations

In [0]:
sample_data = list(range(1,11)) + [1, 2, 2, 3, 4, 5, 5, 5, 6]
print(f"{type(sample_data)} \n{sample_data}")
rdd = sc.parallelize(sample_data)
print(type(rdd))

## The .map(fun)

> The map(func) returns  a new RDD by applying a function to each of the elements in the original RDD.

In [0]:
result = rdd.map(lambda x:x*2)
for record in result.collect():
   print(record)

## The .filter(predicateFun)

filter() returns a new RDD containing only the elements in the parent RDD that satisfy the function inside the filter.



In [0]:
result = rdd.filter(lambda x: x % 3 == 0 and x % 5 == 0)
for record in result.collect():
   print(record)


## The .distinct()

Return a new RDD containing the distinct elements in this RDD.


In [0]:
result = rdd.distinct()
for record in result.collect():
   print(record)


## The .flatMap(func)

The .flatMap() transformation peforms same as the .map() transformation except the fact that .flatMap() transformation return seperate values for each element from original RDD.



In [0]:
flatmap_rdd = spark.sparkContext.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())

## The .union()

> The union() transformation returns a new RDD which contains all elements from two RDDs:

In [0]:
rdd1 = sc.parallelize([2, 2, 3, 5, 6, 6, 7, 8, 9])

rdd2 = sc.parallelize([2, 2, 3, 5, 6, 6, 7, 8, 9])

new_RDD = rdd1.union(rdd2)

## The .intersection()

The intersection() transformation returns a new RDD which contains an intersection of the elements in both RDDs:

In [0]:
new_RDD = rdd1.intersection(rdd2)
new_RDD.take(10)
[2, 3, 5, 6]

## The .subtract()
The subtract() transformation returns a new RDD which has elements present in the first RDD but not in second RDD:

In [0]:
new_RDD = rdd1.subtract(rdd2)
new_RDD.take(10)
[1, 4]

## The sample()

The sample() transformation returns a new RDD containing n-ratio sampled elements subset of existing RDD:

In [0]:
new_RDD = rdd1.sample(False,0.5)
new_RDD.collect()
[2, 3, 5, 5]

## Pair RDD Transformations

Pair RDD tranformations are just another way of referring to an RDD which containins key/value pairs like tuples of data. Pair RDD transformations are applied on each key/element in parallel, where normal transfomations on RDD (like map()) are applied to the all elements of the collection. Because being like dictionaries with key-value pairs, Pair RDDs are widely used.

### The .reduceByKey

The .reduceByKey() transformation performs multiple parallel processes for each key in the data and combines the values for the same keys.



In [0]:
data = [('Technician', 25), ('Lawer', 26), ('Writer', 22), ('Executive', 29), ('Scientist', 22), ('Technician', 23), ('Lawer', 19), ('Writer', 28), ('Executive', 26), ('Scientist', 22)]
marks_rdd = sc.parallelize(data)
print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

### The sortByKey

The .sortByKey() transformation sorts the input data by keys from key-value pairs either in ascending or descending order. It returns a unique RDD as a result.

In [0]:
data = [('Technician', 25), ('Lawer', 26), ('Writer', 22), ('Executive', 29), ('Scientist', 22), ('Technician', 23), ('Lawer', 19), ('Writer', 28), ('Executive', 26), ('Scientist', 22)]
rdd = sc.parallelize(data)
print(rdd.sortByKey("ascending").collect())

### THe groupByKey

The .groupByKey() transformation groups all the values in the given data with the same key together. It returns a new RDD as a result.

In [0]:
data = [('Technician', 25), ('Lawer', 26), ('Writer', 22), ('Executive', 29), ('Scientist', 22), ('Technician', 23), ('Lawer', 19), ('Writer', 28), ('Executive', 26), ('Scientist', 22)]
rdd = sc.parallelize(data)
result = rdd.groupByKey()
for key,value in result.collect():
   print(key,list(value))

### The countByKey()

The .countByKey() option is used to count the number of values for each key in the given data. This action returns a dictionary.
Since we are getting a dictionary as a result, we can also use the dictionary methods such as .keys(), .values() and .items().



In [0]:
data = [('Technician', 25), ('Lawer', 26), ('Writer', 22), ('Executive', 29), ('Scientist', 22), ('Technician', 23), ('Lawer', 19), ('Writer', 28), ('Executive', 26), ('Scientist', 22)]
rdd = sc.parallelize(data)
result = rdd.countByKey()
for key,value in result.items():
   print(key,value)



### The .join

# Operations

## The .count()

> The .count() action on an RDD is an operation that returns the number of elements of our RDD.

In [0]:
total_count = rdd.count()
print(f"Total count :{total_count}")

## The first()

> The .first() action on an RDD returns the first element from our RDD.

In [0]:
first_element = rdd.first()
print(first_element)

## The .take(n)

> The .take(n) action on an RDD returns n number of elements from the RDD. The ‘n’ argument takes an integer which refers to the number of elements we want to extract from the RDD.

In [0]:
for element in rdd.take(5):
  print(element)

## The .collect()

The .collect() action on an RDD returns a list of all the elements of the RDD.

In [0]:
for record in rdd.collect():
   print(record)

## The .saveAsTextFile

The .saveAsTextFile() Action is used to serve the resultant RDD as a text file.

## The .top()
The top() action returns the top n elements of an RDD:

In [0]:
rdd.top(3)

## The .reduce()
The reduce() action returns one element from two elements from RDD by applying lambda function:

## The .sum()
The sum() action returns the sum of the elements of an RDD: