In [40]:
from dask.distributed import Client, progress
client = Client(n_workers=8, threads_per_worker=2, memory_limit='4GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:40627  Dashboard: http://127.0.0.1:41213,Cluster  Workers: 8  Cores: 16  Memory: 32.00 GB


In [1]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

import dask.dataframe as dd

from TargetEncoderv3 import TargetEncoder
from FeatureSelector import FeatureSelector

from sklearn.metrics import *
from sklearn.model_selection import *

import lightgbm as lgb

In [2]:
dtypes = {
            'ip'            : 'uint32',
            'app'           : 'uint16',
            'device'        : 'uint16',
            'os'            : 'uint16',
            'channel'       : 'uint16',
            'is_attributed' : 'uint8',
            'click_id'      : 'uint32'
            }
train = dd.read_csv("../input/train_sample.csv", dtype=dtypes, blocksize=15000000,)
train

Unnamed: 0_level_0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,uint32,uint16,uint16,uint16,uint16,object,object,uint8
,...,...,...,...,...,...,...,...


In [3]:
train.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,87540,12,1,13,497,2017-11-07 09:30:38,,0
1,105560,25,1,17,259,2017-11-07 13:40:27,,0
2,101424,12,1,19,212,2017-11-07 18:05:24,,0
3,94584,13,1,13,477,2017-11-07 04:58:08,,0
4,68413,12,1,1,178,2017-11-09 09:00:09,,0


In [64]:
def eval_fold(train, tr_index, val_index, cols, target_col, colname='new_col', func='mean', func_kwargs = {}):
    X_tr = train.map_partitions(lambda x: x.iloc[tr_index, :], meta=train)
    X_val = train.map_partitions(lambda x: x.iloc[val_index, :], meta=train)
    #display(X_tr.head())

    tmp = getattr(X_tr.groupby(cols)[target_col], func)(**func_kwargs)
    tmp.name = colname
    #display(tmp)
    new_df = X_val.join(tmp.to_frame(), on=cols, how='left')[colname]
    #display(new_df.head())
    return new_df

def cross_val_predict_dask(X, cvlist, cols, target_col, colname, func='mean', func_kwargs={}, verbose=1):
    X_vals = []
    for i, (tr_index, val_index) in enumerate(cvlist):
        if verbose:
            print("Working on fold {}".format(i))
        #print(X_tr.shape)
        new_df = eval_fold(X, tr_index, val_index, cols=cols, target_col=target_col, colname=colname, func=func, func_kwargs=func_kwargs)
        X_vals.append(new_df)
    res = dd.concat(X_vals, axis=0)
    print(res)
    return res
        
        
def cvFeatureGeneration(df, cvlist=None, cols=None, targetcol='is_attributed', func='mean', cname=None, func_kwargs={}):
    target_df = cross_val_predict_dask(df,  cvlist, cols, targetcol, cname, func, func_kwargs, verbose=1)
    df = df.join(target_df.to_frame())
    return df


In [65]:
CVFOLDS = list(KFold(10).split(train, train.is_attributed))
train = cvFeatureGeneration(train, cvlist=CVFOLDS, cols=['device'], targetcol='is_attributed', func='mean', cname='device_mean')

Working on fold 0
Working on fold 1
Working on fold 2
Working on fold 3
Working on fold 4
Working on fold 5
Working on fold 6
Working on fold 7
Working on fold 8
Working on fold 9
Dask Series Structure:
npartitions=10
    float64
        ...
     ...   
        ...
        ...
Name: device_mean, dtype: float64
Dask Name: concat, 123 tasks


In [66]:
train = train.compute()
train = dd.from_pandas(train, npartitions=1)

In [67]:
#CVFOLDS = list(KFold(10).split(train, train.is_attributed))
train = cvFeatureGeneration(train, cvlist=CVFOLDS, cols=['os'], targetcol='is_attributed', func='mean', cname='os_mean')

Working on fold 0
Working on fold 1
Working on fold 2
Working on fold 3
Working on fold 4
Working on fold 5
Working on fold 6
Working on fold 7
Working on fold 8
Working on fold 9
Dask Series Structure:
npartitions=10
    float64
        ...
     ...   
        ...
        ...
Name: os_mean, dtype: float64
Dask Name: concat, 121 tasks


In [68]:
train = train.compute()
train = dd.from_pandas(train, npartitions=5)
train.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed,device_mean,os_mean
0,87540,12,1,13,497,2017-11-07 09:30:38,,0,0.001484,0.00131
1,105560,25,1,17,259,2017-11-07 13:40:27,,0,0.001484,0.001064
2,101424,12,1,19,212,2017-11-07 18:05:24,,0,0.001484,0.001672
3,94584,13,1,13,477,2017-11-07 04:58:08,,0,0.001484,0.00131
4,68413,12,1,1,178,2017-11-09 09:00:09,,0,0.001484,0.000941


In [69]:
def eval_train(train, test, cols, target_col, colname='new_col', func='mean', func_kwargs = {}):
    tmp = getattr(train.groupby(cols)[target_col], func)(**func_kwargs)
    tmp.name = colname
    #display(tmp)
    new_df = test.join(tmp.to_frame(), on=cols, how='left')[colname]
    #display(new_df.head())
    return new_df
    

def testFeatureGeneration(train, test, cols=None, targetcol='is_attributed', func='mean', cname=None, func_kwargs={}):
    target_df = eval_train(train, test, cols, targetcol, cname, func, func_kwargs)
    test = test.join(target_df.to_frame())
    return test

In [70]:
test = dd.read_csv("../input/test.csv")

In [71]:
test = testFeatureGeneration(train, test, cols=['device'], targetcol='is_attributed', func='mean', cname='device_mean')
test.head()

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11a2331d0> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11a233e48> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1199f0400> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1184d2b00> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1184d2f60> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1184d26d8> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1203fe908> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe120436eb8> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1204366a0> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe117c8d9b0> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11b771c88> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe117a142b0> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11a19e550> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11a19e208> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe117a32470> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11850cb38> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11850c160> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11850c390> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0d931d2b0> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0d903d748> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0d903dc50> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc478518> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc475400> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc475828> exception was never retrieved: Traceback (

KilledWorker: ("('hash-join-09d35817760a39b681085d1ef906c8ea', 0)", 'tcp://127.0.0.1:43043')

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc7b8a58> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc7b8908> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe0fc7b87b8> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11432b128> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11430cf98> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe11430ce48> exception was never retrieved: Traceback (

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe114307668> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe114307518> exception was never retrieved: Traceback (most recent call last):
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "/home/mohsin/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1287, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe1143073c8> exception was never retrieved: Traceback (

In [76]:
enc = TargetEncoder(cols=['device'], targetcol=['is_attributed'], func='mean', cname='device_mean', add_to_orig=False)
enc.fit(X_tr).transform(X_val)

AttributeError: 'DataFrame' object has no attribute 'to_frame'

In [73]:
tmp = X_tr.groupby(['device'])['is_attributed'].mean()
tmp.name = 'device_mean'
tmp

Dask Series Structure:
npartitions=1
    float64
        ...
Name: device_mean, dtype: float64
Dask Name: rename, 10 tasks

In [74]:
X_val = X_val[['device']].join(tmp.to_frame(), on=['device'])
X_val.head()

Unnamed: 0,device,device_mean
20000,1,0.001591
20001,1,0.001591
20002,1,0.001591
20003,1,0.001591
20004,1,0.001591


In [75]:
X_val

Unnamed: 0_level_0,device,device_mean
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
,uint16,float64
,...,...
