In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

In [4]:
spark = SparkSession.builder.appName("FinalProject").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/22 11:07:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)
df.show(5)

                                                                                

+---------+-------------+-----------+------------------+-----------------------+----------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevel|
+---------+-------------+-----------+------------------+-----------------------+----------+
|      800|          0.0|     0.3048|              71.3|             0.00266337|   126.201|
|     1000|          0.0|     0.3048|              71.3|             0.00266337|   125.201|
|     1250|          0.0|     0.3048|              71.3|             0.00266337|   125.951|
|     1600|          0.0|     0.3048|              71.3|             0.00266337|   127.591|
|     2000|          0.0|     0.3048|              71.3|             0.00266337|   127.461|
+---------+-------------+-----------+------------------+-----------------------+----------+
only showing top 5 rows



In [6]:
rowcount1 = df.count()
print(rowcount1)

1522


In [7]:
df = df.dropDuplicates()

In [8]:
rowcount2 = df.count()
print(rowcount2)

1503


In [9]:
df = df.dropna()

In [10]:
rowcount3 = df.count()
print(rowcount3)

1499


In [11]:
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



In [12]:
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

                                                                                

In [13]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])

import os

print("NASA_airfoil_noise_cleaned.parquet exists : ", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

Part 1 - Evaluation
Total rows =  1522
Total rows after dropping duplicate rows =  1503
Total rows after dropping duplicate rows and rows with null values =  1499
New column name =  SoundLevelDecibels
NASA_airfoil_noise_cleaned.parquet exists :  True


In [14]:
df = spark.read.parquet("./NASA_airfoil_noise_cleaned.parquet")
df.show(5)

+---------+-------------+-----------+------------------+-----------------------+------------------+
|Frequency|AngleOfAttack|ChordLength|FreeStreamVelocity|SuctionSideDisplacement|SoundLevelDecibels|
+---------+-------------+-----------+------------------+-----------------------+------------------+
|     4000|          3.0|     0.3048|              31.7|             0.00529514|           115.608|
|     3150|          2.0|     0.2286|              31.7|             0.00372371|           121.527|
|     2000|          7.3|     0.2286|              31.7|              0.0132672|           115.309|
|     2000|          5.4|     0.1524|              71.3|             0.00401199|           131.111|
|      500|          9.9|     0.1524|              71.3|              0.0193001|           131.279|
+---------+-------------+-----------+------------------+-----------------------+------------------+
only showing top 5 rows



In [15]:
rowcount4 = df.count()
print(rowcount4)

1499


In [16]:
df.printSchema()

root
 |-- Frequency: integer (nullable = true)
 |-- AngleOfAttack: double (nullable = true)
 |-- ChordLength: double (nullable = true)
 |-- FreeStreamVelocity: double (nullable = true)
 |-- SuctionSideDisplacement: double (nullable = true)
 |-- SoundLevelDecibels: double (nullable = true)



In [17]:
assembler = VectorAssembler(inputCols =df.columns[:-1], 
                            outputCol="features")

In [18]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [19]:
lr = LinearRegression(featuresCol="scaled_features", labelCol="SoundLevelDecibels")

In [20]:
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [21]:
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

In [22]:
pipeline_model = pipeline.fit(train_data)

23/11/22 11:07:44 WARN Instrumentation: [b07b46b4] regParam is zero, which might cause numerical instability and overfitting.
23/11/22 11:07:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/22 11:07:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [23]:
print("Part 2 - Evaluation")
print("total rows = ", rowcount4)

ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])

print("Label Column = ", lr.getLabelCol())

Part 2 - Evaluation
total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Label Column =  SoundLevelDecibels


In [24]:
predictions = pipeline_model.transform(test_data)

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

24.99766625502418


In [27]:
evaluator = RegressionEvaluator(predictionCol="prediction",  labelCol="SoundLevelDecibels",
                                metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

3.9136790958812044


In [28]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.4959688408974623


In [29]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lr_model = pipeline_model.stages[-1]

print("Intercept = ", round(lr_model.intercept, 2))

Part 3 - Evaluation
Mean Squared Error =  25.0
Mean Absolute Error =  3.91
R Squared =  0.5
Intercept =  132.88


In [30]:
pipeline_model.write().save("Final_Project_Linear_Regression")

In [31]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="SoundLevelDecibels")

In [32]:
dtree_pipeline =Pipeline(stages=[assembler, scaler, dt])

In [33]:
dtree_pipeline_model = dtree_pipeline.fit(train_data)

                                                                                

In [34]:
print("Part 2 - Evaluation - Decision Tree")
print("Total rows = ", rowcount4)

ps = [str(x).split("_")[0] for x in dtree_pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])

print("Label column = ", dt.getLabelCol())

Part 2 - Evaluation - Decision Tree
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Label column =  SoundLevelDecibels


In [35]:
predictions = dtree_pipeline_model.transform(test_data)

In [36]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

19.8198755115137


In [37]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

3.612491355835447


In [38]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.600369301461154


In [39]:
print("Part 3 - Evaluation - Decision Tree")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))


Part 3 - Evaluation - Decision Tree
Mean Squared Error =  19.82
Mean Absolute Error =  3.61
R Squared =  0.6


In [40]:
dtree_pipeline_model.write().save("Final_Project_Decision_Tree_Regressor")

In [41]:
from pyspark.ml.regression import GBTRegressor

gbt_reg = GBTRegressor(featuresCol="scaled_features",labelCol="SoundLevelDecibels")

In [42]:
gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt_reg])

In [43]:
gbt_pipeline_model = gbt_pipeline.fit(train_data)

In [44]:
print("Part 2 - Evaluation - Gradient-Boosted Tree")
print("Total rows = ", rowcount4)

ps = [str(x).split("_")[0] for x in gbt_pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])

print("Label column = ", gbt_reg.getLabelCol())

Part 2 - Evaluation - Gradient-Boosted Tree
Total rows =  1499
Pipeline Stage 1 =  VectorAssembler
Pipeline Stage 2 =  StandardScaler
Label column =  SoundLevelDecibels


In [45]:
predictions = gbt_pipeline_model.transform(test_data)

In [46]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="mse")

mse = evaluator.evaluate(predictions)
print(mse)

7.509544752318589


In [47]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="mae")

mae = evaluator.evaluate(predictions)
print(mae)

2.0588709930536946


In [48]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SoundLevelDecibels",
                                metricName="r2")

r2 = evaluator.evaluate(predictions)
print(r2)

0.8485840835208855


In [49]:
print("Part 3 - Evaluation Gradient-Boosted Tree")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))


Part 3 - Evaluation Gradient-Boosted Tree
Mean Squared Error =  7.51
Mean Absolute Error =  2.06
R Squared =  0.85


In [50]:
gbt_pipeline_model.write().save("Final_Project_GBT_Regressor")

In [51]:
loaded_pipeline_model = PipelineModel.load("./Final_Project_GBT_Regressor")

                                                                                

In [52]:
predictions = loaded_pipeline_model.transform(test_data)

In [53]:
predictions.select("SoundLevelDecibels", "prediction").show()

+------------------+------------------+
|SoundLevelDecibels|        prediction|
+------------------+------------------+
|           128.679|129.78067413838593|
|            133.42|133.53950207093914|
|           119.146|121.58695979590908|
|           116.074|121.58695979590908|
|           134.319| 135.5994700590439|
|            125.01|126.54669370971948|
|           125.941|129.06361941345736|
|           130.588| 132.8517779933734|
|           128.354|130.24598608919504|
|           121.783|123.79573222209692|
|            122.94|123.91768437028422|
|           116.146|121.82229447797067|
|           114.044| 114.4636944779707|
|           109.951|112.96013989565817|
|           125.974|125.25880575717201|
|           116.066|120.45753252143763|
|           118.595| 120.2243916461225|
|           126.395| 130.9113645035651|
|           130.089|129.78067413838593|
|           131.889|131.12950470298867|
+------------------+------------------+
only showing top 20 rows



In [54]:
print("Part 4 - Evaluation")

loaded_model = loaded_pipeline_model.stages[-1]
total_stages = len(loaded_pipeline_model.stages)
input_cols = loaded_pipeline_model.stages[0].getInputCols()

print("Number of Stages in the Pipeline = ", total_stages)


Part 4 - Evaluation
Number of Stages in the Pipeline =  3


In [55]:
spark.stop()

### For this project with the given dataset, Gradient-Boosted Tree Regression got the best results 