 ## PySpark – Use Case 1 - Resilient Distributed Dataset (RDD)
    
                             

### 1. Explain what is RDD and different ways to create it

**RDD: Resilient Distributed Datasets**
• Resilient – Fault-tolerant. (Lineage graph)
• Distributed – Stored in memory across the cluster on multiple nodes
• Dataset – Initial data can come from a file or created programmatically

1.RDD is the fundamental data abstraction of Apache Spark.

2.RDD represents an immutable, partitioned collection of records that can
  be operated on in parallel.
  
3.Once you create a RDD you cannot change it,but you can create a new RDD by performing operations like Transformation 
  and Actions on existing RDD.
  
4.An RDD in spark can be cached and used again for future transformations,which is a huge benefit for users.
  RDDs are said to be lazy evaluated.i.e they delay the evaluation until it is really needed.
  
 **There are two different ways to create RDD.**

 **1.Parallelizing an existing collection in your driver program:-**
  
  To create an RDD from Paralleized Collections we use the sc.parallelize method where sc stands for spark context, which 
  can be found under sparksession.
  Sparksession Contains: 1. Spark Context
                         2. Streaming Context
                         3. Sql Context.
  So sc.parallelize method is the sparks context parallelized method to create a parallelize collection, which allows
  sparks to distribute the data across the multiple nodes.
  
     my_list = [1, 2, 3, 4, 5]
     my_list_rdd = sc.parallelize(my_list)

## 2. Referencing to external data file

  
**2.Referencing a dataset in an external storage system:-**
 
  In Spark, the distributed dataset can be formed from any data source supported by Hadoop, including the local 
  file system, HDFS, Cassandra, HBase etc.
  In this, the data is loaded from the external dataset. To create text file RDD, we can use SparkContext’s textFile method.
  It takes URL of the file and read it as a collection of line. URL can be a local path on the machine.
  
      file_rdd = sc.textFile("path_of_file")


### 2. What are features of RDD?

**Features of Spark RDD:-**

**1.Immutability-**

  You cannot change the state of RDD. If you want to change the state of RDD, you need to create a copy of the existing RDD and   perform your required operations. Hence, the required RDD can be retrieved at any time.

**2.In-memory computation-**

  Spark supports in-memory computation which stores data in RAM instead of disk. Hence, the computation power of Spark 
  is highly increased.

**3.Lazy evaluation-**

  Transformations in RDDs are implemented using lazy operations. In lazy evaluation, the results are not computed immediately.   It will generate the results, only when the action is triggered. Thus, the performance of the program is increased.

**4.Fault-tolerant-**

  Once you perform any operations in an existing RDD, a new copy of that RDD is created, and the operations are performed 
  on the newly created RDD. Thus, any lost data can be recovered easily and recreated. This feature makes Spark RDD 
  fault-tolerant.

**5.Partitioning-**

  Data items in RDDs are usually huge. This data is partitioned and send across different nodes for distributed computing.

**6.Persistence-**

  Intermediate results generated by RDD are stored to make the computation easy. It makes the process optimized.

**7.Grained operation-**

  Spark RDD offers two types of grained operations namely coarse-grained and fine-grained. The coarse-grained operation allows   us to transform the whole dataset while the fine-grained operation allows us to transform individual elements in the dataset.

 

### 3.Create spark RDD using Parallelized collections


In [28]:
import findspark
findspark.init()

In [29]:
import pyspark

### a. Create RDD with elements in it

In [30]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Use_Case1").getOrCreate()
sc = spark.sparkContext

In [31]:
MY_RDD = sc.parallelize([("Alice",31,3),("Bob",35,4),("Kanwar",29,2),("Ravi",28,1),("Dev",37,5)])

### b. Print all elements of RDD

In [32]:
MY_RDD.collect()

[('Alice', 31, 3),
 ('Bob', 35, 4),
 ('Kanwar', 29, 2),
 ('Ravi', 28, 1),
 ('Dev', 37, 5)]

In [33]:
# Convert RDD to Dataframe
MY_RDD = MY_RDD.toDF()
MY_RDD.show()

+------+---+---+
|    _1| _2| _3|
+------+---+---+
| Alice| 31|  3|
|   Bob| 35|  4|
|Kanwar| 29|  2|
|  Ravi| 28|  1|
|   Dev| 37|  5|
+------+---+---+



In [34]:
# Rename the Column Names
MY_RDD.withColumnRenamed("_1","Name").withColumnRenamed("_2","Age").withColumnRenamed("_3","Experience").show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
| Alice| 31|         3|
|   Bob| 35|         4|
|Kanwar| 29|         2|
|  Ravi| 28|         1|
|   Dev| 37|         5|
+------+---+----------+



In [35]:
type(MY_RDD)

pyspark.sql.dataframe.DataFrame

### c. Print count elements in RDD

In [36]:
MY_RDD.count()

5

### d. Show use of foreach(f) operation

FOR EACH is an action operation in the spark that is available with DataFrame, RDD, and Datasets in pyspark 
to iterate over each and every element in the dataset. 
Foreach() applies f function to all the row of this dataframe.
The forEach() method returns **undefined** and map() returns a new **rdd/dataframe** with the transformed elements. Even if they do the same job, the returning value remains different.

In [37]:
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
accum.value

10

In [39]:
def f(person):
    print(person.Name)
    return person

### e. What is the use of cache() and how to use it

**use:**
you create a checkpoint in your spark application and if further down the execution of application any of the tasks fail 
your application will be able to recompute the lost RDD partition from the cache.

to speed up applications that access the same RDD multiple times.

There are two function calls for caching an RDD: cache() and persist(level: StorageLevel).

The difference between cache() and persist() is that using **cache()** the default storage level is MEMORY_ONLY
while using **persist()** we can use various storage levels (described below). 

**Advantages for Caching of DataFrame**

Cost-efficient – Spark computations are very expensive hence reusing the computations are used to save cost.
Time-efficient – Reusing repeated computations saves lots of time.
Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.

In [40]:
MY_RDD.cache() 
caching = MY_RDD.persist().is_cached 
print ("Words got chached > %s" % (caching))

Words got chached > True


In [41]:
MY_RDD.cache()

DataFrame[_1: string, _2: bigint, _3: bigint]

### f. Show use of map(), reduce() and foreach(f)

The map method receives a function as a parameter then it applies on every element of Rdd/Dataframe and returns a new Rdd.
It will give same no of records as input

In [42]:
# Use of Map
MY_RDD1 = MY_RDD.rdd.map(lambda x: (x,1))
for x in MY_RDD1.collect():
    print(x)

(Row(_1='Alice', _2=31, _3=3), 1)
(Row(_1='Bob', _2=35, _3=4), 1)
(Row(_1='Kanwar', _2=29, _3=2), 1)
(Row(_1='Ravi', _2=28, _3=1), 1)
(Row(_1='Dev', _2=37, _3=5), 1)


#### Use Of Reduce
Reduce()function is used to calculate min, max, and total of elements in a dataframe.

In [43]:
# Use of Reduce 
NEW_DATA = [10,9,8,7,6,5,4,3,2,1]
RDD = sc.parallelize(NEW_DATA)
RDD.reduce(lambda x,y : x+y)

55

In [44]:
# Use of Reduce Second Method
from operator import mul
new_rdd = sc.parallelize([1,2,3,4,5,6])
a = new_rdd.reduce(mul)
a

720