In [1]:
%load_ext autoreload
%autoreload 2
from glob import glob
import gc
import os
import sys
import datetime
import numpy as np
import pandas as pd
from tqdm import tqdm
from func.utils import get_numeric_features, get_categorical_features, read_pkl_gzip, to_pkl_gzip, parallel_load_data, get_filename
from func.time_utils import date_add_days
from func.ml_utils import save_feature, get_cnt_feature, get_dummie_feature, get_label_feature
from func.parallel_utils import get_parallel_arg_list
from joblib import delayed, Parallel

In [2]:
COLUMN_ID = 'TransactionID'
COLUMN_DT = 'TransactionDT'
COLUMN_TARGET = 'isFraud'
COLUMNS_IGNORE = [COLUMN_ID, COLUMN_DT, COLUMN_TARGET, 'ProductCD']

train_paths = glob('../feature/eda_base/*_train.gz')
test_paths = glob('../feature/eda_base/*_test.gz')

train_paths = [path for path in train_paths 
               if path.count(COLUMN_DT) 
               or path.count(COLUMN_ID)
               or path.count('time_zone')
               or path.count('hour')
               or path.count('C')
               or path.count('D')
               or path.count('Product')
              ]
test_paths = [path for path in test_paths 
               if path.count(COLUMN_DT) 
               or path.count(COLUMN_ID)
               or path.count('time_zone')
               or path.count('hour')
               or path.count('C')
               or path.count('D')
               or path.count('Product')
              ]

df_train = parallel_load_data(train_paths)
df_test = parallel_load_data(test_paths)
data = pd.concat([df_train, df_test], axis=0, ignore_index=True)
if COLUMN_ID in data.columns:
    data.set_index(COLUMN_ID, inplace=True)

base_train = read_pkl_gzip('../input/base_train.gz').set_index(COLUMN_ID)
base_test = read_pkl_gzip('../input/base_test.gz').set_index(COLUMN_ID)
base = pd.concat([base_train, base_test], axis=0)

cols_num = get_numeric_features(data, COLUMNS_IGNORE)
cols_num = [col for col in cols_num if col.count('C') or col.count('D')]
data[cols_num] = data[cols_num].astype('float32')

df_user_id_ca = pd.read_csv('../output/same_user_pattern/0903__same_user_id__card_addr.csv').set_index(COLUMN_ID)
df_user_id_cap = pd.read_csv('../output/same_user_pattern/0903__same_user_id__card_addr_pemail.csv').set_index(COLUMN_ID)
df_user_id_capm = pd.read_csv('../output/same_user_pattern/0902__same_user_id__card_addr_pemail_M.csv').set_index(COLUMN_ID)
df_user_id_bear = pd.read_csv('../output/same_user_pattern/20190901_user_ids_share.csv').set_index(COLUMN_ID)

data['user_id_card_addr'] = df_user_id_ca['predicted_user_id']
data['user_id_card_addr_pemail'] = df_user_id_cap['predicted_user_id']
data['user_id_card_addr_pemail_M'] = df_user_id_capm['predicted_user_id']
data['user_id_bear'] = df_user_id_bear['predicted_user_id']

Process ForkPoolWorker-4:
Process ForkPoolWorker-114:
Process ForkPoolWorker-38:
Process ForkPoolWorker-81:
Process ForkPoolWorker-112:
Process ForkPoolWorker-64:
Process ForkPoolWorker-50:
Process ForkPoolWorker-62:
Process ForkPoolWorker-58:
Process ForkPoolWorker-117:
Process ForkPoolWorker-126:
Process ForkPoolWorker-115:
Process ForkPoolWorker-106:
Process ForkPoolWorker-111:
Process ForkPoolWorker-82:
Process ForkPoolWorker-84:
Process ForkPoolWorker-74:
Process ForkPoolWorker-51:
Process ForkPoolWorker-52:
Process ForkPoolWorker-42:
Process ForkPoolWorker-69:
Process ForkPoolWorker-39:
Process ForkPoolWorker-116:
Process ForkPoolWorker-127:
Process ForkPoolWorker-95:
Process ForkPoolWorker-32:
Process ForkPoolWorker-122:
Process ForkPoolWorker-67:
Process ForkPoolWorker-17:
Process ForkPoolWorker-33:
Process ForkPoolWorker-7:
Process ForkPoolWorker-30:
Process ForkPoolWorker-89:
Process ForkPoolWorker-53:
Process ForkPoolWorker-124:
Process ForkPoolWorker-73:
Process ForkPoolWor

In [3]:
START_DATE = '2017-12-01'
startdate = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')

data['datetime'] = data['TransactionDT'].apply(lambda x: (startdate + datetime.timedelta(seconds = x) - datetime.timedelta(seconds = 14400) ))
data['datetime'].fillna(datetime.date(2020, 1, 1), inplace=True)
data['date'] = data['datetime'].map(lambda x: x.date())

In [4]:
#========================================================================
# ProductCDあたりのC
#========================================================================
cols_C = [col for col in data.columns if col.startswith('C')]
cols_pcd = data['ProductCD'].unique()

for col in tqdm(cols_C):
    for pcd in cols_pcd:
        feature_name = f'{col}__ProductCD-{pcd}'
        data[feature_name] = -1
        data.loc[data['ProductCD'].isin([pcd]), feature_name] = data.loc[data['ProductCD'].isin([pcd]), col]

100%|██████████| 14/14 [00:10<00:00,  1.05it/s]


In [5]:
# sorted([col for col in data.columns if not col.count('C')])
cols_C = sorted([col for col in data.columns if col.count('C') and col not in COLUMNS_IGNORE
#                  and not col.count('Product')
                ])
# cols_D = ['D3', 'D5', 'D6', 'D7', 'D8', 'D9', 'D11', 'D12', 'D13', 'D14']
# cols_feature = cols_C + cols_D
# cols_feature = cols_feature[:3]
len(cols_C)

84

In [6]:
#========================================================================
# FE Aggregation User ID & TimeSeries Date
#========================================================================

def parallel_agg(df, base_key, base_date, n_day, feature):
    result = df.groupby(base_key)[feature].agg({
        f'{base_key}_day{n_day}_{feature}_mean': 'mean',
    })
    return result


def parallel_agg(df, base_key, n_day, feature):
    list_term_df = []
    for end_date in tqdm(list_end_date):
        start_date = date_add_days(end_date, n_day*-1)
        tmp_user = df[df['date']==end_date][[base_key]].drop_duplicates()
        tmp = df[(start_date <= df.date) & (df.date <= end_date)]

        result = tmp.groupby(base_key, as_index=False)[feature].agg({
            f'{base_key}_day{n_day}_{feature}_mean': 'mean',
        })

        tmp_user = tmp_user.merge(result, on=base_key, how='inner')
        tmp_user['date'] = end_date
        list_term_df.append(tmp_user)
    df_agg = pd.concat(list_term_df, axis=0)
    df_agg.set_index([base_key, 'date'], inplace=True)
    return df_agg
    

# User別に期間を切って集計できる様にする
dir_save = 'valid'
df = data
list_base_key = [col for col in df.columns if col.count('user_id')]
list_end_date = sorted(df['date'].unique())[1:]
list_base_date = list_end_date
list_n_day = [1, 3, 5, 7, 10, 14, 21, 28, 31, 62, 93, 124, 180, 270, 360][1:]
# list_n_day = [1, 3, 5, 7, 10, 14]

for base_key in list_base_key:
    for feature in cols_C[1:]:
        
        base_train = data[[base_key, 'date', feature]].iloc[:len(df_train)]
        base_test  = data[[base_key, 'date', feature]].iloc[len(df_train):]
        
        list_p = Parallel(60)([delayed(parallel_agg)(df[[base_key, 'date', feature]], base_key, n_day, feature) for n_day in list_n_day])
        
        df_agg = pd.concat(list_p, axis=1)
        df_agg.reset_index(inplace=True)
        df_agg['date'] = df_agg['date'].map(lambda x: x.date())
            
        base_train_agg = base_train.merge(df_agg, how='left', on=[base_key, 'date'])
        base_test_agg = base_test.merge(df_agg, how='left', on=[base_key, 'date'])
        
        cols_agg = [col for col in base_train_agg.columns if col.count(f'{base_key}_day')]
        
        for col in cols_agg:
            base_train_agg[f"{col}_org_ratio"] = base_train_agg[col] / (base_train_agg[feature]+1)
            base_train_agg[f"{col}_org_diff"] = base_train_agg[col] - (base_train_agg[feature])
            base_test_agg[f"{col}_org_ratio"] = base_test_agg[col] / (base_test_agg[feature]+1)
            base_test_agg[f"{col}_org_diff"] = base_test_agg[col] - (base_test_agg[feature])
        
        cols_save = [col for col in base_train_agg.columns if col.count('org_ratio') or col.count('org_diff')]
        
        save_feature(base_train_agg[cols_save], '503', dir_save, is_train=True, auto_type=True, list_ignore=COLUMNS_IGNORE)
        save_feature(base_test_agg[cols_save],  '503', dir_save, is_train=False, auto_type=True, list_ignore=COLUMNS_IGNORE)

(590540,) | user_id_card_addr_day3_C10_mean_org_ratio
(590540,) | user_id_card_addr_day3_C10_mean_org_diff
(590540,) | user_id_card_addr_day5_C10_mean_org_ratio
(590540,) | user_id_card_addr_day5_C10_mean_org_diff
(590540,) | user_id_card_addr_day7_C10_mean_org_ratio
(590540,) | user_id_card_addr_day7_C10_mean_org_diff
(590540,) | user_id_card_addr_day10_C10_mean_org_ratio
(590540,) | user_id_card_addr_day10_C10_mean_org_diff
(590540,) | user_id_card_addr_day14_C10_mean_org_ratio
(590540,) | user_id_card_addr_day14_C10_mean_org_diff
(590540,) | user_id_card_addr_day21_C10_mean_org_ratio
(590540,) | user_id_card_addr_day21_C10_mean_org_diff
(590540,) | user_id_card_addr_day28_C10_mean_org_ratio
(590540,) | user_id_card_addr_day28_C10_mean_org_diff
(590540,) | user_id_card_addr_day31_C10_mean_org_ratio
(590540,) | user_id_card_addr_day31_C10_mean_org_diff
(590540,) | user_id_card_addr_day62_C10_mean_org_ratio
(590540,) | user_id_card_addr_day62_C10_mean_org_diff
(590540,) | user_id_card_

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



(590540,) | user_id_card_addr_pemail_day3_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day3_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day5_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day5_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day7_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day7_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day10_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day10_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day14_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day14_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day21_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day21_C9__ProductCD-H_mean_org_diff
(590540,) | user_id_card_addr_pemail_day28_C9__ProductCD-H_mean_org_ratio
(590540,) | user_id_card_addr_pemail_day28_C9__Pro

KeyboardInterrupt
  File "/home/yryrgogo/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/home/yryrgogo/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/home/yryrgogo/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/yryrgogo/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/yryrgogo/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Process ForkPoolWorker-3044:


KeyboardInterrupt: 

In [9]:
### ========================================================================
# FE Aggregation User ID & TimeSeries Hour
#========================================================================
list_base_key

['user_id_card_addr',
 'user_id_card_addr_pemail',
 'user_id_card_addr_pemail_M',
 'user_id_bear']

In [13]:
# for df, agg_cols in zip(arg_df_list, arg_list):
def parallel_agg(df, agg_cols):
    error_keys = []
    error_cols = []
    for col in agg_cols:
        aggs = {}
#         aggs[col] = ['mean', 'max', 'min', 'std']
        aggs[col] = ['max', 'min']
        
        for key in list_key:
            
            tmp_base_train = base_train.join(df[key])
            tmp_base_test = base_test.join(df[key])
            
            try:
                base_agg = df[key].to_frame()
            except AttributeError:
                error_keys.append(key)
                error_cols += agg_cols
            
            df_agg = df.groupby(key).agg(aggs)
            df_agg.columns = get_new_columns(key+'_', aggs)
            max_col = [col for col in df_agg.columns if col.count('_max')][0]
            min_col = [col for col in df_agg.columns if col.count('_min')][0]
            df_agg[max_col+'_min_diff'] = df_agg[max_col] - df_agg[min_col]
            
            df_agg.drop([max_col, min_col], axis=1, inplace=True)
            
            df_agg.reset_index(inplace=True)
            
            base_train_agg = tmp_base_train.merge(df_agg, on=key, how='left')
            base_test_agg = tmp_base_test.merge(df_agg, on=key, how='left')
            
            del df_agg, tmp_base_train, tmp_base_test
            gc.collect()
            
            print(base_train_agg.shape, base_test_agg.shape)
            cols_feature = [
                col for col in base_train_agg.columns 
                if col not in COLUMNS_IGNORE and col != key and col != 'D1']
            save_feature(base_train_agg[cols_feature], '502', dir_save, is_train=True, auto_type=True, list_ignore=COLUMNS_IGNORE)
            save_feature(base_test_agg[cols_feature],  '502', dir_save, is_train=False, auto_type=True, list_ignore=COLUMNS_IGNORE)
            
            del base_train_agg, base_test_agg
            gc.collect()

In [14]:
err = Parallel(n_jobs)([delayed(parallel_agg)(d, ac) for d, ac in zip(arg_df_list, arg_list)])

In [None]:
df_feat.head()