In [15]:
import pandas as pd
import numpy as np
import pickle
import re 
import matplotlib.pyplot as plt
import nvstrings, nvcategory
import warnings
import cudf
import pyarrow.parquet as pq
import pdb
import torch
import os
import glob

from fastai import *
from fastai.basic_data import *
from librmm_cffi import librmm
from fastai_modified.core_cudf import *
from time import time
from torch import tensor
from torch.utils import data as torch_data
from torch.utils.dlpack import from_dlpack
from cuml.preprocessing import LabelEncoder
from sys import getsizeof
from numba import cuda
from sklearn.metrics import roc_auc_score
from datetime import date

warnings.filterwarnings("ignore")
%matplotlib inline
%reload_ext snakeviz
GPU_id = 4
os.environ['CUDA_VISIBLE_DEVICES'] = str(GPU_id)
cpu = torch.device("cpu")

In [16]:
MEDIAN = "median"
CONSTANT = "constant"
TRAIN = 'train'
VALID = 'valid'
TEST = 'test'

<h2> <center> Pre processing </center> </h2>

In [17]:
def _enforce_str(y: cudf.Series) -> cudf.Series:
    """
    Ensure that nvcategory is being given strings
    """
    if y.dtype != "object":
        return y.astype("str")
    return y


class MyLabelEncoder(object):
    
    def __init__(self, *args, **kwargs):
        self._cats: nvcategory.nvcategory = None
        self._dtype = None
        self._fitted: bool = False

    def _check_is_fitted(self):
        if not self._fitted:
            raise TypeError("Model must first be .fit()")

    def fit(self, y: cudf.Series) -> "LabelEncoder":
        self._dtype = y.dtype

        y = _enforce_str(y)

        self._cats = nvcategory.from_strings(y.data)
        self._fitted = True
        return self

    def transform(self, y: cudf.Series) -> cudf.Series:
        self._check_is_fitted()
        y = _enforce_str(y)
        encoded = cudf.Series(
            nvcategory.from_strings(y.data)
            .set_keys(self._cats.keys())
            .values()
        )
        return encoded.replace(-1, 0)

    def fit_transform(self, y: cudf.Series) -> cudf.Series:
        self._dtype = y.dtype

        # Convert y to nvstrings series, if it isn't one
        y = _enforce_str(y)

        # Bottleneck is here, despite everything being done on the device
        self._cats = nvcategory.from_strings(y.data)

        self._fitted = True
        arr: librmm.device_array = librmm.device_array(
            y.data.size(), dtype=np.int32
        )
        self._cats.values(devptr=arr.device_ctypes_pointer.value)
        return cudf.Series(arr)

    def inverse_transform(self, y: cudf.Series) -> cudf.Series:
        raise NotImplementedError

In [18]:
class PreprocessingColByCol():
    fill_strategy = MEDIAN
    add_col = False
    fill_val = 0
    means, stds, filler, category_encoders, col_name_missing = {}, {}, {}, {}, set()

    def __init__(self, path, cat_names, cont_names, label_name, fill_strategy=MEDIAN):
        # path + name is the file
        self.path = path
        self.cat_names, self.cont_names, self.label_name = cat_names, cont_names, label_name
        self.fill_strategy = fill_strategy

    def preproc_dataframe(self, mode):
        self.mode = mode
        get_col = lambda n: cudf.read_parquet(f"{self.path}/{mode}/{n}.parquet")[n]
        get_tensor = lambda col: from_dlpack(col.to_dlpack()).view(col.shape[0], -1).to(cpu)
        cats, conts = [], []
        for n in self.cat_names:
            col = get_col(n)
            col = self.categorify(col, n)
            cats.append(get_tensor(col).long())
        for n in self.cont_names:
            col = get_col(n)
            col = self.fill_missing(col, n)
            col = self.normalize(col, n)
            conts.append(get_tensor(col).float())
        label = get_tensor(get_col(self.label_name)).float().squeeze(1)
        return (torch.cat(cats, 1), torch.cat(conts, 1)), label

    def normalize(self, col: cudf.Series, col_name):
        if self.mode == TRAIN: self.means[col_name], self.stds[col_name] = col.mean(), col.std()
        return (col - self.means[col_name]) / (1e-7 + self.stds[col_name])

    def get_median(self, col: cudf.Series):
        col = col.dropna().reset_index(drop=True).sort_values()
        return col[len(col)//2]

    def add_col_(self, col: cudf.Series, col_name):
        col_name_na = col_name + "_na"
        self.df[col_name_na] = col.isna().to_pandas().astype('int64')
        if col_name_na not in self.cat_names: self.cat_names.append(col_name_na)

    def fill_missing(self, col: cudf.Series, col_name):
        if col.isna().sum() == 0: return col
        if self.mode != TRAIN:
            if col_name not in self.col_name_missing:
                raise Exception(f"""There are nan values in field {col_name} but there were none in the training set. 
                Please fix those manually.""")
        else:
            self.col_name_missing.add(col_name)
            if self.fill_strategy == MEDIAN:
                self.filler[col_name] = self.get_median(col)
            elif self.fill_strategy == CONSTANT:
                self.filler[col_name] = self.fill_val
            else:
                self.filler[col_name] = col.value_counts().index[0]
        if self.add_col: self.add_col_(col, col_name)
        return col.fillna(self.filler[col_name])

    def categorify(self, col: cudf.Series, col_name):
        if self.mode != TRAIN: 
            result = self.category_encoders[col_name].transform(col.append([None]))[:-1]
        else:
            self.category_encoders[col_name] = MyLabelEncoder()
            result = self.category_encoders[col_name].fit_transform(col.append([None]))[:-1]
        return result.astype('int64')

In [19]:
class PreprocessDF():
    fill_strategy = MEDIAN
    add_col = False
    fill_val = 0
    category_encoders = {}

    def __init__(self, cat_names, cont_names, label_name, mode=TRAIN, fill_strategy=MEDIAN, to_cpu=True):
        self.cat_names, self.cont_names = cat_names, cont_names
        self.fill_strategy = fill_strategy
        self.label_name = label_name
        self.to_cpu = to_cpu 

    def preproc_dataframe(self, gdf: cudf.dataframe, mode):
        self.gdf = gdf
        self.mode = mode
        self.categorify()
        self.fill_missing()
        self.normalize()
        if is_listy(self.label_name): 
            for n in self.label_name: self.gdf[n] = self.gdf[n].astype('float32')
        else: self.gdf[self.label_name] = self.gdf[self.label_name].astype('float32')
        # int64 in cudf may not be equivalent to that in pytorch
        cats = from_dlpack(self.gdf[self.cat_names].to_dlpack()).long()
        conts = from_dlpack(self.gdf[self.cont_names].to_dlpack())
        label = from_dlpack(self.gdf[self.label_name].to_dlpack())
        if self.to_cpu: (cats, conts), label = (cats.to(cpu), conts.to(cpu)), label.to(cpu)
        return (cats, conts), label

    def normalize(self):
        if self.mode == TRAIN:
            self.means, self.stds = self.gdf[self.cont_names].mean(), self.gdf[self.cont_names].std()
        for i, name in enumerate(self.cont_names):
            self.gdf[name] = ((self.gdf[name]-self.means[i])/(1e-7+self.stds[i])).astype('float32')

    def get_median(self, col: cudf.Series):
        col = col.dropna().reset_index(drop=True).sort_values()
        return col[len(col)//2]

    def add_col_(self, cont_names_na):
        for name in cont_names_na:
            name_na = name + "_na"
            self.gdf[name_na] = self.gdf[name].isna()
            if name_na not in self.cat_names: self.cat_names.append(name_na)

    def fill_missing(self):
        if self.mode == TRAIN:
            self.train_cont_names_na = [name for name in self.cont_names if self.gdf[name].isna().sum()]
            if self.fill_strategy == MEDIAN:
                self.filler = {name: self.get_median(self.gdf[name]) for name in self.train_cont_names_na}
            elif self.fill_strategy == CONSTANT:
                self.filler = {name: self.fill_val for name in self.train_cont_names_na}
            else:
                self.filler = {name: self.gdf[name].value_counts().index[0] for name in self.train_cont_names_na}
            if self.add_col: 
                self.add_col_(self.train_cont_names_na)
            self.gdf[self.train_cont_names_na].fillna(self.filler, inplace=True)
        else:
            cont_names_na = [name for name in self.cont_names if self.gdf[name].isna().sum()]
            if not set(cont_names_na).issubset(set(self.train_cont_names_na)):
                 raise Exception(f"""There are nan values in field {cont_names_na} but there were none in the training set. 
                 Please fix those manually.""")
            if self.add_col: self.add_col_(cont_names_na)
            self.gdf[self.train_cont_names_na].fillna(self.filler, inplace=True)

    def categorify(self):
        for name in self.cat_names:
            if self.mode == TRAIN:
                self.category_encoders[name] = MyLabelEncoder()
                self.gdf[name] = self.category_encoders[name].fit_transform(self.gdf[name].append([None]))[:-1]
            else: self.gdf[name] = self.category_encoders[name].transform(self.gdf[name].append([None]))[:-1]
            self.gdf[name] = self.gdf[name].astype('int64')
    
    def get_emb_sz(self):
        work_in = {}
        for key, val in self.category_encoders.items():
            work_in[key] = len(val._cats.keys()) + 1
        ret_list = [self.def_emb_sz(work_in, n) for n in self.cat_names]
        return ret_list
        
    def emb_sz_rule(self, n_cat:int)->int: return min(600, round(1.6 * n_cat**0.56))

    def def_emb_sz(self, classes, n, sz_dict=None):
        "Pick an embedding size for `n` depending on `classes` if not given in `sz_dict`."
        sz_dict = ifnone(sz_dict, {})
        n_cat = classes[n]
        sz = sz_dict.get(n, int(self.emb_sz_rule(n_cat)))  # rule of thumb
        self.embed_sz[n] = sz
        return n_cat,sz

In [20]:
# train = cudf.read_parquet('train.parquet')
# test = cudf.read_parquet('test.parquet')
# cat_names = ['user_id','item_id','platform','city','device','current_filters'] + [i for i in test.columns if i.startswith('is_')]

# for c in cat_names:
#     print(c)
#     encoder = MyLabelEncoder()
#     train_encoded = encoder.fit_transform(train[c])
#     try:
#         test_encoded = encoder.transform(test[c])
#     except:
#         print(f'{c} failed')
#         continue

# del train, test

<h1> <center> Data preprocessing by cudf</center> </h1>

In [21]:
# %%time
# data_pair = pd.read_pickle('cache/data_pair_all.pkl')

# train = data_pair[data_pair.clickout_missing==0]
# test = data_pair[data_pair.clickout_missing>0]

# valid = train.loc[train['row_id'] % 5 == 1]
# train = train.loc[train['row_id'] % 5 != 1]

# train.reset_index(drop=True, inplace=True)
# valid.reset_index(drop=True, inplace=True)
# test.reset_index(drop=True, inplace=True)

# train.to_parquet('train.parquet')
# valid.to_parquet('valid.parquet')
# test.to_parquet('test.parquet')

# print(train.shape, valid.shape, test.shape)

In [22]:
with open('cache/col_names.pkl', 'rb') as f: col_names = pickle.load(f)
cat_names, cont_names = col_names['cat_names'], col_names['cont_names']

In [23]:
# ds_name = [TRAIN, VALID, TEST]
# data = {}
# for name in ds_name:
#     ds = cudf.read_parquet(f"{name}.parquet")
#     for i, n in enumerate(ds.columns):
#         df = ds[n].to_frame().to_pandas()
#         if not os.path.exists(f"cache/{name}"):
#             os.mkdir(f"cache/{name}")
#         df.to_parquet(f"cache/{name}/{n}.parquet")
#     del ds

In [24]:
a = [1,2,3]

In [25]:
np.random.shuffle(a)

In [26]:
a

[3, 2, 1]

<h3> Test process entire colum by column</h3> 


In [27]:
random.shuffle(cat_names)
random.shuffle(cont_names)
proc = PreprocessingColByCol(path="cache", cat_names=cat_names, cont_names=cont_names, label_name='target')
ds_name = [TRAIN,VALID]
data = {}
for name in ds_name:
    start = time()
    x, y = proc.preproc_dataframe(mode=name)
    print(f"{name} use {time()-start} seconds\n")
    data[name] = (x, y)

train use 64.16728401184082 seconds



AttributeError: 'PreprocessingColByCol' object has no attribute 'category_encoder'

<h3> Test process entire df</h3> 


In [None]:
from itertools import combinations
proc = PreprocessDF(cat_names=cat_names, cont_names=cont_names, label_name='target', to_cpu=True)
ds_name = [TEST]
data = {}
for name in ds_name:
    ds = cudf.read_parquet(f"{name}.parquet")
    x, y = proc.preproc_dataframe(ds, mode=name)
    data[name] = (x, y)
    del ds

In [None]:
for name, ((cats, conts), y) in data.items():
    print(name, cats.shape, conts.shape, y.shape)

<h3> Test fastai </h3> 

In [None]:
%%time
procs = [FillMissing, Normalize, Categorify]
train['is_va'] = train.row_id%5 == 0

In [None]:
%%time
cat_names =[]
test_list = TabularList.from_df(test, path='./', cat_names=cat_names, cont_names=cont_names)
train_list = TabularList.from_df(train, path='./', cat_names=cat_names, cont_names=cont_names, procs=procs)  

In [None]:
%%time
train_list = train_list.split_from_df('is_va')

In [None]:
%%time
train_list_labeled = train_list.label_from_df(cols='target')

In [None]:
%%time
train_list_labeled_test = train_list_labeled.add_test(test_list)

In [None]:
%%time
data_tab = train_list_labeled_test.databunch(num_workers=10,bs=1024)

In [None]:
len(data_tab.valid_ds + data_tab.train_ds)