# NOTEBOOK 3.1 PySpark RDDs

## 0. Create a SparkSession object

We control our Spark Application through a driver process called the **SparkSession**. The **SparkSession** instance is the way Spark executes user-defined manipulation across the cluster. There is a one-to-one correspondence between a SparkSession and a Spark Application.

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("PySpark RDDs") \
  .getOrCreate()

In [9]:
sc = spark.sparkContext

## 1. Creating RDDs
4 ways to create an RDD:
*   Parallelize a collection
*   From another RDD
*   From a Spark DataFrame
*   From an external dataset

### 1.1 Parallelize a collection

You can use the existing collection from your programs (e.g., List, Array, Set, etc) and ask Spark to distribute that collection across the cluster to process it in parallel.

In [10]:
num_list = [5, 2, 3, 5, 6, 9, 5, 0, 8, 3, 8, 6, 5, 2, 2, 3, 5, 4, 3, 1]
numbers_rdd = sc.parallelize(num_list)
print(numbers_rdd.collect())

[5, 2, 3, 5, 6, 9, 5, 0, 8, 3, 8, 6, 5, 2, 2, 3, 5, 4, 3, 1]


### 1.2 From another RDD

In [11]:
distinct_numbers_rdd = numbers_rdd.distinct()
print(distinct_numbers_rdd.collect())

[0, 1, 2, 3, 4, 5, 6, 8, 9]


### 1.3 From a Spark DataFrame

In [12]:
numbers_df = spark.range(1,5)
numbers_df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+



In [13]:
numbers_rdd = numbers_df.rdd
numbers_rdd.collect()

[Row(id=1), Row(id=2), Row(id=3), Row(id=4)]

### 1.4 From an external dataset

In [14]:
import pprint

text_rdd = sc.textFile("data/weather.txt")
pprint.pprint(text_rdd.collect())

['whether the weather be fine',
 'or whether the weather be not',
 'whether the weather be cold',
 'or whether the weather be hot',
 'well whether the weather',
 'whatever the weather',
 'whether we like it or not']


## 2. RDD Transformations

In [15]:
num_list = [5, 2, 3, 5, 6, 9, 5, 0, 8, 3, 8, 6, 5, 2, 2, 3, 5, 4, 3, 1]
numbers_rdd = sc.parallelize(num_list)
print(numbers_rdd.collect())

[5, 2, 3, 5, 6, 9, 5, 0, 8, 3, 8, 6, 5, 2, 2, 3, 5, 4, 3, 1]


### 2.1 filter()
Applies a function that filters out the elements that do not pass the condition criteria.

In [16]:
multiples_rdd = numbers_rdd.filter(lambda x: x % 3 == 0)
print(multiples_rdd.collect())

[3, 6, 9, 0, 3, 6, 3, 3]


In [17]:
squares_rdd = numbers_rdd.map(lambda x: x * x)
squares_rdd.collect()

[25, 4, 9, 25, 36, 81, 25, 0, 64, 9, 64, 36, 25, 4, 4, 9, 25, 16, 9, 1]

In [18]:
def square_if_odd(x):
  if (x % 2 == 1):
    return x * x
  else:
    return x

numbers_rdd.map(square_if_odd).collect()

[25, 2, 9, 25, 6, 81, 25, 0, 8, 9, 8, 6, 25, 2, 2, 9, 25, 4, 9, 1]

### 2.2 union()
Takes another RDD as an input and produces a new RDD containing elements from both the RDDs.

In [19]:
rdd1 = sc.parallelize([3, 1, 0, 8, 9, 5, 7])
rdd2 = sc.parallelize([1, 6, 0, 9, 7, 2, 5])
print(f"rdd1: {rdd1.collect()}")
print(f"rdd2: {rdd2.collect()}")
print(f"rdd1.union(rdd2): {rdd1.union(rdd2).collect()}")

rdd1: [3, 1, 0, 8, 9, 5, 7]
rdd2: [1, 6, 0, 9, 7, 2, 5]
rdd1.union(rdd2): [3, 1, 0, 8, 9, 5, 7, 1, 6, 0, 9, 7, 2, 5]


### 2.3 intersection()
Takes another RDD as an input and produces a new RDD containing elements that exist in both the RDDs.

In [20]:
print(f"rdd1: {rdd1.collect()}")
print(f"rdd2: {rdd2.collect()}")
print(f"rdd1.subtract(rdd2): {rdd1.subtract(rdd2).collect()}")

rdd1: [3, 1, 0, 8, 9, 5, 7]
rdd2: [1, 6, 0, 9, 7, 2, 5]




rdd1.subtract(rdd2): [3, 8]


                                                                                

### 2.4 subtract()
Takes another RDD as an input and produces a new RDD containing elements from the current RDD with the elements in the given RDDs excluded.

In [21]:
print(f"rdd1: {rdd1.collect()}")
print(f"rdd2: {rdd2.collect()}")
print(f"rdd1.subtract(rdd2): {rdd1.subtract(rdd2).collect()}")

rdd1: [3, 1, 0, 8, 9, 5, 7]
rdd2: [1, 6, 0, 9, 7, 2, 5]




rdd1.subtract(rdd2): [3, 8]


                                                                                

### 2.5 cartesian()
Joins elements of one RDD with all the elements of another RDD and results in the cartesian product of two.

In [22]:
firstRDD = sc.parallelize(range(3))
secordRDD = sc.parallelize(['A','B','C'])
firstRDD.cartesian(secordRDD).collect()

                                                                                

[(0, 'A'),
 (0, 'B'),
 (0, 'C'),
 (1, 'A'),
 (1, 'B'),
 (1, 'C'),
 (2, 'A'),
 (2, 'B'),
 (2, 'C')]

### 2.6 map()
Applies a given function to each element of an RDD and returns a new RDD with the same number of elements.

In [23]:
def tokenize(text):
  return text.split()

In [24]:
words_rdd = text_rdd.map(tokenize)
words_rdd.collect()

[['whether', 'the', 'weather', 'be', 'fine'],
 ['or', 'whether', 'the', 'weather', 'be', 'not'],
 ['whether', 'the', 'weather', 'be', 'cold'],
 ['or', 'whether', 'the', 'weather', 'be', 'hot'],
 ['well', 'whether', 'the', 'weather'],
 ['whatever', 'the', 'weather'],
 ['whether', 'we', 'like', 'it', 'or', 'not']]

### 2.7 flatMap()
Returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [25]:
words_flattened_rdd = text_rdd.flatMap(tokenize)
words_flattened_rdd.collect()

['whether',
 'the',
 'weather',
 'be',
 'fine',
 'or',
 'whether',
 'the',
 'weather',
 'be',
 'not',
 'whether',
 'the',
 'weather',
 'be',
 'cold',
 'or',
 'whether',
 'the',
 'weather',
 'be',
 'hot',
 'well',
 'whether',
 'the',
 'weather',
 'whatever',
 'the',
 'weather',
 'whether',
 'we',
 'like',
 'it',
 'or',
 'not']

## 3. Actions

### 3.1 collect()
Returns all the elements of an RDD to the driver program. You should only use **collect()** if you are sure about the size of your final output. If the size of the final output is huge, then your driver program might crash while receiving the data from the executors. The use of **collect()** is not advised in production.

In [26]:
sc.parallelize(range(10)).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### 3.2 count()
Counts the number of elements in the RDD.

In [27]:
sc.parallelize(range(10)).count()

10

### 3.3 take()
Returns N number of elements from an RDD.

In [28]:
sc.parallelize(range(10)).take(2)

[0, 1]

### 3.4 top()
Returns the top N elements from the RDD. The following code returns the top 2 elements from an RDD:

In [29]:
sc.parallelize(range(10)).top(2)

[9, 8]

### 3.5 takeOrdered()
Get N elements based on an ordering.

In [30]:
sc.parallelize(range(10)).takeOrdered(3, key = lambda x: -x)

[9, 8, 7]

### 3.6 first()
Returns the first element of the RDD.

In [31]:
sc.parallelize(range(10)).first()

0

### 3.7 countByValue()
Finds the occurrence of each element in the RDD.

In [32]:
sc.parallelize(["A","A","B","C"]).countByValue()

defaultdict(int, {'A': 2, 'B': 1, 'C': 1})

### 3.8 reduce()
Combines the RDD elements in parallel and gives aggregated results as output.

In [33]:
from operator import add
sc.parallelize(range(1,11)).reduce(add)

55

### 3.9 saveAsTextFile()
Save an RDD to an external data store.

In [34]:
# Save to HDFS
sc.parallelize(range(200)).saveAsTextFile('result.txt')

## 4. Pair RDDs

### 4.1 groupByKey()
- Groups elements having the same key.
- A wide transformation that shuffles data between executors based on the key.


In [35]:
pair_rdd = sc.parallelize([(1, 5),(1, 10),(2, 4),(3, 1),(2,6)])
result = pair_rdd.groupByKey().collect()
for pair in result:
    print(f"key: {pair[0]}, value: {list(pair[1])}")

key: 1, value: [5, 10]
key: 2, value: [4, 6]
key: 3, value: [1]


### 4.2 reduceByKey()
Allows aggregation of data by minimizing the data shuffle and performs operations on each key in parallel. A reduceByKey() transformation first performs the local aggregation within the executor and then shuffles the aggregated data between each node.

In [36]:
pair_rdd = sc.parallelize([(1, 5),(1, 10),(2, 4),(3, 1),(2, 6)])
pair_rdd.reduceByKey(lambda x,y : x+y).collect()

[(1, 15), (2, 10), (3, 1)]

In [37]:
orders = [('pen', 2), ('ruler', 3), ('eraser', 1),
          ('ruler', 2), ('pen', 5), ('notepad', 4), ('ruler', 3)]
orders_rdd = sc.parallelize(orders)
orders_rdd.collect()

[('pen', 2),
 ('ruler', 3),
 ('eraser', 1),
 ('ruler', 2),
 ('pen', 5),
 ('notepad', 4),
 ('ruler', 3)]

### 4.3 sortBy()
Accepts a function that can be used to sort the RDD elements.
Example 4.3(a): Sort an RDD in **ascending order** using the first element of the tuple:

In [38]:
orders = [('pen', 2), ('ruler', 3), ('eraser', 1),
          ('ruler', 2), ('pen', 5), ('notepad', 4), ('ruler', 3)]
orders_rdd.sortBy(lambda x: x[0]).collect()

[('eraser', 1),
 ('notepad', 4),
 ('pen', 2),
 ('pen', 5),
 ('ruler', 3),
 ('ruler', 2),
 ('ruler', 3)]

Example 4.3(b): Sort an RDD in **descending order** using the second element of the tuple:

In [39]:
orders = [('pen', 2), ('ruler', 3), ('eraser', 1),
          ('ruler', 2), ('pen', 5), ('notepad', 4), ('ruler', 3)]
orders_rdd.sortBy(lambda x: -x[1]).collect()

[('pen', 5),
 ('notepad', 4),
 ('ruler', 3),
 ('ruler', 3),
 ('pen', 2),
 ('ruler', 2),
 ('eraser', 1)]

### 4.4 join()
Joins two pair RDDs based on their keys.
Example: join data based on the country and returns only the matching records:

In [40]:
salesRDD = sc.parallelize([("US",20),("IND",30),("UK",10)])
revenueRDD = sc.parallelize([("US",200),("IND",300)])
salesRDD.join(revenueRDD).collect()

[('IND', (30, 300)), ('US', (20, 200))]

In [41]:
spark.stop()