# Cluster classification with Scikit-learn classifiers

In [1]:
import numpy as np
import pandas as pd
import time
import multiprocessing
import functools
from joblib import Parallel, delayed

import sklearn
from sklearn import *
from sklearn.experimental import enable_hist_gradient_boosting

Load ALL the classification models from scikit-learn.
Note that some models are very slow to train with large datasets or crash outright, so we give them a reduced number of (shuffled) rows to learn.
Note that `n_jobs=1` is used, as parallelism is introduced later.

In [24]:
models_a1 = [
    ('BaggingClassifier', sklearn.ensemble.BaggingClassifier(n_jobs=1), 'medi'),
    ('BernoulliNB', sklearn.naive_bayes.BernoulliNB(), 'fast'),
    ('CalibratedClassifierCV', sklearn.calibration.CalibratedClassifierCV(cv=5), 'slow'),
    ('ComplementNB', sklearn.naive_bayes.ComplementNB(), 'fast'),
    ('GaussianNB', sklearn.naive_bayes.GaussianNB(), 'fast'),
]

models_a2 = [
    ('LinearDiscriminantAnalysis', sklearn.discriminant_analysis.LinearDiscriminantAnalysis(), 'fast'),
    ('LinearSVC', sklearn.svm.LinearSVC(max_iter=20000), 'slow'),  # slow with unscaled data
    ('LogisticRegression', sklearn.linear_model.LogisticRegression(solver='lbfgs', multi_class='auto', max_iter=20000), 'slow'),  # slow with unscaled data
    ('LogisticRegressionCV', sklearn.linear_model.LogisticRegressionCV(cv=5, solver='lbfgs', multi_class='auto', max_iter=20000), 'slow'),
    ('MLPClassifier', sklearn.neural_network.MLPClassifier(), 'slow'),
    ('MultinomialNB', sklearn.naive_bayes.MultinomialNB(), 'fast'),
]

models_b = [
    ('NearestCentroid', sklearn.neighbors.NearestCentroid(), 'fast'),
    ('PassiveAggressiveClassifier', sklearn.linear_model.PassiveAggressiveClassifier(max_iter=1000, tol=1e-3, n_jobs=1), 'fast'),
    ('Perceptron', sklearn.linear_model.Perceptron(n_jobs=1), 'fast'),
    ('QuadraticDiscriminantAnalysis', sklearn.discriminant_analysis.QuadraticDiscriminantAnalysis(), 'fast'),
    ('RidgeClassifier', sklearn.linear_model.RidgeClassifier(), 'fast'),
    ('RidgeClassifierCV', sklearn.linear_model.RidgeClassifierCV(), 'fast'),
    ('SGDClassifier', sklearn.linear_model.SGDClassifier(max_iter=5000, tol=1e-3, n_jobs=1), 'medi'),

]

# Run these sequential due to "buffer source array is read-only" with LokyBackend
models_s = [
    ('AdaBoostClassifier', sklearn.ensemble.AdaBoostClassifier(), 'fast'),
    ('DecisionTreeClassifier', sklearn.tree.DecisionTreeClassifier(), 'medi'),    
    ('ExtraTreeClassifier', sklearn.tree.ExtraTreeClassifier(), 'fast'),
    ('ExtraTreesClassifier', sklearn.ensemble.ExtraTreesClassifier(n_estimators=100, n_jobs=-1), 'medi'),
    ('RandomForestClassifier', sklearn.ensemble.RandomForestClassifier(n_estimators=100, n_jobs=-1), 'medi'),
]

# Models that fail alot
models_o = [
    ('NuSVC', sklearn.svm.NuSVC(), 'fast'),  # nu infeasible 
    ('RadiusNeighborsClassifier', sklearn.neighbors.RadiusNeighborsClassifier(radius=3, n_jobs=-1), 'medi'),
    # ('GaussianProcessClassifier', sklearn.gaussian_process.GaussianProcessClassifier(), 'slow'),
    # ('GradientBoostingClassifier', sklearn.ensemble.GradientBoostingClassifier(), 'slow'),  # crashes
    # ('HistGradientBoostingClassifier', sklearn.ensemble.HistGradientBoostingClassifier(), 'slow'),  # crashes? 
    ('KNeighborsClassifier', sklearn.neighbors.KNeighborsClassifier(n_jobs=2), 'slow'),
    ('LabelPropagation', sklearn.semi_supervised.LabelPropagation(), 'slow'),  # requires too much memory to train with larger datasets
    ('LabelSpreading', sklearn.semi_supervised.LabelSpreading(), 'slow'),  # bit slow
    ('SVC', sklearn.svm.SVC(gamma='scale'), 'slow'), # slow, not timeouted 
]

Some models only work with properly scaled data, so we prepare ALL available scalers.

In [22]:
scalers = [
    # ('Unscaled data', ),
    ('standard scaling', sklearn.preprocessing.StandardScaler()),
    ('min-max scaling', sklearn.preprocessing.MinMaxScaler()),
    ('max-abs scaling', sklearn.preprocessing.MaxAbsScaler()),
    ('robust scaling', sklearn.preprocessing.RobustScaler(quantile_range=(25, 75))),
    # ('power transformation (Yeo-Johnson)', sklearn.preprocessing.PowerTransformer(method='yeo-johnson')),
    # ('power transformation (Box-Cox)', sklearn.preprocessing.PowerTransformer(method='box-cox')), # 'strictly zero' meh.
    ('quantile transformation (gaussian pdf)', sklearn.preprocessing.QuantileTransformer(output_distribution='normal')),
    ('quantile transformation (uniform pdf)', sklearn.preprocessing.QuantileTransformer(output_distribution='uniform')),
    ('sample-wise L2 normalizing', sklearn.preprocessing.Normalizer()),
]

Train and Test data is passed through the scalers, including the "unscaled" scaler.

In [23]:
num_dp = 30

data = pd.read_pickle(f"data/600AMeV_{num_dp}dp.clusters.pkl").sample(frac=0.1)

print(data.isna().values.any())

msk = np.random.rand(len(data)) < 0.8
traindata = data[msk]
testdata = data[~msk]

print(traindata.shape)
print(testdata.shape)

trainscaled = [
    (
        "Unscaled data",
        traindata[["T", "E", "Size", "EToF", "EnergyMoment", "TSpawn", "MaxEHit", "X", "Y", "Z"]],
        testdata[["T", "E", "Size", "EToF", "EnergyMoment", "TSpawn", "MaxEHit", "X", "Y", "Z"]],
    )
] + [
    (
        sname,
        scaler.fit_transform(traindata[["T", "E", "Size", "EToF", "EnergyMoment", "TSpawn", "MaxEHit", "X", "Y", "Z"]]),
        scaler.transform(testdata[["T", "E", "Size", "EToF", "EnergyMoment", "TSpawn", "MaxEHit", "X", "Y", "Z"]]),
    )
    for sname, scaler in scalers
]

False
(7781935, 12)
(1944666, 12)


Run all model/scaler combinations, in parallel. Note that we use timeouts per task, as setting at timeout in joblib will throw everything.

In [5]:
def with_timeout(timeout):
    def decorator(decorated):
        @functools.wraps(decorated)
        def inner(*args, **kwargs):
            pool = multiprocessing.pool.ThreadPool(1)
            async_result = pool.apply_async(decorated, args, kwargs)
            try:
                return async_result.get(timeout)
            except multiprocessing.TimeoutError:
                return

        return inner

    return decorator

In [6]:
MEDIHEAD = 500000
SLOWHEAD = 50000
label = "prim"

y_train_fast = traindata[[label]].values.ravel()
y_train_medi = traindata.head(MEDIHEAD)[[label]].values.ravel()
y_train_slow = traindata.head(SLOWHEAD)[[label]].values.ravel()
y_test = testdata[[label]].values.ravel()


@with_timeout(1200)
def train_model(mname, modelorg, speed, sname, x_train, x_test):
    # These get killed without error?
    if mname == "RadiusNeighborsClassifier" and sname != "Unscaled data":
        return (mname, sname, np.NaN, speed, np.NaN, "Skipped")
    try:
        model = sklearn.base.clone(modelorg)
        start = time.time()
        if speed == "slow":
            model.fit(x_train[0:SLOWHEAD], y_train_slow)
        elif speed == "medi":
            model.fit(x_train[0:MEDIHEAD], y_train_medi)
        elif speed == "fast":
            model.fit(x_train, y_train_fast)
        end = time.time()

        y_pred = model.predict(x_test)
        y_true = y_test

        bac = sklearn.metrics.balanced_accuracy_score(y_true, y_pred)
        return (mname, sname, bac, speed, (end - start), "ok")
    except Exception as err:
        return (mname, sname, np.NaN, speed, np.NaN, err)


def train_model_wrap(mname, modelorg, speed, sname, x_train, x_test):
    ret = train_model(mname, modelorg, speed, sname, x_train, x_test)
    if ret:
        return ret
    else:
        return (mname, sname, np.NaN, speed, np.NaN, "Timeout")

In [7]:
try:
    results_a1 = Parallel(n_jobs=10, verbose=1)(
        delayed(train_model_wrap)(mname, modelorg, speed, sname, x_train, x_test)
        for sname, x_train, x_test in trainscaled
        for mname, modelorg, speed in models_a1
    )
except Exception as err:
    print(err)

[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=10)]: Done  40 out of  40 | elapsed:  2.4min finished


In [8]:
try:
    results_a2 = Parallel(n_jobs=10, verbose=1)(
        delayed(train_model_wrap)(mname, modelorg, speed, sname, x_train, x_test)
        for sname, x_train, x_test in trainscaled
        for mname, modelorg, speed in models_a2
    )
except Exception as err:
    print(err)

[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=10)]: Done  48 out of  48 | elapsed:  2.5min finished


In [9]:
try:
    results_b = Parallel(n_jobs=10, verbose=1)(
        delayed(train_model_wrap)(mname, modelorg, speed, sname, x_train, x_test)
        for sname, x_train, x_test in trainscaled
        for mname, modelorg, speed in models_b
    )
except Exception as err:
    print(err)

[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=10)]: Done  30 tasks      | elapsed:  1.1min
[Parallel(n_jobs=10)]: Done  56 out of  56 | elapsed: 20.8min finished


In [10]:
try:
    results_s = Parallel(n_jobs=1, verbose=1)(
        delayed(train_model_wrap)(mname, modelorg, speed, sname, x_train, x_test)
        for sname, x_train, x_test in trainscaled
        for mname, modelorg, speed in models_s
    )
except Exception as err:
    print(err)

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done  40 out of  40 | elapsed: 114.4min finished


In [None]:
try:
    results_o = Parallel(n_jobs=10, verbose=1)(
        delayed(train_model_wrap)(mname, modelorg, speed, sname, x_train, x_test)
        for sname, x_train, x_test in trainscaled
        for mname, modelorg, speed in models_o
    )
except Exception as err:
    print(err)

[Parallel(n_jobs=10)]: Using backend LokyBackend with 10 concurrent workers.


In [16]:
results = results_a1 + results_a2 + results_b + results_s #+ results_o
resultsdf = pd.DataFrame(results)
pd.options.display.max_rows = 999
resultsdf.columns = ["Model", "Scaler", "BAC", "Speed", "Time", "Status"]
resultsdf.sort_values(by=["BAC", "Time"], ascending=[False, True], inplace=True)
resultsdf.style.hide_index()\
    .format({"BAC": "{:.2%}", "Time": "{:.2f}"})\
    .bar(subset=["BAC"], color="lightgreen")\
    .bar(subset=["Time"], color="lightblue")

Model,Scaler,BAC,Speed,Time,Status
PassiveAggressiveClassifier,sample-wise L2 normalizing,86.62%,fast,19.14,ok
NearestCentroid,sample-wise L2 normalizing,85.95%,fast,3.34,ok
Perceptron,quantile transformation (uniform pdf),85.77%,fast,22.38,ok
PassiveAggressiveClassifier,Unscaled data,85.72%,fast,40.83,ok
PassiveAggressiveClassifier,max-abs scaling,85.43%,fast,24.53,ok
QuadraticDiscriminantAnalysis,quantile transformation (uniform pdf),84.41%,fast,10.08,ok
MLPClassifier,quantile transformation (gaussian pdf),84.06%,slow,27.92,ok
RandomForestClassifier,sample-wise L2 normalizing,83.90%,medi,22.74,ok
RandomForestClassifier,Unscaled data,83.82%,medi,14.67,ok
RandomForestClassifier,quantile transformation (uniform pdf),83.80%,medi,16.14,ok
