In [1]:
import wordbatch
from wordbatch.extractors import WordHash
from wordbatch.models import FM_FTRL
from wordbatch.data_utils import *
import threading
import pandas as pd
from sklearn.metrics import roc_auc_score
import time
import numpy as np
import gc
from contextlib import contextmanager

**End-of-life notification**

This library was designed to bring alternative generators to the NumPy 
infrastructure. It as been successful in advancing the conversation 
for a future implementation of a new random number API in NumPy which 
will allow new algorithms and/or generators. The next step
in this process is to separate the basic (or core RNG) from the 
functions that transform random bits into useful random numbers.
This has been implemented in a successor project  **randomgen** 
available on GitHub

https://github.com/bashtage/randomgen

or PyPi

https://pypi.org/project/randomstate/.

randomgen has a slightly different API, so please see the randomgen documentation

https://bashtage.github.io/randomgen.



In [2]:
count_combinations = [
    ['app'],
    ['ip'],  # 3.03
    ['channel'],
    ['os'],
    ['ip', 'device'],  # 9.88
    ['day', 'hour', 'app'],  # 4.08
    ['app', 'channel'],  # 2.8
    ['ip', 'day', 'hour'],  # 0.52
    ['os', 'device'],  # 0.44
    ['ip', 'os', 'day', 'hour'],  # 0.41
    ['ip', 'device', 'day', 'hour'],  # 0.31
    ['ip', 'app', 'os']  # 0.21
]

countUniq_combinations = [
    # [['app'],'ip'],
    # [['app', 'device', 'os', 'channel'], 'ip'],
    [['ip'], 'channel'],  # 0.9
    [['ip'], 'app'],  # 1.3
    [['ip'], 'os']  # 0.45
]

nextClick_combinations = [
    ['ip', 'os'],
    ['ip', 'app', 'device', 'os']
]

In [3]:
@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print(f'[{name}] done in {time.time() - t0:.0f} s')


import os, psutil


def cpuStats():
    pid = os.getpid()
    py = psutil.Process(pid)
    memoryUse = py.memory_info()[0] / 2. ** 30
    print('memory GB:', memoryUse)


start_time = time.time()

mean_auc = 0


def fit_batch(clf, X, y, w):  clf.partial_fit(X, y, sample_weight=w)


def predict_batch(clf, X):  return clf.predict(X)


def evaluate_batch(clf, X, y, rcount):
    auc = roc_auc_score(y, predict_batch(clf, X))
    global mean_auc
    if mean_auc == 0:
        mean_auc = auc
    else:
        mean_auc = 0.2 * (mean_auc * 4 + auc)
    print(rcount, "ROC AUC:", auc, "Running Mean:", mean_auc)
    return auc


def count_agg(df, group_cols):
    print('grouping features')
    for i, cols in enumerate(group_cols):
        col_name = "_".join(cols) + '_count'
        count = df.groupby(cols).size().reset_index(name=col_name)
        df = df.merge(count, on=cols, how='left')
        del count
        gc.collect()
    return df


def count_uniq(df, group_uniq_cols):
    print('unique features')
    for i, cols in enumerate(group_uniq_cols):
        group_cols, uniq_col = cols[0], cols[1]
        col_name = "_".join(group_cols) + '_uniq_' + uniq_col + '_countUniq'
        tmp = df.groupby(group_cols)[uniq_col].nunique().reset_index(name=col_name)
        df = df.merge(tmp, on=group_cols, how='left')
        del tmp
        gc.collect()
    return df


def next_click(df, group_cols):
    print('next click features')
    df['click_time'] = (df['click_time'].astype(np.int64) // 10 ** 9).astype(np.int32)
    for i, cols in enumerate(group_cols):
        col_name = "_".join(cols) + '_nextClick'
        df[col_name] = (df.groupby(cols).click_time.shift(-1) - df.click_time).astype(np.float32)
        gc.collect()
    return df


def df2csr(wb, df, pick_hours=None):
    df.reset_index(drop=True, inplace=True)
    with timer("Adding counts"):
        df['click_time'] = pd.to_datetime(df['click_time'])
        dt = df['click_time'].dt
        df['day'] = dt.day.astype('uint8')
        df['hour'] = dt.hour.astype('uint8')
        del (dt)

        df = count_agg(df, count_combinations)
        df = count_uniq(df, countUniq_combinations)
        df = next_click(df, nextClick_combinations)

    with timer("Log-binning features"):
        for fea in ['app_count',
                    'ip_count',
                    'channel_count',
                    'os_count',
                    'ip_device_count',
                    'day_hour_app_count',
                    'app_channel_count',
                    'ip_day_hour_count',
                    'os_device_count',
                    'ip_os_day_hour_count',
                    'ip_device_day_hour_count',
                    'ip_app_os_count',
                    'ip_uniq_channel_countUniq',
                    'ip_uniq_app_countUniq',
                    'ip_uniq_os_countUniq',
                    'ip_os_nextClick',
                    'ip_app_device_os_nextClick'
                    ]:
            df[fea] = np.log2(1 + df[fea].values).astype(int)

    with timer("Generating str_array"):
        str_array = ("I" + df['ip'].astype(str) \
                     + " A" + df['app'].astype(str) \
                     + " D" + df['device'].astype(str) \
                     + " O" + df['os'].astype(str) \
                     + " C" + df['channel'].astype(str) \
                     + " WD" + df['day'].astype(str) \
                     + " H" + df['hour'].astype(str) \
                     + " AXC" + df['app'].astype(str) + "_" + df['channel'].astype(str) \
                     + " OXC" + df['os'].astype(str) + "_" + df['channel'].astype(str) \
                     + " AXD" + df['app'].astype(str) + "_" + df['device'].astype(str) \
                     + " IXA" + df['ip'].astype(str) + "_" + df['app'].astype(str) \
                     + " AXO" + df['app'].astype(str) + "_" + df['os'].astype(str) \
                     + "AC" + df['app_count'].astype(str) \
                     + "IC" + df['ip_count'].astype(str) \
                     + "CC" + df['channel_count'].astype(str) \
                     + "OC" + df['os_count'].astype(str) \
                     + "IDC" + df['ip_device_count'].astype(str) \
                     + "DHAC" + df['day_hour_app_count'].astype(str) \
                     + "ACC" + df['app_channel_count'].astype(str) \
                     + "IDHC" + df['ip_day_hour_count'].astype(str) \
                     + "ODC" + df['os_device_count'].astype(str) \
                     + "IODHC" + df['ip_os_day_hour_count'].astype(str) \
                     + "IDDHC" + df['ip_device_day_hour_count'].astype(str) \
                     + "IAOC" + df['ip_app_os_count'].astype(str) \
                     + "IUC" + df['ip_uniq_channel_countUniq'].astype(str) \
                     + "IUA" + df['ip_uniq_app_countUniq'].astype(str) \
                     + "IUO" + df['ip_uniq_os_countUniq'].astype(str) \
                     + "ION" + df['ip_os_nextClick'].astype(str) \
                     + "IADON" + df['ip_app_device_os_nextClick'].astype(str) 
                     ).values
    # cpuStats()
    if 'is_attributed' in df.columns:
        labels = df['is_attributed'].values
        weights = np.multiply([1.0 if x == 1 else 0.2 for x in df['is_attributed'].values],
                              df['hour'].apply(lambda x: 1.0 if x in pick_hours else 0.5))
    else:
        labels = []
        weights = []
    return str_array, labels, weights

In [4]:
class ThreadWithReturnValue(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        threading.Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        threading.Thread.join(self)
        return self._return

In [5]:
batchsize = 5000000
D = 2 ** 20

wb = wordbatch.WordBatch(None, extractor=(WordHash, {"ngram_range": (1, 1), "analyzer": "word",
                                                     "lowercase": False, "n_features": D,
                                                     "norm": None, "binary": True})
                         , minibatch_size=batchsize // 80, procs=8, freeze=True, timeout=1800, verbose=0)
clf = FM_FTRL(alpha=0.05, beta=0.1, L1=0.0, L2=0.0, D=D, alpha_fm=0.02, L2_fm=0.0, init_fm=0.01, weight_fm=1.0,
              D_fm=8, e_noise=0.0, iters=2, inv_link="sigmoid", e_clip=1.0, threads=4, use_avx=1, verbose=0)


In [None]:
dtypes = {
    'ip': 'uint32',
    'app': 'uint16',
    'device': 'uint16',
    'os': 'uint16',
    'channel': 'uint16',
    'is_attributed': 'uint8',
}

p = None
rcount = 0

for df_c in pd.read_csv('data/train.csv',
                        engine='c', chunksize=batchsize,
                        skiprows=range(1, 9308569), sep=",", dtype=dtypes):

    rcount += batchsize
    if rcount == 130000000:
        df_c['click_time'] = pd.to_datetime(df_c['click_time'])
        df_c['day'] = df_c['click_time'].dt.day.astype('uint8')
        df_c = df_c[df_c['day'] == 8]
    str_array, labels, weights = df2csr(wb, df_c, pick_hours={4, 5, 10, 13, 14})
    del (df_c)
    if p != None:
        p.join()
        del (X)
    gc.collect()
    X = wb.transform(str_array)
    del (str_array)
    if rcount % (2 * batchsize) == 0:
        if p != None:  p.join()
        p = threading.Thread(target=evaluate_batch, args=(clf, X, labels, rcount))
        p.start()
    print("Training", rcount, time.time() - start_time)
    cpuStats()
    if p != None:  p.join()
    p = threading.Thread(target=fit_batch, args=(clf, X, labels, weights))
    p.start()
    if rcount == 130000000:
        break

grouping features
unique features
next click features
[Adding counts] done in 40 s
[Log-binning features] done in 2 s
[Generating str_array] done in 145 s
Training 5000000 227.62053513526917
memory GB: 1.9478111267089844
grouping features
unique features
next click features
[Adding counts] done in 40 s
[Log-binning features] done in 2 s
[Generating str_array] done in 145 s
Training 10000000 525.7184975147247
memory GB: 2.3234786987304688
10000000 ROC AUC: 0.9745651774599268 Running Mean: 0.9745651774599268
grouping features
unique features
next click features
[Adding counts] done in 41 s
[Log-binning features] done in 2 s
[Generating str_array] done in 148 s
Training 15000000 851.9294579029083
memory GB: 2.624431610107422
grouping features
unique features
next click features
[Adding counts] done in 40 s
[Log-binning features] done in 2 s
[Generating str_array] done in 144 s
Training 20000000 1145.4692740440369
memory GB: 2.211658477783203
20000000 ROC AUC: 0.9683387927007022 Running Me

  stride //= shape[i]


unique features
next click features
[Adding counts] done in 1 s
[Log-binning features] done in 0 s
[Generating str_array] done in 0 s


IndexError: list index out of range

In [None]:
if p != None:
    p.join()

del (X)
p = None
click_ids = []
test_preds = []
rcount = 0
for df_c in pd.read_csv('data/test.csv', engine='c', chunksize=batchsize,
                        sep=",", dtype=dtypes):
    rcount += batchsize
    if rcount % (10 * batchsize) == 0:
        print(rcount)
    str_array, labels, weights = df2csr(wb, df_c)
    click_ids += df_c['click_id'].tolist()
    del (df_c)
    if p != None:
        test_preds += list(p.join())
        del (X)
    gc.collect()
    X = wb.transform(str_array)
    del (str_array)
    p = ThreadWithReturnValue(target=predict_batch, args=(clf, X))
    p.start()
if p != None:  test_preds += list(p.join())

df_sub = pd.DataFrame({"click_id": click_ids, 'is_attributed': test_preds})
df_sub.to_csv("wordbatch_fm_ftrl.csv", index=False)