## Spark MLLib Linear Regrassion

<strong> Linear Regression model to predict MPG (Miles Per Gallon)</strong>
<ul style="list-style-type:square">

<strong> Description: </strong>
<ul style="list-style-type:square">
  <li>Target variable : MPG</li>
  <li>Features: CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME.</li>
  <li>Model : Linear Regrassion</li>
</ul>

We will use the car.csv file which has columns with different car makers, technical and non technical information about all of the cars.In addition, we will create a Spark Session to be able to use dataframes in this project. 

In [1]:
# Spark-Session
spSession = SparkSession.builder.master("local").appName("Linear-Regression").config("Some.config","session").getOrCreate()

In [2]:
#libraries
# Row - to convert RDD into row objects
from pyspark.sql import Row
# Vectors - to build a dense vector
from pyspark.ml.linalg import Vectors
# Model
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
# Load data as RDD
carRDD = sc.textFile("car.csv")
carRDD.take(4)

['MPG,CYLINDERS,DISPLACEMENT,HORSEPOWER,WEIGHT,ACCELERATION,MODELYEAR,NAME',
 '18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite']

In [4]:
# If we are working with a large amount of data and in clusters, we can store the rdd in cache to have a better performance
carRDD.cache()

car.csv MapPartitionsRDD[1] at textFile at <unknown>:0

In [5]:
# Remove the first row
header = carRDD.first()
carRDD2 = carRDD.filter(lambda x : x != header)
# carRDD2 = carRDD.filter(lambda x : "MPG" not in x)
carRDD2.take(4)

['18,8,307,130,3504,12,70,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,buick skylark 320',
 '18,8,318,150,3436,11,70,plymouth satellite',
 '16,8,304,150,3433,12,70,amc rebel sst']

# Data Cleaning

Let's find the mean of the MPG column to fill the blank columns with it.

In [75]:
mssingHP = sc.broadcast(104.4)

In [76]:
# Convert string values into float
def cleanData(inputString):
    global missingHP
    # Split in columns
    attList = inputString.split(",")
    
    # Replace missing values (?) (HoursePower)
    hpValue = attList[3]
    if hpValue == "?":
        hpValue = missingHP.value
    
    # Convert rdd into rows
    row = Row(MPG = float(attList[0]), CYLINDERS = float(attList[1]), DISPLACEMENT = float(attList[2]), 
              HORSEPOWER = float(hpValue), WEIGHT = float(attList[4]), ACCELERATION =float(attList[5]),
              MODELYEAR = float(attList[6]), NAME = attList[7])
    return row

In [77]:
carRDD3 = carRDD2.map(cleanData)
carRDD3.cache()
carRDD3.take(3)

[Row(ACCELERATION=12.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, MODELYEAR=70.0, MPG=18.0, NAME='chevrolet chevelle malibu', WEIGHT=3504.0),
 Row(ACCELERATION=11.5, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, MODELYEAR=70.0, MPG=15.0, NAME='buick skylark 320', WEIGHT=3693.0),
 Row(ACCELERATION=11.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, MODELYEAR=70.0, MPG=18.0, NAME='plymouth satellite', WEIGHT=3436.0)]

## Data Exploration

In [78]:
carDF = spSession.createDataFrame(carRDD3)
carDF.show(2)

+------------+---------+------------+----------+---------+----+--------------------+------+
|ACCELERATION|CYLINDERS|DISPLACEMENT|HORSEPOWER|MODELYEAR| MPG|                NAME|WEIGHT|
+------------+---------+------------+----------+---------+----+--------------------+------+
|        12.0|      8.0|       307.0|     130.0|     70.0|18.0|chevrolet chevell...|3504.0|
|        11.5|      8.0|       350.0|     165.0|     70.0|15.0|   buick skylark 320|3693.0|
+------------+---------+------------+----------+---------+----+--------------------+------+
only showing top 2 rows



In [79]:
# Some Statistics
carDF.select("MPG", "CYLINDERS","HORSEPOWER").describe().show()


+-------+-----------------+------------------+------------------+
|summary|              MPG|         CYLINDERS|        HORSEPOWER|
+-------+-----------------+------------------+------------------+
|  count|              398|               398|               398|
|   mean|23.51457286432161| 5.454773869346734|104.45326633165827|
| stddev|7.815984312565782|1.7010042445332125| 38.19941019081801|
|    min|              9.0|               3.0|              46.0|
|    max|             46.6|               8.0|             230.0|
+-------+-----------------+------------------+------------------+



In [80]:
# Correlation between the target and the features
for i in carDF.columns:
    if not(isinstance(carDF.select(i).take(1)[0][0], str)):
        print('Correlation between MPG and', i , carDF.stat.corr('MPG', i))
    

Correlation between MPG and ACCELERATION 0.42028891210165004
Correlation between MPG and CYLINDERS -0.7753962854205548
Correlation between MPG and DISPLACEMENT -0.8042028248058979
Correlation between MPG and HORSEPOWER -0.7716754412116528
Correlation between MPG and MODELYEAR 0.5792671330833091
Correlation between MPG and MPG 1.0
Correlation between MPG and WEIGHT -0.8317409332443347


# Pre-Processing

When we are working with MLLib, we have to convert the data into a labelpoint. Let's do it with the target ant 3 other features.

In [81]:
#Convert data into a LabelPoint(target, Vector[features])
#Remove columns with no relevance (not significant correlation)
def transformVar(row):
    obj = (row["MPG"], Vectors.dense([row["ACCELERATION"], row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [82]:
# Create a LabelPoint RDD
carRDD4 = carRDD3.map(transformVar)
carRDD4.take(4)

[(18.0, DenseVector([12.0, 307.0, 3504.0])),
 (15.0, DenseVector([11.5, 350.0, 3693.0])),
 (18.0, DenseVector([11.0, 318.0, 3436.0])),
 (16.0, DenseVector([12.0, 304.0, 3433.0]))]

In [83]:
#Convert the labelPoint into a DF
carDF = spSession.createDataFrame(carRDD4,['label','features'])
carDF.select("label", "features").show(5)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
| 18.0|[11.0,318.0,3436.0]|
| 16.0|[12.0,304.0,3433.0]|
| 17.0|[10.5,302.0,3449.0]|
+-----+-------------------+
only showing top 5 rows



## Machine Learning

In [84]:
# Training and test dataset - 70 % training and 30 % test set
(training_set, test_set) = carDF.randomSplit([0.7,0.3])

In [85]:
training_set.count()

283

In [86]:
test_set.count()

115

In [87]:
# Model
LinearReg = LinearRegression(maxIter= 10)
model = LinearReg.fit(training_set)

In [88]:
# Coefficients and Interception
print("Coefficients:"+ str(model.coefficients))
print("Intercept:"+ str(model.intercept))

Coefficients:[0.142733417628,-0.00815978275216,-0.00647910301793]
Intercept:42.0178854680663


In [89]:
# Predicting MPG with test dataset
predictions = model.transform(test_set)
predictions.select("prediction", "features").show()

+------------------+-------------------+
|        prediction|           features|
+------------------+-------------------+
|11.177571096315429|[14.0,360.0,4615.0]|
|13.301278621094092|[15.0,307.0,4376.0]|
|12.958546266270787|[13.5,318.0,4382.0]|
|  7.79721286795052|[11.0,455.0,4951.0]|
| 8.430167522848372|[11.5,383.0,4955.0]|
| 8.074254825302752|[11.5,429.0,4952.0]|
|15.893778639670948|[11.0,360.0,3821.0]|
| 20.73415462461704|[12.0,302.0,3169.0]|
|  7.16418386655787|[12.0,400.0,5140.0]|
| 14.45317356044496|[13.0,350.0,4100.0]|
|12.741009683976227|[13.0,351.0,4363.0]|
|14.959735842451714|[14.0,307.0,4098.0]|
| 11.70292289362271| [8.5,440.0,4312.0]|
|14.740203020585856|[13.0,318.0,4096.0]|
|14.257119790172716|[13.0,351.0,4129.0]|
|15.006039395554549|[14.0,318.0,4077.0]|
|11.787285961784214|[16.0,302.0,4638.0]|
|13.671370247104829|[12.8,351.0,4215.0]|
|18.970670988829752|[11.0,318.0,3399.0]|
|16.876068362302057|[11.5,350.0,3693.0]|
+------------------+-------------------+
only showing top

In [90]:
# Coefficient of determination (r2) to evaluate the model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")
evaluator.evaluate(predictions)


0.6882357676429178