In [1]:
import findspark
findspark.init('/opt/spark/')

In [2]:
import pyspark
pyspark.__version__

'3.2.1'

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BostonHousing').getOrCreate()

In [5]:
boston_df = spark.read.csv('housing.csv', header = True, inferSchema = True)
boston_df.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [6]:
(boston_df.count() , len(boston_df.columns))

(506, 14)

In [8]:
boston_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265
rad,506,9.549407114624506,8.707259384239366,1,24


In [9]:
print(boston_df.dtypes)

[('crim', 'double'), ('zn', 'double'), ('indus', 'double'), ('chas', 'int'), ('nox', 'double'), ('rm', 'double'), ('age', 'double'), ('dis', 'double'), ('rad', 'int'), ('tax', 'int'), ('ptratio', 'double'), ('b', 'double'), ('lstat', 'double'), ('medv', 'double')]


In [10]:
import pyspark.sql.functions as f

# null values in each column
boston_agg = boston_df.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in boston_df.columns])
boston_agg.show()


+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|  b|lstat|medv|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|      0|  0|    0|   0|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+



In [11]:
import six

# correlation
for col in boston_df.columns:
    if not(isinstance(boston_df.select(col).take(1)[0][0], six.string_types)):
        print( "Correlation to medv for {0:s} is {1:3.2f}".format(col, boston_df.stat.corr('medv',col)))

Correlation to medv for crim is -0.39
Correlation to medv for zn is 0.36
Correlation to medv for indus is -0.48
Correlation to medv for chas is 0.18
Correlation to medv for nox is -0.43
Correlation to medv for rm is 0.70
Correlation to medv for age is -0.38
Correlation to medv for dis is 0.25
Correlation to medv for rad is -0.38
Correlation to medv for tax is -0.47
Correlation to medv for ptratio is -0.51
Correlation to medv for b is 0.33
Correlation to medv for lstat is -0.74
Correlation to medv for medv is 1.00


In [13]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 
                                         'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'features')
output = assembler.transform(boston_df)
output.take(1)

[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0, features=DenseVector([0.0063, 18.0, 2.31, 0.0, 0.538, 6.575, 65.2, 4.09, 1.0, 296.0, 15.3, 396.9, 4.98]))]

In [14]:
final_data = output.select(['features', 'medv'])
final_data.show(5)

+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
+--------------------+----+
only showing top 5 rows



In [15]:
# splitting data for test and validation
train_data, test_data = final_data.randomSplit([0.8, 0.2])

In [16]:
train_data.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               408|
|   mean|22.730392156862752|
| stddev| 9.272317938040912|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [17]:
test_data.describe().show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|                98|
|   mean|21.710204081632654|
| stddev|  8.87593166064945|
|    min|               7.0|
|    max|              50.0|
+-------+------------------+



In [19]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.040510089647072374,0.019152019979990664,0.0,2.2238022362005743,-8.087758264581593,4.37166808771813,0.0,-0.759121349826604,0.0,0.0,-0.7581928245184028,0.008712482295789982,-0.4973622805375459]
Intercept: 19.497517902989166


In [20]:
summary = lr_model.summary
print("RMSE: %f" % summary.rootMeanSquaredError)
print("r2: %f" % summary.r2)

RMSE: 4.941245
r2: 0.715317


In [21]:
preds = lr_model.transform(test_data)
preds.select("prediction","medv","features").show(10)

+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|30.510465499463933|24.0|[0.00632,18.0,2.3...|
| 27.81289300467969|22.0|[0.01096,55.0,2.2...|
|28.099433193686533|23.9|[0.02543,55.0,3.7...|
|29.521140040498985|31.2|[0.03049,55.0,3.7...|
|19.998016459061866|17.5|[0.03113,0.0,4.39...|
|31.039576932844028|34.9|[0.0315,95.0,1.47...|
|22.069190991022474|20.6|[0.03306,0.0,5.19...|
|  32.6646306976293|34.9|[0.03359,75.0,2.9...|
|29.216559196182864|23.5|[0.03584,80.0,3.3...|
|22.733274594440346|20.7|[0.03738,0.0,5.19...|
+------------------+----+--------------------+
only showing top 10 rows



In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv",metricName="r2")
print("R Squared on test data = %4.3f" % evaluator.evaluate(preds))

R Squared on test data = 0.734


In [23]:
result = lr_model.evaluate(test_data)
print("Root Mean Squared Error on test data = %4.3f" % result.rootMeanSquaredError)

Root Mean Squared Error on test data = 4.556


In [24]:
print("numIterations: %d" % summary.totalIterations)
print("objectiveHistory: %s" % str(summary.objectiveHistory))

numIterations: 10
objectiveHistory: [0.5000000000000004, 0.43279060296323857, 0.2393633180231567, 0.2166273328516576, 0.187475443684497, 0.18388496087033063, 0.18320604725987805, 0.18223185004739129, 0.1809837827683281, 0.1802100758761481, 0.1801825297106785]


In [26]:
summary.residuals.show(10)

+--------------------+
|           residuals|
+--------------------+
|  0.8406677654554251|
|  0.8381915504464388|
|   4.864681742639526|
|   0.999617394145563|
|  11.205269522975584|
|0.024330941723590627|
| -1.7276672309929033|
| -3.2203855703898228|
|   8.066626290210806|
|   8.425537151179867|
+--------------------+
only showing top 10 rows

