# versions

v1 1,6,3,clicks_th=20,carts_th=15,orders_th=15

v2 0.5,9,0.5,clicks_th=15,carts_th=20,orders_th=20

v3 1,6,3,clicks_th=30,carts_th=30,orders_th=30

v4 1,6,3,clicks_th=40,carts_th=40,orders_th=40

v5 1,6,3,clicks_th=50,carts_th=50,orders_th=50

v6 1,6,3,clicks_th=20,carts_th=15,orders_th=15,drop

# setting

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
type_weight = {0:1,
               1:6,
               2:3}
clicks_th = 15 # クリック数
carts_th  = 15 # カート数
orders_th = 20 # 購入数

VER = 16

In [3]:
!nvidia-smi

Tue Jan 31 02:28:50 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.47.03    Driver Version: 510.47.03    CUDA Version: 11.6     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-SXM...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   31C    P0    50W / 400W |      0MiB / 40960MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

# import

In [4]:
!pip install -q cudf-cu11 dask-cudf-cu11 --extra-index-url=https://pypi.ngc.nvidia.com
import pandas as pd
import numpy as np
import os,sys,pickle,glob,gc
from collections import Counter
import cudf,itertools
pd.set_option('display.max_columns', 100)
pd.set_option('display.max_rows', 100)
print('We will use RAPIDS version',cudf.__version__)

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m442.8/442.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.6/76.6 KB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.8/8.8 MB[0m [31m103.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m453.6/453.6 KB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m85.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.8/8.8 MB[0m [31m108.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m46.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.2/16.2 MB[0m [31m48.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━

# read function

In [5]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob('/content/drive/MyDrive/Colab Notebooks/kaggle/OTTO/dataset/original/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 146 files, in groups of 5 and chunks of 25.
CPU times: user 34.2 s, sys: 7.69 s, total: 41.9 s
Wall time: 45 s


# experiments

# orders matrix

In [6]:
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

def orders_matrix(files,MODE):
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for PART in range(DISK_PIECES):
        #print()
        #print('### DISK PART',PART+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # => OUTER CHUNKS
        for j in range(6):
            a = j*CHUNK
            b = min( (j+1)*CHUNK, len(files) )
            #print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            for k in range(a,b,READ_CT):
                # READ FILE
                df = [read_file(files[k])]
                for i in range(1,READ_CT): 
                    if k+i<b: df.append( read_file(files[k+i]) )
                df = cudf.concat(df,ignore_index=True,axis=0)
                df = df.loc[df['type'].isin([2])]
                df = df.sort_values(['session','ts'],ascending=[True,False])
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df,on='session')
                df = df.loc[ ((df.ts_x - df.ts_y).abs()< 7 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','type_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a: tmp2 = df
                else: tmp2 = tmp2.add(df, fill_value=0)
                #print(k,', ',end='')
            #print()
            # COMBINE OUTER CHUNKS
            if a==0: tmp = tmp2
            else: tmp = tmp.add(tmp2, fill_value=0)
            del tmp2, df
            gc.collect()
        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 40
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<carts_th].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        tmp.to_pandas().to_parquet(f'/content/drive/MyDrive/Colab Notebooks/kaggle/OTTO/dataset/co-visitation matrix/{MODE}/top_15_orders_v{VER}_{PART}.pqt')

# carts matrix

In [7]:
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

def carts_matrix(files,MODE):
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for PART in range(DISK_PIECES):
        #print()
        #print('### DISK PART',PART+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # => OUTER CHUNKS
        for j in range(6):
            a = j*CHUNK
            b = min( (j+1)*CHUNK, len(files) )
            #print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            for k in range(a,b,READ_CT):
                # READ FILE
                df = [read_file(files[k])]
                for i in range(1,READ_CT): 
                    if k+i<b: df.append( read_file(files[k+i]) )
                df = cudf.concat(df,ignore_index=True,axis=0)
                df = df.loc[df['type'].isin([1])] # ONLY WANT CARTS AND ORDERS
                df = df.sort_values(['session','ts'],ascending=[True,False])
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df,on='session')
                df = df.loc[ ((df.ts_x - df.ts_y).abs()< 7 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','type_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a: tmp2 = df
                else: tmp2 = tmp2.add(df, fill_value=0)
                #print(k,', ',end='')
            #print()
            # COMBINE OUTER CHUNKS
            if a==0: tmp = tmp2
            else: tmp = tmp.add(tmp2, fill_value=0)
            del tmp2, df
            gc.collect()
        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 40
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<orders_th].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        tmp.to_pandas().to_parquet(f'/content/drive/MyDrive/Colab Notebooks/kaggle/OTTO/dataset/co-visitation matrix/{MODE}/top_15_carts_v{VER}_{PART}.pqt')

# clicks matrix

In [8]:
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

def clicks_matrix(files,MODE):
    # COMPUTE IN PARTS FOR MEMORY MANGEMENT
    for PART in range(DISK_PIECES):
        #print()
        #print('### DISK PART',PART+1)
        
        # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
        # => OUTER CHUNKS
        for j in range(6):
            a = j*CHUNK
            b = min( (j+1)*CHUNK, len(files) )
            #print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
            
            # => INNER CHUNKS
            for k in range(a,b,READ_CT):
                # READ FILE
                df = [read_file(files[k])]
                for i in range(1,READ_CT): 
                    if k+i<b: df.append( read_file(files[k+i]) )
                df = cudf.concat(df,ignore_index=True,axis=0)
                df = df.loc[df['type'].isin([0])]
                df = df.sort_values(['session','ts'],ascending=[True,False])
                # USE TAIL OF SESSION
                df = df.reset_index(drop=True)
                df['n'] = df.groupby('session').cumcount()
                df = df.loc[df.n<30].drop('n',axis=1)
                # CREATE PAIRS
                df = df.merge(df,on='session')
                df = df.loc[ ((df.ts_x - df.ts_y).abs()< 7 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
                # MEMORY MANAGEMENT COMPUTE IN PARTS
                df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
                # ASSIGN WEIGHTS
                df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
                df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
                df = df[['aid_x','aid_y','wgt']]
                df.wgt = df.wgt.astype('float32')
                df = df.groupby(['aid_x','aid_y']).wgt.sum()
                # COMBINE INNER CHUNKS
                if k==a: tmp2 = df
                else: tmp2 = tmp2.add(df, fill_value=0)
                #print(k,', ',end='')
            #print()
            # COMBINE OUTER CHUNKS
            if a==0: tmp = tmp2
            else: tmp = tmp.add(tmp2, fill_value=0)
            del tmp2, df
            gc.collect()
        # CONVERT MATRIX TO DICTIONARY
        tmp = tmp.reset_index()
        tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
        # SAVE TOP 40
        tmp = tmp.reset_index(drop=True)
        tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
        tmp = tmp.loc[tmp.n<clicks_th].drop('n',axis=1)
        # SAVE PART TO DISK (convert to pandas first uses less memory)
        tmp.to_pandas().to_parquet(f'/content/drive/MyDrive/Colab Notebooks/kaggle/OTTO/dataset/co-visitation matrix/{MODE}/top_20_clicks_v{VER}_{PART}.pqt')

# test orders

In [9]:
%%time
orders_matrix(files,'test')

CPU times: user 16.2 s, sys: 3.71 s, total: 19.9 s
Wall time: 21.6 s


# test carts

In [10]:
%%time
carts_matrix(files,'test')

CPU times: user 19.5 s, sys: 5.08 s, total: 24.6 s
Wall time: 24.8 s


# test clicks

In [11]:
%%time
clicks_matrix(files,'test')

CPU times: user 56.3 s, sys: 14.8 s, total: 1min 11s
Wall time: 1min 45s


# read function

In [12]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob('/content/drive/MyDrive/Colab Notebooks/kaggle/OTTO/dataset/otto-validation/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 120 files, in groups of 5 and chunks of 20.
CPU times: user 23.3 s, sys: 4.21 s, total: 27.5 s
Wall time: 33.7 s


# val orders

In [13]:
%%time
orders_matrix(files,'val')

CPU times: user 11.2 s, sys: 2.35 s, total: 13.5 s
Wall time: 14.9 s


# val carts

In [14]:
%%time
carts_matrix(files,'val')

CPU times: user 13.8 s, sys: 3.76 s, total: 17.5 s
Wall time: 17.7 s


# val clicks

In [None]:
%%time
clicks_matrix(files,'val')