<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Practice Project - Create a machine learning pipeline for a regression project


Estimated time needed: **90** minutes


## Scenario


You are a data engineer at a data analytics consulting company. Your company prides itself in being able to efficiently handle huge datasets. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able do ETL jobs and build ML pipelines.



## Objectives

In this 4 part assignment you will:

- Part 1 ETL
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Machine Learning Pipeline creation
  - Create a machine learning pipeline for prediction
- Part 3 Model evaluation
  - Evaluate the model using metrics
  - Print the intercept and the coefficients
- Part 4 Model Persistance
  - Cave the model for future production use
  - Load and verify the stored model


## Datasets

In this lab you will be using dataset(s):

 - Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg 
 


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to
 connect to this cluster.


The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [None]:
# !pip install pyspark==3.1.2 -q
# !pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [1]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - ETL


### Task 1 - Import required libraries


In [32]:
#your code goes here
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator


<details>
    <summary>Click here for a Hint</summary>
    
Import all the required libraries
    
</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

```

</details>


### Task 2 - Create a spark session


In [3]:
#Create SparkSession

spark = SparkSession.builder.appName("Practice Project").getOrCreate()


your 131072x1 screen size is bogus. expect trouble
24/04/19 13:54:06 WARN Utils: Your hostname, DESKTOP-LMLK3KO resolves to a loopback address: 127.0.1.1; using 172.19.128.203 instead (on interface eth0)
24/04/19 13:54:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/19 13:54:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<details>
    <summary>Click here for a Hint</summary>
    
Use the SparkSession.builder

</details>


<details>
    <summary>Click here for Solution</summary>

```python
spark = SparkSession.builder.appName("Practice Project").getOrCreate()
```

</details>


### Task 3 - Load the csv file into a dataframe


Download the data file


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv


Load the dataset into the spark dataframe


In [4]:
# Load dataset
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)


<details>
    <summary>Click here for a Hint</summary>
    
Use  spark.read.csv

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

```

</details>


### Task 4 - Print top 5 rows of the dataset


In [5]:
#your code goes here
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



### Task 5 - Print the number of cars in each Origin


In [None]:
#your code goes here
df.createOrReplaceTempView("mileage")
result = spark.sql("SELECT Origin, count(*) FROM mileage GROUP BY Origin")
result.show()

In [6]:
df.groupBy("Origin").count().show()

+--------+-----+
|  Origin|count|
+--------+-----+
|European|   70|
|    NULL|    1|
|Japanese|   88|
|American|  247|
+--------+-----+



<details>
    <summary>Click here for a Hint</summary>
    
Use  df.groupBy

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df.groupBy('Origin').count().orderBy('count').show()
```

</details>


### Task 6 - Print the total number of rows in the dataset


In [7]:
#your code goes here
# rowcount1 = spark.sql("SELECT count(*) FROM mileage").show()
rowcount1 = df.count()
print(rowcount1)

406


### Task 7 - Drop all the duplicate rows from the dataset


In [8]:
#your code goes here
df = df.dropDuplicates()

<details>
    <summary>Click here for a Hint</summary>
    
Use  df.dropDuplicates

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = df.dropDuplicates()
```

</details>


### Task 8 - Print the total number of rows in the dataset


In [9]:
#your code goes here
# rowcount2 = spark.sql("SELECT count(*) FROM mileage").show()
rowcount2 = df.count()
print(rowcount2)

392


### Task 9 - Drop all the rows that contain null values from the dataset


In [10]:
#your code goes here
df = df.dropna()

<details>
    <summary>Click here for a Hint</summary>
    
Use df.dropna

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df=df.dropna()
```

</details>


### Task 10 - Print the total number of rows in the dataset


In [11]:
#your code goes here
rowcount3 = df.count()
print(rowcount3)

385


### Task 11 - Rename the column "Engine Disp" to "Engine_Disp"Drop


In [12]:
#your code goes here

df = df.withColumnRenamed(existing="Engine Disp", new="Engine_Disp")

In [13]:
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use df.withColumnRenamed

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = df.withColumnRenamed("Engine Disp","Engine_Disp")
```

</details>


### Task 12 - Save the dataframe in parquet format, name the file as "mpg-cleaned.parquet"


In [15]:
#your code goes here
df.write.mode("overwrite").parquet("mpg-cleaned.parquet")

                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use df.write.parquet

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df.write.parquet("mpg-cleaned.parquet")
```

</details>


#### Part 1 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [16]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])

import os

print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))

Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp
mpg-cleaned.parquet exists : True


## Part - 2 Machine Learning Pipeline creation


### Task 1 - Load data from "mpg-cleaned.parquet" into a dataframe


In [17]:
#your code goes here

df = spark.read.parquet("mpg-cleaned.parquet")
rowcount4 = df.count()

In [18]:
#show top 5 rows
#your code goes here
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [19]:
#print the schema of the dataframe
#your code goes here
df.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine_Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



### Task 2 - Define the StringIndexer pipeline stage


In [21]:
# Stage - 1 Using StringIndexer convert the string column "Origin" into "OriginIndex"
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

<details>
    <summary>Click here for a Hint</summary>
    
Use StringIndexer class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
```

</details>


### Task 3 - Define the VectorAssembler pipeline stage


In [22]:
# Stage 2 - assemble the input columns 'Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year' into a single column "features"
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")

<details>
    <summary>Click here for a Hint</summary>
    
Use the VectorAssembler class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")
```

</details>


### Task 4 - Define the StandardScaler pipeline stage


In [23]:
# Stage 3 - scale the "features" using standard scaler and store in "scaledFeatures" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

<details>
    <summary>Click here for a Hint</summary>
    
Use the StandardScaler class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

```

</details>


### Task 5 - Define the StandardScaler pipeline stage


In [25]:
# Stage 4 - Create a LinearRegression stage to predict "MPG"

lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

<details>
    <summary>Click here for a Hint</summary>
    
Use the LinearRegression class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")
```

</details>


### Task 6 - Build the pipeline


In [26]:
# Build a pipeline using the above four stages

pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

<details>
    <summary>Click here for a Hint</summary>
    
Use the Pipeline class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipeline = Pipeline(stages=[indexer,assembler, scaler, lr])
```

</details>


### Task 7 - Split the data


In [27]:
# Split the data into training and testing sets with 70:30 split. Use 42 as seed
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

<details>
    <summary>Click here for a Hint</summary>
    
Use the randomSplit method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)
```

</details>


### Task 8 - Fit the pipeline


In [28]:
# Fit the pipeline using the training data

pipelineModel = pipeline.fit(trainingData)

24/04/19 14:29:42 WARN Instrumentation: [36fb9914] regParam is zero, which might cause numerical instability and overfitting.
24/04/19 14:29:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/19 14:29:43 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


<details>
    <summary>Click here for a Hint</summary>
    
Use the pipeline.fit method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipelineModel = pipeline.fit(trainingData)
```

</details>


#### Part 2 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [29]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  385
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


## Part 3 - Model Evaluation


### Task 1 - Predict using the model


In [30]:
# Make predictions on testing data
predictions = pipelineModel.transform(testingData)

In [33]:
predictions.show(5)

+----+---------+-----------+----------+------+----------+----+--------+-----------+--------------------+--------------------+------------------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|OriginIndex|            features|      scaledFeatures|        prediction|
+----+---------+-----------+----------+------+----------+----+--------+-----------+--------------------+--------------------+------------------+
|10.0|        8|      360.0|       215|  4615|      14.0|  70|American|        0.0|[8.0,360.0,215.0,...|[4.81279869941784...|   6.9607645775083|
|11.0|        8|      429.0|       208|  4633|      11.0|  72|American|        0.0|[8.0,429.0,208.0,...|[4.81279869941784...| 8.545911819807845|
|12.0|        8|      350.0|       180|  4499|      12.5|  73|American|        0.0|[8.0,350.0,180.0,...|[4.81279869941784...|10.226709705747677|
|12.0|        8|      383.0|       180|  4955|      11.5|  71|American|        0.0|[8.0,383.0,180.0,...|[4.81279869941784...| 5.44

<details>
    <summary>Click here for a Hint</summary>
    
Use the transform method of the model

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions = pipelineModel.transform(testingData)
```

</details>


### Task 2 - Print the MSE


In [34]:
#Your code goes here

evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)


12.226745835571341


<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

```

</details>


### Task 3 - Print the MAE


In [35]:
#Your code goes here

evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)


2.8457151130136196


<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

```

</details>


### Task 4 - Print the R-Squared(R2)


In [36]:
#Your code goes here

evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)


0.801873739489571


<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)
```

</details>


#### Part 3 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [37]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  12.23
Mean Absolute Error =  2.85
R Squared =  0.8
Intercept =  -17.37


## Part 4 - Model persistance


### Task 1 - Save the model to the path "Practice_Project"


In [38]:
# Save the pipeline model
# your code goes here
pipelineModel.write().overwrite().save("./Practice_Project/")

<details>
    <summary>Click here for a Hint</summary>
    
Use the model.write().save method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipelineModel.write().save("Practice_Project")
```

</details>


### Task 2 - Load the model from the path "Practice_Project"


In [41]:
# Load the pipeline model
# your code goes here
loadedPipelineModel = PipelineModel.load("./Practice_Project/")

<details>
    <summary>Click here for a Hint</summary>
    
Use the load method of the model

</details>


<details>
    <summary>Click here for Solution</summary>

```python
loadedPipelineModel = PipelineModel.load("Practice_Project")
```

</details>


### Task 3 - Make predictions using the loaded model on the test data


In [42]:
# Use the loaded pipeline model for predictions
# your code goes here
predictions = loadedPipelineModel.transform(testingData)

<details>
    <summary>Click here for a Hint</summary>
    
Use the transform method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions = loadedPipelineModel.transform(testingData)
```

</details>


### Task 4 - Show the predictions


In [45]:
# your code goes here
predictions.select("MPG", "prediction").show()

+----+------------------+
| MPG|        prediction|
+----+------------------+
|10.0|   6.9607645775083|
|11.0| 8.545911819807845|
|12.0|10.226709705747677|
|12.0| 5.446415257213072|
|13.0|21.430212400590175|
|13.0|17.437792078059797|
|13.0|11.245494102903557|
|13.0|14.180626433497526|
|13.0|  9.95908269168941|
|13.0|11.111417171060129|
|13.0|13.170917811818526|
|13.0| 10.88943987457521|
|13.0|   7.1445362115536|
|13.0|4.2795654853532845|
|13.0| 8.611192450278175|
|14.0|10.356052138542527|
|14.0|16.057308446272554|
|14.0|12.327668542375559|
|14.0|10.787367112520862|
|14.0|10.983935628156818|
+----+------------------+
only showing top 20 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use the the select method of the dataframe

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions.select("MPG","prediction").show()
```

</details>


#### Part 4 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [44]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  4
Coefficient for Cylinders is 0.119
Coefficient for Engine_Disp is 0.4971
Coefficient for Horsepower is -0.2517
Coefficient for Weight is -5.7923
Coefficient for Accelerate is 0.2369
Coefficient for Year is 2.9258


### Task 5 - Stop Spark Session


In [46]:
spark.stop()

Congratulations!! you have successfully finished the practice project.


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
