<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Final Project - Build an ML Pipeline for Airfoil noise prediction


Estimated time needed: **90** minutes


## Scenario


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.



## Objectives

In this 4 part assignment you will:

- Part 1 Perform ETL activity
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Create a  Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
- Part 3 Evaluate the Model
  - Evaluate the model using relevant metrics
- Part 4 Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Before you Start


**Before you start attempting this project it is highly recommended that you finish the practice project.**


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to
 connect to this cluster.


The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [23]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - Perform ETL activity


### Task 1 - Import required libraries


In [24]:
#your code goes here

# Import PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Optionally, you can import types if needed later
from pyspark.sql.types import StructType, StructField, DoubleType

### Task 2 - Create a spark session


In [25]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("AirfoilNoisePrediction") \
    .getOrCreate()

print("SparkSession created successfully.")


SparkSession created successfully.


### Task 3 - Load the csv file into a dataframe


Download the data file.

NOTE : Please ensure you use the dataset below and not the original dataset mentioned above.


In [32]:
#!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


Load the dataset into the spark dataframe


In [33]:
# Load the dataset that you have downloaded in the previous task

# Load the dataset into a Spark DataFrame
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

# Show the first few rows to verify
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 4 - Print top 5 rows of the dataset


In [34]:
#your code goes here
# Print the top 5 rows of the DataFrame
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 6 - Print the total number of rows in the dataset


In [35]:
# Count the number of rows in the DataFrame before cleaning
rowcount1 = df.count()

print(rowcount1)


1522


### Task 7 - Drop all the duplicate rows from the dataset


In [36]:
# Drop duplicate rows from the DataFrame
df = df.dropDuplicates()

# Show the first few rows to verify the operation
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|   115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|   121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|   115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|   131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|   131.279|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 8 - Print the total number of rows in the dataset


In [37]:
# Count the number of rows in the DataFrame after dropping duplicates
rowcount2 = df.count()

print(rowcount2)




1503


                                                                                

### Task 9 - Drop all the rows that contain null values from the dataset


In [38]:
# Drop rows that contain any null values
df = df.dropna()

# Show the first few rows to verify
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|   115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|   121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|   115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|   131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|   131.279|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



### Task 10 - Print the total number of rows in the dataset


In [39]:
# Count the number of rows in the DataFrame after cleaning
rowcount3 = df.count()

print(rowcount3)





1499


                                                                                

### Task 11 - Rename the column "SoundLevel" to "SoundLevelDecibels"


In [40]:
# Rename the column 'SoundLevel' to 'SoundLevelDecibels'
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")

# Show the first few rows to verify
df.show(5)


+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



### Task 12 - Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [41]:
# Save the DataFrame in parquet format with the given filename
df.write.mode('overwrite').parquet("NASA_airfoil_noise_cleaned.parquet")


                                                                                

#### Part 1 - Evaluation



**Run the code cell below.**<br>
**Use the answers here to answer the final evaluation quiz in the next section.**<br>
**If the code throws up any errors, go back and review the code you have written.**</b>


In [42]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Part - 2 Create a  Machine Learning Pipeline


### Task 1 - Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [43]:
# Load the cleaned parquet data into a new DataFrame
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

# Show the first few rows to verify
df.show(5)



+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|      630|          0.0|     0.3048|              31.7|             0.00331266|           129.095|
|     4000|          0.0|     0.3048|              31.7|             0.00331266|           118.145|
|     4000|          1.5|     0.3048|              39.6|             0.00392107|           117.741|
|      800|          4.0|     0.3048|              71.3|             0.00497773|           131.755|
|     1250|          0.0|     0.2286|              31.7|              0.0027238|           128.805|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



### Task 2 - Print the total number of rows in the dataset


In [44]:
#your code goes here

rowcount4 = df.count()
print(rowcount4)


[Stage 61:>                                                         (0 + 8) / 8]

1499


                                                                                

### Task 3 - Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [46]:
from pyspark.ml.feature import VectorAssembler

# Define feature columns (all columns except the target)
feature_columns = [col for col in df.columns if col != "SoundLevelDecibels"]

# Create the VectorAssembler
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features"
)

# Show feature columns to verify
print("Feature columns used:", feature_columns)


Feature columns used: ['Frequency', 'AngleOfAttack', 'ChordLength', 'FreeStreamVelocity', 'SuctionSideDisplacement']


### Task 4 - Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [47]:
from pyspark.ml.feature import StandardScaler

# Define the StandardScaler
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withMean=True,   # Center the data with mean
    withStd=True     # Scale to unit standard deviation
)

print("StandardScaler stage created successfully.")


StandardScaler stage created successfully.


### Task 5 - Define the Model creation pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

**Note:You need to use the scaledfeatures retreived in the previous step(StandardScaler pipeline stage).**


In [48]:
from pyspark.ml.regression import LinearRegression

# Define the LinearRegression model
lr = LinearRegression(
    featuresCol="scaledFeatures",  # Use the scaled features from StandardScaler
    labelCol="SoundLevelDecibels"  # Target column
)

print("LinearRegression stage created successfully.")


LinearRegression stage created successfully.


### Task 6 - Build the pipeline


Build a pipeline using the above three stages


In [49]:
from pyspark.ml import Pipeline

# Define the pipeline with the three stages
pipeline = Pipeline(stages=[assembler, scaler, lr])

print("Pipeline created successfully.")


Pipeline created successfully.


### Task 7 - Split the data


In [50]:
# Split the data into training and testing sets (70% training, 30% testing)
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

# Show the number of rows in each set for verification
print("Training Data Count:", trainingData.count())
print("Testing Data Count:", testingData.count())


                                                                                

Training Data Count: 1044


[Stage 65:>                                                         (0 + 8) / 8]

Testing Data Count: 455


                                                                                

### Task 8 - Fit the pipeline


In [52]:
# Fit the pipeline on the training data
pipelineModel = pipeline.fit(trainingData)

print("Pipeline fitted successfully.")



25/09/13 14:55:39 WARN util.Instrumentation: [e77aba2f] regParam is zero, which might cause numerical instability and overfitting.
[Stage 85:>                                                         (0 + 8) / 8]

Pipeline fitted successfully.


                                                                                

#### Part 2 - Evaluation



**Run the code cell below.**<br>
**Use the answers here to answer the final evaluation quiz in the next section.**<br>
**If the code throws up any errors, go back and review the code you have written.**</b>


In [53]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Part 3 - Evaluate the Model


### Task 1 - Predict using the model


In [54]:
# Use the fitted pipeline model to make predictions on the testing data
predictions = pipelineModel.transform(testingData)

# Show the first few predictions
predictions.select("SoundLevelDecibels", "prediction").show(5)


[Stage 87:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|  123.643440096247|
|           119.975|123.48695788614808|
|           121.783|124.38983849684215|
|           127.224|121.44706993294244|
|           122.229|125.68312652454135|
+------------------+------------------+
only showing top 5 rows



                                                                                

### Task 2 - Print the MSE


In [55]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator for regression using MSE
evaluator = RegressionEvaluator(
    labelCol="SoundLevelDecibels",
    predictionCol="prediction",
    metricName="mse"  # Mean Squared Error
)

# Calculate MSE on the testing data
mse = evaluator.evaluate(predictions)

print("Mean Squared Error (MSE) =", mse)




Mean Squared Error (MSE) = 22.593754071348833


                                                                                

### Task 3 - Print the MAE


In [56]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator for regression using MAE
evaluator_mae = RegressionEvaluator(
    labelCol="SoundLevelDecibels",
    predictionCol="prediction",
    metricName="mae"  # Mean Absolute Error
)

# Calculate MAE on the testing data
mae = evaluator_mae.evaluate(predictions)

print("Mean Absolute Error (MAE) =", mae)


[Stage 90:>                                                         (0 + 8) / 8]

Mean Absolute Error (MAE) = 3.7336902294631447


                                                                                

### Task 4 - Print the R-Squared(R2)


In [57]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator for regression using R2
evaluator_r2 = RegressionEvaluator(
    labelCol="SoundLevelDecibels",
    predictionCol="prediction",
    metricName="r2"  # R-squared
)

# Calculate R2 on the testing data
r2 = evaluator_r2.evaluate(predictions)

print("R-Squared (R²) =", r2)





R-Squared (R²) = 0.5426016508689053


                                                                                

#### Part 3 - Evaluation



**Run the code cell below.**<br>
**Use the answers here to answer the final evaluation quiz in the next section.**<br>
**If the code throws up any errors, go back and review the code you have written.**</b>


In [58]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  125.14


## Part 4 - Persist the Model


### Task 1 - Save the model to the path "Final_Project"


In [59]:
# Save the fitted pipeline model to the specified path
pipelineModel.write().overwrite().save("Final_Project")

print("Pipeline model saved successfully at 'Final_Project'.")


Pipeline model saved successfully at 'Final_Project'.


### Task 2 - Load the model from the path "Final_Project"


In [None]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = #TODO


### Task 3 - Make predictions using the loaded model on the testdata


In [None]:
# Use the loaded pipeline model and make predictions using testingData
predictions = #TODO


### Task 4 - Show the predictions


In [None]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here


#### Part 4 - Evaluation




**Run the code cell below.**<br>
**Use the answers here to answer the final evaluation quiz in the next section.**<br>
**If the code throws up any errors, go back and review the code you have written.**</b>


In [None]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

### Stop Spark Session


In [None]:
spark.stop()

## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


Copyright © 2023 IBM Corporation. All rights reserved.


<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
