# Step 1 - Candidate Generation with RAPIDS
For candidate generation, we build three co-visitation matrices. One computes the popularity of cart/order given a user's previous click/cart/order. We apply type weighting to this matrix. One computes the popularity of cart/order given a user's previous cart/order. We call this "buy2buy" matrix. One computes the popularity of clicks given a user previously click/cart/order.  We apply time weighting to this matrix. We will use RAPIDS cuDF GPU to compute these matrices quickly!

In [None]:
VER = 6

import pandas as pd, numpy as np
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import cudf, itertools
print('We will use RAPIDS version',cudf.__version__)

## Compute Three Co-visitation Matrices with RAPIDS
We will compute 3 co-visitation matrices using RAPIDS cuDF on GPU. This is 30x faster than using Pandas CPU like other public notebooks! For maximum speed, set the variable `DISK_PIECES` to the smallest number possible based on the GPU you are using without incurring memory errors. If you run this code offline with 32GB GPU ram, then you can use `DISK_PIECES = 1` and compute each co-visitation matrix in almost 1 minute! Kaggle's GPU only has 16GB ram, so we use `DISK_PIECES = 4` and it takes an amazing 3 minutes each! Below are some of the tricks to speed up computation
* Use RAPIDS cuDF GPU instead of Pandas CPU
* Read disk once and save in CPU RAM for later GPU multiple use
* Process largest amount of data possible on GPU at one time
* Merge data in two stages. Multiple small to single medium. Multiple medium to single large.
* Write result as parquet instead of dictionary

In [None]:
MODE = "local" # "kaggle"

if MODE == "kaggle":
    readpath = '../raw_data/*_parquet/*'
elif MODE == "local":
    readpath = '../raw_data/deotte_radek/*_parquet/*'
    
files = glob.glob(readpath)

In [None]:
# CACHE FUNCTIONS
def read_file(f):
    return cudf.DataFrame( data_cache[f] )
def read_file_to_cache(f):
    df = pd.read_parquet(f)
    df.ts = (df.ts/1000).astype('int32') # convert to seconds
    df['type'] = df['type'].map(type_labels).astype('int8') # convert to int
    return df

# CACHE THE DATA ON CPU BEFORE PROCESSING ON GPU
data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
for f in files: data_cache[f] = read_file_to_cache(f)

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

## 1) "Carts Orders" Co-visitation Matrix - Type Weighted
- df保留 CLICKS、CARTS 和 ORDERS
- 行为时间小于1days
- 删除同一个session下，重复的'aid_x', 'aid_y', 'type_y'
- weights: 行为权重 {0:1, 1:5, 2:4}
- 保留前50个items

In [None]:
type_weight = {0:1, 1:5, 2:4}

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 16
SIZE = 1.86e6/DISK_PIECES

# pieces循环
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # chunk循环
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # READ_CT 循环
        for k in range(a,b,READ_CT):
            # READ FILE， 5个
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)  # 合并5个df
            # 按照 session 升序, ts 降序
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            # 每个人最新的30个items
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount() # session 累积计数
            df = df.loc[df.n<30].drop('n',axis=1) # 只保留前30个session
            
            # CREATE PAIRS
            # 同一个session的 item-pair
            # session, aid_x, ts_x, type_x, aid_y, ts_y, type_y
            df = df.merge(df,on='session')
            # 行为时间小于1天 & item-pair中的item不同
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # pieces循环数据切分
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]  # 内存管理
            
            # 删除同一用户的重复的 'aid_x', 'aid_y', 'type_y'
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = df.type_y.map(type_weight) # 0:1, 1:5, 2:4
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')

    # 'aid_x','aid_y' 匹配的所有权值累加        
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    # 按照 aid_ 升序, wgt 降序
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # 每个 aid 保留wgt更大的items前50个.
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount() # 根据wgt排序
    tmp = tmp.loc[tmp.n<50].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'{MODE}_top_50_carts_orders_v{VER}_{PART}.pqt')

## 2) "Buy2Buy" Co-visitation Matrix
- df只保留 CARTS 和 ORDERS
- 行为时间小于14days
- 删除同一个session下，重复的'aid_x', 'aid_y', 'type_y'
- weights: CART:1, ORDERS:1
- 保留50个items

In [None]:
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 1
SIZE = 1.86e6/DISK_PIECES

# pieces循环
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # chunk循环
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # read_ct循环
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE， 5个
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0)  # 合并5个df
            df = df.loc[df['type'].isin([1,2])] # 只保留 CARTS 和 ORDERS
            # 按照 session 升序, ts 降序
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            df = df.reset_index(drop=True)
            # 每个人最新的30个items
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            # 同一个session的 item-pair
            # session, aid_x, ts_x, type_x, aid_y, ts_y, type_y
            df = df.merge(df,on='session')
            # 行为时间小于14days & item-pair中的item不同
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
            
            # pieces循环数据切分
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            
            # 删除同一用户的重复的 'aid_x', 'aid_y','type_y'
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y', 'type_y'])
            df['wgt'] = 1 # CART 和 ORDERS 权重相同
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
    # 'aid_x','aid_y' 匹配的所有权值的累加
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')

        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    # 按照 aid_ 升序, wgt 降序
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # 每个aid_x保留匹配度最高的前50个items
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount() # 根据wgt排序
    tmp = tmp.loc[tmp.n<50].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'{MODE}_top_50_buy2buy_v{VER}_{PART}.pqt')

## 3) "Clicks" Co-visitation Matrix - Time Weighted
- df保留 CLICKS、CARTS 和 ORDERS
- 行为时间小于1days
- 删除同一个session下，重复的'aid_x', 'aid_y'
- weights: 时间戳越大权重越大
- 保留50个items

In [None]:
# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

# pieces循环
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # chunk循环
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        
        # read_ct循环
        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE， 5个
            df = [read_file(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append( read_file(files[k+i]) )
            df = cudf.concat(df,ignore_index=True,axis=0) # 合并5个df
            # 按照 session 升序, ts 降序
            df = df.sort_values(['session','ts'],ascending=[True,False])
            
            df = df.reset_index(drop=True)
            # 每个人最新的30个items
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            
            # CREATE PAIRS
            # 同一个session的 item-pair
            # session, aid_x, ts_x, type_x, aid_y, ts_y, type_y
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            
            # pieces循环数据切分
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)] # 内存管理
            
            # 删除同一用户的重复的 'aid_x', 'aid_y'
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            # 时间戳作为权重，时间戳越大权重越大
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            # 1659304800 : min ts
            # 1662328791 : max ts
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
    # 'aid_x','aid_y' 匹配的所有权值累加
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()

    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    # 按照 aid_ 升序, wgt 降序
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    
    # 每个aid_x 保留50个wgt更大的items.
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount() # 根据wgt排序
    tmp = tmp.loc[tmp.n<50].drop('n',axis=1)
    
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_pandas().to_parquet(f'{MODE}_top_50_clicks_v{VER}_{PART}.pqt')

In [None]:
# FREE MEMORY
del data_cache, tmp
_ = gc.collect()