<a href="https://colab.research.google.com/github/mohamedgomaagoda/sparkML/blob/main/sparkML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:

!pip install pyspark==3.1.2 -q
!pip install findspark -q



[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m20.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [4]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
spark = SparkSession.builder.appName("Airfoil noise prediction").getOrCreate()

In [8]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv

--2024-01-18 18:48:25--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/NASA_airfoil_noise_raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 198.23.119.245
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|198.23.119.245|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 60682 (59K) [text/csv]
Saving to: ‘NASA_airfoil_noise_raw.csv’


2024-01-18 18:48:26 (837 KB/s) - ‘NASA_airfoil_noise_raw.csv’ saved [60682/60682]



In [9]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

In [10]:
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [13]:
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevel: double (nullable = true)



In [14]:
rowcount1 = df.count()
print(rowcount1)

1522


In [15]:
df = df.dropDuplicates()
rowcount2 = df.count()
print(rowcount2)

1503


In [16]:
df = df.dropna()
rowcount3 =df.count()
print(rowcount3)

1499


In [17]:
df = df.withColumnRenamed("SoundLevel","SoundLevelDecibels")

In [18]:
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

In [19]:
print(df.count())

1499


In [20]:
assembler = VectorAssembler(inputCols=["Frequency","AngleOfAttack","ChordLength","FreeStreamVelocity","SuctionSideDisplacement"], outputCol="features")

In [21]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

In [22]:
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='SoundLevelDecibels', predictionCol='prediction')

In [23]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [24]:
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

In [25]:
pipelineModel = pipeline.fit(trainingData)

In [28]:
predictions = pipelineModel.transform(testingData)

In [29]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

print(mse)

21.667074418219322


In [30]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

print("Mean Absolute Error =", round(mae, 2))

Mean Absolute Error = 3.61


In [31]:
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print("R Squared =", round(r2, 2))

R Squared = 0.56


In [32]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrModel = pipelineModel.stages[-1]

print("Intercept = ", round(lrModel.intercept,2))

Part 3 - Evaluation
Mean Squared Error =  21.67
Mean Absolute Error =  3.61
R Squared =  0.56
Intercept =  124.95


In [33]:
model_path = "Final_Project"
pipelineModel.write().overwrite().save(model_path)

In [34]:
loadedPipelineModel = PipelineModel.load(model_path)

In [35]:
loadedPredictions = loadedPipelineModel.transform(testingData)

In [36]:
loadedPredictions.select("SoundLevelDecibels", "prediction").show(5, truncate=False)

+------------------+------------------+
|SoundLevelDecibels|prediction        |
+------------------+------------------+
|125.966           |131.43112322688128|
|121.527           |122.37513428895171|
|129.116           |119.76654085222155|
|123.514           |119.04621915131847|
|119.209           |121.65469957424314|
+------------------+------------------+
only showing top 5 rows



In [37]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  3
Coefficient for Frequency is -3.9373
Coefficient for AngleOfAttack is -2.5227
Coefficient for ChordLength is -3.294
Coefficient for FreeStreamVelocity is 1.5372
Coefficient for SuctionSideDisplacement is -1.7176
