In [1]:
# Balance of type weighting タイプごとの重み付けバランス
# 0:clicks 1:carts 2:orders
type_weight = {0:0.5,
               1:9,
               2:0.5}
type_weight_multipliers = type_weight

# Use top X for clicks, carts and orders Top何位までを使うか
clicks_th = 60 # クリック数
carts_th  = 60 # カート数
orders_th = 60 # 購入数

VER = 7

In [2]:
import pandas as pd, numpy as np
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import cudf, itertools
print('We will use RAPIDS version',cudf.__version__)

We will use RAPIDS version 21.10.01


In [3]:
%%time
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32')
    df['type'] = df['type'].map(type_labels).astype('int8')
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
# files = glob.glob('/kaggle/input/otto-chunk-data-inparquet-format/*_parquet/*')
files = glob.glob('/kaggle/input/otto-validation/*_parquet/*')
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 120 files, in groups of 5 and chunks of 20.
CPU times: user 31.2 s, sys: 4.81 s, total: 36 s
Wall time: 41.6 s


## 1) "Carts Orders" Co-visitation Matrix - Type Weighted

In [4]:
%%time

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<carts_th].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_60_carts_orders_fortrain_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...


  "When using a sequence of booleans for `ascending`, "


0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 2
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 3
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Pro

## 2) "Buy2Buy" Co-visitation Matrix

In [5]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 1
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.loc[df['type'].isin([1,2])] # ONLY WANT CARTS AND ORDERS
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = 1
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')

        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<orders_th].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_60_buy2buy_fortrain_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...
0 , 5 , 

  "When using a sequence of booleans for `ascending`, "


10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 
CPU times: user 14.3 s, sys: 5.69 s, total: 20 s
Wall time: 20.4 s


## 3) "Clicks" Co-visitation Matrix - Time Weighted

In [6]:
%%time
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

# COMPUTE IN PARTS FOR MEMORY MANGEMENT
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # MEMORY MANAGEMENT COMPUTE IN PARTS
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            # 1659304800 : minimum timestamp
            # 1662328791 : maximum timestamp
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<clicks_th].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'top_60_clicks_fortrain_v{VER}_{PART}.pqt')


### DISK PART 1
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 2
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processing files 40 thru 59 in groups of 5...
40 , 45 , 50 , 55 , 
Processing files 60 thru 79 in groups of 5...
60 , 65 , 70 , 75 , 
Processing files 80 thru 99 in groups of 5...
80 , 85 , 90 , 95 , 
Processing files 100 thru 119 in groups of 5...
100 , 105 , 110 , 115 , 

### DISK PART 3
Processing files 0 thru 19 in groups of 5...
0 , 5 , 10 , 15 , 
Processing files 20 thru 39 in groups of 5...
20 , 25 , 30 , 35 , 
Processi