## ML Pipeline for Airfoil noise prediction


## Goal


### Create an ML pipe line to create a model to predict the SoundLevel.



## Objectives

1. ETL
  - Remove duplicates and drop rows with null values
  - Make transformations
  - Store the cleaned data in parquet format
2. Machine Learning Pipeline
  - Create a machine learning pipeline for prediction
3. Evaluate the Model
  - Evaluate the model using relevant metrics
4. Persist the Model 
  - Save the model for future production use
  - Load and verify the stored model


## Datasets

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


## Setup


We will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 : ETL


### Import required libraries


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel


from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer


from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Create a spark session


In [None]:
spark = SparkSession.builder.appName("Airfoil Prediction").getOrCreate()

23/12/20 21:22:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load the csv file into a dataframe

Load the dataset into the spark dataframe


In [6]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv("NASA_airfoil_noise_raw.csv", inferSchema=True, header=True)

                                                                                

### Explore the dataset

In [None]:
# show top 5 rows
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [8]:
# Print the total number of rows in the dataset 
rowcount1 = df.count()
print(rowcount1)

1522


### Drop all the duplicate rows from the dataset


In [10]:
df = df.dropDuplicates()
rowcount2 = df.count()
print(rowcount2)



1503


                                                                                

### Drop all the rows that contain null values from the dataset


In [14]:
df = df.dropna()
rowcount3 = df.count()
print(rowcount3)



1499


                                                                                

### Rename the column "SoundLevel" to "SoundLevelDecibels"


In [16]:
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")

### Save the dataframe in parquet formant, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [17]:

df.write.parquet("NASA_airfoil_noise_cleaned.parquet")


[Stage 15:>                                                       (0 + 8) / 200]23/12/20 21:26:07 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/12/20 21:26:07 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/12/20 21:26:07 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

In [18]:
# Sanity Check
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


## Part 2: Create a  Machine Learning Pipeline


### Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe

In [21]:
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

### Print the total number of rows in the dataset


In [22]:
rowcount4 = df.count()
print(rowcount4)



1499


                                                                                

### Define the VectorAssembler pipeline stage


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [27]:
df.columns

['Frequency',
 'AngleOfAttack',
 'ChordLength',
 'FreeStreamVelocity',
 'SuctionSideDisplacement',
 'SoundLevelDecibels']

In [29]:
columns = df.columns
columns.remove("SoundLevelDecibels")
assembler = VectorAssembler(inputCols=columns, outputCol="features")

In [34]:
# check assembly input columns
assembler.getInputCols()

['Frequency',
 'AngleOfAttack',
 'ChordLength',
 'FreeStreamVelocity',
 'SuctionSideDisplacement']

### Define the StandardScaler pipeline stage


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [37]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

### Define the StandardScaler pipeline stage


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"


In [40]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

### Build the pipeline


Build a pipeline using the above three stages


In [None]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

### Train/Test Split the data

In [None]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42

(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

### Fit the pipeline


In [44]:
# Fit the pipeline using the training data

pipelineModel = pipeline.fit(trainingData)

23/12/20 21:35:35 WARN util.Instrumentation: [9778e089] regParam is zero, which might cause numerical instability and overfitting.
[Stage 25:>                                                         (0 + 8) / 8]23/12/20 21:35:38 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/20 21:35:38 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/12/20 21:35:38 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/12/20 21:35:38 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [45]:
# Sanity Check for ML pipeline 
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


## Part 3: Evaluate the Model

### Predict using the model


In [None]:
predictions = pipelineModel.transform(testingData)

### Evaluate with MSE

In [47]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

[Stage 32:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

### Evaluate with MAE


In [48]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)



3.7336902294631287


                                                                                

### Evaluate with R-Squared(R2)

In [49]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)


[Stage 36:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

In [None]:
# sanity check for Evaluating the model
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


## Part 4: Persist the Model


###  Save the model


In [51]:
! mkdir airfoil_prediction
pipelineModel.write().overwrite().save("./airfoil_prediction/")

                                                                                

### Load the model


In [52]:
loadedPipelineModel = PipelineModel.load("./airfoil_prediction/")

                                                                                

### Make predictions using the loaded model on the testdata

In [53]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)


### Show the predictions


In [54]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here
predictions.select("SoundLevelDecibels", "prediction").show(5)

[Stage 58:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
+------------------+------------------+
only showing top 5 rows



                                                                                

In [None]:
# sanity check
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465


### Stop Spark Session


In [56]:
spark.stop()