In [None]:
pip install pyspark

In [None]:
"""
CRIM — per capita crime rate by town.
ZN — proportion of residential land zoned for lots over 25,000 sq.ft.
INDUS — proportion of non-retail business acres per town.
CHAS — Charles River dummy variable (= 1 if tract bounds river; 0 otherwise).
NOX — nitrogen oxides concentration (parts per 10 million).
RM — average number of rooms per dwelling.
AGE — proportion of owner-occupied units built prior to 1940.
DIS — weighted mean of distances to five Boston employment centres.
RAD — index of accessibility to radial highways.
TAX — full-value property-tax rate per $10,000.
PTRATIO — pupil-teacher ratio by town.
BLACK — 1000(Bk — 0.63)² where Bk is the proportion of blacks by town.
LSTAT — lower status of the population (percent).
MEDV — median value of owner-occupied homes in $1000s. This is the target variable.
"""

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()

In [None]:
sqlContext = SQLContext(sc)
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../input/boston-housing-dataset/HousingData.csv')
house_df.take(1)

In [None]:
house_df.cache()
house_df.printSchema()

In [None]:
import six
for i in house_df.columns:
    house_df=house_df.withColumn(i,house_df[i].cast("double").alias(i))
    house_df.na.drop(subset=[i])

In [None]:
house_df.cache()
house_df.printSchema()

In [None]:
house_df.describe().toPandas().transpose()

In [None]:
import pandas as pd
numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

In [None]:
import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MEDV for ", i, house_df.stat.corr('MEDV',i))

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], outputCol = 'features').setHandleInvalid("skip")
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['features', 'MEDV'])
vhouse_df.show(3)

In [None]:
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='MEDV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
train_df.describe().show()

In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MEDV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MEDV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))