# The Boston Housing Assess Dataset Analysis Project

In [None]:
# Part V: Using Spark and MLlib to train 
# In this Notebook, we will use the prepared dataset to train two ensamble regressors available in MLlib
# Gradient Boosted Trees (GBTs) and RandomForest (RF)

In [None]:
# Init to use spark
import findspark
findspark.init('C:/Apps/Spark/spark-3.0.0-bin-hadoop2.7')
from pyspark import SparkContext

In [None]:
# Other imports
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

In [None]:
# Create a Spark Context
sc = SparkContext(appName="PythonGBTreesRegressionExample")

In [None]:
# Load and parse the data file.
path = 'M:/Work/Study/WPI-DS/CurrentCourse-DS504/DataSets/SelectedDataSet/'
#fname = 'BH_SmallP_libsvm.data'
fname = 'BH_BigP_libsvm.data'
fullname = path + fname
print(fullname)
data = MLUtils.loadLibSVMFile(sc, fullname)

## 1. Gradient Boosted Tree Regressor

### Part 1: Data Split Model Training

In [None]:
from time import time

In [None]:
# Set iteration default 20 : 100
nIteration = 100 
print(nIteration)

In [None]:
testPart = 0.3
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([1.0 - testPart, testPart])

In [None]:
ticS = time()
# Train a GradientBoostedTrees model.
#  Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
#         (b) Use more iterations in practice.
model = GradientBoostedTrees.trainRegressor(trainingData,
                                             categoricalFeaturesInfo={}, numIterations=nIteration)
ticE1 = time()
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
ticE2 = time()
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
ticE3 = time()
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testData.count())
ticE4 = time()

print("Step 1: Training model numIter: {} done in {:.4f}s".format(nIteration, ticE1 - ticS))
print("Step 2: Predict part: {} done in {:.4f}s".format(testPart, ticE2 - ticE1))
print("Step 3: Zip test data done in {:.4f}s".format(testPart, ticE3 - ticE2))
print("Step 4: Calc MSE done in {:.4f}s".format(ticE4 - ticE3))
print("Step 5: Total time {:.4f}s".format(ticE4 - ticS))

print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')
print(model.toDebugString())

#Not working for 100 iter!

In [None]:
# Save the model
fullpath = path + "target/tmp/myGradientBoostingRegressionModel"
print('Save model to: ', fullpath)
# Save and load model
model.save(sc, fullpath)
sameModel = GradientBoostedTreesModel.load(sc, fullpath)

### Part 2: Cross Validation

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn import preprocessing

In [None]:
scores_map = {}
scores_map_emsemble = {}

min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(x)

nSplits = 10
kf = KFold(n_splits=nSplits)

#This one is used for cross validation scoring
customScoring = 'neg_mean_squared_error'
scoreStr = 'MSE'

print('Split: ', nSplits)
print('Custom scoring type: ', scoreStr)

In [None]:
algoName = 'GradientBoostedTrees'
print("Training %s..." % algoName)
ticS = time()

desc_rf = GradientBoostedTrees.trainRegressor(trainingData,
                                             categoricalFeaturesInfo={}, numIterations=nIteration)

scores = cross_val_score(desc_rf, x_scaled, y, cv=kf, scoring=customScoring)
ticE = time()
scores_map[algoName] = scores
scores_map_emsemble[algoName] = scores
print("{} CV done in {:.4f}s".format(algoName, ticE - ticS))
print("%s: %0.4f (+/- %0.4f)" % (scoreStr, scores.mean(), scores.std()))

## 2. Random Forest Regressor

In [None]:
algoName = 'RandomForestRegressor'
print("Training %s..." % algoName)
ticS = time()

desc_rf = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    numTrees=nIteration, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=4, maxBins=32)

scores = cross_val_score(desc_rf, x_scaled, y, cv=kf, scoring=customScoring)
ticE = time()
scores_map[algoName] = scores
scores_map_emsemble[algoName] = scores
print("{} CV done in {:.4f}s".format(algoName, ticE - ticS))
print("%s: %0.4f (+/- %0.4f)" % (scoreStr, scores.mean(), scores.std()))

In [None]:
ticS = time()
# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    numTrees=nIteration, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=4, maxBins=32)
ticE1 = time()

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
ticE2 = time()
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
ticE3 = time()
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testData.count())
ticE4 = time()

print("Step 1: Training model numTrees: {} done in {:.4f}s".format(nIteration, ticE1 - ticS))
print("Step 2: Predict part: {} done in {:.4f}s".format(testPart, ticE2 - ticE1))
print("Step 3: Zip test data done in {:.4f}s".format(testPart, ticE3 - ticE2))
print("Step 4: Calc MSE done in {:.4f}s".format(ticE4 - ticE3))
print("Step 5: Total time {:.4f}s".format(ticE4 - ticS))

print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())


In [None]:
# save model
fullpath = path + "target/tmp/myRandomForestRegressionModel"
print('Save model to: ', fullpath)
# Save and load model
model.save(sc, fullpath)
sameModel = RandomForestModel.load(sc, fullpath)