In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import pandas as pd
import numpy as np

%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
spark = (SparkSession
    .builder
    .appName("Feature Eng Cyclical")
    .getOrCreate()
)

In [3]:
import os
cwd = os.getcwd()

house_df = spark.read.csv("file://%s/../datasets/BostonHousing.csv" % cwd, header=True, inferSchema=True)
house_df.cache()

DataFrame[crim: double, zn: double, indus: double, chas: int, nox: double, rm: double, age: double, dis: double, rad: int, tax: int, ptratio: double, b: double, lstat: double, medv: double]

In [4]:
LABEL_COL="medv"
FEATURES_COL="features"

In [5]:
house_df.describe(LABEL_COL).show()

+-------+------------------+
|summary|              medv|
+-------+------------------+
|  count|               506|
|   mean|22.532806324110698|
| stddev| 9.197104087379815|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



In [6]:
numerical_columns = list((dtype[0] for dtype in house_df.dtypes if (dtype[1] == "double" or dtype[1] == "int") and dtype[0] != LABEL_COL))
for numerical_column in numerical_columns:
    print("%s\t%5.2f" % (numerical_column, house_df.corr(LABEL_COL, numerical_column)))

crim	-0.39
zn	 0.36
indus	-0.48
chas	 0.18
nox	-0.43
rm	 0.70
age	-0.38
dis	 0.25
rad	-0.38
tax	-0.47
ptratio	-0.51
b	 0.33
lstat	-0.74


In [7]:
train_df, test_df = house_df.randomSplit([0.7, 0.3])

train_df.cache()
test_df.cache()

DataFrame[crim: double, zn: double, indus: double, chas: int, nox: double, rm: double, age: double, dis: double, rad: int, tax: int, ptratio: double, b: double, lstat: double, medv: double]

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=numerical_columns, outputCol=FEATURES_COL)
lr = LinearRegression(featuresCol=FEATURES_COL, labelCol=LABEL_COL, maxIter=10)

p = Pipeline(stages=[assembler, lr])

In [None]:
model = p.fit(train_df)
lr_model = model.stages[-1]

summary = lr_model.summary
print("RMSE:\t%.2f\nR2:\t%.2f" % (summary.rootMeanSquaredError, summary.r2))

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_df)

rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=LABEL_COL, metricName="rmse")
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=LABEL_COL, metricName="r2")

print("RMSE:\t%.2f\nR2:\t%.2f" % (rmse_evaluator.evaluate(predictions), r2_evaluator.evaluate(predictions)))

RMSE:	4.73
R2:	0.74


In [23]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

param_grid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.1, 0.01])
    .addGrid(lr.fitIntercept, [False, True])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

print("Param grid size: %d" % len(param_grid))

tvs = TrainValidationSplit(estimator=p,
                           estimatorParamMaps=param_grid,
                           evaluator=rmse_evaluator,
                           trainRatio=0.8)

tuned_model = tvs.fit(train_df)
tuned_predictions = tuned_model.transform(test_df)

print("RMSE:\t%.2f\nR2:\t%.2f" % (rmse_evaluator.evaluate(tuned_predictions), r2_evaluator.evaluate(tuned_predictions)))

Param grid size: 12
RMSE:	4.63
R2:	0.75
