<a href="https://colab.research.google.com/github/zhengyu96/ML_for_Hackers/blob/master/4_4_Introduction_to_RDDs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 4.4 Introduction to RDDs

## 4.4.1 What are RDDs?

Resilient Distributed Datasets (RDD) are the fundamental data structures of Spark. It is the data structure which Spark executes operations on. This means that even the dataframes introduced in the previous section, gets optimized and then converted into RDDs.

Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel. (wikipedia)

You can consider RDDs to be a logical model which allows data to be stored across distributed storage. They have the following unique characeristics:

**Distributed Partition**

Each dataset in the RDD is divided into logical partitions. These individual partitions can be computed on different nodes of the cluster.

**Immutable**

Once you have created an RDD, you are unable to make further changes to it. Hence by this defintion, they are purely "read-only".

**Resilent**

However the RDD is also resilent, this means that you will be able to re-create the RDD at any point of time the RDD fails. This is achieved because the RDD stores operations as a series of transformations.

**Directed Acyclic Graphs (DAGs)**

These operations are stored in a data structure known as a directed acyclic graph. The DAG stores all the operations that the RDD is expected to take, but it does not execute these operations immediately (known as eager execution). In fact operations are done in a lazy way (only when the results are required to be known are they executed). 

**Lazy execution**

What lazy excution means is that the operations are not done until absolutely required (usually explicitly) by the user. The concept of lazy execution together with DAG allows you to recreate any RDD in the event of node failures.

**Two types of operations**

There are two types of operations in RDDs. Transformations and Actions. Transformations create a new RDD from an existing RDD, with the lineage of how the new RDD is created stored in the DAG. The original RDD is not changed because RDDs are immutable.

However all transformations are lazy and they are not executed until an action operation is encountered (action operations returns the final RDD result of the DAG computations). This enables the spark optimizer (known as catalyst) to carry out various optimizations.

## 4.4.2 RDD versus Dataframes


So after the introduction of RDD and Dataframes, when should we use either?

Consider the current situation where most of the time we are based at home, and at times we wish to go and grab some lunch from the local hawker centre. We can either have the instruction:

"Drive to xxx market and purchase a packet of chicken rice"

alternatively RDD instructions are more similar to:

"Walk downstairs to the car, open the car door, get in, etc etc"

The former is typically considered as "higher level", telling spark what to do and typically we normally use that form of language. The latter tends towards language considered "low level" and is towards telling spark how to carry out a certain action.

The usage of dataframes are typically preferred as you will gain access to the inherent optimizations. However there are some cases where such high level controls are unavailable for your dataset. Examples of such scenarios are typiclly when data is unstructured (media streams or text streams). Thus it is crucial to have the ability to utilize RDDs if more control is required.

### 4.4.3 How do we create an RDD?

There are three ways RDDs can be created. They are the following:
1.   Parallelizing an existing variable.
2.   Creating RDD from existing RDDs
3.   Referencing a dataset in an external storage system

Let's carry out these ways one at a time.

First, install pyspark and start a spark session.

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 60kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 35.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=88ff63b636a358dd7f7b6b94fae161e6be293551dda13c2d578faeca8fbc939c
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


##### 4.4.3.1 Method 1: Creating an RDD from list

**Example 1: Build a RDD from a list.**

The variable, rdd_list is a parallel collection RDD, and not a list of numbers. So if you are to try to show the contents of the variable rdd_list, you get the following:

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

We can obtain the number of partitions the RDD is split into, and im this scenario for Google Colab, the number of partitions is two.

2

However, if you want to know the exect contents of the RDD, you need to carry out an Action operation, and in this case, the command is collect(). The collect() command allows us to execute the DAG commands associated with the RDD and outputs the result.

[1, 2, 3, 4, 5]

Similar to the show function of the dataframe, you can use the take function to take the first n rows of the RDD.

[1, 2, 3]

[1, 2, 3, 4, 5]

**Example 2: Build a RDD from a tuple.**

ParallelCollectionRDD[7] at readRDDFromFile at PythonRDD.scala:262

2

[(1, 2), (3, 4), (3, 6), (4, 5)]

[(1, 2), (3, 4), (3, 6), (4, 5)]

**Example 3: Build a RDD from a file**

APPL.csv MapPartitionsRDD[29] at textFile at NativeMethodAccessorImpl.java:0

MapPartitionsRDD[44] at javaToPython at NativeMethodAccessorImpl.java:0

['Date, Close/Last, Volume, Open, High, Low',
 '02/28/2020, $273.36, 106721200, $257.26, $278.41, $256.37',
 '02/27/2020, $273.52, 80151380, $281.1, $286, $272.96',
 '02/26/2020, $292.65, 49678430, $286.53, $297.88, $286.5',
 '02/25/2020, $288.08, 57668360, $300.95, $302.53, $286.13']

[Row(_c0='Date', _c1=' Close/Last', _c2=' Volume', _c3=' Open', _c4=' High', _c5=' Low'),
 Row(_c0='02/28/2020', _c1=' $273.36', _c2=' 106721200', _c3=' $257.26', _c4=' $278.41', _c5=' $256.37'),
 Row(_c0='02/27/2020', _c1=' $273.52', _c2=' 80151380', _c3=' $281.1', _c4=' $286', _c5=' $272.96'),
 Row(_c0='02/26/2020', _c1=' $292.65', _c2=' 49678430', _c3=' $286.53', _c4=' $297.88', _c5=' $286.5'),
 Row(_c0='02/25/2020', _c1=' $288.08', _c2=' 57668360', _c3=' $300.95', _c4=' $302.53', _c5=' $286.13')]

pyspark.rdd.RDD

#### 4.4.4 Converting RDD to a dataframe

There are times where you may want to use RDD operations on tabular data, or after working on your RDD you may want to convert it back to a dataframe. We will show how that can be done here:

NameError: ignored

NameError: ignored

Unfortunately the header appears in the second row of the dataframe. Hence you have to remove the header first.

You notice that there are some dollar signs there, which you may want to remove them for easier processing downstream. It is slightly easier to remove them at the RDD phase, which we do in the following.

However note that the columns Close/Last, Open, High, Low are of type String and needs to be converted to type float. 

Let's try to find the average closing price of apple per month.

### 4.4.5 Pair-ed RDDs

One of the main advantages of RDDs is the ease of working with data that is unstructured. Let us illustrate this with a simple example. Recall we created an RDD tuple earlier

[(1, 2), (3, 4), (3, 6), (4, 5)]

##### 4.4.5.1 Transformation Operations

This is typically known as a paired RDD, and in this case the first item is known as the "key" and the second known as the "value". This is much like a dictionary, but in this case the keys need not be unique.

**keys()**

We can find what the keys are:



[1, 3, 3, 4]

**reduceByKey()**

Let's use a trival example, where we wish to count the number of times individual keys occur. The first item is the key, and assume that the second item is the current count of the number of times the key has already occured.

The command then to count the times the key occurs would be to use a function known as:

Key 4 has 5 Counts
Key 1 has 2 Counts
Key 3 has 10 Counts


**sortBy**

If you wish to sort the rdd by a certain count order, you can use the sortby together with a lambda function.

[(3, 10), (4, 5), (1, 2)]

However you will be able to appreciate the flexibility of RDDs when working with unstructured data such as text. For instance, we have the following sentence:

[('the', 2),
 ('quick', 1),
 ('brown', 1),
 ('fox', 1),
 ('jumps', 1),
 ('over', 1),
 ('lazy', 1),
 ('dog', 1)]

#### 4.4.5.2 Action Operations

**countByKey()**

We can count how many unique key-value pairs there are in the RDD:

defaultdict(int, {1: 1, 3: 2, 4: 1})

**lookup()**

If you would like to find out the values of all key-value pairs of a particular key, you can lookup with the following command:

[4, 6]

b'(2) PythonRDD[360] at collect at <ipython-input-160-3b7d3e4fa4b3>:4 []\n |  MapPartitionsRDD[359] at mapPartitions at PythonRDD.scala:133 []\n |  ShuffledRDD[358] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(2) PairwiseRDD[357] at reduceByKey at <ipython-input-160-3b7d3e4fa4b3>:1 []\n    |  PythonRDD[356] at reduceByKey at <ipython-input-160-3b7d3e4fa4b3>:1 []\n    |  ParallelCollectionRDD[7] at readRDDFromFile at PythonRDD.scala:262 []'