# Tree-based algorithms

Use the information from parameter creation to train and test tree-based algorithms against data  

## Setup

In [None]:
import cv2
import pickle
import pandas as pd
from pandas.core.indexing import convert_missing_indexer
import numpy as np
import seaborn as sns
from time import time
import graphviz

import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import matplotlib.colors as colors

from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import plot_confusion_matrix
from sklearn import tree
from sklearn.model_selection import cross_val_score, GridSearchCV, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.dummy import DummyClassifier
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                            confusion_matrix, classification_report, roc_curve, roc_auc_score)
from sklearn.tree import export_graphviz

from scipy.special import expit
from sklearn.tree import export_graphviz

In [None]:
# Plotting Settings
%matplotlib inline
import matplotlib.colors as colors
import mplhep as hep
plt.style.use(hep.style.ROOT)

import matplotlib.pylab as pylab
params = {'xaxis.labellocation': 'center', 
         'yaxis.labellocation': 'center'}
pylab.rcParams.update(params)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Functions

### Algoroithm training / running

In [None]:
def train_test_split(data, labels, target, train_portion=0.75, seed=1):
    #t0 = time()
    np.random.seed(seed)
    assert 'names' in data.columns, f'`names` column is not found in df.'

    full_data = data.copy()
    full_data['original_name'] = full_data.names.apply(lambda x: '_'.join(x.split('_')[int('aug' in x or 'raw' in x) + int('aug' in x):]))
    original_imgs = np.unique(full_data['original_name'])
    train_img = set(np.random.choice(original_imgs, 
                                 int(train_portion * len(original_imgs)),
                                 False))
    train_idx = full_data.original_name.apply(lambda x: x in train_img)
    train_data = full_data[train_idx].drop('original_name', 1)
    test_data = full_data[~train_idx].drop('original_name', 1)
    
    X_train = train_data[labels]
    X_test = test_data[labels]
    
    y_train = train_data[target]
    y_test = test_data[target]

    y_train=y_train.astype('int')
    y_test=y_test.astype('int')

    X_train=X_train.astype('int')
    X_test=X_test.astype('int')
    
    #print(time() - t0)
    return X_train, X_test, y_train, y_test

In [None]:
def train_test_split_subset(data, labels, target, test_portion=0.25, train_subset_portion=1., seed=1):
    """
    This function creates train and test data.
    The test data is fixed to test_portion * len(data) .
    The train data contains (1 - test_portion) * train_subset_portion.
    (1 - test_portion) * (1 - train_subset_portion) samples will be discarded. 
    """
    np.random.seed(seed)
    assert 'names' in data.columns, f'`names` column is not found in df.'

    full_data = data.copy()
    full_data['original_name'] = full_data.names.apply(
        lambda x: '_'.join(x.split('_')[int('aug' in x or 'raw' in x) + int('aug' in x):]))
    original_imgs = np.unique(full_data['original_name'])
    assert original_imgs.shape[0] == 332
    train_img = set(np.random.choice(original_imgs, 
                                 int((1-test_portion) * len(original_imgs)),
                                 False))
    train_idx = full_data.original_name.apply(lambda x: x in train_img)
    train_data = full_data[train_idx]
    test_data = full_data[~train_idx]

    if train_subset_portion < 1:
        # Use a subset to train the model.
        train_imgs = np.unique(train_data['original_name'])
        train_subimg = set(np.random.choice(train_imgs, 
                                    int(train_subset_portion * len(train_imgs)),
                                    False))
        train_subidx = train_data.original_name.apply(lambda x: x in train_subimg)
        train_data = train_data[train_subidx]
    
    train_data = train_data.drop('original_name', 1).reset_index()
    test_data = test_data.drop('original_name', 1).reset_index()
    
    X_train = train_data[labels]
    X_test = test_data[labels]
    
    y_train = train_data[target]
    y_test = test_data[target]

    y_train=y_train.astype('int')
    y_test=y_test.astype('int')

    X_train=X_train.astype('int')
    X_test=X_test.astype('int')
    
    return X_train, X_test, y_train, y_test

In [None]:
def run_all(X_train, X_test, y_train, y_test, labels, target, df_run, print_all, plot_DT, plot_RF):
    '''Take in a set of train test split data and run a 
       decision tree, knn, boosted decision tree and, random forest,'''
    t0 = time()
    # Decision tree
    max = 7
    max_leaf = 10
    decision_tree_clf = DecisionTreeClassifier(max_depth = max, 
                                               random_state=0, max_leaf_nodes = max_leaf).fit(X_train, y_train)
    #decision_tree_clf = DecisionTreeClassifier( max_leaf_nodes = 10).fit(X_train, y_train)
    #decision_tree_clf = DecisionTreeClassifier(max_depth = 5, random_state=0).fit(X_train, y_train)
    
    print('Decison Tree:', time() - t0)
    
    # Boosted decision trees
    t0 = time()
    l_r = 0.002
    m_d = 5
    BDT_clf = GradientBoostingClassifier(learning_rate=l_r, max_depth=m_d,
                                      random_state=0).fit(X_train, y_train)
    print('BDT', time() - t0)

    # Random Forest
    t0 = time()
    max_d = 5
    n_estimators = 4
    min_leaf= 1
    
    RF_clf = RandomForestClassifier(max_depth = max_d, n_estimators = n_estimators, random_state = 0).fit(X_train, y_train)
    #RF_clf = RandomForestClassifier().fit(X_train, y_train)
    print('RF', time() - t0)

    if print_all:
        
        print('Decision Tree, max_depth={}, max_leaf_nodes={}'.format(max, max_leaf))
        print('Accuracy of Decision Tree classifier on training set: {:.2f}'
             .format(decision_tree_clf.score(X_train, y_train)))
        print('Accuracy of Decision Tree classifier on test set: {:.2f}'
             .format(decision_tree_clf.score(X_test, y_test)))
        print()
        print('GBDT, learning_rate={}, max_depth={}'.format(l_r, m_d))
        print('Accuracy of GBDT classifier on training set: {:.2f}'
             .format(BDT_clf.score(X_train, y_train)))
        print('Accuracy of GBDT classifier on test set: {:.2f}'
             .format(BDT_clf.score(X_test, y_test)))
        print()
        print('Random Forest, (max depth = {}, n_estimators = {})'.format(max_d, n_estimators))
        print('Accuracy of RF classifier on training set: {:.2f}'
             .format(RF_clf.score(X_train, y_train)))
        print('Accuracy of RF classifier on test set: {:.2f}'
             .format(RF_clf.score(X_test, y_test)))

    if plot_DT:
       # Plot decision tree
        fn= (labels)
        cn = ['no flake', 'flake']
        #cn = str(df_run['target'])
        #fig, axes = plt.subplots(nrows = 1,ncols = 1,figsize = (4,4), dpi=300)
        #tree.plot_tree(decision_tree_clf,
                    #   feature_names = fn, 
                    #   class_names=cn,
                    #   filled = True, label = 'none');
        #fig.savefig('decisiontree.png')
        
        dot_data = tree.export_graphviz(decision_tree_clf, out_file=None, 
                                feature_names=fn,  
                                class_names=None,
                                filled=True, impurity = True, rounded = True)
        graph = graphviz.Source(dot_data, format="png") 
        graph.render("decision_tree_graphivz.png")
        
    if plot_RF:
        # Extract single tree
        estimator = RF_clf.estimators_[-1]
        fn= (labels)
        cn = str(df_run[target])
        fig, axes = plt.subplots(nrows = 1,ncols = 1,figsize = (4,4), dpi=300)
        tree.plot_tree(estimator,
                       feature_names = fn, 
                       class_names=cn,
                       filled = True);
        fig.savefig('RF.png')

        
    return decision_tree_clf, BDT_clf, RF_clf


### Optimization of algorihtms 

Use cross validation to determine best hyperparameters for the algorithm. Display result with ROC curve, confusion matrix, and visualization of decision tree 

In [None]:
def cross_valid(clf, param_grid, scoring, X_train, y_train, X_test, y_test, cv_in = 10):
    t0 = time()
    '''Take a classifier and find best parameters'''    
    grid = GridSearchCV(clf, param_grid = param_grid, cv = cv_in)
    grid.fit(X_train, y_train)

    # print prediction results 
    y_true, y_pred = y_test, grid.predict(X_test)
    report = classification_report(y_true, y_pred, output_dict=True)

    acc = report['accuracy']

    results = pd.DataFrame.from_dict(grid.cv_results_)
  
    # Find average
    aves = results['mean_test_score']
    ave = aves.max()
    ave_index = aves.idxmax()

    # Find the minimum, min, std based on highest average 
    #ser2 = results.loc[max_index]
    ser2 = results.loc[ave_index]
    arr = ser2.to_numpy()
    min = arr[7:12].min()
    max = arr[7:12].max()
    std = results.iloc[results['mean_test_score'].idxmax()]['std_test_score']

    return max, min, ave, acc, std, grid.best_estimator_

In [None]:
def plot_confusion(clfs, X_test, y_test, title, names):
    '''Plot confusion matrix for a given classifier and test results'''

    fig, axs = plt.subplots(1, 3, figsize = (25, 6))

    for clf, name, ax in zip (clfs, names, axs):
        plot_confusion_matrix(clf, X_test, y_test, ax=ax, display_labels=np.array(['Flake','No Flake']),
                                    cmap=plt.cm.YlGnBu, colorbar = False)
    ax.set_title("%s %s"%(name, title), fontsize = 20)
    plt.show()
    fig.savefig("%s %s"%(name, title), facecolor= 'white')

In [None]:
def ROC_curve(clfs, names, df, title):
    ''' Plot confusion matrix for a given classifier'''
  
    ## ROC Curve ##
    target = 'labels'
    labels = [df.columns[4],'[ 5. 10.]', '[10. 20.]','[20. 30.]','[30. 40.]','[40. 50.]','[50. 60.]','[60. 70.]','[70. 80.]','[80. 90.]','[ 90. 100.]','[100. 110.]','[110. 120.]','[120. 130.]','[130. 140.]','[140. 180.]', df.columns[3+16]]

    X = df[labels]
    y = df[target] 
    y=y.astype('int')

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=0)

    colors = ['#2c7fb8', '#7fcdbb', '#bebada']
    for clf, name, col in zip (clfs, names, colors):
        # fit model
        clf.fit(X_train, y_train)

        # predict probabilities
        pred_prob1 = clf.predict_proba(X_test)

        # roc curve for models
        fpr1, tpr1, thresh1 = roc_curve(y_test, pred_prob1[:,1], pos_label=1)

        # roc curve for tpr = fpr 
        random_probs = [0 for i in range(len(y_test))]
        p_fpr, p_tpr, _ = roc_curve(y_test, random_probs, pos_label=1)

        # auc scores
        auc_score1 = roc_auc_score(y_test, pred_prob1[:,1])

        # plot roc curves
        plt.plot(fpr1, tpr1, linestyle='--',color=col, label="%s (area = %0.2f)" % (name, auc_score1))

    # title
    #plt.title("ROC Curve %s"%(title))
    # x label
    plt.xlabel('False Positive Rate')
    # y label
    plt.ylabel('True Positive rate')

    plt.legend(loc='best', fontsize = 20)
    plt.savefig("ROC_CURVE_%s %s"%(name, title), facecolor= 'white')
    plt.show()

In [None]:
def plot_accuracy(x, percent, cnn, dt, bdt, rf):
    ''' Creates four panel figure of accuracies for each classifier (cnn, decision
        tree, random forest, graident boosted tree). Line for average test and 
        accuracy on test with std as error bars.'''
    
    kfold_arrs = np.array([dt, bdt, rf])
    legends = np.array(['Decision Tree', 'Gradient Boosted', 'Random Forest'])

    fig, axs = plt.subplots(1, 4, sharey=True, figsize=(15, 10))
    axs[0].plot(x, cnn, '-o', color = '#225ea8')
    axs[0].set_title('CNN', fontsize = 24)
    axs[0].set_xscale('log')
    fig.subplots_adjust(wspace=0)

    for kfold_arr, leng, ax, acc in zip (kfold_arrs, legends, axs[1:], accs):
        kfold_arr = np.array(kfold_arr)
        y = kfold_arr[:,2]

        ax.fill_between(x, kfold_arr[:,2] + kfold_arr[:,3], kfold_arr[:,2] - kfold_arr[:,3], color = '#edf8b1', hatch = '/',
                  alpha=0.5)
        ax.set_title(leng, fontsize = 24)
        ax.plot(x, y, '-o', color = '#225ea8')
        ax.plot(x, kfold_arr[:,4], '-o', color = '#7fcdbb')
        ax.set_xscale('log')


    axs[0].set_ylabel('Accuracy of Classifier')
    #text = ('Accuracies with %s %s Training Data'%(percent, '%'))
    #axs[1].text(0.1, np.max(kfold_arrs[2][:,2] + kfold_arrs[2][:,3])*1.01
             # , text, fontsize = 22)
    axs[2].set_xlabel('Number of Quantized Colors')
    #axs[2].xaxis.set_label_coords(0, .50)
    plt.savefig('Accuracy_plot_%s.png'%(percent))
    plt.show()

In [None]:
def run_all_cv(dfs, titles, scoring_in = 'accuracy', cv_in = 10, trainPortion = 1, cnn = [], isConfusion_ROC = True, isDisplayTree = True, isPlotAccuracy = True, x = [5, 20, 256]):
    count = 0
    rf_n = []
    dt_n = []
    bdt_n = []

    #titles = ['Five Colors', 'Ten Colors', '15 Colors', '20 Colors', '20 Colors']
    for df, title in zip (dfs, titles):

        target = 'labels'
        labels = [df.columns[4],'[ 5. 10.]', '[10. 20.]','[20. 30.]','[30. 40.]','[40. 50.]','[50. 60.]','[60. 70.]','[70. 80.]','[80. 90.]','[ 90. 100.]','[100. 110.]','[110. 120.]','[120. 130.]','[130. 140.]','[140. 180.]', df.columns[3+16]]

        ## Run Machine learning ##
        X_train, X_test, y_train, y_test = train_test_split_tin(df, labels, target)
        #X_train, X_test, y_train, y_test = train_test_split_subset(df, labels, target, test_portion=0.25, train_subset_portion=trainPortion)

        # Decision Tree
        param_dict = { "max_depth":range(4,8), 'max_leaf_nodes':range(5, 20)} 
        max, min, ave, acc, std, dt_clf = grid_search(DecisionTreeClassifier(), param_dict, "accuracy", X_train, y_train, X_test, y_test, cv_in = cv_in)
        dt_n.append(np.array([max, min, ave, std, acc]))
        print('average: ', ave, ' accuracy: ', acc, 'min: ', min, 'max: ', max)

        # Random Forests
        param_dict = { "max_depth":range(4,10), "n_estimators":range(1,10)} 
        max, min, ave, acc, std, rf_clf = grid_search(RandomForestClassifier(), param_dict, "accuracy", X_train, y_train, X_test, y_test, cv_in = cv_in)
        rf_n.append(np.array([max, min, ave, std, acc]))
        print('average: ', ave, ' accuracy: ', acc, 'min: ', min, 'max: ', max)

        # Gradient Boosted Decision Tree 
        ler = np.array([0.0001, 0.001, 0.01, 0.1, 0.3])
        param_dict = {"learning_rate":ler, "max_depth":range(4,7)} 
        max, min, ave, acc, std, gbt_clf = grid_search(GradientBoostingClassifier(), param_dict, "accuracy", X_train, y_train, X_test, y_test, cv_in = cv_in)
        bdt_n.append(np.array([max, min, ave, std, acc]))
        print('average: ', ave, ' accuracy: ', acc, 'min: ', min, 'max: ', max)

        count = count + 1
        print('count: ', count, 'still working ....')

        if(isConfusion_ROC):
            clfs = [dt_clf, rf_clf, gbt_clf]
            names = ["Decision Tree", "Random Forest", 'Gradient Boosted']
            plot_confusion(clfs, X_test, y_test, title, names)
            ROC_curve(clfs, names, df, title)

        if(isDisplayTree):
            fn= (labels)
            dot_data = tree.export_graphviz(dt_clf, out_file=None, 
                                    feature_names=fn,  
                                    class_names=None,
                                    filled=True, impurity = True, rounded = True)
            graph = graphviz.Source(dot_data, format="png") 
            graph.render("decision_tree_graphivz_%s"%(title))

    if(isPlotAccuracy):
        plot_accuracy(x, trainPortion, cnn, dt_n, bdt_n, rf_n)

    return 


### Evaluation

In [None]:
def dummy_classifieres(X_train, X_test, y_train, y_test):
    '''Run dummy classifieres for evaluation'''
    dummy_majority = DummyClassifier(strategy = 'most_frequent').fit(X_train, y_train)
    y_dummy_predictions = dummy_majority.predict(X_test)
    print('Score of Dummy Classifier: ', dummy_majority.score(X_test, y_test))
    
    dummy_uniform = DummyClassifier(strategy = 'uniform').fit(X_train, y_train)
    y_dummy_predictions = dummy_uniform.predict(X_test)
    print('Score of Dummy Classifier (uniform): ', dummy_uniform.score(X_test, y_test))
    
    return dummy_majority, dummy_uniform

In [None]:
def images_classified(clf, X_test, df_in):
    '''Take in a classifier and it's X_test and return four arrays of paths corresponding to TP, TN, FP & FN'''
    
    clf_predicted = clf.predict(X_test)

    X_test2 = X_test.copy()

    X_test2['predicted label'] = clf_predicted


    indexs = X_test2.index.values

    path_arr = []
    label_p = []
    label_t = []

    for index, label in zip (indexs, clf_predicted):
        path_arr.append(df_in['paths'][index])
        label_t.append(df_in['labels'][index])
        label_p.append(label)
        
    data = {'path': path_arr,
            'True label': label_t,
            'Predicted label': label_p}

    df = pd.DataFrame.from_dict(data) 
    
    word = []
    
    for i, j in df.iterrows(): 
        true = j['True label']
        predicted = j['Predicted label']

        if( (true == 0) & (predicted == 0)):
            word.append('TN')

        if( (true == 1) & (predicted == 1)):
            word.append('TP')

        if( (true == 1) & (predicted == 0)):
            word.append('FN')

        if( (true == 0) & (predicted == 1)):
            word.append('FP')

    df['Outcome'] = word
    TP = df[df['Outcome']== 'TP'].path.values
    TN = df[df['Outcome']== 'TN'].path.values
    FP = df[df['Outcome']== 'FP'].path.values
    FN = df[df['Outcome']== 'FN'].path.values
    
    return TP, TN, FP, FN, df
  