## Cross Validated Grid Search Function

In [6]:
def run_on_subset(pipe_params, subset):
    'perform the pipeline(transform corpus to matrix and use to make model, use on test data) using the parameter dictionary on the data'

    #set variables from input
    pipe, parameters = pipe_params
    train_data, train_labels, test_data, test_labels = subset
    
    def gridSearch(pipe, parameters):
        'runs the grid search CV function with input pipe and parameters'

        grid_search = GridSearchCV(
            estimator = pipe,
            param_grid = parameters,
            n_jobs = 2,  
        )
        return grid_search
    
    #set grid Search CV with input pipe and parameters
    grid_search = gridSearch(pipe, parameters)
    #fit the data to the model through the pipe and randomized CV
    grid_search.fit(train_data,train_labels)

    best_parameters = grid_search.best_estimator_.get_params()
    print('Pipeline = {}'.format(pipe))
    
    #formating print statments
    print('Best parameters = ')
    bestparams = []
    if type(parameters) is list:
        for param_dict in parameters:
            if type(param_dict) is dict:
                for param_name in sorted(param_dict.keys()):
                    if param_name in best_parameters.keys():
                        x = str(param_name) + ': ' + str(best_parameters[param_name])
                        if x not in bestparams:
                            bestparams.append(x)
        print(*bestparams, sep = '\n')
                #print(f"{param_name}: {best_parameters[param_name]}")
    else:
        for param in sorted(parameters.keys()):
            print(f"{param}: {best_parameters[param]}")
                        

    #test_accuracy = grid_search.score(test_data, test_labels)
    best_estimator = grid_search.best_estimator_
    #print(
       # "Accuracy of the best parameters using the inner CV of "
      #  f"the random search: {grid_search.best_score_:.3f}"
    #)
    #print(f"Accuracy on test set: {test_accuracy:.3f}")
    return best_estimator

ROC AUC

In [None]:
target_names = ['non-toxic', 'toxic', 'severe_toxic']
#initialize dictionary for evaluation scores
scores = defaultdict(list)

In [4]:
def fit_model(X_train, y_train, classifier):
    
    model = classifier.fit(X_train, y_train)
    return model

def y_preds(model, X_test):
    'get the predicted y values'

    y_preds = model.predict(X_test)
    return y_preds

def y_score1(model, X_test):
    'get the predicted y values'

    y_score = model.predict_proba(X_test)
    return y_score

def y_score2(X_train, X_test, y_train, classifier):
    'get the predicted y values'

    #get scores for model functions which do not have the predict_proba method
    return #y_score

#https://medium.com/@dtuk81/confusion-matrix-visualization-fc31e3f30fea
def conf_Ma(model, y_test, X_test):
    
    y_pred = model.predict(X_test)
    cf_matrix = confusion_matrix(y_test, y_pred)
    
    group_counts = ['{0:0.0f}'.format(value) for value in
                    cf_matrix.flatten()]
    group_percentages = ['{0:.2%}'.format(value) for value in
                        cf_matrix.flatten()/np.sum(cf_matrix)]
    labels = [f'{v1}\n{v2}' for v1, v2, in
            zip(group_counts,group_percentages)]
    labels = np.asarray(labels).reshape(3,3)
    sns.heatmap(cf_matrix, annot=labels, fmt='', cmap='Blues')
    #sns.heatmap(cf_matrix/np.sum(cf_matrix), annot=True, cmap = 'Greens')
    accuracy  = np.trace(cf_matrix) / float(np.sum(cf_matrix))
    stats_text = "\n\nAccuracy={:0.3f}".format(accuracy)
    plt.ylabel('True label')
    plt.xlabel('Predicted label' + stats_text)
    return

def ROC_AUC_analysis(y_train, y_test, y_score, target_names, classifier):
    'Plot the ROC_AUC curve and return the average macro AUC value'

    n_classes = len(np.unique(y_train))
    label_binarizer = LabelBinarizer().fit(y_train)
    y_onehot_test = label_binarizer.transform(y_test)
    y_onehot_test.shape  # (n_samples, n_classes)

    # store the fpr, tpr, and roc_auc for all averaging strategies
    fpr, tpr, roc_auc = dict(), dict(), dict()

    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_onehot_test[:, i], y_score[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr_grid = np.linspace(0.0, 1.0, 1000)

    # Interpolate all ROC curves at these points
    mean_tpr = np.zeros_like(fpr_grid)

    for i in range(n_classes):
        mean_tpr += np.interp(fpr_grid, fpr[i], tpr[i])  # linear interpolation

    # Average it and compute AUC
    mean_tpr /= n_classes

    fpr["weighted"] = fpr_grid
    tpr["weighted"] = mean_tpr
    roc_auc["weighted"] = auc(fpr["weighted"], tpr["weighted"])

    fig, ax = plt.subplots(figsize=(6, 6))
    plt.plot(
        fpr["weighted"],
        tpr["weighted"],
        label=f"weighted-average ROC curve (AUC = {roc_auc['weighted']:.2f})",
        color="navy",
        linestyle=":",
        linewidth=4,
    )

    colors = cycle(["aqua", "darkorange", "cornflowerblue"])
    for class_id, color in zip(range(n_classes), colors):
        RocCurveDisplay.from_predictions(
            y_onehot_test[:, class_id],
            y_score[:, class_id],
            name=f"ROC curve for {target_names[class_id]}",
            color=color,
            ax=ax,
        )

    plt.plot([0, 1], [0, 1], "k--", label="ROC curve for chance level (AUC = 0.5)")
    plt.axis("square")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(str(classifier) + "\nExtension of Receiver Operating Characteristic\nto One-vs-Rest multiclass")
    plt.legend()
    plt.show()

    return roc_auc['weighted']

def analysis(subset, target_names, classifier):
        
        X_train, y_train, X_test, y_test = subset
        model = fit_model(X_train, y_train, classifier)
        y_score = y_score1(model, X_test)
        conf_Ma(model, y_test, X_test)
        ROC_AUC_analysis(y_train, y_test, y_score, target_names, classifier)
        predict = y_preds(model, X_test)
        print(classification_report(y_test, predict, target_names = target_names))
        accuracy = model.score(X_test, y_test)

        scores['Classifier/Pipeline'].append(classifier)
        scores['Accuracy'].append(accuracy)
        scores['ROC_AUC'].append(roc_auc_score(y_test, y_score, average = 'weighted', multi_class ='ovo'))
        for metric in [precision_score, recall_score, f1_score]:
            score_name = metric.__name__.replace("_", " ").replace("score", "").capitalize()
            scores[score_name].append(metric(y_test, predict, average = 'weighted'))
        return