## <font color='blue'>Spark MLLib - Regressão Linear</font>

## Using LinearRegression to predict MPG values

In [1]:
# Imports
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# Spark Session - (required to work with Dataframes in Spark)
spSession = SparkSession.builder.master("local").appName("SparkMLLibMPGPredict").getOrCreate()

In [8]:
carsRDD = sc.textFile("auto-mpg.csv")
carsRDD.cache()
carsRDD.count()

399

In [9]:
carsRDD

auto-mpg.csv MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
carsRDD.take(5)

['mpg,cylinders,displacement,horsepower,weight,acceleration,model year,origin,car name',
 '18,8,307,130,3504,12,70,1,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,1,buick skylark 320',
 '18,8,318,150,3436,11,70,1,plymouth satellite',
 '16,8,304,150,3433,12,70,1,amc rebel sst']

In [5]:
# Removing header
carsRDD = carsRDD.filter(lambda x: "cylinders" not in x)
carsRDD.count()

398

In [6]:
# Searching for NAN/missing values
carsRDD_nan = carsRDD.filter(lambda x: "?" in x or ",," in x)
carsRDD_nan.take(100)

['25,4,98,?,2046,19,71,1,ford pinto',
 '21,6,200,?,2875,17,74,1,ford maverick',
 '40.9,4,85,?,1835,17.3,80,2,renault lecar deluxe',
 '23.6,4,140,?,2905,14.3,80,1,ford mustang cobra',
 '34.5,4,100,?,2320,15.8,81,2,renault 18i',
 '23,4,151,?,3035,20.5,82,1,amc concord dl']

In [7]:
# Since the missing value it's in the HP column, could substitute it for the HP Average OR
# to have more precision, could use MPG, Cylinders, Acceleration and Weight data to predict HP and fill
# the missing values

# First, lets take the database without the missing values
cleanedCarsRDD = carsRDD.filter(lambda x: "?" not in x)
dbSize = cleanedCarsRDD.count()

In [8]:
# Now, let's create a function to calculate the HP average
cleanedCarsRDD2 = cleanedCarsRDD.map(lambda x: float(x.split(",")[3]))
avgHP = cleanedCarsRDD2.reduce(lambda x, y: (x+y))/dbSize
avgHP

104.46938775510205

In [9]:
# Using Average HP to substitute missing values
avgHPbc = sc.broadcast(avgHP)

In [10]:
# Function to substitute missing data
def cleaningData(inputStr):
    global avgHPbc
    valuesList = inputStr.split(",")
    hpValue = valuesList[3]
    if hpValue == "?":
        hpValue = avgHPbc.value
       
    lines = Row(MPG = float(valuesList[0]), CYLINDERS = float(valuesList[1]), DISPLACEMENT = float(valuesList[2]), 
                 HORSEPOWER = float(hpValue), WEIGHT = float(valuesList[4]), ACCELERATION = float(valuesList[5]), 
                 MODELYEAR = float(valuesList[6]), NAME = valuesList[7]) 
    return lines

In [11]:
# Running function on RDD
CarsRDD2 = carsRDD.map(cleaningData)
CarsRDD2.cache()
CarsRDD2.take(5)

[Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, WEIGHT=3504.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='1'),
 Row(MPG=15.0, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, WEIGHT=3693.0, ACCELERATION=11.5, MODELYEAR=70.0, NAME='1'),
 Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, WEIGHT=3436.0, ACCELERATION=11.0, MODELYEAR=70.0, NAME='1'),
 Row(MPG=16.0, CYLINDERS=8.0, DISPLACEMENT=304.0, HORSEPOWER=150.0, WEIGHT=3433.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='1'),
 Row(MPG=17.0, CYLINDERS=8.0, DISPLACEMENT=302.0, HORSEPOWER=140.0, WEIGHT=3449.0, ACCELERATION=10.5, MODELYEAR=70.0, NAME='1')]

In [12]:
# Creating DataFrame
CarsDF = spSession.createDataFrame(CarsRDD2)

In [13]:
# Estatísticas descritivas
CarsDF.select("MPG","CYLINDERS", "HORSEPOWER", "ACCELERATION", "WEIGHT").describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+
|summary|              MPG|         CYLINDERS|        HORSEPOWER|      ACCELERATION|           WEIGHT|
+-------+-----------------+------------------+------------------+------------------+-----------------+
|  count|              398|               398|               398|               398|              398|
|   mean|23.51457286432161| 5.454773869346734|104.46938775510203|15.568090452261313|2970.424623115578|
| stddev|7.815984312565783|1.7010042445332125| 38.19918737359047|2.7576889298126765|846.8417741973266|
|    min|              9.0|               3.0|              46.0|               8.0|           1613.0|
|    max|             46.6|               8.0|             230.0|              24.8|           5140.0|
+-------+-----------------+------------------+------------------+------------------+-----------------+



In [14]:
# Mapping correlation between target (MPG) and variables
for i in CarsDF.columns:
    if not(isinstance(CarsDF.select(i).take(1)[0][0], str)) :
        print("Corr: ", i, CarsDF.stat.corr('MPG', i))

Corr:  MPG 1.0
Corr:  CYLINDERS -0.7753962854205549
Corr:  DISPLACEMENT -0.804202824805898
Corr:  HORSEPOWER -0.7714371350025524
Corr:  WEIGHT -0.8317409332443345
Corr:  ACCELERATION 0.42028891210165126
Corr:  MODELYEAR 0.5792671330833099


In [15]:
# Converting to LabeledPoint (target, Vector[features])
# and removing columns with minor correlation
def transformation(row) :
    obj = (row["MPG"], Vectors.dense([row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [16]:
CarsRDD3 = CarsRDD2.map(transformation)
CarsDF2 = spSession.createDataFrame(CarsRDD3,["label", "features"])
CarsDF2.select("label","features").show(10)

+-----+--------------+
|label|      features|
+-----+--------------+
| 18.0|[307.0,3504.0]|
| 15.0|[350.0,3693.0]|
| 18.0|[318.0,3436.0]|
| 16.0|[304.0,3433.0]|
| 17.0|[302.0,3449.0]|
| 15.0|[429.0,4341.0]|
| 14.0|[454.0,4354.0]|
| 14.0|[440.0,4312.0]|
| 14.0|[455.0,4425.0]|
| 15.0|[390.0,3850.0]|
+-----+--------------+
only showing top 10 rows



In [17]:
CarsRDD3.take(5)

[(18.0, DenseVector([307.0, 3504.0])),
 (15.0, DenseVector([350.0, 3693.0])),
 (18.0, DenseVector([318.0, 3436.0])),
 (16.0, DenseVector([304.0, 3433.0])),
 (17.0, DenseVector([302.0, 3449.0]))]

In [89]:
# Training/Testing
(trainData, testData) = CarsDF2.randomSplit([0.7, 0.3])

In [68]:
# Model creation
linearReg = LinearRegression(maxIter = 30)
model = linearReg.fit(trainData)

In [69]:
# Metrics
print("Coeffs: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coeffs: [-0.01721658818499739,-0.0058789958538445295]
Intercept: 44.35303236796368


In [70]:
# Prediction
predictions = model.transform(testData)
predictions.select("features", "prediction").show()

+--------------+------------------+
|      features|        prediction|
+--------------+------------------+
|[304.0,4732.0]|11.299781179332157|
|[400.0,4906.0]| 8.624043435003458|
|[307.0,4098.0]|14.975414786114598|
|[350.0,3988.0]|14.881791038082607|
|[350.0,4100.0]|14.223343502452021|
|[350.0,4502.0]| 11.85998716920652|
|[400.0,4422.0]|11.469477428264213|
|[400.0,4464.0]| 11.22255960240274|
|[400.0,4746.0]| 9.564682771618585|
|[440.0,4735.0]| 8.940688198610978|
|[318.0,4096.0]|14.797790307787317|
|[351.0,4154.0]|13.888661138159417|
|[454.0,4354.0]|10.939553384335781|
|[250.0,3336.0]|20.436555153288978|
|[318.0,3399.0]| 18.89545041791695|
|[318.0,4135.0]|14.568509469487378|
|[350.0,3693.0]|16.616094814966743|
|[350.0,4440.0]| 12.22448491214488|
|[400.0,3761.0]|15.355493687655446|
|[250.0,3278.0]| 20.77753691281196|
+--------------+------------------+
only showing top 20 rows



In [71]:
# R2 Coeff
evaluating = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")
evaluating.evaluate(predictions) 

0.7183284542977011

In [72]:
# Mapping correlation between target (HP) and variables
for i in CarsDF.columns:
    if not(isinstance(CarsDF.select(i).take(1)[0][0], str)) :
        print("Corr: ", i, CarsDF.stat.corr('HORSEPOWER', i))

Corr:  MPG -0.7714371350025524
Corr:  CYLINDERS 0.8389393655645941
Corr:  DISPLACEMENT 0.8936462355336693
Corr:  HORSEPOWER 1.0
Corr:  WEIGHT 0.8605743010090405
Corr:  ACCELERATION -0.6842591288838642
Corr:  MODELYEAR -0.4116509959223453


In [73]:
def transformation2(row) :
    obj = (row["HORSEPOWER"], Vectors.dense([row["DISPLACEMENT"], row["WEIGHT"]]))
    return obj

In [98]:
CarsRDD4 = CarsRDD2.map(transformation2)
CarsDF3 = spSession.createDataFrame(CarsRDD4,["label", "features"])
CarsDF3.select("label","features").show(10)

+-----+--------------+
|label|      features|
+-----+--------------+
|130.0|[307.0,3504.0]|
|165.0|[350.0,3693.0]|
|150.0|[318.0,3436.0]|
|150.0|[304.0,3433.0]|
|140.0|[302.0,3449.0]|
|198.0|[429.0,4341.0]|
|220.0|[454.0,4354.0]|
|215.0|[440.0,4312.0]|
|225.0|[455.0,4425.0]|
|190.0|[390.0,3850.0]|
+-----+--------------+
only showing top 10 rows



In [99]:
# Training/Testing
(trainData, testData) = CarsDF3.filter(CarsDF3.label != avgHP).randomSplit([0.7, 0.3])

In [100]:
# Model creation
linearReg = LinearRegression(maxIter = 30)
model = linearReg.fit(trainData)

In [101]:
# Metrics
print("Coeffs: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coeffs: [0.2306504786306493,0.01365485210043601]
Intercept: 19.4113258168614


In [102]:
# Prediction
predictions = model.transform(testData)
predictions.select("features", "prediction").show()

+--------------+------------------+
|      features|        prediction|
+--------------+------------------+
| [68.0,1867.0]| 60.58916723525958|
| [76.0,1649.0]|59.457613306409726|
| [85.0,2035.0]| 66.80424052485387|
| [83.0,2003.0]| 65.90598430037862|
| [71.0,1773.0]|59.997562573710546|
| [79.0,1950.0]| 64.25967522453291|
| [79.0,1963.0]| 64.43718830183857|
|[140.0,2401.0]| 84.48769271829917|
|[250.0,3158.0]|120.19596840770063|
|[250.0,3432.0]|123.93739788322011|
| [90.0,2125.0]| 69.18642960704636|
| [98.0,2164.0]| 71.56417266800855|
|[116.0,2220.0]| 76.48055300098466|
|[122.0,2310.0]|  79.0933925618078|
|[122.0,2395.0]| 80.25405499034486|
|[120.0,2979.0]|  87.7671876597382|
| [97.0,2100.0]| 70.45961165495001|
| [97.0,2130.0]| 70.86925721796308|
|[250.0,3021.0]| 118.3252536699409|
| [70.0,2124.0]| 64.55976518233294|
+--------------+------------------+
only showing top 20 rows



In [103]:
# R2 Coeff
evaluating = RegressionEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "r2")
evaluating.evaluate(predictions) 

0.7651180835826709

In [105]:
predictions = model.transform(testData)
predictions.select("features", "prediction").show()

+--------------+------------------+
|      features|        prediction|
+--------------+------------------+
| [68.0,1867.0]| 60.58916723525958|
| [76.0,1649.0]|59.457613306409726|
| [85.0,2035.0]| 66.80424052485387|
| [83.0,2003.0]| 65.90598430037862|
| [71.0,1773.0]|59.997562573710546|
| [79.0,1950.0]| 64.25967522453291|
| [79.0,1963.0]| 64.43718830183857|
|[140.0,2401.0]| 84.48769271829917|
|[250.0,3158.0]|120.19596840770063|
|[250.0,3432.0]|123.93739788322011|
| [90.0,2125.0]| 69.18642960704636|
| [98.0,2164.0]| 71.56417266800855|
|[116.0,2220.0]| 76.48055300098466|
|[122.0,2310.0]|  79.0933925618078|
|[122.0,2395.0]| 80.25405499034486|
|[120.0,2979.0]|  87.7671876597382|
| [97.0,2100.0]| 70.45961165495001|
| [97.0,2130.0]| 70.86925721796308|
|[250.0,3021.0]| 118.3252536699409|
| [70.0,2124.0]| 64.55976518233294|
+--------------+------------------+
only showing top 20 rows



In [110]:
testAvgHP = CarsDF3.filter(CarsDF3.label == avgHP)

In [111]:
predictions = model.transform(testAvgHP)
predictions.select("features", "prediction").show()

+--------------+-----------------+
|      features|       prediction|
+--------------+-----------------+
| [98.0,2046.0]|69.95290012015711|
|[200.0,2875.0]|104.7991213317448|
| [85.0,1835.0]|64.07327010476666|
|[140.0,2905.0]| 91.3697381769189|
|[100.0,2320.0]|74.15563055293788|
|[151.0,3035.0]|95.68202421491273|
+--------------+-----------------+



In [None]:
# Function to substitute missing data
def cleaningData(inputStr):
    global avgHPbc
    valuesList = inputStr.split(",")
    hpValue = valuesList[3]
    if hpValue == "?":
        hpValue = avgHPbc.value
       
    lines = Row(MPG = float(valuesList[0]), CYLINDERS = float(valuesList[1]), DISPLACEMENT = float(valuesList[2]), 
                 HORSEPOWER = float(hpValue), WEIGHT = float(valuesList[4]), ACCELERATION = float(valuesList[5]), 
                 MODELYEAR = float(valuesList[6]), NAME = valuesList[7])
    
    return lines