In [1]:
import pandas as pd
import numpy as np
import itertools as it
import os 
import time

import dask
from dask import delayed
from dask.distributed import LocalCluster, SSHCluster, Client
from dask_jobqueue import SLURMCluster
from dask.diagnostics import ProgressBar

import sklearn
from sklearn.experimental import enable_iterative_imputer
from sklearn.linear_model import BayesianRidge
from sklearn.impute import SimpleImputer, KNNImputer, IterativeImputer
from sklearn.ensemble import RandomForestRegressor

In [None]:
# needed for MissForest (it is only compatible up to scikit-learn 1.1.3!!)
import sys
import sklearn.neighbors._base
sys.modules['sklearn.neighbors.base'] = sklearn.neighbors._base
from missingpy import MissForest

# needed for GAIN\n",
import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
from GAIN import gain
import pandas as pd

In [2]:
output_dir = "output_dask/"

In [3]:
# load data (different test train splits in one table)
df_train = pd.read_csv("data/train_table_0.8_folds.csv")
df_test = pd.read_csv("data/test_table_0.8_folds.csv")

In [4]:
# define iterables
algorithms = ["mean_imputer", "knn_imputer", "missforest_imputer", "gain_imputer"]
iterations = range(50)
missing_value_proportions = [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99] + [100 / len(df_train[df_train["fold"]==0])]
folds = df_train["fold"].unique()

# hyperparameter combinations to test
test_dict = [{"imputer_name": "knn_imputer",
              "param_grid": {"n_neighbors": [1, 5, 10, 20, 30, 40, 50], "weights": ["uniform", "distance"]}},
             {"imputer_name": "missforest_imputer",
              "param_grid": {"n_estimators": [10, 50, 100, 200], "max_iter": [30], "decreasing": [False],
                             "criterion": ["squared_error"], "max_features": [None], "random_state": [0],
                             "missing_values": [np.nan]}},
             {"imputer_name": "gain_imputer",
              "param_grid": {"alpha": [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300], "batch_size": [128],
                             "hint_rate": [0.9, 0.5], "iterations": [10000]}},
             {"imputer_name": "iterative_ridge_imputer",
              "param_grid": {"tol": [1e-3], "max_iter": [100, 1000], "sample_posterior": [False],
                             "estimator": [BayesianRidge()], "random_state": [0]}},
             {"imputer_name": "mean_imputer",
              "param_grid": {"missing_values": [np.nan], "strategy": ['mean']}},
             {"imputer_name": "iterative_rf_imputer",
              "param_grid": {"estimator":  [RandomForestRegressor(n_estimators=5),
                                            RandomForestRegressor(n_estimators=10), 
                                            RandomForestRegressor(n_estimators=50),
                                           RandomForestRegressor(n_estimators=100),
                                           RandomForestRegressor(n_estimators=200)], "max_iter":[30, 50, 100], "random_state": [0]}}
             ]

grid_search_dict = {}
for imputer_name in algorithms:
    param_grid = [x for x in test_dict if x["imputer_name"] == imputer_name][0]["param_grid"]
    grid_search = [x for x in it.product(*param_grid.values())]
    
    temp_grid_search = []
    for i in range(len(grid_search)):
        temp = {}
        for j in range(len(param_grid.keys())):
            temp[list(param_grid.keys())[j]] = grid_search[i][j]
        temp_grid_search.append(temp)

    grid_search_dict[imputer_name] = temp_grid_search

# Set up dask

In [None]:
dask.config.set(scheduler="distributed")  # Set Dask scheduler to distributed

In [None]:
# one node 
# client = Client(n_workers=8, threads_per_worker=1)
# client = Client(n_workers=1)
# client

In [None]:
# multiple nodes
cluster = SLURMCluster(
                queue='mpp', #-P
                account="oekochem.oekochem", #-A
                cores=128,  # number of cores per node
                processes=24, # 12,
                walltime='12:00:00', #--time
                memory='250GB',
                interface='ib0',  # connection between nodes
                local_directory="/tmp/", 
                job_extra_directives=["--qos 12h" ,
                                      "-o ./log/dask-worker-%j.log",
                                      "-e ./log/dask-worker-%j.err",
                                      "--verbose",
                                      "--export=OMP_NUM_THREADS=1"]
)


In [None]:
cluster.scale(jobs=24)
#cluster.adapt(maximum_jobs=20)
cluster

In [None]:
!squeue -u yvjennig

In [None]:
client = Client(cluster)
client

In [None]:
# for loops in one function
# for loops outside function
# merge all folds into one dataset
# joblib
# sciris
# dask process bar

# willy rath

In [5]:
def run_imputer(x_in, y_in, predicting_in, hyperparamerter_dict_in, imputer_name_in):
    if imputer_name_in == "gain_imputer":
        s_impute_time = time.time()
        x_hat = pd.DataFrame(gain.gain(x_in.to_numpy(), hyperparamerter_dict_in), columns=x_in.columns)
        impute_time = time.time() - s_impute_time
    else:
        if imputer_name_in == "knn_imputer":
            imputer = KNNImputer(**hyperparamerter_dict_in)
        elif imputer_name_in == "missforest_imputer":
            imputer = MissForest(**hyperparamerter_dict_in)
        elif imputer_name_in == "iterative_ridge_imputer" or imputer_name_in == "iterative_rf_imputer":
            imputer = IterativeImputer(**hyperparamerter_dict_in)
        elif imputer_name_in == "mean_imputer":
            imputer = SimpleImputer(**hyperparamerter_dict_in)
            # x_hat = imputer.fit_transform(x_in)
        else:
            print("run_imputer: Unknown imputer!")

        s_impute_time = time.time()
        x_hat = pd.DataFrame(imputer.fit_transform(x_in), columns=x_in.columns)  # @todo can we use pd or dd dataframes??
        impute_time = time.time() - s_impute_time

    # compute RMSE
    rmse = np.linalg.norm(x_hat[predicting_in] - y_in[predicting_in]) / np.sqrt(len(y_in))

    return rmse, impute_time

In [6]:
def drop_values_from_one_param_by_index_and_scale(x_in, parameter_in, indexes_in):
    x = x_in.copy()
    x.loc[indexes_in, parameter_in] = np.nan

    # scale data
    x_scaled, y_scaled = scale_data(x, x_in)
    return x_scaled, y_scaled

In [7]:
def scale_data(x_in, y_in):
    # NOT WORKING ON DASK DELAYED OBJECTS
    # scale data
    scaler = sklearn.preprocessing.MinMaxScaler().fit(x_in)  # RobustScaler().fit(x_in) # StandardScaler().fit(x_in)
    x_scaled = pd.DataFrame(scaler.transform(x_in), columns=x_in.columns)
    y_scaled = pd.DataFrame(scaler.transform(y_in), columns=y_in.columns)
    return x_scaled, y_scaled

In [8]:
def drop_values_globally_and_scale(x_in, proportion_in, parameters_in):
    # randomly remove k% from all parameter columns (globally) (100% equals the LENGTH of x, not length*width
    # to keep it comparable to the other run (where k% are removed from one column only)
    x = x_in.copy()  # we do not want to change inputs directly
    x_rows = x[parameters_in].shape[0]
    x_cols = x[parameters_in].shape[1]

    # number of values to set to nan
    # round(len(x) * proportion_in / 100) 
    num_vals = round(len(x)*len(parameters_in)*proportion_in/100)

    nan_mat = np.zeros(shape=(x_rows * x_cols,), dtype=bool)  # create array of False
    nan_mat[:num_vals] = True  # set first num_vals elements to True
    nan_mat = np.random.permutation(nan_mat)  # shuffle
    nan_mat = np.reshape(nan_mat, (x_rows, x_cols))  # bring to right shape

    # concat lat, lon, depth columns with parameter columns (only from latter value were removed)
    x = pd.concat([x[[p for p in x.columns if p not in parameters_in]], x[parameters_in].mask(nan_mat)], axis=1)

    # scale data
    x_scaled, y_scaled = scale_data(x, x_in)
    return x_scaled, y_scaled

In [9]:
# @delayed
def experiment0(df_train, algorithm, fold, iteration, missingness, grid_search):
    filename = f"{output_dir}exp0_{algorithm}_{fold}_{iteration}_{missingness}.csv"

    res = []
    if not os.path.isfile(filename):
        df_train_temp = df_train[df_train["fold"]==fold].drop("fold", axis=1)
        parameters = [x for x in df_train_temp.columns if x.startswith("P_")]
    
        x0, y0 = drop_values_globally_and_scale(x_in=df_train_temp, proportion_in=missingness, parameters_in=parameters)
    
        for hyperparamerter_combination in grid_search:
            rmse0, impute_time0 = run_imputer(x_in=x0, y_in=y0,
                                              predicting_in=parameters,
                                              hyperparamerter_dict_in=hyperparamerter_combination,
                                              imputer_name_in=algorithm)
            res.append(pd.DataFrame({"iteration": [iteration],
                                     "missing_value_proportion": [missingness],
                                     "predicting": [str(parameters)],
                                     "imputer": [algorithm],
                                     "time": [impute_time0],
                                     "rmse": [rmse0],
                                     "hyperparameters": [hyperparamerter_combination],
                                     "fold": [fold]}))
        pd.concat(res).to_csv(filename, index=False)
    return res

In [10]:
# @delayed
def experiment1(df_train, algorithm, fold, iteration, missingness, grid_search):
    filename = f"{output_dir}exp1_{algorithm}_{fold}_{iteration}_{missingness}.csv"
    
    res = []
    if not os.path.isfile(filename):
        df_train_temp = df_train[df_train["fold"]==fold].drop("fold", axis=1).reset_index(drop=True)
        parameters = [x for x in df_train_temp.columns if x.startswith("P_")]

        # determine indexes to remove
        num_indexes = np.round(len(df_train_temp.index) / 100 * missingness).astype(int)
        indexes_to_remove = np.random.randint(0, len(df_train_temp), num_indexes)
    
        for param in parameters:
            x1, y1 = drop_values_from_one_param_by_index_and_scale(x_in=df_train_temp, parameter_in=param, indexes_in=indexes_to_remove)
            for hyperparamerter_combination in grid_search:
                rmse1, impute_time1 = run_imputer(x_in=x1, y_in=y1, predicting_in=param, hyperparamerter_dict_in=hyperparamerter_combination, imputer_name_in=algorithm)
                res.append(pd.DataFrame({"iteration": [iteration],
                                         "missing_value_proportion": [missingness],
                                         "predicting": [param],
                                         "imputer": [algorithm],
                                         "time": [impute_time1],
                                         "rmse": [rmse1],
                                         "hyperparameters": [hyperparamerter_combination],
                                         "fold": [fold]}))
        pd.concat(res).to_csv(filename, index=False)
    return res

In [None]:
%%time

res0 = []
res1 = []
algorithms=["mean_imputer", "knn_imputer"]
for algorithm in algorithms:
    for fold in folds:        
        for iteration in iterations:
            for missingness in missing_value_proportions:
                print(algorithm, fold, iteration, missingness)
                res0.append(experiment0(df_train=df_train, algorithm=algorithm, fold=fold, iteration=iteration, missingness=missingness, grid_search=grid_search_dict[algorithm]))
                res1.append(experiment1(df_train=df_train, algorithm=algorithm, fold=fold, iteration=iteration, missingness=missingness, grid_search=grid_search_dict[algorithm]))

mean_imputer 0 0 1
mean_imputer 0 0 10
mean_imputer 0 0 20
mean_imputer 0 0 30
mean_imputer 0 0 40
mean_imputer 0 0 50
mean_imputer 0 0 60
mean_imputer 0 0 70
mean_imputer 0 0 80
mean_imputer 0 0 90
mean_imputer 0 0 99
mean_imputer 0 0 0.004239443784975411
mean_imputer 0 1 1
mean_imputer 0 1 10
mean_imputer 0 1 20
mean_imputer 0 1 30
mean_imputer 0 1 40
mean_imputer 0 1 50
mean_imputer 0 1 60
mean_imputer 0 1 70
mean_imputer 0 1 80
mean_imputer 0 1 90
mean_imputer 0 1 99
mean_imputer 0 1 0.004239443784975411
mean_imputer 0 2 1
mean_imputer 0 2 10
mean_imputer 0 2 20
mean_imputer 0 2 30
mean_imputer 0 2 40
mean_imputer 0 2 50
mean_imputer 0 2 60
mean_imputer 0 2 70
mean_imputer 0 2 80
mean_imputer 0 2 90
mean_imputer 0 2 99
mean_imputer 0 2 0.004239443784975411
mean_imputer 0 3 1
mean_imputer 0 3 10
mean_imputer 0 3 20
mean_imputer 0 3 30
mean_imputer 0 3 40
mean_imputer 0 3 50
mean_imputer 0 3 60
mean_imputer 0 3 70
mean_imputer 0 3 80
mean_imputer 0 3 90
mean_imputer 0 3 99
mean_imput

In [None]:
res0[0].visualize()#engine="ipycytoscape")

In [None]:
%%time
with ProgressBar():
    out0 = dask.compute(*res0)

In [None]:
# @todo store output
pd.concat(out0).to_csv(f"{output_dir}tuning_results_exp0.csv", index=False)

In [None]:
%%time
with ProgressBar():
    out1 = dask.compute(*res1)

In [None]:
# @todo store output
pd.concat(out1).to_csv(f"{output_dir}tuning_results_exp1.csv", index=False)