<a href="https://colab.research.google.com/github/moosemaniam/IISCDeepLearning/blob/main/M1_AST_10_Scalable_Programming_using_PySpark_A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Programme in Deep Learning (Foundations and Applications)
## A Program by IISc and TalentSprint
### Assignment 10: PySpark Transformation on Paired RDD's and Spark Architecture

## Learning Objectives

At the end of the experiment, you will be able to

* get a better understanding of the Spark Architecture.

* know the different types of transformations and action.

* have a clear picture about the Narrow and Wide dependencies. 

* know the ways to transform the paired RDD's by applying tranformations and action on it.

* have a better understanding of Data Partitioning and it's advantages.


### Information

#### Spark Architecture

* Basically Spark runs on cluster of machine.
* It consists of master node and worker node.
* When a spark job is initiallised master node requests the resource manage(YARN, Messos, etc.) for the worker node to implement the job.
* Then the resource manager allots the worker nodes to the master node. Further, the master node intitlizes the Logical Plan and the data in worker nodes to process the task.
* The Spark usually has two plans called as Logical Plan and Physical plan.
* **Logical Plan** converts application to a dataflow of dependencies.

* **Physical Plan** converts dataflow into specific tasks for execution
of the tasks that are executed within Workers/Executors.


  ![img](https://cdn.iisc.talentsprint.com/DLFA/Experiment_related_data/Workflow-of-Apache-Spark-working-with-hierarchical-index.ppm)


### RDD vs. Datasets/DataFrames

**RDD :** They are immutable distributed collection of elements.
* Transformations, actions and partitions can be done on a RDD.
* We can apply Lower-level abstraction.
* We can have better control on the data when compared to DataFrames, but we need more coding.
* We need to define the schema manually.
* RDD is slower than Dataframes to perform simple operations like grouping the data.


**DataFrame :** They are immutable distributed collection
(like RDD)
* The Data in DataFrames are organized into named columns.
* It imposes a structure and are easier for abstraction.
* It makes processing of large data sets easier when compared to RDD.
* Automatically finds out the schema of the dataset.
* Performs aggregation faster than RDDs, as it provides an easy API to perform aggregation operations.


### Setup Steps:

In [None]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "" #@param {type:"string"}

In [None]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "" #@param {type:"string"}

In [None]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()
  
notebook= "M1_AST_10_Scalable_Programming_using_PySpark_A" #name of the notebook
def setup():
#  ipython.magic("sx pip3 install torch")  
    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")
    
    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:        
        print(r["err"])
        return None        
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None
    
    elif getAnswer1() and getAnswer2() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional, 
              "concepts" : Concepts, "record_id" : submission_id, 
              "answer1" : Answer1, "answer2" : Answer2, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:        
        print(r["err"])
        return None   
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://dlfa.iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id
    

def getAdditional():
  try:
    if not Additional: 
      raise NameError
    else:
      return Additional  
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None
  
def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None
  
  
# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None
  
def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None
  

def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer1():
  try:
    if not Answer1:
      raise NameError 
    else: 
      return Answer1
  except NameError:
    print ("Please answer Question 1")
    return None

def getAnswer2():
  try:
    if not Answer2:
      raise NameError 
    else: 
      return Answer2
  except NameError:
    print ("Please answer Question 2")
    return None
  

def getId():
  try: 
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup 
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup() 
else:
  print ("Please complete Id and Password cells before running setup")



 **Install PySpark**

In [None]:
!pip -q install pyspark

**Creating Spark Session**

Spark session is a combined entry point of a Spark application, which came into implementation from Spark 2.0 (Instead of having various contexts, everything is encapsulated in a Spark session)

In [None]:
# Start spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf  # User Defined Functions
from pyspark.sql.types import StringType
from operator import add
spark = SparkSession.builder.appName('Spark_RDD').getOrCreate()
spark

In [None]:
# Accessing sparkContext from sparkSession instance.
sc = spark.sparkContext

### Transformations:

**GroupByKey :** It groups all values for each and every key with in the RDD. It will create a new pair, where the original key corresponds to this collected group of values.

1. Apply groupbykey operation to the data defined below.

  data = [

  ( " USA " , 1 ) , ( " USA " , 2 ) , ( " India " , 1 ) ,

  ( " UK " , 1 ) , ( " India " , 4 ) , ( " India " , 9 ) ,
       
  ( " USA " , 8 ) , ( " USA " , 3 ) , ( " India " , 4 ) ,

  ( " UK " , 6 ) , ( " UK " , 9 ) , ( " UK " , 5 )

  ]

In [None]:
# CODE HERE

**cogroup :** For each key k in self RDD or in other RDD, cogroup returns a resulting RDD that contains a tuple with the list of values for that key in self as well as other.

1. Group the values according to the keys for the below given data.
  
  data1 = [ ( 1 , " spark " ) , ( 2 , " HDFS " ) , ( 3 , " Hive " ) , ( 4 , " Flink " ) , ( 6 , " HBase " ) ]
  
  data2 = [ ( 4 , " RealTime " ) , ( 5 , " Kafka " ) , ( 6 , " NOSQL " ) , ( 1 , " stream " ) , ( 1 , " MLlib " ) ]

In [None]:
# CODE HERE

**subtract:** The subtraction is done only to the like tuples and returns the first RDD after subtracting the values from the second RDD.

1. Subtract the following data.

  data1 = [ ( " apple " , 1 ) , ( " banana " , 4) , ( " carrot " , 5) , ( " mango " , 3) ]

  data2 = ( [ ( " apple " , 1) , ( " carrot " , 5 ) ]

In [None]:
# CODE HERE

### JOINS

**join :** The Join is a database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD.

1. Apply join operation to the data defined below.

  data = [ ( ' A ', 1 ) , ( ' b ', 2 ) , ( ' c ' , 3 ), ( ' d ' , 8 ) ] 

  data2 =[ ( ' A ' , 4 ) , ( ' A ' , 6 ) , ( ' b ' , 7 ) , ( ' c ' , 3 ) , ( ' c ' , 8 ) ]

In [None]:
# CODE HERE

2. Apply join operation to the data given below:

  data1 = [ ( " apple " , 1), ( " banana " , 4), ( " carrot " , 5), ( " mango " , 3)]
  
  data2 = [ ( " apple " , 3), ( " banana " , 2)]

In [None]:
# CODE HERE

**Left Outer Join :** A left outer join returns all the values from the left table and matched values from the right table (NULL in the case of no matching join predicate).

1. Apply left outer join operation to the data defined below.

 data1 = [ ( " apple " , 1), ( " banana " , 4), ( " carrot " , 5), ( " mango " , 3)]
  
  data2 = [ ( " apple " , 3), ( " banana " , 2)]

In [None]:
# CODE HERE

**Right Outer Join :** A right outer join returns all the values from the right table and matched values from the left table (NULL in the case of no matching join predicate)

1. Apply right outer join operation to the data defined below.

  data1 = [ ( " apple " , 1), ( " banana " , 4), ( " carrot " , 5), ( " mango " , 3)]
  
  data2 = [ ( " apple " , 3), ( " banana " , 2)]

In [None]:
# CODE HERE

### Actions on Paired RDD

**countByKey :**  It count's the number of elements for each key.

1. For the below defined data count the keys(Using countByKey action) for each student.

  data = [
    
  ( " bharat " , [ " c " ,  " java " ,  " python " ] ) ,
                                      
  ( " chandu " , [ " c " , " java " , " python " , " scala " ] ) ,
                                      
  ( " akash " , [ " c " ,  " java " , " python " , " scala " , " spark " , " cobal " ] ) ,
                                      
  ( " bharat " , [ " c " , " java " , " python " ] ) ,
                                      
  ( " akash " , [ " c " , " java " , " python " , " scala " , " spark " , " cobal " ] ) ,
                                      
  ( " akash " , [ " c " , " java " , " python " ) ,
                                      
  ( " bharat " , [ " c " , " java " , " python " ] ) ,
                                      
  ( " chandu " , [ " c " ,  " java " , " python " , " scala " ] ) ,
                                      
  ( " deepak " , [ " c " , " java " , " python " ] ) ,

  ( " chandu " , [ " c " , " java " , " python " , " scala " ] ) , 

  ( " deepak " , [ " c " , " java " , " python " ] ) ,

 ]

In [None]:
# CODE HERE

**collectAsMap:** It returns the RDD as a native
Dictionary or Map object. In simple words, when a Paired RDD with (k,v) and collectAsMAp is applied on it, then a values are returned when the key is called.

1. For the given below data find the values with the help key.

  data = [ ( " apple " , 1 ) , ( " banana " , 4 ) , ( " carrot " , 5 ) , ( " mango " , 3 ) ]

In [None]:
# CODE HERE

### Narrow and Wide Dependencies

**Narrow Dependency:**-In Narrow transformation, all the elements that are required to compute the records in single partition exist in the single partition of parent RDD. A limited subset of partition is used to calculate the result. Shuffling is not done in narrow transformation.

Example:- map, filter, flatmap, mappartition, sample, union, etc.



* In Narrow transformation, all the elements that are required to compute the records in single partition should be in the single partition of parent RDD.
* A limited subset of partition is used to calculate the result.
* Each output partition depends on exactly one or a few input partitions in RDD.
* Each input partition is used by exactly one
output partition.
* So output and input partitions can be on same Worker.
* It is also called as full dependency.
* Narrow transformations are the result of map(), filter().


![img](https://cdn.iisc.talentsprint.com/DLFA/Experiment_related_data/narrow_transformation.png)

**Wide Dependency:-** In wide transformation, all the elements that are required to compute the records in the single partition may exist in many partitions of parent RDD. In simple terms we can say that shuffling is done in wide transformations.

Example:- Intersection, distinct, reducebykey, groupbykey, join, cartesian,coalese, repartition, etc.



* In wide transformation, all the elements that are required to compute the records in the single partition may be in many partitions of parent RDD.
* Each input partition can be used by one or more output partitions RDD.
* So, wide dependency forces a shuffle across Workers.
* This is also called as Shuffle or partial dependency.
* Wide Dependency are the result of groupbyKey and reducebyKey.



  ![img](https://cdn.iisc.talentsprint.com/DLFA/Experiment_related_data/wide_trans.png)

### Data Partitioning


**Data Partitioning :** 

*    RDD's are spread on partitions across nodes of a
cluster.
*    Controlling the number of partitions, helps balance compute load, and helps to reduce shuffle communication across nodes.
* Can also be used for specific algorithms to operate on partition at a time.
* Spark tracks partitioner used to generate an RDD to optimize operations.
* Many transformations can set or unset the number of partitiones like Map following a Join can unset Hash partitioner and Filter, mapValue & flatMapValues retain partitioner.

The Default Partitioners in Pyspark are:-
* HashPartitioner: It is uses keys or values to partitions by using an aggregation function on it. This is mainly used in groupby, reduceby etc.
* RangePartitioner: It creates roughly equal ranges,
determined by sampling the RDD contents. It is mainly used in orderby.

**mapPartitions:** It is similar to map() operation where the output of mapPartitions() returns the same number of rows as in input RDD.
It is used to improve the performance of the map() when there is a need to do heavy initializations like Database connection.
mapPartitions() applies a heavy initialization to each partition of RDD instead of each element of RDD.
It is a Narrow transformation operation

1. Find the sum of each partition in the below defined RDD.

  data = [74, 45, 87, 95] with 2 partitions in it

In [None]:
# CODE HERE

Here we can see that the sum operation has been applied to the partitions itself rather than the entire RDD.

**mapPartitionsWithIndex() :** It iterates all over the 
elements in a partition and also gives access to
partition number and partition number would also be taken as the input (as a Broadcast variable)and the function is applied on it.
MapPartitionsWithIndex is similar to mapPartitions, except that it takes one more argument as input, which is the index of the partition.

1. Find the sum of the indexes of the partitions in the below defined data.

  data = [74, 45, 87, 95] with 4 partitions.

In [None]:
# CODE HERE

num partition and get partitioner
for each partition


**foreachPartition :** It works in the same way as the foreach. Only difference is that it applies a function to each partition of the RDD.

1. Apply foreachPartition on the below defined data
  
  data = [ 19 , 42 , 73 , 24 , 54 ]

In [None]:
# CODE HERE

### Numeric RDD

* It is commonly used in statistics for RDDs having numeric type.
*  stats() action to populate all stats of a RDD.
* Individual functions (actions) also available such as count, mean,  mode, variance, standard deviation, min , max, etc.

1. Find all the stats, count, mean , mode, variance, standard deviation, min , max for the below defined RDD.

  data = [ 2, 3, 7, 1, 9, 17, 28, 37, 98, 72]

In [None]:
# CODE HERE

### Please answer the questions below to complete the experiment:




In [None]:
#@title Q.1.Which of the following is a transformation?  { run: "auto", form-width: "500px", display-mode: "form" }
Answer1 = "" #@param ["","take(n)", "top()", "countByValue()","mapPartitionWithIndex()"]

In [None]:
#@title Q.2.  What is a transformation in Spark RDD?{ run: "auto", form-width: "500px", display-mode: "form" }
Answer2 = "" #@param ["","Takes RDD as input and produces one or more RDD as output.", "Returns final result of RDD computations.","The ways to send result from executors to the driver","None of the above"]


In [None]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [None]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "" #@param {type:"string"}


In [None]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "" #@param ["","Yes", "No"]


In [None]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")