The UCI Machine Learning Repository makes available a popular dataset identifying various properties of three cultivars of Italian wine grapes: https://archive.ics.uci.edu/ml/datasets/Wine. These can be used to build a multi-class identifier with which measurements of these properties can be used to predict which cultivar is being observed.

The values in this dataset are:</p>

0. Cultivar
1. Alcohol
2. Malic acid
3. Ash
4. Alcalinity of ash
5. Magnesium
6. Total phenols
7. Flavanoids
8. Nonflavanoid phenols
9. Proanthocyanins
10. Color intensity
11. Hue
12. OD280/OD315 of diluted wines
13. Proline

We'll be using a multi-class predictor to predict the cultivar type given the characteristics of Italian Wine Grapes

We'll split the original training dataset into an 80:20 training and validation sets for the purposes of tuning.  We'll not use an exhaustive tuning method but rather use hyperopt to make this more efficient.  One thing to note in Hyperopt, 'ALWAYS CONVERT VALUES RECEIVED FROM HYPEROPT INTO INTEGERS!!! Since hyperopt gives out values in float, we can not pass those values as parameters in decision tree's, hence we'll need to conver them into integers!!!

Once the model is tuned, we'll train our final model using the optimized hyperparameter values.

In [0]:
# notebook config
USER_NAME = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
FILE_STORE_ROOT = '/FileStore/shared_uploads/'+USER_NAME

In [0]:
# examine the file
file_name = FILE_STORE_ROOT+'/wine/wine.data'
dbutils.fs.head(file_name)

Out[36]: '1,14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065\n1,13.2,1.78,2.14,11.2,100,2.65,2.76,.26,1.28,4.38,1.05,3.4,1050\n1,13.16,2.36,2.67,18.6,101,2.8,3.24,.3,2.81,5.68,1.03,3.17,1185\n1,14.37,1.95,2.5,16.8,113,3.85,3.49,.24,2.18,7.8,.86,3.45,1480\n1,13.24,2.59,2.87,21,118,2.8,2.69,.39,1.82,4.32,1.04,2.93,735\n1,14.2,1.76,2.45,15.2,112,3.27,3.39,.34,1.97,6.75,1.05,2.85,1450\n1,14.39,1.87,2.45,14.6,96,2.5,2.52,.3,1.98,5.25,1.02,3.58,1290\n1,14.06,2.15,2.61,17.6,121,2.6,2.51,.31,1.25,5.05,1.06,3.58,1295\n1,14.83,1.64,2.17,14,97,2.8,2.98,.29,1.98,5.2,1.08,2.85,1045\n1,13.86,1.35,2.27,16,98,2.98,3.15,.22,1.85,7.22,1.01,3.55,1045\n1,14.1,2.16,2.3,18,105,2.95,3.32,.22,2.38,5.75,1.25,3.17,1510\n1,14.12,1.48,2.32,16.8,95,2.2,2.43,.26,1.57,5,1.17,2.82,1280\n1,13.75,1.73,2.41,16,89,2.6,2.76,.29,1.81,5.6,1.15,2.9,1320\n1,14.75,1.73,2.39,11.4,91,3.1,3.69,.43,2.81,5.4,1.25,2.73,1150\n1,14.38,1.87,2.38,12,102,3.3,3.64,.29,2.96,7.5,1.2,3,1547\n1,13.63,1.81,2.7,17.2,112,2.85,2.91,

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
 
import pandas as pd
import numpy as np
 
from sklearn.model_selection import train_test_split
 
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import mean_squared_error
 
from hyperopt import hp, fmin, tpe, SparkTrials, STATUS_OK, space_eval

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression

from sklearn.pipeline import Pipeline

In [0]:
# read the data to a pandas DataFrame and assemble feature and label arrays
wine_schema = StructType([
  StructField('Cultivar', StringType()),
  StructField('Alcohol', FloatType()),
  StructField('Malic acid', FloatType()),
  StructField('Ash', FloatType()),
  StructField('Alcalinity of ash', FloatType()),
  StructField('Magnesium', FloatType()),
  StructField('Total phenols', FloatType()),
  StructField('Flavanoids', FloatType()),
  StructField('Nonflavanoid phenols', FloatType()),
  StructField('Proanthocyanins', FloatType()),
  StructField('Color intensity', FloatType()),
  StructField('Hue', FloatType()),
  StructField('OD280/OD315 of diluted wines', FloatType()),
  StructField('Proline', FloatType())
  ])
 
 
wine = (
  spark
    .read
    .format('csv')
    .schema(wine_schema)
    .load(file_name)
    )
 
wine_pd = wine.toPandas()
 
display(wine_pd)

Cultivar,Alcohol,Malic acid,Ash,Alcalinity of ash,Magnesium,Total phenols,Flavanoids,Nonflavanoid phenols,Proanthocyanins,Color intensity,Hue,OD280/OD315 of diluted wines,Proline
1,14.23,1.71,2.43,15.6,127.0,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065.0
1,13.2,1.78,2.14,11.2,100.0,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050.0
1,13.16,2.36,2.67,18.6,101.0,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185.0
1,14.37,1.95,2.5,16.8,113.0,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480.0
1,13.24,2.59,2.87,21.0,118.0,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735.0
1,14.2,1.76,2.45,15.2,112.0,3.27,3.39,0.34,1.97,6.75,1.05,2.85,1450.0
1,14.39,1.87,2.45,14.6,96.0,2.5,2.52,0.3,1.98,5.25,1.02,3.58,1290.0
1,14.06,2.15,2.61,17.6,121.0,2.6,2.51,0.31,1.25,5.05,1.06,3.58,1295.0
1,14.83,1.64,2.17,14.0,97.0,2.8,2.98,0.29,1.98,5.2,1.08,2.85,1045.0
1,13.86,1.35,2.27,16.0,98.0,2.98,3.15,0.22,1.85,7.22,1.01,3.55,1045.0


In [0]:
#Checking for missing values
wine_pd.isnull().sum()

Out[40]: Cultivar                        0
Alcohol                         0
Malic acid                      0
Ash                             0
Alcalinity of ash               0
Magnesium                       0
Total phenols                   0
Flavanoids                      0
Nonflavanoid phenols            0
Proanthocyanins                 0
Color intensity                 0
Hue                             0
OD280/OD315 of diluted wines    0
Proline                         0
dtype: int64

There are no missing values in the dataset. Hence we won't need to replace or impute missing values. Although decision trees can handle missing values, we'll still create a pipeline to deal with missing values

In [0]:
#Separating features from labels
y = wine_pd['Cultivar']
X = wine_pd.drop('Cultivar',axis=1)

In [0]:
# split the data into training and test data sets
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, train_size=0.7)

In [0]:
#We'll now apply column transformation to take care of missing values from numerical and categorical columns using SimpleImputer and Column Transformer

missing_value_transformer = ColumnTransformer([
  (  'median_missing', 
      SimpleImputer(missing_values=np.NaN, strategy='median'), 
      [0,1,2,3,4,5,6,7,8,9,10,11,12]
  )
  ])



In [0]:
# split the training data into training and validation datasets
X_train_train, X_train_validate, y_train_train, y_train_validate = train_test_split(X_train, y_train, stratify=y_train, train_size=0.8) 

In [0]:
# We'll now tune our model for the optimal hyperparameters
# The evaluation function ahead will determine an optimal value for:
# max_depth between 1 and 10
# max_features between 1 and 13
# all other features are allowed to remain at their defaults
search_space = {
    'max_depth' : hp.quniform('max_depth', 1, 10, 1)                                  
    ,'max_features' : hp.quniform('max_features', 1, 13, 1)     
    }
#Explaination of parameters :
#1. Max Depth : It refers to the number of nodes the tree will have, or the depth the tree structure will go too. If too many leaf nodes are present in our model, it may overcomplicate the model, and the model will become sensitive to undesired variations or noises in the datasets being provided. This would lead to overfitting
# 2. Max Features : It refers to the number of features from the dataset being considered. Feature reduction is a classis technique of decreasing a models variance hence decreasing the chances of overfitting.  

In [0]:
#Here, we are creating Spark Cluster Broadcasts, so that broadcasts or copies of the train_train and train_validate structures can be passed on to all clusters performing hyperparameter optimization parallely
X_train_train_broadcast = sc.broadcast(X_train_train)
y_train_train_broadcast = sc.broadcast(y_train_train)
X_train_validate_broadcast = sc.broadcast(X_train_validate)
y_train_validate_broadcast = sc.broadcast(y_train_validate)

In [0]:
def evaluate_model(hyperopt_params):
  
  # accessing replicated input data
  X_train_input = X_train_train_broadcast.value
  y_train_input = y_train_train_broadcast.value
  X_validate_input = X_train_validate_broadcast.value
  y_validate_input = y_train_validate_broadcast.value  
  
  # configuring model parameters
  params = hyperopt_params
 
  # adjusting hyperopt-supplied params back to integers for feeding into model
  if 'max_depth' in params: params['max_depth']=int(params['max_depth'])   # hyperopt supplies values as float but must be int
  if 'max_features' in params: params['max_features']=int(params['max_features']) # hyperopt supplies values as float but must be int
  
  # instantiating model with parameters
  model = DecisionTreeClassifier(**params)
  
  # training
  model.fit(X_train_input, y_train_input)
  
  # predicting
  y_pred = model.predict(X_validate_input)
  
  # loss function
  loss = mean_squared_error(y_validate_input,y_pred) 
  
  # return results
  return {'loss': loss, 'status': STATUS_OK}


In [0]:
argmin = fmin(
  fn=evaluate_model,
  space=search_space,
  algo=tpe.suggest,  # algorithm controlling how hyperopt navigates the search space
  max_evals=20,
  trials=SparkTrials(parallelism=4),
  #4 parallel worker nodes will be working to decrease computational stress and perform the task rather quickly
  verbose=True
  )
 
print(argmin)


Hyperopt with SparkTrials will automatically track trials in MLflow. To view the MLflow experiment associated with the notebook, click the 'Runs' icon in the notebook context bar on the upper right. There, you can view all runs.
To view logs from trials, please check the Spark executor logs. To view executor logs, expand 'Spark Jobs' above until you see the (i) icon next to the stage from the trial job. Click it and find the list of tasks. Click the 'stderr' link for a task to view trial logs.


  0%|          | 0/20 [00:00<?, ?trial/s, best loss=?]  5%|▌         | 1/20 [00:07<02:16,  7.17s/trial, best loss: 0.24] 10%|█         | 2/20 [00:08<01:04,  3.59s/trial, best loss: 0.16] 15%|█▌        | 3/20 [00:11<00:56,  3.33s/trial, best loss: 0.16] 20%|██        | 4/20 [00:12<00:39,  2.44s/trial, best loss: 0.12] 25%|██▌       | 5/20 [00:15<00:39,  2.64s/trial, best loss: 0.12] 30%|███       | 6/20 [00:16<00:29,  2.12s/trial, best loss: 0.08] 35%|███▌      | 7/20 [00:18<00:27,  2.08s/trial, best loss: 0.08] 40%|████      | 8/20 [00:19<00:20,  1.74s/trial, best loss: 0.08] 45%|████▌     | 9/20 [00:23<00:26,  2.45s/trial, best loss: 0.08] 50%|█████     | 10/20 [00:24<00:20,  2.00s/trial, best loss: 0.08] 55%|█████▌    | 11/20 [00:25<00:15,  1.70s/trial, best loss: 0.08] 60%|██████    | 12/20 [00:26<00:11,  1.49s/trial, best loss: 0.08] 65%|██████▌   | 13/20 [00:30<00:15,  2.26s/trial, best loss: 0.08] 70%|███████   | 14/20 [00:32<00:13,  2.19s/trial, best loss: 0.08] 

Total Trials: 20: 20 succeeded, 0 failed, 0 cancelled.


{'max_depth': 10.0, 'max_features': 9.0}


In [0]:
# configuring model parameters

params = argmin
 
# adjusting hyperopt-supplied params
if 'max_depth' in params: params['max_depth']=int(params['max_depth'])   # hyperopt supplies values as float but must be int
if 'max_features' in params: params['max_features']=int(params['max_features']) # hyperopt supplies values as float but must be int
 
#training our model using the optimized parameters

model = DecisionTreeClassifier(**params)


In [0]:
clf = Pipeline(steps=[
  ('missing_values', missing_value_transformer),
  ('classification', model)
  ])

In [0]:
#Now we'll fit the data into the pipeline with the correct parameters to ensure that whatever data is fed into the model, the missing values are taken care of 

clf.fit(X_train,y_train)

model.score(X_test, y_test)



Out[69]: 0.9074074074074074

In [0]:
#Destroying the broadcasts to free up memory
X_train_train_broadcast.destroy()
y_train_train_broadcast.destroy()
X_train_validate_broadcast.destroy()
y_train_validate_broadcast.destroy()
