In [1]:
import warnings
warnings.filterwarnings('ignore')
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

In [3]:
spark = SparkSession.builder.appName("project").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/21 15:18:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)
df.show(5)

                                                                                

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [5]:
df.groupBy('Origin').count().orderBy('count').show()

+--------+-----+
|  Origin|count|
+--------+-----+
|    NULL|    1|
|European|   70|
|Japanese|   88|
|American|  247|
+--------+-----+



In [6]:
rowcount1 = df.count()

In [7]:
df = df.dropDuplicates()

In [8]:
rowcount2 = df.count()

In [9]:
df = df.dropna()

In [10]:
rowcount3 = df.count()

In [11]:
df = df.withColumnRenamed("Engine Disp", "Engine_Disp")

In [12]:
df.write.parquet("mpg-cleaned.parquet")

                                                                                

In [13]:
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df.columns[2])

import os

print("mpg-cleaned.parquet exists :", os.path.isdir("mpg-cleaned.parquet"))

Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp
mpg-cleaned.parquet exists : True


In [14]:
df = spark.read.parquet('./mpg-cleaned.parquet')
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [15]:
rowcount4 = df.count()

In [16]:
df.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine_Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [17]:
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

In [18]:
assembler = VectorAssembler(inputCols=["Cylinders", "Engine_Disp", "Horsepower", "Weight","Accelerate","Year"],
                            outputCol="features")

In [19]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features") 

In [20]:
lr_scaled = LinearRegression(featuresCol="scaled_features", labelCol="MPG")

In [21]:
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr_scaled])

In [22]:
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=42)

In [23]:
pipeline_model = pipeline.fit(train_data)

23/11/21 15:18:39 WARN Instrumentation: [542d269e] regParam is zero, which might cause numerical instability and overfitting.
23/11/21 15:18:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/21 15:18:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [24]:
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)

ps = [str(x).split("_")[0] for x in pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

print("Label column = ", lr_scaled.getLabelCol())

Part 2 - Evaluation
Total rows =  385
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


In [25]:
predictions = pipeline_model.transform(test_data)

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG",
                                metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

12.226745835571704


In [27]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG",
                                metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

2.8457151130136515


In [28]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG",
                                metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)

0.8018737394895651


In [29]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse,2))
print("Mean Absolute Error = ", round(mae,2))
print("R Squared = ", round(r2,2))

lrmodel = pipeline_model.stages[-1]

print("Interecept = ", round(lrmodel.intercept, 2))

Part 3 - Evaluation
Mean Squared Error =  12.23
Mean Absolute Error =  2.85
R Squared =  0.8
Interecept =  -17.37


In [30]:
pipeline_model.write().save("Project")

In [31]:
loaded_pipeline_model = PipelineModel.load("./Project")

                                                                                

In [32]:
predictions = loaded_pipeline_model.transform(test_data)

In [33]:
predictions.select("MPG", "prediction").show()

+----+------------------+
| MPG|        prediction|
+----+------------------+
|10.0|6.9607645775084315|
|11.0| 8.545911819807586|
|12.0|10.226709705747634|
|12.0| 5.446415257212681|
|13.0|21.430212400590502|
|13.0|17.437792078060024|
|13.0|11.245494102903692|
|13.0|14.180626433497519|
|13.0| 9.959082691689243|
|13.0|11.111417171060054|
|13.0|13.170917811818647|
|13.0|10.889439874574833|
|13.0| 7.144536211553209|
|13.0| 4.279565485352773|
|13.0| 8.611192450277887|
|14.0|10.356052138542552|
|14.0|16.057308446272753|
|14.0|12.327668542375658|
|14.0|10.787367112520801|
|14.0|10.983935628156612|
+----+------------------+
only showing top 20 rows



In [34]:
print("Part 4 - Evaluation")

loaded_model = loaded_pipeline_model.stages[-1]
total_stages = len(loaded_pipeline_model.stages)
input_cols = loaded_pipeline_model.stages[1].getInputCols()

print("Number of stages in the pipeline = ", total_stages)

for i, j in zip(input_cols, loaded_model.coefficients):
    print(f"Coefficient for {i} is {round(j, 4)}")

Part 4 - Evaluation
Number of stages in the pipeline =  4
Coefficient for Cylinders is 0.119
Coefficient for Engine_Disp is 0.4971
Coefficient for Horsepower is -0.2517
Coefficient for Weight is -5.7923
Coefficient for Accelerate is 0.2369
Coefficient for Year is 2.9258


In [35]:
spark.stop()