# Build an ML Pipeline for Airfoil noise prediction


## Scenario
You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values. You will create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns. You will evaluate the model and towards the end you will persist the model.

## Objectives


### Part 1 Perform ETL activity
Load a csv dataset
Remove duplicates if any
Drop rows with null values if any
Make transformations
Store the cleaned data in parquet format
### Part 2 Create a Machine Learning Pipeline
Create a machine learning pipeline for prediction
### Part 3 Evaluate the Model
Evaluate the model using relevant metrics
### Part 4 Persist the Model
Save the model for future production use
Load and verify the stored model

## Datasets
In this lab you will be using dataset(s):

The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise

This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.

In [None]:
#Install the required libraries

!pip install pyspark==3.1.2 -q
!pip install findspark -q
!pip install pandas==1.3.4
!pip install scikit-learn==1.0.2
!pip install numpy==1.21.6

In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## PART 1 - PERFORM ETL ACTIVITY

In [None]:
#Import required libraries
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import Row
import numpy as np


In [None]:
#Create a SparkSession

spark = SparkSession.builder.appName('Airfoil noise prediction').getOrCreate()

In [None]:
#Load the csv file into a dataframe

!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

In [None]:
# Load the dataset into the spark dataframe

df = spark.read.csv('NASA_airfoil_noise_raw.csv', header=True, inferSchema=True)

In [None]:
#Print the top 5 rows of the dataset

df.show(5)

In [None]:
#Print the total number of rows in the dataset
rowcount1 =df.count()
print(rowcount1)

In [None]:
#Drop all the duplicate rows from the dataset
df2 = df.dropDuplicates()

In [None]:
#Print the total number of rows in the dataset

rowcount2 = df.dropDuplicates().count()
print(rowcount2)


In [None]:
#Drop all the rows that contain null values from the dataset and print the total number of rows in the dataset
df_cleaned = df.dropDuplicates().dropna()
rowcount3 = df_cleaned.count()
print(rowcount3)

In [None]:
#Rename the column "SoundLevel" to "SoundLevelDecibels"
df_renamed = df_cleaned.withColumnRenamed("SoundLevel", "SoundLevelDecibels")
df_renamed.show()

In [None]:
#Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"

df_renamed.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
!!ls -l NASA_airfoil_noise_cleaned.parquet

In [None]:
#Print the dataset rows 

print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1] if df else None)

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

 ## PART 2 - CREATE A MACHINE LEARNING PIPELINE 

In [None]:
#Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe
df_loaded = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

In [None]:
#Print total number of rows in dataset
rowcount4 = df_loaded.count()
print(rowcount4)

In [None]:
#Import libraries
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline


from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
#Define the vector assembler pipeline stage
#Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.

input_columns = [col for col in df_loaded.columns if col != 'SoundLevelDecibels']
assembler = VectorAssembler(inputCols=input_columns, outputCol='features')


In [None]:
# Scale the "features" using standard scaler and store in "scaledFeatures" column

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=False)

In [None]:
#Create a LinearRegression stage to predict "SoundLevelDecibels"

lr = LinearRegression(featuresCol='scaledFeatures', labelCol='SoundLevelDecibels')

In [None]:
# Create the pipeline with all three stages
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [None]:
#Split the data into training and testing with a 70:30 split. Set the value of seed to 42

# Set the seed value
seed_value = 42

# Split the data into training and testing sets with a 70:30 split
train_data, test_data = df_loaded.randomSplit([0.7, 0.3], seed=seed_value)

# Display the count of rows in each set
print("Training set count: ", train_data.count())
print("Testing set count: ", test_data.count())

In [None]:
#Fit the pipeline
pipelineModel = pipeline.fit(train_data)

In [None]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

## PART 3 - EVALUATE THE MODEL

In [None]:
# Make predictions on the testing data
predictions = pipelineModel.transform(test_data)

In [None]:
#Print the MSE(mean squared error)

# Create a RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='SoundLevelDecibels', predictionCol='prediction', metricName='mse')

# Calculate the MSE on the testing data
mse = evaluator.evaluate(predictions)

# Print the MSE
print("Mean Squared Error (MSE):", mse)

In [None]:
#Print the MAE(Mean absolute error)


# Create a RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='SoundLevelDecibels', predictionCol='prediction', metricName='mae')

# Calculate the MAE on the testing data
mae = evaluator.evaluate(predictions)

# Print the MAE
print("Mean Absolute Error (MAE):", mae)


In [None]:
#Print the R-Squared(R2)

# Create a RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='SoundLevelDecibels', predictionCol='prediction', metricName='r2')

# Calculate the R2 on the testing data
r2 = evaluator.evaluate(predictions)

# Print the R2
print("R-Squared (R2):", r2)

In [None]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

## PART 4 - PERSIST THE MODEL

In [None]:
#Save the model

# Specify the path to save the model
model_path = "Final_Project"

# Save the pipeline model to the specified path
pipelineModel.write().overwrite().save(model_path)

# Print a confirmation message
print(f"Pipeline model saved to {model_path}")

In [None]:
#Load the model from the path "Final_Project"

from pyspark.ml import PipelineModel

# Specify the path where the model is saved
loaded_model_path = "Final_Project"

# Load the pipeline model from the specified path
loadedPipelineModel = PipelineModel.load(loaded_model_path)

In [None]:

# Make predictions on the test data using the loaded model
predictions_loaded_model = loadedPipelineModel.transform(test_data)

# Display the predictions
predictions_loaded_model.select('SoundLevelDecibels', 'prediction').show(truncate=False)

In [None]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")