In [80]:
# basic libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# sklearn
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# others
import multiprocessing as mp
from xgboost import XGBRegressor
from tqdm import tqdm

# config
import sys
sys.path.append('/home/yusukemh/github/yusukemh/StatisticalDownscaling/writeup')
from config import C_COMMON, C_SINGLE, C_GRID, FILENAME

# enable autoreload
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [108]:
df = pd.read_csv(FILENAME, usecols=C_COMMON + C_SINGLE)
columns = C_SINGLE

# Linear Regression

In [253]:
def cross_val_predict(df: pd.DataFrame, model, skn: int, columns: list, verbose=False):
    """
    Runs cross_val_predict for a single skn, using XGB or LinearRegression.
    The same functionality as sklearn.model_selection.cross_val_predict,
    except the split is not exactly 1/5 (thus pre-determined by preprocessing).
    This is because the split has to be made in the way it won't separate samples in the same month into different folds.
        Args:
            :param df: dataset for evaluation. Must contain 'fold' column that specifies assignment of each sample to the folds.
            :param model: one of [sklearn.linear_models.LinearRegression, xgboost.XGBRegressor]
            :param skn: identifier for stations
            :param columns: list of str indicating which columns to use as input data for the model
    """
    assert 'fold' in df.columns, "Must contain a column 'fold' to specify assignment of samples to the folds."
    n_folds = len(df['fold'].unique())
    dfs = [] # list of dfs containing result for each fold
    
    iterator = tqdm(range(n_folds)) if verbose else range(n_folds)
    
    for fold in iterator:
        df_train = df.query(f'(fold != {fold}) & (skn == {skn})')
        df_test = df.query(f'(fold == {fold}) & (skn == {skn})')
        
        x_train, x_test = np.array(df_train[columns]), np.array(df_test[columns])
        y_train, y_test = np.array(df_train['data_in']), np.array(df_test['data_in'])
        
        model.fit(x_train, y_train)
        yhat = model.predict(x_test)
        
        _df = pd.DataFrame(
            {
                'skn' : df_test['skn'].values,
                'year': df_test['year'].values,
                'month': df_test['month'].values,
                'observed': df_test['data_in'].values,
                'prediction': yhat,
            }
        )
        dfs.append(_df)
        
    return pd.concat(dfs)

def parallelize(func, args, n_jobs=-1):
    """
    :param args: iterable. list of arguments for the function
    """
    if n_jobs == -1:
        pool = mp.Pool(mp.cpu_count())
    else:
        pool = mp.Pool(n_jobs)
    result_objects = [pool.apply_async(func, args=_args) for _args in args]
    pool.close()
    pool.join()
    return [r.get() for r in result_objects]

In [190]:
linear_regression = LinearRegression()
df_result = cross_val_predict(df, linear_regression, skn=79, columns=columns, verbose=False)
mean_squared_error(df_result['observed'], df_result['prediction'], squared=False)

6.004229222338949

In [216]:
xgboost = XGBRegressor()
df_result = cross_val_predict(df, xgboost, skn=79, columns=columns, verbose=True)
mean_squared_error(df_result['observed'], df_result['prediction'], squared=False)

100%|██████████| 5/5 [00:01<00:00,  3.78it/s]


7.792404950412038

In [233]:
start = time.time()
result_dfs = parallelize(
    cross_val_predict,
    [[skn] for skn in df['skn'].unique()]
)
end = time.time()
print(end - start)

34.87988209724426


In [214]:
start = time.time()
for skn in tqdm(df['skn'].unique()):
    _df = cross_val_predict(df, xgboost, skn, columns, False)
end = time.time()
print(end - start)

100%|██████████| 24/24 [00:32<00:00,  1.37s/it]

32.77909517288208





In [255]:
start = time.time()
Parallel(n_jobs=12)(delayed(cross_val_predict)(df, xgboost, skn, columns, False) for skn in df['skn'].unique())
end = time.time()
print(end - start)

49.09046173095703


In [239]:
from joblib import Parallel, delayed

In [251]:
start = time.time()
Parallel(n_jobs=12)(delayed(dummy)(skn, i) for skn, i in zip(range(10), range(1,11)))
end = time.time()
print(end - start)

1.0273005962371826


In [247]:
def dummy(skn, i):
    time.sleep(1)
    return pd.DataFrame()

In [248]:
start = time.time()
for i in range(24):
    dummy(0, i)
end = time.time()
print(end - start)

24.035521030426025


In [238]:
start = time.time()
test = paralellize(
    dummy,
    [(skn, 0) for skn in df['skn'].unique()]
)
end = time.time()
print(end - start)

6.629427433013916


In [147]:
# pool = mp.Pool(mp.cpu_count())
# result_objects = [pool.apply_async(dummy, args=[skn]) for skn in df['skn'].unique()]
# results = [r.get() for r in result_objects]
# pool.close()
# pool.join()
# print(results)