##  Build an ML Pipeline for Airfoil noise prediction


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [2]:

def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')



import findspark
findspark.init()

 Perform ETL activity

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

In [4]:
spark = SparkSession.builder.appName("MachineLearningWithSpark").getOrCreate()

24/10/14 10:34:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


 Load the csv file into a dataframe


In [5]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv


--2024-10-14 10:35:00--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv.1’


2024-10-14 10:35:00 (49.7 MB/s) - ‘NASA_airfoil_noise_raw.csv.1’ saved [60682/60682]



Load the dataset into the spark dataframe


In [7]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header= True, inferSchema= True)

 Print top 5 rows of the dataset

In [8]:
df.show()

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
|     2500|          0.0|     0.3048|              71.3|             0.00266337|   125.571|
|     3150|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     4000|          0.0|     0.3048|              71.3|             0.00266337|

 Print the total number of rows in the 


In [10]:
rowcount1 = df.count()
print(rowcount1)



1503


                                                                                

 Drop all the duplicate rows 

In [11]:
df = df.drop_duplicates()

 Print the total number of rows 

In [12]:
rowcount2 = df.count()
print(rowcount2)



1503


                                                                                

 Drop all the rows that contain null values 

In [13]:
df = df.dropna()

 Print the total number of rows 

In [14]:
rowcount3 = df.count()
print(rowcount3)



1499


                                                                                

 Rename the column "SoundLevel" to "SoundLevelDecibels"

In [15]:
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")

Save the dataframe in parquet format as they provide efficient storage, faster query performance, and compatibility with big data processing frameworks.

In [16]:
df.write.mode("overwrite").parquet("NASA_airfoil_noise_cleaned.parquet")

[Stage 15:>                                                       (0 + 8) / 200]24/10/14 10:41:03 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
24/10/14 10:41:03 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
24/10/14 10:41:03 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

Evaluation:

In [17]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1503
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists : True


# Create a  Machine Learning Pipeline

 Load data from "NASA_airfoil_noise_cleaned.parquet" into a dataframe

In [18]:
df = df = df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

 Print the total number of rows


In [19]:
rowcount4 = df.count()
print(rowcount4)



1499


                                                                                

 Define the VectorAssembler pipeline stage

In [20]:
assembler = VectorAssembler(inputCols=["Frequency", "AngleOfAttack","ChordLength", "FreeStreamVelocity","SuctionSideDisplacement"],outputCol="features")
df_transformed = assembler.transform(df)
df_transformed.show(5)
df_transformed.printSchema()

+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|            features|
+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+
|      630|          0.0|     0.3048|              31.7|             0.00331266|           129.095|[630.0,0.0,0.3048...|
|     4000|          0.0|     0.3048|              31.7|             0.00331266|           118.145|[4000.0,0.0,0.304...|
|     4000|          1.5|     0.3048|              39.6|             0.00392107|           117.741|[4000.0,1.5,0.304...|
|      800|          4.0|     0.3048|              71.3|             0.00497773|           131.755|[800.0,4.0,0.3048...|
|     1250|          0.0|     0.2286|              31.7|              0.0027238|           128.805|[1250.0,0.0,0.228...|
+---------+-------------+-------

Define the StandardScaler pipeline stage

In [21]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
df_scaled = scaler.fit(df_transformed).transform(df_transformed)
df_scaled.show(5)
df_scaled.printSchema()

                                                                                

+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|            features|      scaledFeatures|
+---------+-------------+-----------+------------------+-----------------------+------------------+--------------------+--------------------+
|      630|          0.0|     0.3048|              31.7|             0.00331266|           129.095|[630.0,0.0,0.3048...|[0.19963520038433...|
|     4000|          0.0|     0.3048|              31.7|             0.00331266|           118.145|[4000.0,0.0,0.304...|[1.26752508180528...|
|     4000|          1.5|     0.3048|              39.6|             0.00392107|           117.741|[4000.0,1.5,0.304...|[1.26752508180528...|
|      800|          4.0|     0.3048|              71.3|             0.00497773|           131.755|[800.0,4.0,0.3048...|[0.25350501636105...|
|     

 Define the Model creation pipeline stage

In [22]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

Build the pipeline

Build a pipeline using the above three stages


In [23]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

Split the data

In [24]:
(trainingData, testingData) = df.randomSplit([0.7,0.3], seed = 42)

Fit the pipeline

In [25]:
pipelineModel = pipeline.fit(trainingData)

24/10/14 10:45:59 WARN util.Instrumentation: [e66a0cfc] regParam is zero, which might cause numerical instability and overfitting.
[Stage 26:>                                                         (0 + 8) / 8]24/10/14 10:46:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/10/14 10:46:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/10/14 10:46:01 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/10/14 10:46:01 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

#### Part 2 - Evaluation


In [26]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr.getLabelCol())

Part 2 - Evaluation
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Pipeline Stage 3 =  LinearRegression
Label column =  SoundLevelDecibels


# Evaluate the Model


 Predict using the model

In [27]:
predictions = pipelineModel.transform(testingData)

Print the MSE

In [28]:

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

[Stage 33:>                                                         (0 + 8) / 8]

22.593754071348812


                                                                                

Print the MAE

In [29]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

[Stage 35:>                                                         (0 + 8) / 8]

3.7336902294631287


                                                                                

 Print the R-Squared(R2)

In [30]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

[Stage 37:>                                                         (0 + 8) / 8]

0.5426016508689058


                                                                                

 Evaluation


In [31]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))


Part 3 - Evaluation
Mean Squared Error =  22.59
Mean Absolute Error =  3.73
R Squared =  0.54
Intercept =  132.6


# Persist the Model


 Save the model to the path "MachineLearningWithSpark"


In [32]:
pipelineModel.write().save("MachineLearningWithSpark")

 Load the model from the path "MachineLearningWithSpark"

In [33]:
loadedPipelineModel = PipelineModel.load("MachineLearningWithSpark")

 Make predictions using the loaded model on the testdata

In [34]:
predictions = loadedPipelineModel.transform(testingData)

 Show the predictions

In [35]:
predictions.select("SoundLevelDecibels","prediction").show()

[Stage 59:>                                                         (0 + 1) / 1]

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           127.315|123.64344009624753|
|           119.975|123.48695788614877|
|           121.783|124.38983849684254|
|           127.224|121.44706993294302|
|           122.229|125.68312652454188|
|           122.754|119.00135887553772|
|           127.564| 126.5260736531985|
|           126.149|124.72369322766609|
|           120.076|129.24665689814083|
|           138.123|130.62951864347926|
|           127.314|127.13089533096246|
|           130.715|125.89163510250594|
|           129.367|129.30423088951827|
|           122.905| 129.9451290107514|
|           127.127|128.10022579415522|
|           127.417|126.28072873047776|
|           128.698|122.06234864411809|
|           131.073|122.38819030163322|
|           135.368|125.76877819819666|
|           124.514|125.43708134590952|
+------------------+------------------+
only showing top 20 rows



                                                                                

 Evaluation

In [36]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9728
Coefficient for AngleOfAttack is -2.4775
Coefficient for ChordLength is -3.3818
Coefficient for FreeStreamVelocity is 1.5789
Coefficient for SuctionSideDisplacement is -1.6465


In [37]:
spark.stop()

<!--
## Change Log
-->


<!--
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|
-->
