In [None]:
# default_exp mldsutils

# mldsutils

## Dr. Tirthajyoti Sarkar, Fremont, CA

> This is a utility package for some of the most common data science (DS) and machine learning (ML) functions I use everyday.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import matplotlib as mpl
mpl.rcParams['figure.dpi']=125

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, BaggingClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis

from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor

In [None]:
#export
def rename_duplicates(old):
    seen = {}
    for x in old:
        if x in seen:
            seen[x] += 1
            yield "%s_%d" % (x, seen[x])
        else:
            seen[x] = 0
            yield x

## Running a list of classifiers

In [None]:
#export
def run_classifiers(X,y,
                    clf_lst = [LogisticRegression(C=0.1,n_jobs=-1)],names=None,
                    num_runs=10,test_frac=0.2,scaling=True,
                    metric='accuracy',
                    runtime=True,
                    verbose=0):
    """
    Runs through the list of classifiers for a given number of times
    """
    if names is None:
        names = [str(type(c)).split('.')[-1][:-2] for c in clf_lst]
        names = list(rename_duplicates(names))

    assert len(names)==len(clf_lst), print("Length of the classifier names and list of classifiers did not match.")
    
    scores = dict.fromkeys(names,[])
    if runtime:
        runtimes = dict.fromkeys(names,[])
    for name, clf in zip(names, clf_lst):
        if runtime:
            sc,rt= [],[]
        else:
            sc=[]
        for i in range(num_runs):
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_frac,)
            if scaling:
                X_train = StandardScaler().fit_transform(X_train)
                X_test = StandardScaler().fit_transform(X_test)
            if runtime:
                t1 = time.time()
                clf.fit(X_train, y_train)
                t2 = time.time()
                delta_t = round((t2-t1)*1000,3)
                rt.append(delta_t)
            else:
                clf.fit(X_train, y_train)
            if metric=='accuracy':
                score = round(clf.score(X_test, y_test),3)
            if metric=='f1':
                score = f1_score(y_test,clf.predict(X_test))
            sc.append(score)
        
        # Book-keeping scores and runtime        
        sc = np.array(sc)
        scores[name] = sc
        if runtime:
            rt = np.array(rt)
            runtimes[name] = rt
        if verbose:
            print(f"Finished {num_runs} runs for {name} algorithm")
            print("-"*75)
    
    # Convert to DataFrame
    df_scores = pd.DataFrame(scores)
    if runtime:
        df_runtimes = pd.DataFrame(runtimes)
    
    if runtime:
        return df_scores,df_runtimes
    else:
        return df_scores

In [None]:
#export
def plot_bars(d,
              t1="Mean accuracy score of algorithms",
              t2="Std.dev of the accuracy scores of algorithms"):
    """
    """
    fig,ax=plt.subplots(1,2,figsize=(14,5))
    ax[0].barh(y=list(d.columns),width=d.describe().T['mean'],height=0.6,color='goldenrod')
    ax[0].set_title(t1)
    ax[1].barh(y=list(d.columns),width=d.describe().T['std'],height=0.6,color='dodgerblue')
    ax[1].set_title(t2)
    ax[0].spines['top'].set_visible(False)
    ax[0].spines['right'].set_visible(False)
    ax[0].spines['left'].set_visible(False)
    ax[0].spines['bottom'].set_color('#DDDDDD')
    ax[1].spines['top'].set_visible(False)
    ax[1].spines['right'].set_visible(False)
    ax[1].spines['left'].set_visible(False)
    ax[1].spines['bottom'].set_color('#DDDDDD')
    plt.tight_layout(pad=1.5)
    plt.show()

## Running a list of regressors

In [None]:
#export
def run_regressors(X,y,
                    reg_lst = [LinearRegression(n_jobs=-1)],names=None,
                    num_runs=10,test_frac=0.2,scaling=True,
                    metric='rmse',
                    runtime=True,
                    verbose=0):
    """
    Runs through the list of classifiers for a given number of times
    """
    if names is None:
        names = [str(type(c)).split('.')[-1][:-2] for c in reg_lst]
        names = list(rename_duplicates(names))

    assert len(names)==len(reg_lst), print("Length of the regressor names and list of regressors did not match.")
    
    if len(X.shape)==1:
        X = X.reshape(-1,1)
    if len(y.shape)==1:
        y = y.reshape(-1,1)
    
    scores = dict.fromkeys(names,[])
    if runtime:
        runtimes = dict.fromkeys(names,[])
    for name, reg in zip(names, reg_lst):
        if runtime:
            sc,rt= [],[]
        else:
            sc=[]
        for i in range(num_runs):
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_frac,)
            if scaling:
                X_train = StandardScaler().fit_transform(X_train)
                X_test = StandardScaler().fit_transform(X_test)
            if runtime:
                t1 = time.time()
                reg.fit(X_train, y_train)
                t2 = time.time()
                delta_t = round((t2-t1)*1000,3)
                rt.append(delta_t)
            else:
                reg.fit(X_train, y_train)
            if metric=='rmse':
                rmse = round(np.sqrt(np.mean((reg.predict(X_test)-y_test)**2).mean()),3)
                sc.append(rmse)
            if metric=='r2':
                r2 = reg.score(X_test,y_test)
                sc.append(r2)
        
        # Book-keeping scores and runtime        
        sc = np.array(sc)
        scores[name] = sc
        if runtime:
            rt = np.array(rt)
            runtimes[name] = rt
        if verbose:
            print(f"Finished {num_runs} runs for {name} algorithm")
            print("-"*75)
    
    # Convert to DataFrame
    df_scores = pd.DataFrame(scores)
    if runtime:
        df_runtimes = pd.DataFrame(runtimes)
    
    if runtime:
        return df_scores,df_runtimes
    else:
        return df_scores

## A test of running the `run_regressors` function

In [None]:
X = np.random.normal(size=2000)
y = 2*X+3
d1 = run_regressors(X,y,metric='r2',runtime=False)
assert (1-d1['LinearRegression']).sum() < 0.1