In [1]:
import pandas as pd
import numpy as np
import datetime
import imp
import scripts as scr
import os
import re
import multiprocessing
import numpy as np
import pandas as pd
from pytictoc import TicToc
from IPython.display import display
import itertools
import matplotlib.pyplot as plt

In [2]:
def transform_cols (df, dict_col_types = None):
    # Расширяйте для необходимых столбцов и их явной типизации
    if dict_col_types is None:
        dict_col_types = {
        'amount_original':(float, 0.0),
        'channel_indicator_desc':(str, u'null'),
        'event_description':(str, u'null'),
        'short_date':(int, 0),
        'cdf_s_20':(str, u'null'),
        'cdf_s_126':(str, u'null'),
        'cdf_s_127':(int, 30),
        'cdf_s_129':(int, 30),
        'cdf_s_138':(str, u'null'),
        'cdf_s_130':(int, 30),
        'cdf_s_133':(int, 30),
        'cdf_s_134':(int, 30),
        'cdf_s_135':(int, 30),
        'cdf_s_140':(float, 0.0),
        'cdf_s_218':(str, u'null'),
        'cdf_s_294':(int, 0),
        'cdf_s_299':(str, u'null'),
        'data_s_65':(int, 0),
        'data_i_120':(int, 0),
        'data_i_154':(float, -150)
        }
                
    if df.shape[0] == 0:
        return df
    
    df.replace(u'null', np.nan, inplace=True)

    for i in dict_col_types:
        if i in df.columns:
            change_type, fill_value = dict_col_types[i]
            df[i] = df[i].fillna(fill_value).astype(change_type)
    
    return df

def calc_base_features(data):
    
    def cust_mark_to_class(custom_mark):
        """
        Преобразует входящее значение CUSTOM_MARK в класс
        return:
            1 - фрод
            0 - легитимная
            -1 - неизвестно
        """
        ret = -1
        if custom_mark in ['F','S']:
            ret = 1
        elif custom_mark in ['A','G', np.NaN]:
            ret = 0

        return ret
    
    feat_matrix = pd.DataFrame()
    
    if data.shape[0] == 0:
        return feat_matrix
    

    feat_matrix = pd.DataFrame()

    if data.shape[0] == 0:
        raise 'shape is 0'

    # заполняем ряд пропусков
    data.cdf_s_140 = data.cdf_s_140.fillna(0).astype(float) / 1000
    data.data_i_120.fillna(1, inplace=True)

    feat_matrix['amount'] = data['amount_original']
    same_columns = ['event_id', 'user_id', 'event_time', 'short_date', 
                    'cdf_s_127', 'cdf_s_129', 'cdf_s_130', 'cdf_s_133', 'cdf_s_134', 'cdf_s_135', 'data_i_120']
    for column in same_columns:
        feat_matrix.loc[:, column] = data[column]
    feat_matrix['label'] = [cust_mark_to_class(x) for x in data['custom_mark']]
    # ----------
    # дополнительные фичи
    # по user_id
    user_id_what_suffix = pd.Series([re.sub('[0-9]', '', x) for x in data['user_id']])
    for suffix in ['MBK', 'VSP', 'CRM', 'IVR', 'other']:
        if suffix == 'other':
            result = ~user_id_what_suffix.isin(['MBK', 'VSP', 'CRM', 'IVR', '']).astype(int)
        else:
            result = (user_id_what_suffix == suffix).astype(int)
        feat_matrix[f'user_id_{suffix}'] = result
    feat_matrix['user_id_digit_only'] = feat_matrix['user_id'].apply(lambda x: x.isdigit()).astype(int)
    # по каналу ohe
    for suffix in ['MOBILEAPI', 'WEBAPI', 'ATMAPI', 'MBK', 'other']:
        if suffix == 'other':
            result = ~data.channel_indicator_desc.isin(['MOBILEAPI', 'WEBAPI', 'ATMAPI', 'MBK']).astype(int)
        else:
            result = (data.channel_indicator_desc == suffix).astype(int)
        feat_matrix[f'channel_indicator_desc_is_{suffix}'] = result
    # время операции
    feat_matrix['event_hour'] = [x.hour for x in feat_matrix['event_time']]
    feat_matrix['event_hour_night'] = [1 if ((hour >= 23) or (hour <= 7)) else 0 for hour in feat_matrix['event_hour']]
    feat_matrix['event_hour_workhour'] = [1 if ((hour >= 8) or (hour <= 17)) else 0 for hour in feat_matrix['event_hour']]
    feat_matrix['event_hour_evening'] = [1 if ((hour >= 18) or (hour <= 22)) else 0 for hour in feat_matrix['event_hour']]
        
    feat_matrix['event_day'] = [x.dayofweek for x in feat_matrix['event_time']]
    feat_matrix['event_day_is_weekend'] = [1 if day >= 6 else 0 for day in feat_matrix['event_day']]
    # ----------
    # кумулятивная сумма операций за сутки в каналах web и МП, умножил на 1e15 из-за того, что там сильно маленькие числа
    feat_matrix['cumulative_sum_total'] = data.cdf_s_140 * 1e15


    feat_matrix['client_age'] = [x.days / 365.25 for x in (data.event_time - data.cdf_s_19)]
    feat_matrix['client_age_isnull'] = feat_matrix['client_age'].isnull().astype(int)

    #заменил на -1
    feat_matrix['cat_new_ip'] = [1 if x == 'ДА' else 0 if x == 'НЕТ' else -1 for x in data.cdf_s_126]
    feat_matrix['cat_new_prov'] =  [1 if x == 'ДА' else 0 if x == 'НЕТ' else -1 for x in data.cdf_s_138]
    feat_matrix['channel_op'] =  [0 if x == 'MOBILE' else 1 if x == 'WEB' else -1 for x in data.channel_indicator_desc]
    feat_matrix['op_type'] = [0 if x == 'Перевод частному лицу' else
                              1 if x == 'Оплата услуг' else
                              2 if x == 'Перевод между своими счетами и картами' else
                              3 for x in data.event_description]

    # бинарный флаг определяющий наличие возраста получателя
    # (полезен для линейных моделей,  менее для деревьев с учетом следующего признака)
    feat_matrix['transfer_recip_age'] = [1 if x == 0 else 0 for x in data.cdf_s_294]
    # разница возрастов получателей и отправителей, если отсутствует/неприменимо, то padding 500
    feat_matrix['transfer_age_diff'] = feat_matrix.client_age - [int(x) if x != 0 else 1000 for x in data.cdf_s_294]
    feat_matrix.loc[feat_matrix['transfer_age_diff'] < 0, 'transfer_age_diff'] = -999
    # перевод родственнику
    feat_matrix['transfer_for_relative'] = [1 if x == 'ДА' else 0 for x in data.cdf_s_218] 
    # сила связи отправителя и получателя
    feat_matrix['transfer_know_recip_squared'] = [ x if x is not None else 0 for x in data.data_s_65]
    # 'data_i_154' - ряд признаков, которые описывают устройство, с которого проводятся операции
    feat_matrix['data_i_154'] = [ x if x is not None else -150 for x in data.data_i_154]
    # 'cdf_s_124'- дата выдачи карты получателя
    feat_matrix['know_recip_card_age'] = ~data.cdf_s_124.isnull().astype(int)
    # в cdf_s_124 подмешана дата рождения, поэтому и max
    feat_matrix['recip_card_age'] = [max(x.days, 1000) if type(x) is not pd.tslib.NaTType else 1000 
                                     for x in (data.event_time - data.cdf_s_124)]

    feat_matrix['one_region'] = (data.cdf_s_20 == data.cdf_s_299).astype(int) # сравнение регионов

    # там с провайдером какая-то фигня до этого была (использовалась левая переменная)
    feat_matrix['ip_isp'] = data['ip_isp'].fillna(-1000000).astype(int)
    
    #ADD NEW FEATURES
    # natural log of amount
    feat_matrix['log_amount'] = np.log(feat_matrix['amount'] + 1)

    # len and code of region name
    feat_matrix['client_region_len'] = data.cdf_s_20.apply(lambda x: len(str(x)))
    feat_matrix['client_region'] = np.array([x if x.isdigit() else -1 for x in data.cdf_s_20], dtype=float)
    
    # 'cdf_s_136','cdf_s_137','cdf_s_140'- кумулятивные суммы операций за сутки в web, МП, web + МП 
    feat_matrix['amnt2chnls'] = (data["amount_original"].fillna(0) / \
                                 (data["cdf_s_136"].fillna(0).astype(float) + \
                                  data["cdf_s_136"].fillna(0).astype(float) + \
                                  data["cdf_s_140"].fillna(0).astype(float) + 1))

    # поставил order_cols временно, чтобы сохранить такой же порядок как в оригинальном
    order_cols = ['event_id', 'user_id', 'label', 'event_time', 'short_date', 'amount',
                  'client_age', 'cat_new_ip', 'cat_new_prov', 'channel_op', 'op_type',
                  'cumulative_sum_total', 'data_i_120', 'data_i_154', 
                  'cdf_s_127', 'cdf_s_129','cdf_s_130', 'cdf_s_133', 'cdf_s_134', 'cdf_s_135',
                  'know_recip_card_age', 'recip_card_age', 'one_region', 
                  'log_amount', 'ip_isp', 'amnt2chnls']
    order_cols += [x for x in feat_matrix.columns if not x in order_cols] # все остальные
    return feat_matrix[order_cols]


def get_data(chunk_name):
    
    def load_data(chunk_fnames, fields=None, query=None, sample='train', dict_col_types=None):
        df = pd.DataFrame({})
        if isinstance(chunk_fnames, str):
            chunk_fnames = [chunk_fnames]

        for filename in chunk_fnames:
            chunk_df = pd.read_feather(filename)

            if fields is None:
                fields = chunk_df.columns.tolist()

            transormed = transform_cols(chunk_df)

            if query:
                transormed = transormed.query(query)

            df = pd.concat([df, transormed[fields]], ignore_index=True)
        return df
    

    def features_handler(chunk_names, calc_feat, query=None, chunk_size=5000):
        res_df = pd.DataFrame()

        for chunk_name in chunk_names:
            loaded_data = load_data(chunk_name, query=query, dict_col_types=None)
            feat_chunk = calc_feat(loaded_data)
            res_df = pd.concat([res_df, feat_chunk], ignore_index=True)

        return res_df


    feat_test = features_handler(
        query = query,
        chunk_names=[chunk_name],
        calc_feat = calc_base_features)
    return feat_test

In [3]:
FIRST_N = 100
N_THREADS = 16
train_folder = '../../data/raw_splits/train/'
train_files = sorted([x for x in os.listdir(train_folder) if not '.pkl' in x], key = lambda x: int(re.sub('[^0-9]', '', x)))
train_files = [os.path.join(train_folder, x) for x in train_files]
print(f'Length of train files is {len(train_files)}')
train_files[:5]

Length of train files is 51


['../../data/raw_splits/train/chunk_0.fth',
 '../../data/raw_splits/train/chunk_1.fth',
 '../../data/raw_splits/train/chunk_2.fth',
 '../../data/raw_splits/train/chunk_3.fth',
 '../../data/raw_splits/train/chunk_4.fth']

---

In [4]:
include_channels = ['Перевод частному лицу', 'Оплата услуг', 'Перевод между своими счетами и картами']

start_date = 20171029
end_date = 20171128

query = f"event_description in ({str(include_channels)[1:-1]})" + \
        f" and short_date >= {str(start_date)}" + \
        f" and short_date <= {str(end_date)}"
query

"event_description in ('Перевод частному лицу', 'Оплата услуг', 'Перевод между своими счетами и картами') and short_date >= 20171029 and short_date <= 20171128"

In [5]:
%%time
# жрет вплоть более 70 гигов оперативки, осторожнее
with multiprocessing.Pool(processes=min(N_THREADS, FIRST_N)) as pool:
    results = pool.map(get_data, train_files)

You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)
You can access NaTType as type(pandas.NaT)


CPU times: user 2.9 s, sys: 4.55 s, total: 7.45 s
Wall time: 4min 15s


In [6]:
# total_df = pd.DataFrame()
# for i, df in tqdm_notebook(enumerate(results), total = len(results)):
total_df = pd.concat(total_df).reset_index().drop('index', axis = 1)

In [7]:
total_df['label'].value_counts()

 0    3802643
 1      14012
-1       3664
Name: label, dtype: int64

In [8]:
print(total_df.shape)
total_df.columns

(3820319, 50)


Index(['event_id', 'user_id', 'label', 'event_time', 'short_date', 'amount',
       'client_age', 'cat_new_ip', 'cat_new_prov', 'channel_op', 'op_type',
       'cumulative_sum_total', 'data_i_120', 'data_i_154', 'cdf_s_127',
       'cdf_s_129', 'cdf_s_130', 'cdf_s_133', 'cdf_s_134', 'cdf_s_135',
       'know_recip_card_age', 'recip_card_age', 'one_region', 'log_amount',
       'ip_isp', 'amnt2chnls', 'user_id_MBK', 'user_id_VSP', 'user_id_CRM',
       'user_id_IVR', 'user_id_other', 'user_id_digit_only',
       'channel_indicator_desc_is_MOBILEAPI',
       'channel_indicator_desc_is_WEBAPI', 'channel_indicator_desc_is_ATMAPI',
       'channel_indicator_desc_is_MBK', 'channel_indicator_desc_is_other',
       'event_hour', 'event_hour_night', 'event_hour_workhour',
       'event_hour_evening', 'event_day', 'event_day_is_weekend',
       'client_age_isnull', 'transfer_recip_age', 'transfer_age_diff',
       'transfer_for_relative', 'transfer_know_recip_squared',
       'client_region_len'

In [9]:
!mkdir -p '../../data/prepaired_dataset/'

In [10]:
total_df.to_feather('../../data/prepaired_dataset/train_v2.fth')