In [0]:
# imports 
from pyspark.ml.regression import DecisionTreeRegressor 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator 
from mlflow.tracking import MlflowClient
import mlflow
import mlflow.spark 
import pandas as pd
from pyspark.ml.feature import RFormula

client = MlflowClient()

# load the datasets 
userhome = 'dbfs:/user/skim658@gwu.edu'

redDF = spark.read.parquet(userhome + '/final-project/reddf.parquet')
whiteDF = spark.read.parquet(userhome + '/final-project/whitedf.parquet')

In [0]:
# split the data set into train and test sets 
redTrainDF, redTestDF = redDF.repartition(8).randomSplit([0.8, 0.2], seed = 42)
print(redTrainDF.cache().count())
whiteTrainDF, whiteTestDF = whiteDF.repartition(8).randomSplit([0.8, 0.2], seed = 42)
print(whiteTrainDF.cache().count())

## Decision Tree with Cross Validation 

Max Tree Depth: 2, 5, 10; Max Bins: 10, 20, 40

## Red Wine

In [0]:
with mlflow.start_run(run_name = 'RED-DT-All-Features') as run:
  # model 
  dt = DecisionTreeRegressor(labelCol = 'quality')
  vecAssemblerDT = VectorAssembler(inputCols = [x for x in redTrainDF.columns if x != 'quality'], outputCol = 'features')
  # cross validation 
  dtparamGrid = ParamGridBuilder().addGrid(dt.maxDepth, [2, 5, 10]).addGrid(dt.maxBins, [10, 20, 40]).build()
  evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'quality')
  cv = CrossValidator(estimator = dt, evaluator = evaluator, estimatorParamMaps = dtparamGrid, numFolds = 3, parallelism = 4)
  
  # pipeline 
  pipeline = Pipeline(stages = [vecAssemblerDT, cv])
  pipelineModel = pipeline.fit(redTrainDF)
  
  cvModel = pipelineModel.stages[-1]
  dtModel = cvModel.bestModel 
  
  testDF = vecAssemblerDT.transform(redTestDF)
  predDF = dtModel.transform(testDF)
  display(predDF.select('features', 'quality', 'prediction'))
  
  # log parameters 
  mlflow.log_param('features', 'all')
  mlflow.log_param('color', 'red')
  # log metric 
  rmse = evaluator.setMetricName('rmse').evaluate(predDF)
  mlflow.log_metric('rmse', rmse)

features,quality,prediction
"Map(vectorType -> dense, length -> 11, values -> List(5.4, 0.74, 0.0, 1.2, 0.041, 16.0, 46.0, 0.99258, 4.01, 0.59, 12.5))",6,5.743589743589744
"Map(vectorType -> dense, length -> 11, values -> List(5.9, 0.61, 0.08, 2.1, 0.071, 16.0, 24.0, 0.99376, 3.56, 0.77, 11.1))",6,6.0
"Map(vectorType -> dense, length -> 11, values -> List(6.0, 0.5, 0.0, 1.4, 0.057, 15.0, 26.0, 0.99448, 3.36, 0.45, 9.5))",5,5.174887892376682
"Map(vectorType -> dense, length -> 11, values -> List(6.2, 0.64, 0.09, 2.5, 0.081, 15.0, 26.0, 0.99538, 3.57, 0.63, 12.0))",5,6.307692307692308
"Map(vectorType -> dense, length -> 11, values -> List(6.4, 0.56, 0.15, 1.8, 0.078, 17.0, 65.0, 0.99294, 3.33, 0.6, 10.5))",6,5.703125
"Map(vectorType -> dense, length -> 11, values -> List(6.6, 0.815, 0.02, 2.7, 0.072, 17.0, 34.0, 0.9955, 3.58, 0.89, 12.3))",7,6.783132530120482
"Map(vectorType -> dense, length -> 11, values -> List(6.8, 0.91, 0.06, 2.0, 0.06, 4.0, 11.0, 0.99592, 3.53, 0.64, 10.9))",4,6.0
"Map(vectorType -> dense, length -> 11, values -> List(7.0, 0.36, 0.21, 2.4, 0.086, 24.0, 69.0, 0.99556, 3.4, 0.53, 10.1))",6,5.174887892376682
"Map(vectorType -> dense, length -> 11, values -> List(7.0, 0.685, 0.0, 1.9, 0.067, 40.0, 63.0, 0.9979, 3.6, 0.81, 9.9))",5,5.44
"Map(vectorType -> dense, length -> 11, values -> List(7.0, 0.78, 0.08, 2.0, 0.093, 10.0, 19.0, 0.9956, 3.4, 0.47, 10.0))",5,5.15625


In [0]:
display(dtModel)

treeNode
"{""index"":31,""featureType"":""continuous"",""prediction"":null,""threshold"":10.525,""categories"":null,""feature"":10,""overflow"":false}"
"{""index"":15,""featureType"":""continuous"",""prediction"":null,""threshold"":0.575,""categories"":null,""feature"":9,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":10.350000000000001,""categories"":null,""feature"":10,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":0.7525,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":0.525,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":5.174887892376682,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":4.8,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":0.065,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":4.5,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":5.15625,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [0]:
# see the best model parameters 
# print(dtModel.extractParamMap())
paramDict = dtModel.extractParamMap()
tempDict = {}
for k, v in paramDict.items():
  tempDict[k.name] = v

bestMaxDepth = tempDict['maxDepth']
bestMaxBins = tempDict['maxBins']

tempDict = {'Max Depth': [bestMaxDepth], 'Max Bins': [bestMaxBins]}
bestCVModel = pd.DataFrame(tempDict, index = [0])
display(bestCVModel)

Max Depth,Max Bins
5,20


In [0]:
# get best model metrics 
experiment_id = run.info.experiment_id
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"])
for run in runs:
  if run.data.params['mlModelClass'] == 'DecisionTreeRegressor' and int(run.data.params['maxBins']) == bestMaxBins and int(run.data.params['maxDepth']) == bestMaxDepth:
    print(run) 
    redMetricsDF = pd.DataFrame(run.data.metrics, index = [0])
    break
display(redMetricsDF)

avg_rmse,std_rmse
0.6767975847554345,0.0131962607602101


In [0]:
# feature importance
pandasDF = pd.DataFrame(list(zip([x for x in redTrainDF.columns if x != 'quality'], dtModel.featureImportances)), columns=["feature", "importance"])
topFeatures = pandasDF.sort_values(["importance"], ascending=False)
topFeatures

Unnamed: 0,feature,importance
10,alcohol,0.477595
9,sulphates,0.237685
1,volatile_acidity,0.099796
6,total_sulfur_dioxide,0.052981
5,free_sulfur_dioxide,0.034336
2,citric_acid,0.027481
0,fixed_acidity,0.026592
4,chlorides,0.021766
8,pH,0.011551
3,residual_sugar,0.010215


## White Wine

In [0]:
with mlflow.start_run(run_name = 'WHITE-DT-All-Features') as run:
  # model 
  dt = DecisionTreeRegressor(labelCol = 'quality')
  vecAssemblerDT = VectorAssembler(inputCols = [x for x in whiteTrainDF.columns if x != 'quality'], outputCol = 'features')
  # cross validation 
  dtparamGrid = ParamGridBuilder().addGrid(dt.maxDepth, [2, 5, 10]).addGrid(dt.maxBins, [10, 20, 40]).build()
  evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'quality')
  cv = CrossValidator(estimator = dt, evaluator = evaluator, estimatorParamMaps = dtparamGrid, numFolds = 3, parallelism = 4)
  
  # pipeline 
  pipeline = Pipeline(stages = [vecAssemblerDT, cv])
  pipelineModel = pipeline.fit(whiteTrainDF)
  
  cvModel = pipelineModel.stages[-1]
  dtModel = cvModel.bestModel 
  
  testDF = vecAssemblerDT.transform(whiteTestDF)
  predDF = dtModel.transform(testDF)
  display(predDF.select('features', 'quality', 'prediction'))
  
  # log parameters 
  mlflow.log_param('features', 'all')
  mlflow.log_param('color', 'white')
  # log metric 
  rmse = evaluator.setMetricName('rmse').evaluate(predDF)
  mlflow.log_metric('rmse', rmse)

features,quality,prediction
"Map(vectorType -> dense, length -> 11, values -> List(4.9, 0.47, 0.17, 1.9, 0.035, 60.0, 148.0, 0.98964, 3.27, 0.35, 11.5))",6,4.0
"Map(vectorType -> dense, length -> 11, values -> List(5.1, 0.35, 0.26, 6.8, 0.034, 36.0, 120.0, 0.99188, 3.38, 0.4, 11.5))",6,6.336309523809524
"Map(vectorType -> dense, length -> 11, values -> List(5.2, 0.3, 0.34, 1.5, 0.038, 18.0, 96.0, 0.98942, 3.56, 0.48, 13.0))",8,6.804347826086956
"Map(vectorType -> dense, length -> 11, values -> List(5.4, 0.29, 0.38, 1.2, 0.029, 31.0, 132.0, 0.98895, 3.28, 0.36, 12.4))",6,6.229166666666667
"Map(vectorType -> dense, length -> 11, values -> List(5.6, 0.18, 0.27, 1.7, 0.03, 31.0, 103.0, 0.98892, 3.35, 0.37, 12.9))",6,6.804347826086956
"Map(vectorType -> dense, length -> 11, values -> List(5.6, 0.26, 0.18, 1.4, 0.034, 18.0, 135.0, 0.99174, 3.32, 0.35, 10.2))",6,6.4
"Map(vectorType -> dense, length -> 11, values -> List(5.7, 0.2, 0.3, 2.5, 0.046, 38.0, 125.0, 0.99276, 3.34, 0.5, 9.9))",6,5.926881720430107
"Map(vectorType -> dense, length -> 11, values -> List(5.7, 0.28, 0.35, 1.2, 0.052, 39.0, 141.0, 0.99108, 3.44, 0.69, 11.3))",6,6.336309523809524
"Map(vectorType -> dense, length -> 11, values -> List(5.8, 0.22, 0.29, 1.3, 0.036, 25.0, 68.0, 0.98865, 3.24, 0.35, 12.6))",6,6.229166666666667
"Map(vectorType -> dense, length -> 11, values -> List(5.8, 0.23, 0.27, 1.8, 0.043, 24.0, 69.0, 0.9933, 3.38, 0.31, 9.4))",6,5.45


In [0]:
display(dtModel)

treeNode
"{""index"":31,""featureType"":""continuous"",""prediction"":null,""threshold"":10.850000000000001,""categories"":null,""feature"":10,""overflow"":false}"
"{""index"":15,""featureType"":""continuous"",""prediction"":null,""threshold"":0.2525,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":0.2075,""categories"":null,""feature"":1,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":0.9978100000000001,""categories"":null,""feature"":7,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":7.5,""categories"":null,""feature"":5,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":4.7,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":5.926881720430107,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":0.305,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":7.098039215686274,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":6.162162162162162,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [0]:
# see the best model parameters 
paramDict = dtModel.extractParamMap()
tempDict = {}
for k, v in paramDict.items():
  tempDict[k.name] = v

bestMaxDepth = tempDict['maxDepth']
bestMaxBins = tempDict['maxBins']

tempDict = {'Max Depth': [bestMaxDepth], 'Max Bins': [bestMaxBins]}
bestCVModel = pd.DataFrame(tempDict, index = [0])
display(bestCVModel)

Max Depth,Max Bins
5,40


In [0]:
# get best model metrics 
experiment_id = run.info.experiment_id
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"])
for run in runs:
  if run.data.params['mlModelClass'] == 'DecisionTreeRegressor' and int(run.data.params['maxBins']) == bestMaxBins and int(run.data.params['maxDepth']) == bestMaxDepth:
    print(run) 
    whiteMetricsDF = pd.DataFrame(run.data.metrics, index = [0])
    break
display(whiteMetricsDF)

avg_rmse,std_rmse
0.7467741384386625,0.0118710409843188


In [0]:
pandasDF = pd.DataFrame(list(zip([x for x in whiteTrainDF.columns if x != 'quality'], dtModel.featureImportances)), columns=["feature", "importance"])
topFeatures = pandasDF.sort_values(["importance"], ascending=False)
topFeatures

Unnamed: 0,feature,importance
10,alcohol,0.529053
1,volatile_acidity,0.196905
5,free_sulfur_dioxide,0.126104
7,density,0.053797
3,residual_sugar,0.029487
2,citric_acid,0.021063
8,pH,0.014072
6,total_sulfur_dioxide,0.013836
0,fixed_acidity,0.010479
9,sulphates,0.005204


## Decision Tree with Hyperopt Hyperparameter Tuning

In [0]:
# imports
from hyperopt import hp 
from hyperopt import fmin, tpe, STATUS_OK, Trials

## Red Wine

In [0]:
# define the search space 
search_space = {
  'maxDepth': hp.randint('maxDepth', 2, 30),
  'maxBins': hp.randint('maxBins', 10, 40)
}

def objective_function(params):
  with mlflow.start_run(nested = True):
    # hyperparameters to tune
    maxDepth = params['maxDepth']
    maxBins = params['maxBins']
    
    # model 
    vecAssemblerDT = VectorAssembler(inputCols = [x for x in redTrainDF.columns if x != 'quality'], outputCol = 'features')
    dt = DecisionTreeRegressor(labelCol = 'quality', maxBins = maxBins, maxDepth = maxDepth, seed = 42)
    
    # pipeline 
    pipeline = Pipeline(stages = [vecAssemblerDT, dt])
    pipelineModel = pipeline.fit(redTrainDF)
    
    # evaluate predictions 
    redPredDF = pipelineModel.transform(redTestDF)
    regressionEvaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'quality')
    rmse = regressionEvaluator.evaluate(redPredDF)
    
    # log parameters 
    mlflow.log_param('maxDepth', maxDepth)
    mlflow.log_param('maxBins', maxBins)
    mlflow.log_param('color', 'red')
    mlflow.log_param('features', 'all')
    mlflow.log_param('tuning', 'hyperopt')
    mlflow.log_metric('rmse', rmse)
  
  return {'loss': rmse, 'status': STATUS_OK}

# create parent run 
with mlflow.start_run(run_name = 'RED-DT-All-Features-Hyperopt') as run:
  num_evals = 30
  trials = Trials()
  best_hyperparam = fmin(fn = objective_function,
                        space = search_space,
                        algo = tpe.suggest,
                        max_evals = num_evals,
                        trials = trials)
  # log param and metric for the best model 
  for name, value in best_hyperparam.items():
    mlflow.log_param(name, value)
  mlflow.log_metric('loss', trials.best_trial['result']['loss'])
    

In [0]:
redResult = pd.DataFrame(best_hyperparam, index = [0])
display(redResult)

maxBins,maxDepth
10,4


In [0]:
bestMaxBins = best_hyperparam['maxBins']
bestMaxDepth = best_hyperparam['maxDepth']

In [0]:
# get best model metrics 
experiment_id = run.info.experiment_id
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"])
for run in runs:
  if run.data.params['tuning'] == 'hyperopt' and int(run.data.params['maxBins']) == bestMaxBins and int(run.data.params['maxDepth']) == bestMaxDepth:
    print(run) 
    redMetricsDF = pd.DataFrame(run.data.metrics, index = [0])
    break
display(redMetricsDF)

rmse
0.6838940275640455


## White Wine

In [0]:
def objective_function(params):
  with mlflow.start_run(nested = True):
    # hyperparameters to tune
    maxDepth = params['maxDepth']
    maxBins = params['maxBins']
    
    # model 
    vecAssemblerDT = VectorAssembler(inputCols = [x for x in whiteTrainDF.columns if x != 'quality'], outputCol = 'features')
    dt = DecisionTreeRegressor(labelCol = 'quality', maxBins = maxBins, maxDepth = maxDepth, seed = 42)
    
    # pipeline 
    pipeline = Pipeline(stages = [vecAssemblerDT, dt])
    pipelineModel = pipeline.fit(whiteTrainDF)
    
    # evaluate predictions 
    whitePredDF = pipelineModel.transform(whiteTestDF)
    regressionEvaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'quality')
    rmse = regressionEvaluator.evaluate(whitePredDF)
    
    # log parameters 
    mlflow.log_param('maxDepth', maxDepth)
    mlflow.log_param('maxBins', maxBins)
    mlflow.log_param('color', 'white')
    mlflow.log_param('features', 'all')
    mlflow.log_param('tuning', 'hyperopt')
    mlflow.log_metric('rmse', rmse)
  
  return {'loss': rmse, 'status': STATUS_OK}

# create parent run 
with mlflow.start_run(run_name = 'WHITE-DT-All-Features-Hyperopt') as run:
  num_evals = 30
  trials = Trials()
  best_hyperparam = fmin(fn = objective_function,
                        space = search_space,
                        algo = tpe.suggest,
                        max_evals = num_evals,
                        trials = trials)
  # log param and metric for the best model 
  for name, value in best_hyperparam.items():
    mlflow.log_param(name, value)
  mlflow.log_metric('loss', trials.best_trial['result']['loss'])

In [0]:
whiteResult = pd.DataFrame(best_hyperparam, index = [0])
display(whiteResult)

maxBins,maxDepth
12,7


In [0]:
bestMaxBins = best_hyperparam['maxBins']
bestMaxDepth = best_hyperparam['maxDepth']

In [0]:
# get best model metrics 
experiment_id = run.info.experiment_id
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"])
for run in runs:
  if run.data.params['tuning'] == 'hyperopt' and int(run.data.params['maxBins']) == bestMaxBins and int(run.data.params['maxDepth']) == bestMaxDepth:
    print(run) 
    whiteMetricsDF = pd.DataFrame(run.data.metrics, index = [0])
    break
display(whiteMetricsDF)

rmse
0.7617022714530096
