###### What is RDD (Resilient Distributed Dataset)?

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

Additionally, RDDs provide data abstraction of partitioning and distribution of the data designed to run computations in parallel on several nodes, while doing transformations on RDD we don’t have to worry about the parallelism as PySpark by default provides.

This Apache PySpark RDD tutorial describes the basic operations available on RDDs, such as map(), filter(), and persist() and many more. In addition, this tutorial also explains Pair RDD functions that operate on RDDs of key-value pairs such as groupByKey() and join() etc.

Note: RDD’s can have a name and unique identifier (id)

###### Creating RDD

RDD’s are created primarily in two different ways,

   * parallelizing an existing collection and
   * referencing a dataset in an external storage system (```HDFS```, ```S3``` and many more). 

Before we look into examples, first let’s initialize SparkSession using the builder pattern method defined in SparkSession class. While initializing, we need to provide the master and application name as shown below. In realtime application, you will pass master from spark-submit instead of hardcoding on Spark application.




In [1]:
from pyspark.sql import SparkSession
spark:SparkSession = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()    

Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true


22/09/06 07:46:25 WARN Utils: Your hostname, Jkop resolves to a loopback address: 127.0.1.1; using 172.30.92.4 instead (on interface eth0)
22/09/06 07:46:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/06 07:46:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


```master()``` – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn (Yet Another Resource Negotiator) or mesos depends on your cluster setup.

  * Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.

```appName()``` – Used to set your application name.

```getOrCreate()``` – This returns a SparkSession object if already exists, and creates a new one if not exist.

Note: Creating SparkSession object, internally creates one SparkContext per JVM.

###### Create RDD using sparkContext.parallelize()

By using ```parallelize()``` function of SparkContext (sparkContext.parallelize() ) you can create an RDD. This function loads the existing collection from your driver program into parallelizing RDD. This is a basic method to create RDD and is used when you already have data in memory that is either loaded from a file or from a database. and it required all data to be present on the driver program prior to creating RDD.

In [2]:
#Create RDD from parallelize    
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(data)

For production applications, we mostly create RDD by using external storage systems like HDFS, S3, HBase e.t.c. To make it simple for this PySpark RDD tutorial we are using files from the local system or loading it from the python list to create RDD.

###### Create RDD using sparkContext.textFile()

Using textFile() method we can read a text (.txt) file into RDD.




In [3]:
#Create RDD from external Data source
#rdd2 = spark.sparkContext.textFile("/path/textFile.txt")

###### Create RDD using sparkContext.wholeTextFiles()

wholeTextFiles() function returns a PairRDD with the key being the file path and value being file content.


In [4]:
# #Reads entire file into a RDD as single record.
# rdd3 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")

# Besides using text files, we can also create RDD from CSV file, JSON, and more formats.

###### Create empty RDD using sparkContext.emptyRDD

Using ```emptyRDD()``` method on sparkContext we can create an RDD with no data. This method creates an empty RDD with no partition.

In [5]:
# Creates empty RDD with no partition    
rdd = spark.sparkContext.emptyRDD 
# rddString = spark.sparkContext.emptyRDD[String]

###### Creating empty RDD with partition

Sometimes we may need to write an empty RDD to files by partition, In this case, you should create an empty RDD with partition.

In [6]:
#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions

###### RDD Parallelize

When we use ```parallelize()``` or ```textFile()``` or ```wholeTextFiles()``` methods of SparkContxt to initiate RDD, it automatically splits the data into partitions based on resource availability. when you run it on a laptop it would create partitions as the same number of cores available on your system.

**getNumPartitions()** – This a RDD function which returns a number of partitions our dataset split into.

In [7]:
print("initial partition count:"+str(rdd.getNumPartitions()))
#Outputs: initial partition count:2

AttributeError: 'function' object has no attribute 'getNumPartitions'

**Set parallelize manually** – We can also set a number of partitions manually, all, we need is, to pass a number of partitions as the second parameter to these functions for example  ```sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 10). ```

###### Repartition and Coalesce

Sometimes we may need to repartition the RDD, PySpark provides two ways to repartition; first using ```repartition()``` method which shuffles data from all nodes also called full shuffle and second coalesce() method which shuffle data from minimum nodes, for examples if you have data in 4 partitions and doing ```coalesce(2)``` moves data from just 2 nodes.  

Both of the functions take the number of partitions to repartition rdd as shown below.  Note that repartition() method is a very expensive operation as it shuffles data from all nodes in a cluster. 

In [8]:
reparRdd = rdd.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))
#Outputs: "re-partition count:4
# Note: repartition() or coalesce() methods also returns a new RDD.

AttributeError: 'function' object has no attribute 'repartition'

###### PySpark RDD Operations

**RDD transformations** – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
**RDD actions** – operations that trigger computation and return RDD values.

###### RDD Transformations with example

Transformations on PySpark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are ```flatMap(),``` ```map(),``` ```reduceByKey(),``` ```filter(),``` ```sortByKey()``` and return new RDD instead of updating the current.

In this PySpark RDD Transformation section of the tutorial, I will explain transformations using the word count example. The below image demonstrates different RDD transformations we going to use.

In [9]:
# create an RDD by reading a text file.
rdd = spark.sparkContext.textFile("doc.txt")

**flatMap** – ```flatMap()``` transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.

In [10]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))

In [11]:
rdd2

PythonRDD[4] at RDD at PythonRDD.scala:53

**map** – ```map()``` transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.

In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.

In [12]:
rdd3 = rdd2.map(lambda x: (x,1))

**reduceByKey** – ```reduceByKey()``` merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count. 

In [15]:
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)

**sortByKey** – ```sortByKey()``` transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair

In [18]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
#Print rdd5 result to console
print(rdd5.collect())

[Stage 0:>                                                          (0 + 1) / 1]

[(9, 'Project'), (9, 'Gutenberg’s'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with')]


                                                                                

**filter** – ```filter()``` transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.

In [22]:
rdd6 = rdd5.filter(lambda x : 'a' in x[1])
print(rdd6.collect())

[(18, 'Wonderland'), (18, 'Carroll'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'and')]


**RDD Actions with example**

RDD Action operations return the values from an RDD to a driver program. In other words, any RDD function that returns non-RDD is considered as an action. 

In this section of the PySpark RDD tutorial, we will continue to use our word count example and performs some actions on it.

``count()`` – Returns the number of records in an RDD

In [23]:
# Action - count
print("Count : "+str(rdd6.count()))

Count : 6


```first()``` – Returns the first record.

In [24]:
# Action - first
firstRec = rdd6.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

First Record : 18,Wonderland


```max()``` – Returns max record.

In [25]:
# Action - max
datMax = rdd6.max()
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])

Max Record : 27,at


```reduce()``` – Reduces the records to single, we can use this to count or sum.

In [26]:
# Action - reduce
totalWordCount = rdd6.reduce(lambda a,b: (a[0]+b[0],a[1]))
print("dataReduce Record : "+str(totalWordCount[0]))

dataReduce Record : 144
