In [1]:
from dask.distributed import Client, LocalCluster
import logging

cluster = LocalCluster(
    n_workers=28,
    threads_per_worker=8,
    silence_logs=logging.DEBUG
)

client = Client(cluster, heartbeat_interval=10000)
print(client.dashboard_link)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37101 instead
  http_address["port"], self.http_server.port
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:44491
distributed.scheduler - INFO -   dashboard at:           127.0.0.1:37101
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:43871'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:45569'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:42683'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:42239'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:41307'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34667'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:37201'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:33823'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:45

http://127.0.0.1:37101/status


In [2]:
import afqinsight as afqi
import joblib
import matplotlib.pyplot as plt
import numpy as np
import os.path as op
import pandas as pd
import pickle
import seaborn as sns

from datetime import datetime

from sklearn.base import clone
from sklearn.model_selection import RepeatedKFold
from sklearn.metrics import median_absolute_error, r2_score
from sklearn.metrics import explained_variance_score, mean_squared_error
from sklearn.linear_model import LassoCV

from skopt import BayesSearchCV
from skopt.plots import plot_convergence, plot_objective, plot_evaluations

print(afqi.__version__)

0.2.9.dev460469908


In [3]:
X, y, groups, columns, subjects, classes = afqi.load_afq_data(
    "../data/raw/age_data",
    target_cols=["Age"],
)

In [4]:
label_sets = afqi.multicol2sets(pd.MultiIndex.from_tuples(columns, names=["metric", "tractID", "nodeID"]))

In [5]:
pyafq_bundles = [
    c for c in columns
    if c[1] not in ["Right Cingulum Hippocampus", "Left Cingulum Hippocampus"]
]
pyafq_bundles = [
    [c] for c in np.unique([col[1] for col in pyafq_bundles])
]

In [6]:
X_pyafq_bundles = afqi.select_groups(
    X,
    pyafq_bundles,
    label_sets
)

In [7]:
print(X.shape)
print(X_pyafq_bundles.shape)
print(len(label_sets))

(77, 10000)
(77, 9000)
10000


In [8]:
columns = [
    c for c in columns 
    if c[1] not in ["Right Cingulum Hippocampus", "Left Cingulum Hippocampus"]
]
label_sets = afqi.multicol2sets(pd.MultiIndex.from_tuples(columns, names=["metric", "tractID", "nodeID"]))

X_md_fa = afqi.select_groups(
    X_pyafq_bundles,
    [["fa"], ["md"]],
    label_sets
)

In [9]:
print(X.shape)
print(X_pyafq_bundles.shape)
print(X_md_fa.shape)

(77, 10000)
(77, 9000)
(77, 3600)


In [10]:
groups_md_fa = groups[:36]

In [12]:
def get_cv_results(n_repeats=5, n_splits=10,
                   power_transformer=False, 
                   shuffle=False,
                   ensembler=None,
                   target_transform_func=None,
                   target_transform_inverse_func=None,
                   n_estimators=10,
                   trim_nodes=0,
                   square_features=False):
    if shuffle:
        rng = np.random.default_rng()
        y_fit = rng.permutation(y)
    else:
        y_fit = np.copy(y)

    if trim_nodes > 0:
        grp_mask = np.zeros_like(groups_md_fa[0], dtype=bool)
        grp_mask[trim_nodes:-trim_nodes] = True
        X_mask = np.concatenate([grp_mask] * len(groups_md_fa))

        groups_trim = []
        start_idx = 0
        
        for grp in groups_md_fa:
            stop_idx = start_idx + len(grp) - 2 * trim_nodes
            groups_trim.append(np.arange(start_idx, stop_idx))
            start_idx += len(grp) - 2 * trim_nodes
            
        X_trim = X_md_fa[:, X_mask]
    elif trim_nodes == 0:
        groups_trim = [grp for grp in groups_md_fa]
        X_trim = np.copy(X_md_fa)
    else:
        raise ValueError("trim_nodes must be non-negative.")
        
    if square_features:
        _n_samples, _n_features = X_trim.shape
        X_trim = np.hstack([X_trim, np.square(X_trim)])
        groups_trim = [np.concatenate([g, g + _n_features]) for g in groups_trim]
    
    cv = RepeatedKFold(
        n_splits=n_splits,
        n_repeats=n_repeats,
        random_state=1729
    )

    cv_results = {}    
    
    pipe_skopt = afqi.pipeline.make_base_afq_pipeline(
        imputer_kwargs={"strategy": "median"},
        power_transformer=power_transformer,
        scaler="standard",
        estimator=LassoCV,
        estimator_kwargs={
            "verbose": 0,
            "n_alphas": 50,
            "cv": 3,
            "n_jobs": 28,
            "max_iter": 500,
        },
        verbose=0,
        ensemble_meta_estimator=ensembler,
        ensemble_meta_estimator_kwargs={
            "n_estimators": n_estimators,
            "n_jobs": 1,
            "oob_score": True,
            "random_state": 1729,
        },
        target_transform_func=target_transform_func,
        target_transform_inverse_func=target_transform_inverse_func,
    )

    for cv_idx, (train_idx, test_idx) in enumerate(cv.split(X_trim, y_fit)):
        start = datetime.now()

        X_train, X_test = X_trim[train_idx], X_trim[test_idx]
        y_train, y_test = y_fit[train_idx], y_fit[test_idx]

        with joblib.parallel_backend("dask"):
            pipe_skopt.fit(X_train, y_train)

        cv_results[cv_idx] = {
            "pipeline": pipe_skopt,
            "train_idx": train_idx,
            "test_idx": test_idx,
            "y_pred": pipe_skopt.predict(X_test),
            "y_true": y_test,
            "test_mae": median_absolute_error(y_test, pipe_skopt.predict(X_test)),
            "train_mae": median_absolute_error(y_train, pipe_skopt.predict(X_train)),
            "test_r2": r2_score(y_test, pipe_skopt.predict(X_test)),
            "train_r2": r2_score(y_train, pipe_skopt.predict(X_train)),
        }
        
        if ensembler is None:
            if ((target_transform_func is not None)
                or (target_transform_inverse_func is not None)):
                cv_results[cv_idx]["coefs"] = pipe_skopt.named_steps["estimate"].regressor_.coef_
                cv_results[cv_idx]["alpha"] = pipe_skopt.named_steps["estimate"].regressor_.alpha_
            else:
                cv_results[cv_idx]["coefs"] = pipe_skopt.named_steps["estimate"].coef_
                cv_results[cv_idx]["alpha"] = pipe_skopt.named_steps["estimate"].alpha_
        else:
            if ((target_transform_func is not None)
                or (target_transform_inverse_func is not None)):
                cv_results[cv_idx]["coefs"] = [
                    est.coef_ for est
                    in pipe_skopt.named_steps["estimate"].regressor_.estimators_
                ]
                cv_results[cv_idx]["alpha"] = [
                    est.alpha_ for est
                    in pipe_skopt.named_steps["estimate"].regressor_.estimators_
                ]
            else:
                cv_results[cv_idx]["coefs"] = [
                    est.coef_ for est
                    in pipe_skopt.named_steps["estimate"].estimators_
                ]
                cv_results[cv_idx]["alpha"] = [
                    est.alpha_ for est
                    in pipe_skopt.named_steps["estimate"].estimators_
                ]
        
        print(f"CV index [{cv_idx:3d}], Elapsed time: ", datetime.now() - start)
        
    return cv_results, y_fit

In [13]:
results = {}

trim_nodes = 0
results[f"bagging_pure_lasso_trim{trim_nodes}"] = get_cv_results(
    n_splits=10, n_repeats=1, power_transformer=False,
    shuffle=False,
    trim_nodes=trim_nodes, square_features=False,
)

results[f"bagging_target_transform_pure_lasso_trim{trim_nodes}"] = get_cv_results(
    n_splits=10, n_repeats=1, power_transformer=False,
    shuffle=False,
    target_transform_func=np.log, target_transform_inverse_func=np.exp,
    trim_nodes=trim_nodes, square_features=False,
)

distributed.scheduler - INFO - Receive client connection: Client-worker-3da13d58-64cc-11eb-bbd8-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-3da432b6-64cc-11eb-bbd8-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-3da128b8-64cc-11eb-bbd8-42010a8a0002
distributed.core - INFO - Starting established connection


CV index [  0], Elapsed time:  0:00:05.888013


  positive)


CV index [  1], Elapsed time:  0:00:03.673027


distributed.scheduler - INFO - Receive client connection: Client-worker-43680f9c-64cc-11eb-bbcc-9b141b0b417e
distributed.core - INFO - Starting established connection


CV index [  2], Elapsed time:  0:00:02.898850
CV index [  3], Elapsed time:  0:00:06.616127
CV index [  4], Elapsed time:  0:00:07.814545


distributed.scheduler - INFO - Receive client connection: Client-worker-4dc39bee-64cc-11eb-bbc4-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-4dc68cae-64cc-11eb-bbc4-42010a8a0002
distributed.core - INFO - Starting established connection
  positive)


CV index [  5], Elapsed time:  0:00:05.261474
CV index [  6], Elapsed time:  0:00:06.505989
CV index [  7], Elapsed time:  0:00:06.799328


distributed.scheduler - INFO - Receive client connection: Client-worker-58cdcbcc-64cc-11eb-bbe4-399b529947a9
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-58cc2be2-64cc-11eb-bbe4-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-58cd7bda-64cc-11eb-bbe4-42010a8a0002
distributed.core - INFO - Starting established connection
  positive)


CV index [  8], Elapsed time:  0:00:06.541004
CV index [  9], Elapsed time:  0:00:06.486970
CV index [  0], Elapsed time:  0:00:08.405929


distributed.scheduler - INFO - Receive client connection: Client-worker-65bd5a40-64cc-11eb-bc7e-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-65bfc052-64cc-11eb-bc7e-42010a8a0002
distributed.core - INFO - Starting established connection
  positive)


CV index [  1], Elapsed time:  0:00:05.774093


  positive)


CV index [  2], Elapsed time:  0:00:06.629735
CV index [  3], Elapsed time:  0:00:06.752765


distributed.scheduler - INFO - Receive client connection: Client-worker-70fe708c-64cc-11eb-bbe1-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-7100b82e-64cc-11eb-bbe1-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-70fe0f62-64cc-11eb-bbe1-42010a8a0002
distributed.core - INFO - Starting established connection


CV index [  4], Elapsed time:  0:00:08.178327


  positive)


CV index [  5], Elapsed time:  0:00:06.789112


distributed.scheduler - INFO - Receive client connection: Client-worker-7a1b34d0-64cc-11eb-bcc5-fbefa1a56eee
distributed.core - INFO - Starting established connection
  positive)


CV index [  6], Elapsed time:  0:00:04.803682
CV index [  7], Elapsed time:  0:00:06.360823
CV index [  8], Elapsed time:  0:00:06.709489


distributed.scheduler - INFO - Receive client connection: Client-worker-849d814c-64cc-11eb-bc53-42010a8a0002
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-worker-84a0013e-64cc-11eb-bc53-42010a8a0002
distributed.core - INFO - Starting established connection


CV index [  9], Elapsed time:  0:00:04.297737


  positive)


In [14]:
results.keys()

dict_keys(['bagging_pure_lasso_trim0', 'bagging_target_transform_pure_lasso_trim0'])

In [15]:
for key, res in results.items():
    test_mae = [cvr["test_mae"] for cvr in res[0].values()]
    train_mae = [cvr["train_mae"] for cvr in res[0].values()]
    test_r2 = [cvr["test_r2"] for cvr in res[0].values()]
    train_r2 = [cvr["train_r2"] for cvr in res[0].values()]

    print(key, "test  MAE", np.mean(test_mae))
    print(key, "train MAE", np.mean(train_mae))
    print(key, "test  R2 ", np.mean(test_r2))
    print(key, "train R2 ", np.mean(train_r2))

bagging_pure_lasso_trim0 test  MAE 5.5661476222938635
bagging_pure_lasso_trim0 train MAE 1.1931035214506038
bagging_pure_lasso_trim0 test  R2  0.413984310714887
bagging_pure_lasso_trim0 train R2  0.9488110341185585
bagging_target_transform_pure_lasso_trim0 test  MAE 4.434465256219688
bagging_target_transform_pure_lasso_trim0 train MAE 0.7597048857934332
bagging_target_transform_pure_lasso_trim0 test  R2  0.4733396853419719
bagging_target_transform_pure_lasso_trim0 train R2  0.9286894212613932


In [17]:
with open("age_regression_pure_lasso.pkl", "wb") as fp:
    pickle.dump(results, fp)