# RDD Exercise

In [2]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("TestingRDD").getOrCreate())

### Define unstructured list.

In [3]:
words_list = "Spark makes life a lot easier and put me into good Spirits, Spark is too Awesome!".split(" ")

In [4]:
print(words_list)

['Spark', 'makes', 'life', 'a', 'lot', 'easier', 'and', 'put', 'me', 'into', 'good', 'Spirits,', 'Spark', 'is', 'too', 'Awesome!']


In [5]:
type(words_list)

list

### Creating RDD from list

In [6]:
words_rdd = spark.sparkContext.parallelize(words_list)

**collect()** action is used to retrieve all the data from a distributed RDD or DataFrame and return it as a local collection in the driver program. The purpose of collect() is to bring the distributed data into the driver program's memory so that you can work with it locally.

When you apply transformations on an RDD or DataFrame, the data is processed in a distributed manner across multiple worker nodes in the Spark cluster. The intermediate and final results are stored in memory across these nodes. However, there might be scenarios where you need to access or analyze the complete dataset locally within the driver program. That's when the **collect()** action comes into play.

In [7]:
words_data = words_rdd.collect()

### Printing for each data in RDD words_data.

In [8]:
for word in words_data:
    print(word)

Spark
makes
life
a
lot
easier
and
put
me
into
good
Spirits,
Spark
is
too
Awesome!


In [9]:
words_rdd.count()

16

## Transformation in PySpark.

### Apply distinct function into RDD.

In [10]:
words_rdd.distinct().count()

15

In [11]:
words_data = words_rdd.collect()
for word in words_data:
    print(word)

Spark
makes
life
a
lot
easier
and
put
me
into
good
Spirits,
Spark
is
too
Awesome!


When iterating again the words_data the word 'Spark' not distincted even after apply the function, because data in RDD is immutable. and function distinct() is not creating a new RDD which hold distinctive value, unless we defined a new RDD.

In [12]:
words_unique_rdd = words_rdd.distinct()
words_unique_data = words_unique_rdd.collect()
for word in words_unique_data:
    print(word)

put
good
makes
life
a
lot
and
Awesome!
Spark
into
Spirits,
is
easier
me
too


now duplicated record is removed after re defined RDD to hold distinctive value of the word list.

### Apply filter function.

In [13]:
def wordsStartsWith(word, letter):
    return word.startswith(letter)

In [14]:
words_rdd.filter(lambda word: wordsStartsWith(word,'S')).collect()

['Spark', 'Spirits,', 'Spark']

### Apply map function (transfrom).

In [15]:
numbers = [*range(1,20)]
print(numbers)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


In [16]:
rdd_numbers = spark.sparkContext.parallelize(numbers)

In [17]:
squared_rdd_numbers = rdd_numbers.map(lambda x : (x, x**2))

In [18]:
for num in squared_rdd_numbers.collect():
    print(num)

(1, 1)
(2, 4)
(3, 9)
(4, 16)
(5, 25)
(6, 36)
(7, 49)
(8, 64)
(9, 81)
(10, 100)
(11, 121)
(12, 144)
(13, 169)
(14, 196)
(15, 225)
(16, 256)
(17, 289)
(18, 324)
(19, 361)


In [19]:
words_trd_rdd = words_rdd.map(lambda word:(word,word[0],wordsStartsWith(word,'S')))

In [None]:
for word in words_trd_rdd.collect():
    print(word)

### Sorted by List

In [22]:
countries_list = [("India",91),("USA",4),("Greece",13)]
countries_rdd = spark.sparkContext.parallelize(countries_list)

By Default sorted by Country ascending

In [31]:
sorted_countries_list = countries_rdd.sortByKey().collect()

In [24]:
for country in sorted_countries_list:
    print(country)

('Greece', 13)
('India', 91)
('USA', 4)


In [28]:
sorted_countries_list = countries_rdd.map(lambda c: (c[1],c[0])).sortByKey(False).collect()

After swapping value into new tuples, it now sorted by rank instead in Descending order (set Sorted key False)

In [29]:
for country in sorted_countries_list:
    print(country)

(91, 'India')
(13, 'Greece')
(4, 'USA')


## Action in PySpark.

### Reduce Function

The reduce function in PySpark RDD is used to aggregate the elements of an RDD using a binary operator. It takes a binary operator function as an argument, which combines two elements at a time to produce a single result.

The reduce function in PySpark RDD does not require the use of collect.
Unlike transformations like map or filter that return a new RDD, the reduce operation directly returns the final result. Therefore, there is no need to use collect after reduce.

In [30]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
sum_of_numbers = numbers_rdd.reduce(lambda x, y: x + y)
print (sum_of_numbers)

15


**Explain behind the reduce lambda function

In [32]:
def sumList(x,y):
    print(x,y)
    return x + y

In [34]:
result = numbers_rdd.reduce(lambda x, y: sumList(x,y))
print(result)

1 2
3 3
6 9
15


In [40]:
def wordLengthReducer(leftWord, rightWord):
    word=""
    if len(leftWord) > len(rightWord):
        word=leftWord
    else:
        word=rightWord
    #print(word)
    return word

In [41]:
words_rdd.reduce(wordLengthReducer)

'Awesome!'

## Exercise

### Finding Max/Min value in RDD

In [42]:
numbers_rdd = spark.sparkContext.parallelize([7, 2, 9, 1, 5, 3])
max_value = numbers_rdd.reduce(lambda x, y: x if x > y else y)
print("Maximum value:", max_value)

Maximum value: 9


Let's illustrate each iteration of the reduce operation for finding the maximum value in the RDD [7, 2, 9, 1, 5, 3]:

Iteration 1:

x = 7 (initial accumulated maximum)
y = 2 (next element in the RDD)
Since x (7) is greater than y (2), the lambda function returns x.
Accumulated maximum after the first iteration: 7
Iteration 2:

x = 7 (accumulated maximum from the previous iteration)
y = 9 (next element in the RDD)
Since y (9) is greater than x (7), the lambda function returns y.
Accumulated maximum after the second iteration: 9
Iteration 3:

x = 9 (accumulated maximum from the previous iteration)
y = 1 (next element in the RDD)
Since x (9) is greater than y (1), the lambda function returns x.
Accumulated maximum after the third iteration: 9
Iteration 4:

x = 9 (accumulated maximum from the previous iteration)
y = 5 (next element in the RDD)
Since x (9) is greater than y (5), the lambda function returns x.
Accumulated maximum after the fourth iteration: 9
Iteration 5:

x = 9 (accumulated maximum from the previous iteration)
y = 3 (next element in the RDD)
Since x (9) is greater than y (3), the lambda function returns x.
Accumulated maximum after the fifth iteration: 9
After iterating through all elements in the RDD, the final result is 9, which is the maximum value in the RDD [7, 2, 9, 1, 5, 3].

### Daily Temperature

In [63]:
ferenheit_temp = spark.sparkContext.parallelize([59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4])
celcius_temp = ferenheit_temp.map(lambda x: (x-32)*(5/9))

In [64]:
for temp in celcius_temp.collect():
    print(temp)

15.0
14.000000000000002
12.000000000000002
13.0
10.999999999999998
12.000000000000002
13.0


In [66]:
filterTempRDD = celcius_temp.filter(lambda x: x >= 13)
for temp in filterTempRDD.collect():
    print(temp)

15.0
14.000000000000002
13.0
13.0
