https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

## Spark Regression

### Findspark / Import modules

In [4]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


In [5]:
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "Azure:mmlspark:0.16") \
            .getOrCreate()
import mmlspark

In [2]:
sc= SparkContext()
sqlContext = SQLContext(sc)
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
                                                                      inferschema='true').load('boston.csv')
house_df.take(1) #print firstrow

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.309999943, CHAS=0, NOX=0.537999988, RM=6.574999809, AGE=65.19999695, DIS=4.090000153, RAD=1, TAX=296, PT=15.30000019, B=396.8999939, LSTAT=4.980000019, MV=24.0)]

### Looking at the data

In [3]:
house_df.cache()
house_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [4]:
house_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
CRIM,506,3.6135235608162057,8.601545086715594,0.00632,88.97619629
ZN,506,11.363636363636363,23.32245299451514,0.0,100.0
INDUS,506,11.136778749531626,6.86035298095724,0.460000008,27.73999977
CHAS,506,0.0691699604743083,0.2539940413404101,0,1
NOX,506,0.5546950602312246,0.1158776754570543,0.38499999,0.870999992
RM,506,6.28463438896641,0.7026171549511354,3.561000109,8.779999733
AGE,506,68.57490120115612,28.148861532793276,2.900000095,100.0
DIS,506,3.7950426960059325,2.105710142043288,1.129600048,12.12650013
RAD,506,9.549407114624506,8.707259384239366,1,24


In [5]:
# Correlations
import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, house_df.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


### Feature Engineering

In [6]:
#create features and output 
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'MV'])
vhouse_df.show(3)

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
+--------------------+-----------+
only showing top 3 rows



### Test/Train split

In [7]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### Linear Reg

In [8]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))


Coefficients: [-0.05397014626897897,0.0008341907493364299,0.0,3.8669417430605404,-5.303852851149422,4.4803621778897025,0.0,-0.3920069961852871,0.00340502811186884,-0.0001082889430725175,-0.8118745047522378,0.00872694423739595,-0.46282124474691705]
Intercept: 16.554302525881432


In [9]:
## RMSE and r^2
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 4.825367
r2: 0.707081


In [10]:
# Compare errors in respect to output itself
train_df.describe().show()

+-------+------------------+
|summary|                MV|
+-------+------------------+
|  count|               323|
|   mean|22.504334396681113|
| stddev| 8.929557708870195|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [11]:
#run model on test set

lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|32.214472866458685|32.70000076|[0.01301,35.0,1.5...|
| 37.81156458735652|       50.0|[0.01381,80.0,0.4...|
| 42.48924998948818|       50.0|[0.01501,90.0,1.2...|
| 26.12121714305029|23.10000038|[0.0187,85.0,4.15...|
|26.388115684248902|28.70000076|[0.029850001,0.0,...|
+------------------+-----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.714064


In [12]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 5.15997


In [13]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5, 0.43185895450319756, 0.24019932549121156, 0.21557725606754746, 0.18644672161612985, 0.18461313200478252, 0.18408666816623462, 0.18357084174491572, 0.18307855125548425, 0.18277994227319913, 0.1826338110956476]
+-------------------+
|          residuals|
+-------------------+
| -6.279121618225492|
| 1.4379423375535865|
| -5.768576510455393|
|  5.150677025407852|
| 1.5046025787809363|
| 0.5374961063628163|
|-1.0966662184554359|
| -3.090194525574585|
|  9.541162424114745|
| 2.7885442734285206|
| 1.9786378509700135|
| 5.8340024593131865|
|  -1.02565620965904|
| 11.455336009415973|
| -1.466799876886057|
|  6.195821211493502|
| 0.1809418901261104|
|-10.599116628711062|
|-3.7138627486173696|
|  3.298139905549995|
+-------------------+
only showing top 20 rows



In [14]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","MV","features").show()

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|32.214472866458685|32.70000076|[0.01301,35.0,1.5...|
| 37.81156458735652|       50.0|[0.01381,80.0,0.4...|
| 42.48924998948818|       50.0|[0.01501,90.0,1.2...|
| 26.12121714305029|23.10000038|[0.0187,85.0,4.15...|
|26.388115684248902|28.70000076|[0.029850001,0.0,...|
|21.212638552508114|       17.5|[0.031129999,0.0,...|
|30.262812960154847|34.90000153|[0.031500001,95.0...|
|22.192518260883922|20.60000038|[0.033059999,0.0,...|
| 31.35959159322169|34.90000153|[0.03359,75.0,2.9...|
| 23.07436818530134|20.89999962|[0.03548,80.0,3.6...|
| 28.30618957770413|       23.5|[0.035840001,80.0...|
|23.427279604762973|20.70000076|[0.037379999,0.0,...|
|34.301618632745615|34.59999847|[0.03768,80.0,1.5...|
|26.068888451017962|23.20000076|[0.038710002,52.5...|
|    27.90875648203|       28.0|[0.041129999,25.0...|
|30.768989382328307|24.79999

## Decision Tree Regression

In [15]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'MV')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.68974


In [16]:
dt_model.featureImportances

SparseVector(13, {0: 0.017, 1: 0.001, 4: 0.0339, 5: 0.5773, 7: 0.0496, 8: 0.0049, 9: 0.0152, 11: 0.0119, 12: 0.2891})

In [17]:
house_df.take(1)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.309999943, CHAS=0, NOX=0.537999988, RM=6.574999809, AGE=65.19999695, DIS=4.090000153, RAD=1, TAX=296, PT=15.30000019, B=396.8999939, LSTAT=4.980000019, MV=24.0)]

Apparently, the number of rooms is the most important feature to predict the house median price in our data.



## Gradient Boosted Tree Regression

In [18]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'MV', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'MV', 'features').show(5)

+------------------+-----------+--------------------+
|        prediction|         MV|            features|
+------------------+-----------+--------------------+
|28.941499272931033|32.70000076|[0.01301,35.0,1.5...|
| 47.07995945782344|       50.0|[0.01381,80.0,0.4...|
|50.068196865739836|       50.0|[0.01501,90.0,1.2...|
| 23.46900964290492|23.10000038|[0.0187,85.0,4.15...|
| 23.21926890213816|28.70000076|[0.029850001,0.0,...|
+------------------+-----------+--------------------+
only showing top 5 rows



In [19]:
gbt_evaluator = RegressionEvaluator(
    labelCol="MV", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 4.65883


Gradient-boosted tree regression performed the best on our data.