<a href="https://colab.research.google.com/github/pallavibekal/Data-Engineering-Spark-and-Hadoop-Code/blob/main/Intro_to_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Dataset

The dataset chosen for this assignment is [Ecommerce customers](https://www.kaggle.com/srolka/ecommerce-customers). The dataset is made up of 500 records and 8 columns. It has customer information, such as e-mail, address, and their color avatar. Then it also has numerical value columns.

* Avg Session Length: Average session of in-store style advice sessions
* Time on App: Average time spent on App in minutes
* Time on Website: Average time spent on Website in minutes
* Length of Membership: How many years the customer has been a member.
* Yearly Amount Spent

Here, we will be using the first four features to perform linear regression using spark and predict Yearly Amount Spent by each customer.

### Information

**Why do we need Spark?**

Spark is one of the latest technologies being used to quickly and easily handle Big Data. Spark is an open-source distributed computing framework that promises a clean and pleasurable experience similar to that of Pandas, while scaling to large data sets via a distributed architecture under the hood. 

Apache Spark is a powerful cluster computing engine, therefore it is designed for fast computation of big data. Spark runs on Memory (RAM), and that makes the processing much faster than on Disk. It includes "MLlib" library to perform Machine Learning tasks using the Spark framework.

### Introduction

Apache Spark is known as a fast, easy to use and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. It’s well-known for its speed, ease of use, generality and the ability to run virtually everywhere. And even though Spark is one of the most asked tools for data engineers, also data scientists can benefit from Spark when doing exploratory data analysis, feature extraction, supervised learning and model evaluation.

Spark is a platform for cluster computing that lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.

As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

### Setup Steps:

In [None]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "2200092" #@param {type:"string"}

In [None]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "9686800288" #@param {type:"string"}

In [None]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()
  
notebook= "M5_AST_02_Intro_to_PySpark_A" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDS/Datasets/ecommerce_customers_.csv")  
    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")
    
    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:        
        print(r["err"])
        return None        
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None
    
    elif getAnswer() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional, 
              "concepts" : Concepts, "record_id" : submission_id, 
              "answer" : Answer, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:        
        print(r["err"])
        return None   
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://cds.iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id
    

def getAdditional():
  try:
    if not Additional: 
      raise NameError
    else:
      return Additional  
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None
  
def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None
  
  
# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None
  
def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None
  

def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer():
  try:
    if not Answer:
      raise NameError 
    else: 
      return Answer
  except NameError:
    print ("Please answer Question")
    return None
  

def getId():
  try: 
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup 
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup() 
else:
  print ("Please complete Id and Password cells before running setup")



Setup completed successfully


### Importing required packages

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

### PySpark

PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) and Spark Core.

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Images/pyspark_components.png' width = 700 px/>
</figure>

**Spark SQL and DataFrame**

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrame and can also act as distributed SQL query engine.

**Streaming**

Running on top of Spark, the streaming feature in Apache Spark enables powerful interactive and analytical applications across both streaming and historical data, while inheriting Spark’s ease of use and fault tolerance characteristics.

**MLlib**

Built on top of Spark, MLlib is a scalable machine learning library that provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines.

**Spark Core**

Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built on top of. It provides an RDD (Resilient Distributed Dataset) and in-memory computing capabilities.

#### Install PySpark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=fcb26781619f8f878e25601d5eae9588ad4d197f2efd74fe1bf0e61b88e43e38
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


#### Start a Spark Session

Spark session is a combined entry point of a Spark application, which came into implementation from Spark 2.0. It provides a way to interact with various spark’s functionality with a lesser number of constructs. Instead of having spark context, hive context, SQL context, now everything is encapsulated in a Spark session.

In [None]:
# Start spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('LinearRegression').getOrCreate()
spark

### Data Processing using Pyspark

#### Loading data into PySpark

To load the dataset we will use the `read.csv` module.  The `inferSchema` parameter provided will enable Spark to automatically determine the data type for each column. Also, `header` and `sep` parameters are given as the dataset contains header, and values are separated using vertical bar.

In [None]:
df = spark.read.csv("ecommerce_customers_.csv", sep = "|", header=True, inferSchema = True)           # creating spark data frame

#### Data exploration with PySpark

* Display data types of dataframe columns

In [None]:
# Print the data types 
df.dtypes

[('Email', 'string'),
 ('Address', 'string'),
 ('Avatar', 'string'),
 ('Avg Session Length', 'double'),
 ('Time on App', 'double'),
 ('Time on Website', 'double'),
 ('Length of Membership', 'double'),
 ('Yearly Amount Spent', 'double')]

* Display column details

In [None]:
# Print the Schema of the DataFrame
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



* Display rows

In [None]:
df.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

* Display total number of rows

In [None]:
df.first()

Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005)

* Display column labels

In [None]:
# YOUR CODE HERE to display column labels

* Display specific columns

In [None]:
columns = ["Email","Time on App","Time on Website"]
df.select(columns).show(5)

+--------------------+------------------+------------------+
|               Email|       Time on App|   Time on Website|
+--------------------+------------------+------------------+
|mstephenson@ferna...| 12.65565114916675| 39.57766801952616|
|   hduke@hotmail.com|11.109460728682564|37.268958868297744|
|    pallen@yahoo.com|11.330278057777512|37.110597442120856|
|riverarebecca@gma...|13.717513665142507| 36.72128267790313|
|mstephens@davidso...|12.795188551078114| 37.53665330059473|
+--------------------+------------------+------------------+
only showing top 5 rows



* Display the statistics of dataframe

In [None]:
df.describe().show()

+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|summary|            Email|             Address|     Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|  count|              500|                 500|        500|               500|               500|               500|                 500|                500|
|   mean|             null|                null|       null| 33.05319351819619|12.052487937166134| 37.06044542094859|   3.533461555915055|  499.3140382585909|
| stddev|             null|                null|       null|0.9925631110845354|0.9942156084725424|1.0104889067564033|  0.9992775024112585|   79.3147815497068|
|    min|aaron04@yahoo.com|0001 Mack MillNor..

* Display total distinct values in *Avatar* column

In [None]:
# Distinct value count
# YOUR CODE HERE

* Display count of distinct values in *Avatar* column

In [None]:
# YOUR CODE HERE

* Plot the count of distinct values in *Avatar* column

In [None]:
DF = df.groupby('Avatar').count().sort("count", ascending= False)
DF.show(8)

+------------+-----+
|      Avatar|count|
+------------+-----+
|   CadetBlue|    7|
|   SlateBlue|    7|
| GreenYellow|    7|
|        Cyan|    7|
|        Teal|    7|
|   PeachPuff|    6|
|      Purple|    6|
|LightSkyBlue|    6|
+------------+-----+
only showing top 8 rows



In [None]:
plt.figure(figsize= (24,4))
x = DF.toPandas()['Avatar']
# YOUR CODE HERE to create y 
sns.barplot(x, y)
plt.xticks(rotation= 90)
plt.show()

NameError: ignored

<Figure size 1728x288 with 0 Axes>

* Display average time spent on app by users having different *Avatar*

In [None]:
df.groupby('Avatar').avg().select(['Avatar', 'avg(Time on App)']).show(5)

+-----------+------------------+
|     Avatar|  avg(Time on App)|
+-----------+------------------+
|ForestGreen|11.801835104426386|
|    DimGray|12.013773141157184|
|   SeaGreen|11.352012316138753|
|       Aqua|12.207605304482167|
|       Teal| 11.77888239909872|
+-----------+------------------+
only showing top 5 rows



* Display the records where average time spent on website by user is greater than 37 minutes

In [None]:
df.filter(df['Time on Website'] > 37).show(5) 

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|mstephens@davidso...|14023 Rodriguez P...|Med

* Display the minimum Yearly Amount Spent where average time spent on website by user is greater than 39 minutes

In [None]:
from pyspark.sql.functions import col, min
df.filter(col('Time on Website')>13).agg(count('Yearly Amount Spent')).show()

+------------------------+
|min(Yearly Amount Spent)|
+------------------------+
|      350.05820016384513|
+------------------------+



In [None]:
df.filter(col('Time on App')>13).count()

92

In [None]:
df.filter(col('Time on App')>13).show()

+--------------------+--------------------+---------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|         Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+---------------+------------------+------------------+------------------+--------------------+-------------------+
|riverarebecca@gma...|1414 David Throug...|    SaddleBrown| 34.30555662975554|13.717513665142507| 36.72128267790313|   3.120178782748092|  581.8523440352177|
|vchurch@walter-ma...|860 Lee KeyWest D...|         Salmon| 33.98777289568564|13.386235275676436|37.534497341555735|  3.2734335777477144|  570.2004089636196|
|andrew06@peterson...|26104 Alexander G...|         Tomato|33.992572774953736|13.338975447662113| 37.22580613162114|   2.482607770510596|  492.6060127179966|
|taylormason@gmail...|7773 Powell Sprin...|       Da

* Display the records where average time spent on app by user is greater than 12 minutes and average time spent on website is smaller than 37 minutes

In [None]:
from pyspark.sql.functions import col
# YOUR CODE HERE

To know more about other `pyspark.sql.functions` operation click [here](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#module-pyspark.sql.functions).

### Linear Regression Model

Linear Regression model is one of the oldest and widely used machine learning approach which assumes a relationship between dependent and independent variables. It consists of the best fitting line through the scattered points on the graph and this best fitting line is known as the regression line.

#### Setting Up DataFrame for Model

For Spark to accept the data, it needs to be in the form of two columns ("labels", "features")

* Features are data points of all the attributes to be used for prediction
* Labels are output for each data point
* We will be predicting Label from Features

For the linear regression model, we need to import two modules from Pyspark i.e. Vector Assembler and Linear Regression. Vector Assembler is a transformer that assembles all the features into one vector from multiple columns that contain type double.

To know more about vector assembler click [here](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler).

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
assembler = VectorAssembler(
                            inputCols= ["Avg Session Length", "Time on App", "Time on Website",'Length of Membership'],
                            outputCol= "features")       # features is the name of output columns which combines all the columns

In [None]:
output = assembler.transform(df)            # A new column 'features' will be created along with the existing columns
                                            # features column will include all the values combined in one list

In [None]:
# YOUR CODE HERE to display first 10 rows of output

In [None]:
output.select("features").show(10, truncate= False)          # displays only the features column (which includes all other column values in a list)

+----------------------------------------------------------------------------+
|features                                                                    |
+----------------------------------------------------------------------------+
|[34.49726772511229,12.65565114916675,39.57766801952616,4.0826206329529615]  |
|[31.92627202636016,11.109460728682564,37.268958868297744,2.66403418213262]  |
|[33.000914755642675,11.330278057777512,37.110597442120856,4.104543202376424]|
|[34.30555662975554,13.717513665142507,36.72128267790313,3.120178782748092]  |
|[33.33067252364639,12.795188551078114,37.53665330059473,4.446308318351434]  |
|[33.871037879341976,12.026925339755056,34.47687762925054,5.493507201364199] |
|[32.02159550138701,11.366348309710526,36.68377615286961,4.685017246570912]  |
|[32.739142938380326,12.35195897300293,37.37335885854755,4.4342734348999375] |
|[33.98777289568564,13.386235275676436,37.534497341555735,3.2734335777477144]|
|[31.936548618448917,11.814128294972196,37.145168223

In [None]:
# Complete dataset is represented in 2 columns
final_data = output.select("features",'Yearly Amount Spent') 

#### Splitting the data into Training and Test set

In [None]:
# Splitting the data in Train and Test set(70% training data, 30% testing data)
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [None]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                342|
|   mean|  497.6122801333889|
| stddev|   80.8860181295649|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [None]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                158|
|   mean| 502.99759065617997|
| stddev|   75.9224944308872|
|    min| 298.76200786180766|
|    max|  712.3963268096637|
+-------+-------------------+



#### Create a Linear Regression Model object and fit on train data

In [None]:
regressor = LinearRegression(featuresCol="features", labelCol="Yearly Amount Spent") 

# Learn to fit the model from training set
model = regressor.fit(train_data)

#### Predicting the Test set results

In [None]:
predict = model.transform(test_data)

predict.select(predict.columns[:]).show(10)

+--------------------+-------------------+------------------+
|            features|Yearly Amount Spent|        prediction|
+--------------------+-------------------+------------------+
|[30.3931845423455...|  319.9288698031936|332.37224360018877|
|[30.5743636841713...| 442.06441375806565| 442.4864443655067|
|[30.8364326747734...|  467.5019004269896|472.04850488065904|
|[30.8794843441274...|  490.2065999848547|494.32495881985005|
|[30.9716756438877...|  494.6386097568927| 488.2139824840033|
|[31.0662181616375...| 448.93329320767435| 462.2977093421298|
|[31.1239743499119...|  486.9470538397658| 508.6936292770554|
|[31.2681042107507...|  423.4705331738239| 427.7546215739317|
|[31.2834474760581...|  591.7810894256675| 569.7407845810544|
|[31.3091926408918...|  432.7207178399336| 430.2028309861696|
+--------------------+-------------------+------------------+
only showing top 10 rows



#### Evaluating Model Performance

In [None]:
# YOUR CODE HERE to create metrics                            # Using evaluate method we can verify our model's performance

metrics = model.evaluate(test_data)                             # Using evaluate method we can verify our model's performance

print('Mean absolute error: {}'.format(metrics.meanAbsoluteError))
print('Root mean squared error: {}'.format(metrics.rootMeanSquaredError))
print('R_squared value: {}'.format(metrics.r2))

Mean absolute error: 7.863649967165185
Root mean squared error: 10.029022579798484
R_squared value: 0.9824396278303007


To know more about other operations in pyspark click [here](https://cdn.iisc.talentsprint.com/CDS/cheatSheet_pyspark.pdf).

### Please answer the questions below to complete the experiment:




In [None]:
# @title In the above given spark dataframe (df), what is the total number of records where average time spent on app by user is greater than 13 minutes? { run: "auto", form-width: "500px", display-mode: "form" }
Answer = "92" #@param ["","90","91", "92"]

In [None]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "Good and Challenging for me" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [None]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "na" #@param {type:"string"}


In [None]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "Yes" #@param ["","Yes", "No"]


In [None]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "Very Useful" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "Very Useful" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")

Your submission is successful.
Ref Id: 6033
Date of submission:  09 Jan 2022
Time of submission:  15:17:42
View your submissions: https://cds.iisc.talentsprint.com/notebook_submissions
