In [1]:
# Starting Spark Session as sc after calling init() of findspark

import findspark
findspark.init()

# creating spark as pyspark object

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()



In [2]:
#Loading the Sample_dataset.csv file from same working directory

data = spark.read.csv('sample_dataset.csv', header=True, inferSchema=True)

In [3]:
# looking at data type of each column to see what data types inferSchema=TRUE paramter has set for each column

data.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- age: integer (nullable = true)



In [5]:
#Displaying first 10 rows of data

data.show(10)

+----+---------+------------+----------+------+------------+---+
| mpg|cylinders|displacement|horsepower|weight|acceleration|age|
+----+---------+------------+----------+------+------------+---+
|18.0|        8|       307.0|       130|  3504|        12.0| 49|
|15.0|        8|       350.0|       165|  3693|        11.5| 49|
|18.0|        8|       318.0|       150|  3436|        11.0| 49|
|16.0|        8|       304.0|       150|  3433|        12.0| 49|
|17.0|        8|       302.0|       140|  3449|        10.5| 49|
|15.0|        8|       429.0|       198|  4341|        10.0| 49|
|14.0|        8|       454.0|       220|  4354|         9.0| 49|
|14.0|        8|       440.0|       215|  4312|         8.5| 49|
|14.0|        8|       455.0|       225|  4425|        10.0| 49|
|15.0|        8|       390.0|       190|  3850|         8.5| 49|
+----+---------+------------+----------+------+------------+---+
only showing top 10 rows



In [59]:
#Create a Feature array by omitting the last column

feature_columns = data.columns[:-1] 

from pyspark.ml.feature import VectorAssembler
vect_assembler = VectorAssembler(inputCols=feature_columns,outputCol="inp_features")

In [60]:
#Utilize Assembler created above in order to add the feature column to the original dataset

data_features = vect_assembler.transform(data)

In [61]:
#Display the data having additional column named features
# independent feature values combined in one list and inserted in column as last cell.

data_features.show()

+----+---------+------------+----------+------+------------+---+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|age|        inp_features|
+----+---------+------------+----------+------+------------+---+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0| 49|[18.0,8.0,307.0,1...|
|15.0|        8|       350.0|       165|  3693|        11.5| 49|[15.0,8.0,350.0,1...|
|18.0|        8|       318.0|       150|  3436|        11.0| 49|[18.0,8.0,318.0,1...|
|16.0|        8|       304.0|       150|  3433|        12.0| 49|[16.0,8.0,304.0,1...|
|17.0|        8|       302.0|       140|  3449|        10.5| 49|[17.0,8.0,302.0,1...|
|15.0|        8|       429.0|       198|  4341|        10.0| 49|[15.0,8.0,429.0,1...|
|14.0|        8|       454.0|       220|  4354|         9.0| 49|[14.0,8.0,454.0,2...|
|14.0|        8|       440.0|       215|  4312|         8.5| 49|[14.0,8.0,440.0,2...|
|14.0|        8|       455.0|       225|  4425|       

In [62]:
#Select only Features and Label from dataset
#here label is age of vehicle

finalized_dataset = data_features.select("inp_features","age")

finalized_dataset.show(5)

+--------------------+---+
|        inp_features|age|
+--------------------+---+
|[18.0,8.0,307.0,1...| 49|
|[15.0,8.0,350.0,1...| 49|
|[18.0,8.0,318.0,1...| 49|
|[16.0,8.0,304.0,1...| 49|
|[17.0,8.0,302.0,1...| 49|
+--------------------+---+
only showing top 5 rows



In [63]:
#Spliting the data into training and testing with 80% data going in training and 20% in testing

train_dataset, test_dataset = finalized_dataset .randomSplit([0.8, 0.2])

### Linear Regression model

In [64]:
#Importing Linear Regression class as lr

from pyspark.ml.regression import LinearRegression as lr

In [65]:
#Create the lr object named having feature column as inp_features and Label column as age

L_obj = lr(featuresCol="inp_features", labelCol="age")

In [66]:
#Train the model by using fit() method with lr object

model = L_obj.fit(train_dataset)

In [67]:
#Predict the age using the evulate method 

pred = model.evaluate(test_dataset)

In [68]:
#Show the predicted age values along side actual age values in the dataset

pred.predictions.show(5)

+--------------------+---+------------------+
|        inp_features|age|        prediction|
+--------------------+---+------------------+
|[9.0,8.0,304.0,19...| 49| 45.43928180396885|
|[10.0,8.0,360.0,2...| 49| 46.38094468989763|
|[11.0,8.0,318.0,2...| 49| 46.29940745229791|
|[12.0,8.0,383.0,1...| 48|42.885233412020625|
|[13.0,8.0,302.0,1...| 44| 48.71031554781278|
+--------------------+---+------------------+
only showing top 5 rows



In [69]:
#Evaluate the model using metric like  Root Mean Square Error(RMSE) and 

from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="age", predictionCol="prediction")

# Root Mean Square Error

rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
print("RMSE: %.2f" % rmse)



RMSE: 2.77


Hence there is much difference between predicted and actual.

In [71]:
#stopping the spark session

spark.stop()