In [1]:
import sys
import pathlib
cwd = pathlib.Path().cwd()
sys.path.append(cwd.parent.as_posix())
data_folder = cwd.parent.joinpath('data')

import datetime as dt
import pandas as pd
import dask.dataframe as dd

from dask.distributed import Client, wait
from auxiliary import trim_memory, select_and_sort

from dask_ml.decomposition import PCA
from telecom.transformers import ColumnsCorrector
from sklearn.pipeline import make_pipeline
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import KFold
from lightgbm import LGBMClassifier
from sklearn.metrics import f1_score

In [2]:
client = Client(n_workers=1)
# client

2022-06-08 20:38:32,262 - distributed.diskutils - INFO - Found stale lock file and directory '/home/avagadro/projects/mega_telecom/research/dask-worker-space/worker-q38o6sjp', purging


In [3]:
# parameters
blocksize = '200MB'
drop_feats = ['75', '81', '85', '139', '203']

# with PCA compression: 0.7776868306270386
bound_date = '2018-11-19'
compress_features = True
n_components = 3

In [None]:
# # baseline
# bound_date = ''
# compress_features = False

# # partial fit: 0.7776767294528835
# bound_date = '2018-11-19'
# compress_features = False

## load and prepare data

In [4]:
# read train data
train_data = dd.read_csv(data_folder.joinpath('data_train.csv'), blocksize=blocksize).drop('Unnamed: 0', axis=1)

# select required train part
if bound_date:
    bound_timestamp = dt.datetime.fromisoformat(bound_date).timestamp()
    used_train_mask = client.submit(lambda df, bound: df['buy_time'] >= bound, train_data, bound_timestamp, key='get_train_data_mask')
    # extract and sort train data
    train_data = client.submit(select_and_sort, train_data, mask=used_train_mask, by='buy_time', key='train_data_sort')
else:
    train_data = client.submit(train_data.sort_values, by='buy_time', key='train_data_sort')

In [5]:
# read features
feats_csv = dd.read_csv(data_folder.joinpath('features.csv'), sep='\t', blocksize=blocksize).drop(['Unnamed: 0', *drop_feats], axis=1)
# feats_csv = dd.read_csv(data_folder.joinpath('compressed_features.csv'), blocksize=blocksize).drop('Unnamed: 0', axis=1)

# compress if required
if compress_features:
    # push dask data to the cluster separately
    features = client.scatter(feats_csv.drop(['id', 'buy_time'], axis=1))
    headers = client.submit(feats_csv[['id', 'buy_time']].compute, key='compute_headers')

    # recast to dask array with computes sizes
    dask_array = client.submit(dd.DataFrame.to_dask_array, features, lengths=True, key='recast_to_dask_array')

    # fit PCA
    pca_model = PCA(n_components)
    fit_pca = client.submit(pca_model.fit, dask_array, key='fit_pca_model')
    # trim memory
    client.run(trim_memory)

    # transform features
    transformed = client.submit(lambda df: fit_pca.result().transform(df).compute(), dask_array, key='compress_features')
    wait(transformed)

    # concat features
    concat = client.submit(lambda df, arr: pd.concat([df.reset_index(drop=True), pd.DataFrame(arr)], axis=1), headers, transformed, key='concat_features')
    user_feats = concat.result()
    del features, headers, dask_array, fit_pca, transformed, concat
else:
    user_feats = feats_csv



In [6]:
# select required user features
unique_ids = client.submit(lambda df: df['id'].unique().compute(), train_data, key='unique_ids_compute')
used_feats_mask = client.submit(user_feats['id'].isin, unique_ids, key='get_used_feats_mask')

# extract and sort user features
user_feats = client.submit(select_and_sort, user_feats, mask=used_feats_mask, by='buy_time', key='user_feats_sort')

# compute all data (recast to pandas DataFrame)
train_data_df = client.submit(train_data.result().compute, key='recast_train_data')
user_feats_df = user_feats if compress_features else client.submit(user_feats.result().compute, key='recast_user_feats')
# user_feats_df = client.submit(user_feats.result().compute, key='recast_user_feats')
wait([train_data_df, user_feats_df])

# remove no longer needed tasks from cluster
del used_train_mask, unique_ids, used_feats_mask, train_data, user_feats

  [              id    buy_time             0        ... s x 5 columns]]
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


In [7]:
# merge
merged = client.submit(pd.merge_asof, train_data_df, user_feats_df, by='id', on='buy_time', direction='nearest', key='data_merge')

# split into data/target & send to cluster
data = client.scatter(merged.result().drop('target', axis=1))
target = client.scatter(merged.result()['target'])
wait([data, target])

# remove no longer needed tasks from cluster
del merged, train_data_df, user_feats_df

In [8]:
# trim cluster memory
client.run(trim_memory)

{'tcp://127.0.0.1:33955': 1}

## featuring

In [9]:
# build featuring pipeline
pipeline = make_pipeline(ColumnsCorrector('drop', ['id', ]), )

## fit

In [10]:
n_folds = 5

metrics = []
models = []
class_weights = dict(enumerate(compute_class_weight('balanced', classes=[0, 1], y=target.result())))
folds = KFold(n_splits=n_folds, shuffle=True, random_state=29)

for train_index, valid_index in folds.split(target.result()):
    # push train/valid dataframes to the cluster
    train_df = client.scatter(data.result().iloc[train_index])
    valid_df = client.scatter(data.result().iloc[valid_index])
    # fit and apply featuring pipeline
    featuring = client.submit(pipeline.fit, train_df, target, key='featuring_fit')
    X_train = client.submit(featuring.result().transform, train_df, key='train_featuring_transform')
    X_valid = client.submit(featuring.result().transform, valid_df, key='valid_featuring_transform')
    # exctract targets and push them to the cluster
    y_train = client.scatter(target.result().iloc[train_index])
    y_valid = client.scatter(target.result().iloc[valid_index])

    # LGBM
    estimator = LGBMClassifier(random_state=17,
                               class_weight=class_weights,
                               n_estimators=100,
                               learning_rate=0.15,
                               max_depth=-1,
                               num_leaves=31,
                               )
    model = client.submit(estimator.fit, X_train, y_train)

    # predicts & metrics
    prediction = client.submit(lambda mdl, df: mdl.predict(df), model, X_valid, key='compute_predictions')
    score = client.submit(f1_score, y_valid, prediction, average='macro', key='scoring')
    # append step result
    models.append(model.result())
    metrics.append(score.result())
    # remove no longer needed tasks from cluster
    del model, featuring, train_df, valid_df, X_train, y_train, X_valid, y_valid, prediction, score
    # trim cluster memory
    client.run(trim_memory)

print(f'Avg. f-score: {sum(metrics) / n_folds}')

Avg. f-score: 0.7777151619934244


PCA сжатие до 3 компонент практически не повлияло на метрику. Но метрики различаются на фоне разницы реализаций PCA в `dask` и `pyspark`. Для дальнейшей работы взяты фичи, сжатые `pyspark`.

Также следует отметить, что при отсечении обучающих данных по указанной дате, распределение коммерческих предложений "стабилизируется" и сама дата как признак, в силу особенностей алгоритма дерева решений, не будет иметь значения на диапазоне, превышающем тот, который был на обучении. А все тестовые данные (январь 2019) выходят за этот диапазон. В связи с этим, следует исключить дату из признаков.

In [11]:
#