In [None]:
%pip install pyspark==3.1.2 -q
%pip install findspark -q

# Suppress warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python
import findspark
findspark.init()

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

# Create a SparkSession
spark = SparkSession.builder.appName("Airfoil Noise").getOrCreate()

# Load the dataset
df = spark.read.csv("NASA_airfoil_noise_raw.csv", header=True, inferSchema=True)

#Print top 5 rows of the dataset
df.show(5)

#Print the total number of rows in the dataset
rowcount1 = df.count()
print(rowcount1)

#Drop all the duplicate rows from the dataset
df = df.dropDuplicates()

#Print the total number of rows in the dataset
rowcount2 = df.count()
print(rowcount2)

#Drop all the rows that contain null values from the dataset
df = df.na.drop()

#Print the total number of rows in the dataset
rowcount3 = df.count()
print(rowcount3)

#Rename the column "SoundLevel" to "SoundLevelDecibels"
df = df.withColumnRenamed("SoundLevel", "SoundLevelDecibels")

#Save the dataframe in parquet format
df.write.parquet("NASA_airfoil_noise_cleaned.parquet")

# Part 1 - Evaluation
print("Part 1 - Evaluation")
print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("New column name = ", df.columns[-1])
import os
print("NASA_airfoil_noise_cleaned.parquet exists :", os.path.isdir("NASA_airfoil_noise_cleaned.parquet"))

# Part 2 - Create a Machine Learning Pipeline

# Load data from "NASA_airfoil_noise_cleaned.parquet"
df = spark.read.parquet("NASA_airfoil_noise_cleaned.parquet")

#Print the total number of rows in the dataset
rowcount4 = df.count()
print(rowcount4)

#Define the VectorAssembler pipeline stage
assembler = VectorAssembler(inputCols=[col for col in df.columns if col != "SoundLevelDecibels"], outputCol="features")

#Define the StandardScaler pipeline stage
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

#Define the Model creation pipeline stage
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="SoundLevelDecibels")

#Build the pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

#Split the data
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)

#Fit the pipeline
pipelineModel = pipeline.fit(trainingData)

# Part 2 - Evaluation
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
ps = [str(x).split("_")[0] for x in pipeline.getStages()]
print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])
print("Label column = ", lr.getLabelCol())

# Part 3 - Evaluate the Model

#Predict using the model
predictions = pipelineModel.transform(testingData)

#Print the MSE
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

#Print the MAE
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

#Print the R-Squared (R2)
evaluator = RegressionEvaluator(labelCol="SoundLevelDecibels", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

# Part 3 - Evaluation
print("Part 3 - Evaluation")
print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))
lrModel = pipelineModel.stages[-1]
print("Intercept = ", round(lrModel.intercept,2))

# Part 4 - Persist the Model

#Save the model to the path "Project"
pipelineModel.save("Project")

#Load the model from the path "Project"
loadedPipelineModel = PipelineModel.load("Project")

#Make predictions using the loaded model on the test data
predictions = loadedPipelineModel.transform(testingData)

#Show the predictions
predictions.select("SoundLevelDecibels", "prediction").show(5)

# Part 4 - Evaluation
print("Part 4 - Evaluation")
loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[0].getInputCols()
print("Number of stages in the pipeline = ", totalstages)
for i, j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

# Stop Spark Session
spark.stop()
