## **Build an ML Pipeline for Airfoil noise prediction**


## **Scenario**


You are a data engineer at an aeronautics consulting company. Your company prides itself in being able to efficiently design airfoils for use in planes and sports cars. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able to do ETL jobs and build ML pipelines. 

In this project you will use the modified version of the NASA Airfoil Self Noise dataset. You will clean this dataset, by dropping the duplicate rows, and removing the rows with null values, create an ML pipe line to create a model that will predict the SoundLevel based on all the other columns and evaluate the model and towards the end you will persist the model.



## **Datasets**

In this lab you will be using dataset(s):

 - The original dataset can be found here NASA airfoil self noise dataset. https://archive.ics.uci.edu/dataset/291/airfoil+self+noise
 
 - This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


Diagram of an airfoil. - For informational purpose


![Airfoil with flow](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_with_flow.png)


Diagram showing the Angle of attack. - For informational purpose


![Airfoil angle of attack](https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/images/Airfoil_angle_of_attack.jpg)


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### **Installing Required Libraries**

In [1]:
!pip install pyspark

## **Part 1. Perform ETL**


### **1. Import required libraries**


In [4]:
#your code goes here

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.pipeline import Pipeline, PipelineModel

### **2. Create a spark session**


In [5]:
#Create a SparkSession

spark = SparkSession.builder.appName('NASA Airfoil').getOrCreate()

25/04/24 18:40:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### **3. Load the csv file into a dataframe**


Load the dataset into the spark dataframe


In [7]:
# Load the dataset that you have downloaded in the previous task

df = spark.read.csv('NASA_airfoil_noise_raw.csv', header=True, inferSchema=True)

                                                                                

Print top 5 rows of the dataset


In [8]:
#your code goes here
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



Print the total number of rows in the dataset


In [9]:
#your code goes here
rowcount1 = df.count()
print(rowcount1)

1522


Drop all the duplicate rows from the dataset


In [10]:
df = df.dropDuplicates()

Print the total number of rows in the dataset


In [11]:
#your code goes here

rowcount2 = df.count()
print(rowcount2)




1503


                                                                                

Drop all the rows that contain null values from the dataset


In [12]:
df = df.dropna()

Print the total number of rows in the dataset


In [13]:
#your code goes here

rowcount3 = df.count()
print(rowcount3)




1499


                                                                                

Rename the column "SoundLevel" to "SoundLevelDecibels"


In [14]:
# your code goes here

df = df.withColumnRenamed('SoundLevel', 'SoundLevelDecibels')

In [16]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



Save the dataframe in parquet format, name the file as "NASA_airfoil_noise_cleaned.parquet"


In [18]:
# your code goes here

df.write.mode('overwrite').parquet('NASA_airfoil_noise_cleaned.parquet')

[Stage 18:>                                                       (0 + 8) / 200]25/04/24 18:48:26 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/04/24 18:48:26 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
25/04/24 18:48:27 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

## **Part 2. Create a  Machine Learning Pipeline**


Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe


In [20]:
df = spark.read.parquet('NASA_airfoil_noise_cleaned.parquet/')

Print the total number of rows in the dataset


In [21]:
rowcount4 = df.count()
print(rowcount4)



1499


                                                                                

### **1. Define the VectorAssembler pipeline stage**


Stage 1 - Assemble the input columns into a single column "features". Use all the columns except SoundLevelDecibels as input features.


In [23]:
assembler = VectorAssembler(
    inputCols=['Frequency','AngleOfAttack','ChordLength','FreeStreamVelocity','SuctionSideDisplacement'],
    outputCol='features')

### **2. Define the StandardScaler pipeline stage**


Stage 2 - Scale the "features" using standard scaler and store in "scaledFeatures" column


In [30]:
scaler = StandardScaler(inputCol='features', outputCol='ScaledFeatures')

### **3. Define the Model creation pipeline stage**


Stage 3 - Create a LinearRegression stage to predict "SoundLevelDecibels"

In [31]:
lr = LinearRegression(featuresCol='ScaledFeatures', labelCol='SoundLevelDecibels')

### **4. Build the pipeline**

Build a pipeline using the above three stages


In [32]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

### **5. Split the data**


In [33]:
# Split the data into training and testing sets with 70:30 split.
# set the value of seed to 42

(trainingData, testingData) = df.randomSplit([0.7,0.3], seed=42) 

### **6. Fit the pipeline**


In [34]:
# Fit the pipeline using the training data

pipelineModel = pipeline.fit(trainingData)

25/04/24 18:59:17 WARN util.Instrumentation: [87feb786] regParam is zero, which might cause numerical instability and overfitting.
[Stage 27:>                                                         (0 + 8) / 8]25/04/24 18:59:20 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/04/24 18:59:20 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
25/04/24 18:59:20 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
25/04/24 18:59:20 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

## **Part 3. Evaluate the Model**


### **Predict using the model**


In [36]:
# Make predictions on testing data

predictions = pipelineModel.transform(testingData)

### **1. Print the MSE**


In [44]:
#your code goes here

mse_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='mse')
mse = mse_evaluator.evaluate(predictions)
print(mse)

[Stage 38:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

### **2. Print the MAE**


In [46]:
mae_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='mae')
mae = mae_evaluator.evaluate(predictions)
print(mae)



3.7336902294631287


                                                                                

### **3. Print the R-Squared(R2)**


In [48]:
r2_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='SoundLevelDecibels', metricName='r2')
r2 = r2_evaluator.evaluate(predictions)
print(r2)

[Stage 46:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

## **Part 4. Persist the Model**


### **1. Save the model to the path "NASA_Project"**


In [50]:
# Save the pipeline model as "Final_Project"
pipelineModel.save('NASA_Project')

                                                                                

### **2. Load the model from the path**


In [51]:
# Load the pipeline model you have created in the previous step
loadedPipelineModel = PipelineModel.load('NASA_Project')

### **3. Make predictions using the loaded model on the testdata**


In [52]:
# Use the loaded pipeline model and make predictions using testingData
predictions = loadedPipelineModel.transform(testingData)

### **4. Show the predictions**


In [55]:
#show top 5 rows from the predections dataframe. Display only the label column and predictions
#your code goes here
predictions.select('SoundLevelDecibels','prediction').show(5)

[Stage 69:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
+------------------+------------------+
only showing top 5 rows



                                                                                

Stop Spark Session


In [57]:
spark.stop()

<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
