In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"

import pandas as pd
pd.options.display.max_rows = 999
import gc
import numpy as np
import cudf
from glob import glob
from tqdm import tqdm
import gc
import cudf

def load_jsonl(load_path, max_load_chunk=100000):
    chunks = pd.read_json(load_path, lines=True, chunksize=max_load_chunk)
    
    dfs = []
    for e, chunk in tqdm(enumerate(chunks)):
        if e > max_load_chunk:
            break
        event_dict = {"session": [], "aid": [], "ts": [], "type": []}
        for session, events in zip(chunk["session"].tolist(), chunk["events"].tolist()):
            for event in events:
                event_dict["session"].append(session)
                event_dict["aid"].append(event["aid"])
                event_dict["ts"].append(event["ts"])
                event_dict["type"].append(event["type"])
        dfs.append(pd.DataFrame(event_dict))

    return pd.concat(dfs).reset_index(drop=True).astype({"ts": "datetime64[ms]"})

In [None]:
train = load_jsonl("../input/otto-recommender-system/train.jsonl")
test = load_jsonl("../input/otto-recommender-system/test.jsonl")

train['type'] = train['type'].map({'clicks':0, 'carts':1, 'orders':2}).astype('int8')
test['type'] = test['type'].map({'clicks':0, 'carts':1, 'orders':2}).astype('int8')
train.session = train.session.astype('int32')
test.session = test.session.astype('int32')
train.aid = train.aid.astype('int32')
test.aid = test.aid.astype('int32')

train.to_parquet('../input/train.parquet')
test.to_parquet('../input/test.parquet')

In [None]:
!nvidia-smi

# Starting Full dataset processing / Writting by batchs

In [None]:
def freemem(df):
    for col in df.columns:
        if df[col].dtype == 'int64':
            df[col] = df[col].astype('int32')
        if df[col].dtype == 'Int64':
            df[col] = df[col].astype('int32')
        elif df[col].dtype == 'float64':
            df[col] = df[col].astype('float32')
        elif df[col].dtype == 'Float64':
            df[col] = df[col].astype('float32')
    gc.collect()
    return

maptype = {'clicks':0, 'carts':1, 'orders':2}

In [None]:
train = cudf.read_parquet('../input/train.parquet')
test = cudf.read_parquet('../input/test.parquet')

train = train.sort_values(['session', 'ts', 'type'], ascending=[True, False, True]).reset_index(drop=True)
train['n'] = train.groupby('session')['ts'].cumcount()
test = test.sort_values(['session', 'ts', 'type'], ascending=[True, False, True]).reset_index(drop=True)
test['n'] = test.groupby('session')['ts'].cumcount()

print(train.shape, test.shape)

train['ts'] = train['ts'].astype('int64') // 1000000
test['ts'] = test['ts'].astype('int64') // 1000000
freemem(train)
freemem(test)
gc.collect()

In [None]:
train.head()

In [None]:
test.head()

In [None]:
dt = test.groupby('session')['aid'].count()
dt = dt[dt>=2]
dt = dt.sort_values().reset_index()
dt = dt.loc[((dt['session'] % 10)==0) & (dt['aid']>=2) ]
print(dt.shape)
dt.tail(50)

In [None]:
valid = test.loc[ test.session.isin(dt['session']) ].reset_index(drop=True)
test = test.loc[~test.session.isin(dt['session']) ].reset_index(drop=True)

valid.shape, test.shape

In [None]:
!rm -r fold
!mkdir fold

for n, s in enumerate(range(0, train.shape[0], 1900000)):
    e = s+1900000
    if e > train.shape[0]: e = train.shape[0]
    tmp2 = train.iloc[s:e].copy().reset_index(drop=True)
    if tmp2.shape[0]>0:
        tmp2.to_parquet(f'fold/train-split{n}.parquet')
    del tmp2
gc.collect()

In [None]:
for n, s in enumerate(range(0, valid.shape[0], 350000)):
    e = s+350000
    if e > valid.shape[0]: e = valid.shape[0]
    tmp2 = valid.iloc[s:e].copy().reset_index(drop=True)
    if tmp2.shape[0]>0:
        tmp2.to_parquet(f'fold/valid-split{n}.parquet')
    del tmp2
gc.collect()

In [None]:
for n, s in enumerate(range(0, test.shape[0], 1800000)):
    e = s+1800000
    if e > test.shape[0]: e = test.shape[0]
    tmp2 = test.iloc[s:e].copy().reset_index(drop=True)
    if tmp2.shape[0]>0:
        tmp2.to_parquet(f'fold/test-split{n}.parquet')
    del tmp2
gc.collect()