Perform imports

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col#, countDistinct, count, when, isnan
import pandas as pd
#import matplotlib.pyplot as plt
#import seaborn as sns
from pyspark.ml import Pipeline
import numpy as np

#temporary hide error reports
import warnings
warnings.filterwarnings('ignore')

Read data

In [0]:
model_data_df = spark.read.parquet("/FileStore/final_data")

In [0]:
model_data_df.printSchema()

In [0]:
model_data_df.show(5)

Train and test datasets

In [0]:
train_data_df, test_data_df = model_data_df.randomSplit([0.8, 0.2])

print(f"train_data_df count: {train_data_df.count()}\ntest_data_df count: {test_data_df.count()}")

----
## Model selection

An important task in ML is model selection, or using data to find the best model or parameters for a given task. This is also called tuning.

Basically, I need to find the answers for following questions:

**I. Which technique** should I choose (linear regression, decision tree, random forest etc.)?

**II. Which value of hyperparameters** should I choose, e.g. if the decision tree with depth 3 is better than the decision tree with depth 5?

**III. Which metric** should I use for comparison of all models?

MLlib supports model selection using tools such as CrossValidator and TrainValidationSplit. These tools require the following items:

**I. Estimator**: algorithm or Pipeline to tune

**II. Set of ParamMaps**: parameters to choose from, sometimes called a “parameter grid” to search over

**III. Evaluator**: metric to measure how well a fitted Model does on held-out test data

At a high level, these model selection tools work as follows:

- They split the input data into separate training and test datasets.
- For each (training, test) pair, they iterate through the set of ParamMaps:
    - For each ParamMap, they fit the Estimator using those parameters, get the fitted Model, and evaluate the Model’s performance using the Evaluator.
- They select the Model produced by the best-performing set of parameters.

### I. Estimator

**Linear Regression**

In our case we choose LinearRegression as Estimator

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="label", predictionCol="lrPrediction")

lrPipeline = Pipeline(stages = [lr])

**Logistic Regression**

The same piece of code for LogisticRegression will look like:

`from pyspark.ml.classification import LogisticRegression`

`blr = LogisticRegression(featuresCol="features", labelCol="binLabel", predictionCol="blrPrediction")`

### II. ParamGRidBuilder

**Linear Regression**

All parametrs for Linear regression could be found at the documentation:

https://spark.apache.org/docs/2.4.4/ml-classification-regression.html#regression

https://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression

https://spark.apache.org/docs/2.4.4/ml-classification-regression.html#survival-regression

But mains parametrs for **Linear Regression** function are following:
* **loss**: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
* **elasticNetParam**: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
* **regParam**: regularization parameter (>= 0). (default: 0.0)
* **maxIter**: max number of iterations (>= 0). (default: 100, current: 50)
* **tol**: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)

Define parameter grid for linear regression

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

lrParamGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.0, 0.1, 0.5, 1.0]) \
                .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5])\
                .addGrid(lr.maxIter, [5, 15, 50, 100])\
                .build()

L(w;x,y):=1/2*(wTx−y)^2 + α(λ∥w∥1)+(1−α)(λ/2∥w∥2^2),α∈[0,1],λ≥0

**Logistic Regression**

Main parameters for **Logistic Regression** are nearly the same as in case of Linear Regression.
There is a couple interesting parameter in advance:
- threshold - threshold in binary classification prediction (range [0, 1]) has impact on predicted value. Default is 0.5
- probabilityCol - conditional probability, 1/(1 + exp(-rawPrediction_k))
- rawPredictionCol - Raw prediction for each possible label

### III. Evaluator

The Evaluator can be a RegressionEvaluator for regression problems, a BinaryClassificationEvaluator for binary data, or a MulticlassClassificationEvaluator for multiclass problems. 

**Linear Regression**
RegressionEvaluator options:
- **rmse** - root mean squared error (default)
- **mse** - mean squared error
- **r2** - r^2 metric
- **mae** - mean absolute error

In our case we are satisfied with the existing options of RegressionEvaluator and choose RMSE.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

lrEvaluator = RegressionEvaluator()\
                .setPredictionCol("lrPrediction")\
                .setMetricName("rmse")

**Logistic Regression**
BinaryClassificationEvaluator options:
- **areaUnderROC**
- **areaUnderPR**


The code for Logistic Regression look like this

`from pyspark.ml.evaluation import BinaryClassificationEvaluator`

`lrEvaluator = BinaryClassificationEvaluator().setPredictionCol("blrPrediction").setMetricName("areaUnderROC")`

The great advantage of Spark is that you are not limited on that prepared evaluation metrics. You can create your own metric, for example Lift on the first decile, and choose the best model on the basis of that custom made metric.

### IV. CrossValidator

As we mention above, CrossValidor needs three following items:

- Estimator
- Set of ParamMaps
- Evaluator

There are two more parameters that could have a significant impact mainly on the computing costs:
- **numFolds** - how many splits into training and testing datasets will be done, i.e. how many models will be trained for each parameters combination

- **parallelism** - how many models will be trained in parallel

So **how many models** will be calculated during cross validation? If we have two hyperametrs, each has three values and numFold is set to 3, in total we have
2 x 3 x 3 = 18. In total 18 models will be trained during cross validation. ism parameter

So finally we **set CrossValidator** ...

In [0]:
from pyspark.ml.tuning import CrossValidator

lrCrossValidator = CrossValidator(estimator=lrPipeline,
                        estimatorParamMaps=lrParamGrid,
                        evaluator=lrEvaluator,
                        numFolds=2,
                        parallelism=3)

... and **train CrossValidator**.

In [0]:
lrCrossValidatorModel = lrCrossValidator.fit(train_data_df)

____

## Result of CrossValidator

Not user friendly CrossValidator output of performance

In [0]:
lrCrossValidatorModel.avgMetrics[0:10]

Not user friendly parametr grid

In [0]:
lrParamGrid[0:5]

In [0]:
#getting param. names

l1 = len(lrParamGrid[0].keys()) 
paramName = []

for i in range(0,l1):
  paramName.append(list(lrParamGrid[0].keys())[i].name)

print(paramName)

In [0]:
#getting param. values

l2 = len(lrParamGrid)
paramValues = []

for i in range(0,l2):
  paramValues.append(lrParamGrid[i].values())

print(paramValues)

In [0]:
# all together

allLrModels = pd.DataFrame.from_records(paramValues, columns =paramName )
metricName = lrEvaluator.getMetricName()
allLrModels.loc[:,metricName] = lrCrossValidatorModel.avgMetrics

display(allLrModels.sort_values(allLrModels.columns[-1], ascending = True)[0:100])

regParam,elasticNetParam,maxIter,rmse
1.0,0.5,5,32341.337022360385
0.5,0.5,5,32423.097309477584
1.0,0.1,5,32476.35953677753
0.1,0.1,5,32742.327134400417
0.1,0.5,5,32742.32891654085
0.5,0.1,5,32742.33422770695
0.0,0.5,5,33633.9426613954
0.0,0.0,5,33633.9426613954
0.0,0.1,5,33633.9426613954
0.1,0.1,15,35605.30039637413


### The best model

In [0]:
bestLr = LinearRegression(featuresCol="features", labelCol="label", predictionCol="lrPrediction", 
                                    regParam = 1.0, elasticNetParam = 0.1, maxIter = 5)

bestLrModel = bestLr.fit(train_data_df)

By method **bestModel** we can find the best model calculating by crossValidator according to chosen metric (here RMSE)

`bestLrModel = lrCrossValidatorModel.bestModel`

In [0]:
lrPrediction = bestLrModel.transform(test_data_df)
print("Predictions:")
display(lrPrediction["lrPrediction", "label"].toPandas()[0:5])

lrPrediction,label
213072.17325742688,140000
216194.4069805116,200000
114942.93084217823,144000
340375.3720041543,325300
129026.884675955,139400


In [0]:
intercept = bestLrModel.intercept
print(f"Intercept:\n{intercept}")

In [0]:
coef = bestLrModel.coefficients.toArray
print(f"Coefficients:\n{coef}")

In [0]:
#lets put the coefficients into the table
from itertools import chain

attrs = sorted((attr["idx"], attr["name"]) for attr in 
               (chain(*lrPrediction.schema["features"].metadata["ml_attr"]["attrs"].values())))

pairs = [(name, bestLrModel.coefficients[idx]) for idx, name in attrs]

sorted_pairs = sorted(pairs, key = lambda p: abs(p[1]), reverse=True)
variables = [("intercept", intercept)] + sorted_pairs

results = pd.DataFrame(variables, columns = ["predictors", "coefficients"])
display(results)

predictors,coefficients
intercept,-328926.5031985266
PoolQC_index_vec_Ex,75892.27284473793
OverallQual_index_vec_10,50854.78325995202
GarageQual_index_vec_Ex,38541.44771785655
OverallQual_index_vec_9,30938.71545035708
Neighborhood_index_vec_1,30708.30404050865
ExterQual_index_vec_Ex,27578.467127251308
TotRmsAbvGrd_index_vec_10,25422.378911049625
KitchenQual_index_vec_Ex,23594.972905062707
BsmtQual_index_vec_Ex,23528.07649034609
