## Resilient Distributed Datasets (RDDs) - Lab

Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. An RDD could come from any datasource, e.g. text files, a database, a JSON file etc.


## Objectives

You will be able to:

* Describe RDDs as fundamental storage units in Spark computing environment
* Create RDDs from Python collections
* Set number of partitions for parallelizing RDDs
* Review an RDD's dependancy graph at different stages of processing. 
* Apply the map(func) transformation to a given function on all elements of an RDD in different partitions
* Use collect() action to trigger the processing stage of spark's lazy evaluation
* Use count() action to calculate the number of elements of a parallelized RDD
* Use filter(func) to filter unwanted data from RDDs
* Develop an understanding of Python's lambda functions for RDDs processing


## What are RDDs? 

To get a better understanding of RDDs, let's break down each one of the components of the acronym RDD:

Resilient: RDDs are considered "resilient" because they have built-in fault tolerance. This means that even if one of the nodes goes offline, RDDs will be able to restore the data. This is already a huge advantage compared to standard storage. If a standard computer dies will performing an operation, all of its memory will be lost in the process. With RDDs, multiple nodes can go offline, and the action will still be held in working memory.

Distributed: The data is contained on multiple nodes of a cluster-computing operation. It is efficiently partitioned to allow for parallelism.

Dataset: The dataset has been * partitioned * across the multiple nodes. 

RDDs are the building block upon which more high level spark operations are based upon. Chances are, if you are performing an action using Spark, the operation involves RDDs. 



Key Characteristics of RDDs:

- Immutable: Once an RDD is created, it cannot be modified.
- Lazily Evaluated: RDDs will not be evaluated until an action is triggered. Essentially, when RDDs are created, they are programmed to perform some action, but that function will not get activated until it is explicitly called. The reason for lazy evaluation is that allows users to organize the actions of their Spark program into smaller actions. It also saves unnecessary computation and memory load.
- In-Memory: The operations in Spark are performed in-memory rather than in the Database. This is what allows Spark to perform fast operations with very large quantities of data.




### RDD Transformations vs Actions

In Spark, we first create a __base RDD__ and then apply one or more transformations to that base RDD following our processing needs. Being immutable means, **once an RDD is created, it cannot be changed**. As a result, **each transformation of an RDD creates a new RDD**. Finally, we can apply one or more **actions** to the RDDs. Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.


<img src="rdd1.png" width=500>

Let's see this behavior through a simple example. In this example, we will perform several actions and transformations on RDDs in order to obtain a better understanding of Spark processing. 

### Create a Python collection 

We need some data to start experimenting with RDDs. Let's create some sample data and see how RDDs handle it. To practice working with RDDs, we're going to use a simple Python list.

- Create a Python list `data` of integers between 1 and 1000. 
- Use the `range()` function to create this list of a length = 1000 (1 - 1000 inclusive). 
- Sanity check : confirm the length of the list 

In [14]:
nums = list(range(1,1001))
len(nums)

1000

### Initialize an RDD

To initialize an RDD, first create import `pyspark` and then create a SparkContext.

In [7]:
import pyspark
sc = pyspark.SparkContext('local[*]')

Once you've created the SparkContext, you can use the `parallelize` method to create an rdd. Here, create one with 10 partitions.

In [15]:
rdd = sc.parallelize(nums,numSlices=10)
print(type(rdd))

<class 'pyspark.rdd.RDD'>


Determine how many partitions are being used

In [16]:
rdd.getNumPartitions()

10

### Basic descriptive RDD actions

Let's perform some basic operations on our RDD. In the cell below, use the `count`, `first`, `top`, `collect`

In [17]:
rdd.count()

1000

In [18]:
rdd.first()

1

In [19]:
rdd.top(10)

[1000, 999, 998, 997, 996, 995, 994, 993, 992, 991]

In [20]:
## Note: When you are dealing with big data, this could make your computer crash! It's best to avoid using the collect() method
rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

## Map functions

Now let's perform some operations on our simple dataset. To begin with, try a basic operation where each number is multiplied by 2. To make this happen, create a function called `multiply_2` and use the map method to apply it to the RDD.

In [21]:
def multiply_2(num):
    return num * 2

doubled = rdd.map(multiply_2)

In [22]:
doubled.top(10)

[2000, 1998, 1996, 1994, 1992, 1990, 1988, 1986, 1984, 1982]

### Lambda Functions

Note that you can also use lambda functions if you want to quickly perform simple operations on data. Let's create a lambda function that subtracts every value by 5.

In [23]:
doubled_minus_5 = doubled.map(lambda x : x -5)

In [24]:
doubled_minus_5.top(10)

[1995, 1993, 1991, 1989, 1987, 1985, 1983, 1981, 1979, 1977]

## Chaining Methods

Much like pandas, you are able to chain methods with Spark. We can perform all of these operations in one line:

In [25]:
rdd.map(multiply_2).map(lambda x : x - 5).top(10)

[1995, 1993, 1991, 1989, 1987, 1985, 1983, 1981, 1979, 1977]

## RDD hierarchy

We are able to see the full journey of RDDs by looking at their hierarchy.

In [None]:
doubled_minus_5.histogram

### Map vs. Flatmap

Depending on how you want your data to be outputted, you might want to use flatMap, let's take a look at how it performs operations versus the standard map.

In [30]:
mapped = rdd.map(lambda x: (x, x*2 - 5))
print(mapped.count())
print(mapped.take(10))

1000
[(1, -3), (2, -1), (3, 1), (4, 3), (5, 5), (6, 7), (7, 9), (8, 11), (9, 13), (10, 15)]


In [32]:
flat_mapped = rdd.flatMap(lambda x : (x, x*2 -5 ))
print(flat_mapped.count())
print(flat_mapped.take(10))

2000
[1, -3, 2, -1, 3, 1, 4, 3, 5, 5]


## Reduce

Now it's time to figure out the total revenue from this week's worth of sales. Let's do this with a reduce function.



In [26]:
doubled_minus_5.reduce(lambda x,y :x + y)

996000

In [1]:
import numpy as np

In [3]:
user_ids = np.random.randint(1,200,400)
sales_amount = np.random.random(400) * 100

In [6]:
sales_data = list(zip(user_ids,sales_amount))

### ReducebyKey



In [11]:
rdd_sales = sc.parallelize(sales_data)

total_sales = rdd_sales.reduceByKey(lambda x,y : x +y)

### Sort RDDs

In [13]:
total_sales.sortBy(lambda x: x[1],ascending = False).collect()

[(165, 458.98975237990214),
 (133, 439.08856305957636),
 (69, 428.3338944109911),
 (82, 336.3196107071562),
 (62, 289.34679383793156),
 (148, 284.0000366029162),
 (192, 280.1221011473372),
 (151, 266.7766912861809),
 (177, 266.2609065887784),
 (120, 258.16313828831557),
 (94, 257.70857710806143),
 (33, 251.5445577357754),
 (89, 251.23007854980386),
 (186, 249.22583625623912),
 (57, 248.1605174352525),
 (160, 241.51726688448596),
 (124, 240.40451041132525),
 (121, 233.1886142209236),
 (145, 227.00577425491394),
 (194, 225.28740526902874),
 (38, 223.21825418607338),
 (114, 215.36889589428853),
 (58, 215.03891434551574),
 (79, 212.9645925999117),
 (157, 203.19083113109838),
 (34, 196.37483458219697),
 (140, 190.72394480900638),
 (174, 189.31751506575054),
 (26, 189.13605555066135),
 (9, 187.73252573067214),
 (37, 184.64103026462075),
 (45, 181.98621379595886),
 (95, 179.22901432084262),
 (166, 175.01994845679258),
 (51, 174.79545457918886),
 (3, 174.78103514273494),
 (147, 173.11032038642

### Transformations

Transformations create a new data set from an existing one by passing each dataset element through a function and returns a new RDD representing the results. In short, creating an RDD from an existing RDD is ‘transformation’.
All transformations in Spark are lazy. They do not compute their results right away. Instead, they just remember the transformations applied to some base data set (e.g. a file). The transformations are only computed when an action requires a result that needs to be returned to the driver program.
A transformationon a RDD that returns another RDD, like map, flatMap, filter, reduceByKey, join, cogroup, etc.

### Actions
Actions return final results of RDD computations. Actions trigger execution using lineage graph to load the data into original RDD and carry out all intermediate transformations and return the final results to the Driver program or writes it ou
t to the file system. An action returns a value (to a Spark driver - the user program).

Here are some of key transformations and actions that we will explore later.
<img src="rdd2.png" width=400>

### Additional Reading

- [The original paper on RDDs](https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf)
- [RDDs in Apache Spark](https://data-flair.training/blogs/create-rdds-in-apache-spark/)
- [Programming with RDDs](https://runawayhorse001.github.io/LearningApacheSpark/rdd.html)

## Summary

In this lesson we went through a brief introduction to RDD creation from a Python collection, setting number of logical partitions for an RDD and extracting lineage and of an RDD in a spark application. We also looked at checking an RDD's id used by spark and setting names for RDDs as an indication of content. Following labs will build upon this knowledge and we shall see how transformations and actions can be applied to RDDs in a distributed setup. 
