The purpose of this notebook is to build sklearn-like pipeline for transformation

In [116]:
import pandas as pd         
import os.path

N_JOBS = 7
DEBUG=False

In [2]:
def load_data(path="../../data/csv/all.csv"):
    dataframe = pd.read_csv(path, index_col=0)
    return dataframe.loc[~dataframe["execTimeMs"].isnull()]

In [3]:
def prepare_dataframe(dataframe):
    output = dataframe.dropna(axis="columns")
    targets = output["execTimeMs"]
    dropped = output[["command", "execTimeMs", "jobId", "ctime_mean", "ctime_max", "ctime_sum", "read_sum","write_sum","readSyscalls_sum","writeSyscalls_sum","readReal_sum","writeReal_sum","writeCancelled_sum","rxBytes_sum","rxPackets_sum","rxErrors_sum","rxDrop_sum","rxFifo_sum","rxFrame_sum","rxCompressed_sum","rxMulticast_sum","txBytes_sum","txPackets_sum","txErrors_sum","txDrop_sum","txFifo_sum","txColls_sum","txCarrier_sum","txCompressed_sum","cpu_mean","cpu_max","memory_mean","memory_max"]]
    features = output.drop(dropped.columns, axis=1)
    return features, targets, dropped

In [4]:
features, targets, dropped = prepare_dataframe(load_data())

In [5]:
features.dtypes

workflowName          object
size                 float64
executable            object
args                  object
inputs                object
outputs               object
name                  object
cpu.manufacturer      object
cpu.brand             object
cpu.speed            float64
cpu.cores              int64
cpu.physicalCores      int64
cpu.processors         int64
mem.total              int64
mem.free               int64
mem.used               int64
mem.active             int64
mem.available          int64
mem.buffers            int64
mem.cached             int64
mem.slab               int64
mem.buffcache          int64
mem.swaptotal          int64
mem.swapused           int64
mem.swapfree           int64
dtype: object

# Preprocessing flow

In [6]:
import numpy as np
from sklearn.preprocessing import FunctionTransformer, StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.compose import make_column_selector as selector

In [7]:
def vectorize_list(series):
    def vectorize(list_string):
        return len(eval(list_string))
    return np.vectorize(vectorize)(series)

def ListTransformer():
    return FunctionTransformer(func=vectorize_list)

In [8]:
list_transformer = Pipeline(steps=[("list", ListTransformer()), ("scaler", StandardScaler())])
list_features = list(['args', 'inputs', 'outputs'])

numerical_transformer = StandardScaler()
numerical_features = list(features.select_dtypes(include="number").columns)

categorical_transformer = OneHotEncoder(sparse=False, handle_unknown = "ignore")
categorical_features = list(set(features.select_dtypes(include="object").columns) ^ set(list_features))

def make_classifying_preprocessor(additional_features=["read_sum", "write_sum", "cpu_mean", "memory_max"]):
    external_features = categorical_features + additional_features
    return ColumnTransformer(
            transformers=[('lists', list_transformer, list_features), 
                          ('num', numerical_transformer, numerical_features),
                          ('cat', categorical_transformer, external_features)])

def make_regression_preprocessor(additional_features=["read_sum", "write_sum", "cpu_mean", "memory_max"]):
    external_features = numerical_features + additional_features
    return ColumnTransformer(
        transformers=[
            ('lists', list_transformer, list_features),            
            ('num', numerical_transformer, external_features),  
            ('cat', categorical_transformer, categorical_features)
        ])

preprocessor = make_classifying_preprocessor(additional_features=[])

In [9]:
from scipy.stats import percentileofscore
import math

def calculate_quantile_rank(labels, label):
    return percentileofscore(labels, label) / 100

def calculate_utilization_class(labels, label):
    def label_for_rank(rank):
        if rank > 0.75:
            return 'very high'
        elif rank > 0.5:
            return 'high'
        elif rank > 0.25:
            return 'medium'
        else:
            return 'low'
    return label_for_rank(calculate_quantile_rank(labels, label))

def calculate_utilization_bucket(labels, label, num_buckets):
    bucket_size = 1.0 / num_buckets
    def bucket_for_rank(rank):
        return str(math.floor(rank / bucket_size))
    return bucket_for_rank(calculate_quantile_rank(labels, label))

# Pipeline composition (with PCA)

In [10]:
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.svm import SVR, SVC
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor, MLPClassifier
from sklearn.linear_model import Lasso, SGDRegressor, ElasticNet, LinearRegression
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.dummy import DummyRegressor
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import train_test_split, GridSearchCV, HalvingGridSearchCV

In [11]:
base_steps = [('pca', PCA(random_state=42))]
dummy_pipeline = Pipeline(steps=base_steps +[('dummy', DummyRegressor())])
X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.2, random_state=0)

In [12]:
pca_param_grid = {
    'pca__n_components': np.arange(1, 50, 3),    
}
knn_param_grid = {
    'knn__n_neighbors': np.arange(1, 30, 3),
}
regressor = ('knn', KNeighborsRegressor())
full_pipeline = Pipeline(steps= base_steps + [regressor])
grid_search = HalvingGridSearchCV(full_pipeline, {**knn_param_grid, **pca_param_grid}, cv=2, verbose=2, scoring="r2", n_jobs=-1)

In [13]:
def rae(actual, predicted):
    """ Relative Absolute Error (aka Approximation Error) """
    EPSILON=1e-10
    return np.sum(np.abs(actual - predicted)) / (np.sum(np.abs(actual - np.mean(actual))) + EPSILON)

In [14]:
from sklearn.metrics import r2_score, mean_absolute_error, mean_absolute_percentage_error

In [15]:
def calculate_regression_score(true, pred, scores=[r2_score, mean_absolute_error, mean_absolute_percentage_error, rae]):
    executor = get_reusable_executor(max_workers=4)
    results = executor.map(lambda fun: fun(true, pred), scores)
    return results

In [115]:
from loky import get_reusable_executor

def rate_regressor(X_train, y_train, X_test, y_test, regressor, regressor_params, verbose=10, aggressive_elimination=True, steps=base_steps):
    if DEBUG:
        print(f"Rating {regressor}")
    full_pipeline = Pipeline(steps= base_steps + [regressor])
    vector_length = X_train.shape[1]
    pca_param_grid = {'pca__n_components': np.arange(1, vector_length, 1),}
    grid_search = HalvingGridSearchCV(full_pipeline, {**pca_param_grid, **regressor_params}, cv=2, verbose=verbose, scoring="r2", n_jobs=N_JOBS)
    if DEBUG:
        print("Evaluating grid search")
    grid_search.fit(X_train, y_train)
    
    # scores
    if DEBUG:
        print("Predicting on test set")
    prediction = grid_search.best_estimator_.predict(X_test)
    
    if DEBUG:
        print("Calculating scores")
    r2, mae, mape, rae = calculate_regression_score(y_test, prediction)
    if DEBUG:
        print("Calculated scores on test set")
    adjusted_r2 = 1 - (1-r2)*(len(y_test)-1)/(len(y_test)-X_test.shape[1]-1)
    return {"r2": r2, "adjusted_r2": adjusted_r2, "mae": mae, "mape": mape, "rae": rae,"best_score": grid_search.best_score_, "params": grid_search.best_params_}

In [17]:
from sklearn.metrics import f1_score, accuracy_score, balanced_accuracy_score, roc_auc_score

def rate_classifier(X_train, y_train, X_test, y_test, classifier, classifier_params, verbose=10, aggressive_elimination=True, steps=base_steps):
    print(f"Rating {classifier}")
    full_pipeline = Pipeline(steps= base_steps + [classifier])
    vector_length = X_train.shape[1]
    pca_param_grid = {'pca__n_components': np.arange(1, vector_length, 1),}
    grid_search = HalvingGridSearchCV(full_pipeline, {**pca_param_grid, **classifier_params}, cv=2, verbose=verbose, scoring="accuracy", n_jobs=N_JOBS)
    print("Evaluating grid search")
    grid_search.fit(X_train, y_train)
    
    # scores
    print("Predicting on test set")
    prediction = grid_search.best_estimator_.predict(X_test)
    
    print("Calculating scores")
    executor = get_reusable_executor(max_workers=5, timeout=5)
    
    scores = [
        lambda true, pred: f1_score(true, pred, average="micro"),
        lambda true, pred: f1_score(true, pred, average="macro"),
        lambda true, pred: accuracy_score(true, pred), 
        lambda true, pred: balanced_accuracy_score(true, pred)
    ]
    results = executor.map(lambda fun: fun(y_test, prediction), scores)
    print("Calculated scores on test set")
    micro, macro, accuracy, balanced_accuracy = list(results)
    return {"f1_micro": micro, "f1_macro": macro, "accuracy": accuracy, "balanced_accuracy": balanced_accuracy, "params": grid_search.best_params_}

# Here go regressor params

In [18]:
knn = ("knn", KNeighborsRegressor())
knn_params = {'knn__n_neighbors': np.arange(1, 30, 1)}

dtr = ("dtr", DecisionTreeRegressor(random_state=5))
dtr_params = {"dtr__criterion": ["mse", "friedman_mse", "mae", "poisson"]}

lasso = ("lasso", Lasso(random_state=5))
lasso_params = {"lasso__alpha": np.arange(0.01, 1, 0.05)}

en = ("elasticnet", ElasticNet(random_state=5))
en_params = {"elasticnet__alpha": np.arange(0.01, 1, 0.05), "elasticnet__l1_ratio": np.arange(0, 1, 0.1)}

svr = ("svr", SGDRegressor())
svr_params = {"svr__loss": ["squared_loss", "huber", "epsilon_insensitive"], "svr__penalty": ['l2', 'l1', 'elasticnet'],
             "svr__alpha": np.arange(0.0001, 0.2, 0.01), "svr__max_iter": [10000]}

rf = ("rf", RandomForestRegressor())
rf_params = {"rf__n_estimators": np.arange(5, 100, 5), "rf__criterion": ["mae", "mse"], "rf__max_features": ["auto", "sqrt", "log2"]}

# Here go classifier params

In [19]:
knn_classifier = ("knn", KNeighborsClassifier())
knn_clf_params = {'knn__n_neighbors': np.arange(1, 30, 1)}

dtr_classifier = ("dtr", DecisionTreeClassifier(random_state=5))
dtr_clf_params = {"dtr__criterion": ["gini", "entropy"]}

mlp_classifier = ("mlp", MLPClassifier())
mlp_clf_params = {"mlp__hidden_layer_sizes": np.arange(1,200, 10),          
                  "mlp__activation": ["logistic", "tanh", "relu"],         
                  "mlp__activation": ["logistic"],         
#                   "mlp__alpha": np.arange(0.01, 0.1, 0.01)
                 }

svc = ("svc", SVC(random_state=5))
svc_clf_params = {
    "svc__C": np.arange(0.1, 1, 0.1), 
    "svc__kernel": ["linear", "poly", "rbf", "sigmoid"],
    "svc__degree": np.arange(3, 10, 1)
}

In [20]:
import warnings
warnings.filterwarnings('ignore')

In [21]:
def make_datasets(dataframe):
    jobs_below_1200ms = dataframe.loc[dataframe["execTimeMs"] < 1200]
    jobs_between_2000ms_25000ms = dataframe.loc[dataframe["execTimeMs"].between(2000, 25000)]
    jobs_count = dataframe["name"].value_counts()
    jobs_most_occuring = dataframe.loc[dataframe["name"].isin(jobs_count[jobs_count > 3000].index.values)]
    jobs_mDiffFit = dataframe.loc[dataframe["name"] == "mDiffFit"]
    jobs_haplotype = dataframe.loc[dataframe["name"] == "haplotype_caller"]
    jobs_mShrink = dataframe.loc[dataframe["name"] == "mShrink"]
    return jobs_below_1200ms, jobs_between_2000ms_25000ms, jobs_most_occuring

In [22]:
datasets = make_datasets(load_data())

In [23]:
dfs_for_jobs = [pd.DataFrame(y) for x, y in load_data().groupby('name', as_index=False)]

In [24]:
def rate_data(features, targets, regressors, verbose=10, pipeline_steps=base_steps):
    X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.3, random_state=0)
    df = pd.DataFrame(columns=["name", "pca", "adjusted_r2","r2", "mae", "mape", "rae", "best_score", "params"])
    for (regressor, params) in regressors:
        result = rate_regressor(X_train, y_train, X_test, y_test, regressor, params, verbose, pipeline_steps)
        df = df.append({"name": regressor[0], **result, "pca": result["params"]["pca__n_components"]}, ignore_index=True)
    return df

In [94]:
def rate_data_explicit(X_train, X_test, y_train, y_test, regressors, verbose=10, pipeline_steps=base_steps):
    df = pd.DataFrame(columns=["name", "pca", "adjusted_r2","r2", "mae", "mape", "best_score", "params"])
    for (regressor, params) in regressors:
        result = rate_regressor(X_train, y_train, X_test, y_test, regressor, params, verbose, pipeline_steps)
        df = df.append({"name": regressor[0], **result, "pca": result["params"]["pca__n_components"]}, ignore_index=True)
    return df

In [25]:
def rate_classifiers_for_data(features, targets, classifiers, verbose=10, pipeline_steps=base_steps):
    X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.3, random_state=0)
    df = pd.DataFrame(columns=["name", "pca", "accuracy","balanced_accuracy", "f1_micro", "f1_macro", "params"])
    for (classifier, params) in classifiers:
        result = rate_classifier(X_train, y_train, X_test, y_test, classifier, params, verbose, pipeline_steps)
        df = df.append({"name": classifier[0], **result, "pca": result["params"]["pca__n_components"]}, ignore_index=True)
    return df

In [26]:
def rate_dataset(dataframe, regressors, verbose=2):
    print(f"Rating dataset of len {len(dataframe)}")
    features, targets, _ = prepare_dataframe(dataframe[:10000])
    features = preprocessor.fit_transform(features)
    rate_data(features, targets, regressors, verbose)

In [27]:
basic_regressors = [
    (knn, knn_params),
#     (dtr, dtr_params),
    (lasso, lasso_params),
    (en, en_params),
    (svr, svr_params),
]

basic_classifiers = [
    (knn_classifier, knn_clf_params),
#     (dtr_classifier, dtr_clf_params),
    (mlp_classifier, mlp_clf_params),
    (svc, svc_clf_params)
]

In [28]:
def simple_experiment():
    print("Rating jobs datasets")
    for dataset in dfs_for_jobs:
        print(dataset.iloc[0]["name"])
        rate_dataset(dataset, basic_regressors)

    print("Rating common datasets")
    for dataset in datasets:
        rate_dataset(dataset, basic_regressors)

In [29]:
# rate_dataset(dfs_for_jobs[8], verbose=0)
# rate_dataset(dfs_for_jobs[1], basic_regressors, verbose=0)

## Eksperyment 4

### Cel

Zmierzyć skuteczności najlepszych pipelinów dla każdego joba, zobaczyć czy warto schodzić w dół pod wzgledem błędów

### Dane

In [30]:
full_df = load_data().dropna(axis="columns")
raw_datasets = { x:pd.DataFrame(y) for x, y in full_df.groupby('name', as_index=False)}
datasets_split = {x:train_test_split(df, random_state=0, train_size=0.75) for x,df in raw_datasets.items()}
# big_df_train, big_df_test = pd.concat([train for (_, (train, test)) in datasets_split.items()]), pd.concat([test for (_, (train, test)) in datasets_split.items()])

In [31]:
def add_resource_features(features, dropped, baseline, resources):
    for resource in resources:
        features[resource] = dropped[resource].map(lambda value: calculate_quantile_rank(baseline[resource], value))

In [32]:
def get_numerical_pipeline_data(features, labels, dropped, baseline, resources=["read_sum", "write_sum", "cpu_mean", "memory_max"]):
    """
    Raw data enhanced with resource utilization quantile scores, but scores are assigned - not predicted
    """
    add_resource_features(features, dropped, baseline, resources)
    preprocessor = make_regression_preprocessor(resources)
    features = preprocessor.fit_transform(features)
    return features, labels, dropped, preprocessor

In [62]:
def get_numerical_pipeline_data_big(data, resources=["read_sum", "write_sum", "cpu_mean", "memory_max"]):
    """
    Raw data enhanced with resource utilization quantile scores, but scores are assigned - not predicted
    """
    features, labels, dropped = prepare_dataframe(data)
    for resource in resources:
        features[resource] = dropped[resource].map(lambda value: calculate_quantile_rank(dropped[resource], value))
    features = make_regression_preprocessor(resources).fit_transform(features)
    return pd.DataFrame(features, index=labels.index), labels

In [40]:
datasets_split['add_replace'][1]

Unnamed: 0,jobId,read_sum,write_sum,readSyscalls_sum,writeSyscalls_sum,readReal_sum,writeReal_sum,writeCancelled_sum,rxBytes_sum,rxPackets_sum,...,mem.used,mem.active,mem.available,mem.buffers,mem.cached,mem.slab,mem.buffcache,mem.swaptotal,mem.swapused,mem.swapfree
15540,5VzTEh48X-1-8,409532.0,9.0,305.0,9.0,0.0,32768.0,0.0,0.0,0.0,...,13633937408,5642911744,27716104192,2768896,6641188864,1846169600,8490127360,0,0,0
86402,YrmwF3UAW-1-320,448476.0,9.0,359.0,9.0,0.0,32768.0,0.0,14300.0,60.0,...,6938828800,3055312896,13768790016,315117568,3270520832,713506816,4299145216,0,0,0
58973,yXB4dcCCr-1-34,241751.0,9.0,192.0,9.0,28672.0,32768.0,0.0,0.0,0.0,...,1270759424,788201472,7468998656,454656,577638400,177721344,755814400,0,0,0
110321,uwu6POi40-1-8,426355.0,9.0,316.0,9.0,0.0,32768.0,0.0,0.0,0.0,...,9407356928,1177980928,15646105600,97316864,8047538176,480485376,8625340416,0,0,0
41349,bcKUGiUAf-1-112,428460.0,9.0,321.0,9.0,0.0,32768.0,0.0,11516.0,29.0,...,10568097792,3031588864,30327427072,2768896,6220783616,1854623744,8078176256,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
126832,NibmYG_N3-1-34,446548.0,19.0,356.0,10.0,0.0,32768.0,0.0,13686.0,46.0,...,8767651840,1390714880,14376423424,342212608,6592135168,897110016,7831457792,0,0,0
58434,ySTPaOAXY-1-164,316111.0,9.0,247.0,9.0,0.0,32768.0,0.0,0.0,0.0,...,3617013760,1105604608,7151595520,2768896,2224361472,593780736,2820911104,0,0,0
83370,P2WYkNQ_Y-1-8,469391.0,19.0,391.0,10.0,0.0,32768.0,0.0,13622.0,51.0,...,8932556800,3431043072,13393043456,123502592,5412884480,364777472,5901164544,0,0,0
74809,DayfX-Qsv-1-86,414077.0,9.0,308.0,9.0,0.0,32768.0,0.0,0.0,0.0,...,8723546112,1424261120,15399841792,340754432,6542303232,907046912,7790104576,0,0,0


### Przebieg


trenujemy pipeline ogólny, liczymy jego skuteczności dla każdego typu jobów. Z eksperymentu 1. - najlepszy pipeline to był:

dtr, mae, pca 71

dla każdego typu jobów trenujemy dla niego pipeline, liczymy skuteczności

In [91]:
train, test = datasets_split['add_replace']

In [117]:
def run_experiment4():
    exp4_resources = ["read_sum", "write_sum", "cpu_max", "cpu_mean", "memory_mean", "memory_max"]
    
    def run_pipeline(name, features, targets):
        pipeline_df = rate_data(features, targets, basic_regressors, verbose=10)
        pipeline_df["pipeline"] = name
        return pipeline_df
    
    big_regressor = Pipeline([ # test pipeline
        ('pca', PCA(random_state=42, n_components=31)),
        ('knn', KNeighborsRegressor(n_neighbors=11))
    ])
    print("Preparing data for big regressor")
    X, y = get_numerical_pipeline_data_big(full_df, exp4_resources)
    train_indices = np.concatenate([train.index for (_, (train, _)) in datasets_split.items()])
    test_indices = np.concatenate([test.index for (_, (_, test)) in datasets_split.items()])
    print(f"{len(X)} {len(y)} {len(train_indices)} {len(test_indices)}")
    X_train, X_test, y_train, y_test = X.loc[train_indices], X.loc[test_indices], y.loc[train_indices], y.loc[test_indices]
    print(f"Making split with test as {len(test_indices)/(len(test_indices) + len(train_indices))} of dataset")
    
    print("Training big regressor with train data")
    big_regressor.fit(X_train, y_train)
    
    print("Predicting with big regressor on test data")
    y_predicted = big_regressor.predict(X_test)
    
    print("Rating big regressor's overall performance")
    [r2, mae, mape, rae_score] = calculate_regression_score(y_test, y_predicted, [r2_score, mean_absolute_error, mean_absolute_percentage_error, rae])
    print(f"Scores for big regressor:")
    print(f"R2: {r2}")
    print(f"MAE: {mae}")
    print(f"MAPE: {mape}")
    print(f"RAE: {rae_score}")
    print()
    dataframes = []
    for (job, (train, test)) in datasets_split.items():
        print(f"Comparing big regressor vs local regressor for job {job}")
        print("Preparing data for local regressor")
        joint_df = pd.concat([train, test])
        local_X, local_y = get_numerical_pipeline_data_big(joint_df, exp4_resources)
        print(f"{len(local_X)} {len(train.index)} {len(test.index)}")
        local_X_train, local_X_test, local_y_train, local_y_test = local_X.loc[train.index], local_X.loc[test.index], local_y.loc[train.index], local_y.loc[test.index]
        
        print(f"Rating local regressors for job {job}")
        regressor_df = rate_data_explicit(local_X_train, local_X_test, local_y_train, local_y_test, basic_regressors, verbose=0)
        print(regressor_df.head())
        print("Preparing data for big regressor")
        local_X_test, local_y_test = X.loc[test_indices], y.loc[test_indices]
        
        print(f"Rating big regressor for job {job}")
        local_predicted = big_regressor.predict(local_X_test, local_y_test)
        [r2, mae, mape, rae_score] = calculate_regression_score(local_predicted, local_y_test, [r2_score, mean_absolute_error, mean_absolute_percentage_error, rae])
        regressor_df = regressor_df.append({"name": "big", **{"r2": r2, "mae": mae, "mape": mape, "rae": rae_score}, "pca": "xD"}, ignore_index=True)
        regressor_df["job"] = job
        regressor_df["size"] = len(joint_df)
        dataframes.append(regressor_df)
        
    return pd.concat(dataframes)

In [None]:
exp4_df = run_experiment4()

### Wyniki

Wykresy:
- najlepsza skuteczność regresora (min mape/rae) vs liczba sampli
- skuteczności 5 najlepszych regresorów dla każdego (grid chart)
- najlepsza skuteczność regresora dla joba vs skuteczność dużego regresora dla tego joba

Odpowiedzi:
- czy zwiększenie granularności jest sensowne?
- czy jest jakiś widoczny próg liczby sampli przy zwiększonej granularności?