In [1]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import six
from pyspark.sql.functions import isnan,when,count,col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [2]:
Boston = pd.read_csv('C:/Users/senth/Downloads/boston.csv')

In [3]:
Boston.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PT,B,LSTAT,MV
0,0.00632,18.0,2.31,0,0.538,6.575,65.199997,4.09,1,296,15.3,396.899994,4.98,24.0
1,0.02731,0.0,7.07,0,0.469,6.421,78.900002,4.9671,2,242,17.799999,396.899994,9.14,21.6
2,0.02729,0.0,7.07,0,0.469,7.185,61.099998,4.9671,2,242,17.799999,392.829987,4.03,34.700001
3,0.03237,0.0,2.18,0,0.458,6.998,45.799999,6.0622,3,222,18.700001,394.630005,2.94,33.400002
4,0.06905,0.0,2.18,0,0.458,7.147,54.200001,6.0622,3,222,18.700001,396.899994,5.33,36.200001


In [4]:
spark=SparkSession.builder.appName('Pyspark').getOrCreate()

In [8]:
data_boston=spark.read.csv('C:/Users/senth/Downloads/boston.csv',header=True,inferSchema=True)

In [9]:
data_boston.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|              CRIM|                ZN|             INDUS|              CHAS|               NOX|                RM|               AGE|               DIS|              RAD|               TAX|               PT|                 B|             LSTAT|               MV|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|               506|               506|               506|               506|               506|               506|               506|               506|              506|  

In [11]:
data_boston.select([count(when(isnan(c),c)).alias(c) for c in data_boston.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX| PT|  B|LSTAT| MV|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|  0|  0|    0|  0|
+----+---+-----+----+---+---+---+---+---+---+---+---+-----+---+



In [12]:
for i in data_boston.columns:
    if not( isinstance(data_boston.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MV for ", i, data_boston.stat.corr('MV',i))

Correlation to MV for  CRIM -0.3883046116575088
Correlation to MV for  ZN 0.36044534463752903
Correlation to MV for  INDUS -0.48372517128143383
Correlation to MV for  CHAS 0.17526017775291847
Correlation to MV for  NOX -0.4273207763683772
Correlation to MV for  RM 0.695359937127267
Correlation to MV for  AGE -0.37695456714288667
Correlation to MV for  DIS 0.24992873873512172
Correlation to MV for  RAD -0.3816262315669168
Correlation to MV for  TAX -0.46853593528654536
Correlation to MV for  PT -0.5077867038116085
Correlation to MV for  B 0.3334608226834164
Correlation to MV for  LSTAT -0.7376627294671615
Correlation to MV for  MV 1.0


In [15]:
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PT', 'B', 'LSTAT'], outputCol = 'features')
new_data= vectorAssembler.transform(data_boston)
new_data = new_data.select(['features', 'MV'])
new_data.show()

+--------------------+-----------+
|            features|         MV|
+--------------------+-----------+
|[0.00632,18.0,2.3...|       24.0|
|[0.027310001,0.0,...|21.60000038|
|[0.02729,0.0,7.07...|34.70000076|
|[0.032370001,0.0,...|33.40000153|
|[0.069049999,0.0,...|36.20000076|
|[0.029850001,0.0,...|28.70000076|
|[0.088289998,12.5...|22.89999962|
|[0.144549996,12.5...|27.10000038|
|[0.211239994,12.5...|       16.5|
|[0.170039997,12.5...|18.89999962|
|[0.224889994,12.5...|       15.0|
|[0.117470004,12.5...|18.89999962|
|[0.093780003,12.5...|21.70000076|
|[0.629760027,0.0,...|20.39999962|
|[0.637960017,0.0,...|18.20000076|
|[0.627390027,0.0,...|19.89999962|
|[1.053930044,0.0,...|23.10000038|
|[0.784200013,0.0,...|       17.5|
|[0.802709997,0.0,...|20.20000076|
|[0.725799978,0.0,...|18.20000076|
+--------------------+-----------+
only showing top 20 rows



In [16]:
splits = new_data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]

In [17]:
lr = LinearRegression(featuresCol = 'features', labelCol='MV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.05712813170771199,0.010655609276820237,-0.002701964168698477,0.28502450920695743,-4.637818023638124,4.570782072015724,0.0,-0.5390600322573454,0.0,0.0,-0.8065706096906113,0.005739519020826664,-0.5456855254164695]
Intercept: 18.2566974620344


In [18]:
training = lr_model.summary
print("RMSE: %f" % training.rootMeanSquaredError)
print("r2: %f" % training.r2)

RMSE: 4.614469
r2: 0.734634
