# What is RDD?

RDD stands for “Resilient Distributed Dataset”. It is the fundamental data structure of Apache Spark. RDD in Apache Spark is an immutable collection of objects which computes on the different node of the cluster.
Decomposing the name RDD:

    1.Resilient i.e. fault-tolerant with the help of RDD lineage graph(DAG) and so able to recompute missing or damaged partitions due to node failures.

    2.Distributed, since Data resides on multiple nodes.

    3.Dataset represents records of the data you work with. The user can load the data set externally which can be either JSON file, CSV file, text file or database via JDBC with no specific data structure.

Hence, each and every dataset in RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster. RDDs are fault tolerant i.e. It posses self-recovery in the case of failure.

There are three ways to create RDDs in Spark such as – Data in stable storage, other RDDs, and parallelizing already existing collection in driver program. One can also operate Spark RDDs in parallel with a low-level API that offers transformations and actions. We will study these Spark RDD Operations later in this section.

Spark RDD can also be cached and manually partitioned. Caching is beneficial when we use RDD several times. And manual partitioning is important to correctly balance partitions. Generally, smaller partitions allow distributing RDD data more equally, among more executors. Hence, fewer partitions make the work easy.

Programmers can also call a persist method to indicate which RDDs they want to reuse in future operations. Spark keeps persistent RDDs in memory by default, but it can spill them to disk if there is not enough RAM. Users can also request other persistence strategies, such as storing the RDD only on disk or replicating it across machines, through flags to persist.

# Contents:
    a.Creating RDD
    b.Basic Operations:
        1. .map(...)The method is applied to each element of the RDD and transformation is done 
        2. .filter(...)The method allows you to select elements of your dataset that fit specified criteria
        3. .flatMap(...)The method works similarly to .map(...) but returns a flattened results instead of a list. 
        4. .distinct(...)The method returns a list of distinct values in a specified column.
        5. .sample(...)The method returns a randomized sample from the dataset.
        6. .take(...)  
        7. .collect(...)  
        8. .reduce(...)  
        9. .count(...)  
        10. .saveAsTextFile(...)  
        11. .foreach(...) 

# Importing Libraries

In [69]:
import pyspark
from pyspark import SparkContext
import numpy as np
import pandas as pd

In [70]:
sc=SparkContext("local[*]")

# A. Creating RDD

In [71]:
lst=np.random.randint(0,10,20)
print(lst)

[6 4 5 7 4 8 1 7 7 3 5 4 8 8 1 1 1 7 5 4]


### What did we just do? We created a RDD? What is a RDD?
![](https://i.stack.imgur.com/cwrMN.png)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a **fault-tolerant collection of elements that can be operated on in parallel**. SparkContext manages the distributed data over the worker nodes through the cluster manager. 

There are two ways to create RDDs: 
* parallelizing an existing collection in your driver program, or 
* referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

We created a RDD using the former approach

# `A` is a pyspark RDD object, we cannot access the elements directly

In [72]:
A=sc.parallelize(lst)

In [73]:
type(A)

pyspark.rdd.RDD

In [74]:
A

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

### Opposite to parallelization - `collect` brings all the distributed elements and returns them to the head node. <br><br>Note - this is a slow process, do not use it often. 

In [75]:
A.collect()

[6, 4, 5, 7, 4, 8, 1, 7, 7, 3, 5, 4, 8, 8, 1, 1, 1, 7, 5, 4]

### How were the partitions created? Use `glom` method

In [76]:
A.glom().collect()

[[6],
 [4, 5],
 [7, 4],
 [8],
 [1, 7],
 [7, 3],
 [5],
 [4, 8],
 [8, 1],
 [1],
 [1, 7],
 [5, 4]]

# B. Transformations

### 1. `map` function

In [77]:
B=A.map(lambda x:x*x)

In [78]:
B.collect()

[36, 16, 25, 49, 16, 64, 1, 49, 49, 9, 25, 16, 64, 64, 1, 1, 1, 49, 25, 16]

`map` operation with regular Python function

In [79]:
def square(x):
    return x*x*x

In [80]:
C=A.map(square)

In [81]:
C.collect()

[216,
 64,
 125,
 343,
 64,
 512,
 1,
 343,
 343,
 27,
 125,
 64,
 512,
 512,
 1,
 1,
 1,
 343,
 125,
 64]

### 2. `filter` function

In [82]:
A.filter(lambda x:x%4==0).collect()

[4, 4, 8, 4, 8, 8, 4]

### 3. `flatmap` function

In [83]:
D=A.flatMap(lambda x:(x,x*x))

### `flatmap` method returns a new RDD by first applying a function to all elements of this RDD, and then flattening the results

In [84]:
D.collect()

[6,
 36,
 4,
 16,
 5,
 25,
 7,
 49,
 4,
 16,
 8,
 64,
 1,
 1,
 7,
 49,
 7,
 49,
 3,
 9,
 5,
 25,
 4,
 16,
 8,
 64,
 8,
 64,
 1,
 1,
 1,
 1,
 1,
 1,
 7,
 49,
 5,
 25,
 4,
 16]

### 4. `distinct` function

### The method `RDD.distinct()` Returns a new dataset that contains the distinct elements of the source dataset.

In [85]:
A.distinct().collect()

[1, 3, 4, 5, 6, 7, 8]

### 5. `sample` function

## Sampling an RDD
* RDDs are often very large.
* **Aggregates, such as averages, can be approximated efficiently by using a sample.** This comes handy often for operation with extremely large datasets where a sample can tell a lot about the pattern and descriptive statistics of the data.
* Sampling is done in parallel and requires limited computation.

The method `RDD.sample(withReplacement,p)` generates a sample of the elements of the RDD. where
- `withReplacement` is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.
- `p` is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample.

In [86]:
m=5
n=20
print('sample1=',A.sample(False,m/n).collect()) 
print('sample2=',A.sample(False,m/n).collect())

sample1= [7, 4, 4, 8, 1, 5]
sample2= [7, 3, 8]


In [68]:
sc.stop()