In [1]:
import pandas as pd, numpy as np
from tqdm.notebook import tqdm
import os, sys, pickle, glob, gc
from collections import Counter
import itertools

In [2]:
data_dir = "../data/"

In [3]:
#data_cache = {}
type_labels = {'clicks':0, 'carts':1, 'orders':2}
files = glob.glob(os.path.join(data_dir,'*_pqt_chunks/*'))

# CHUNK PARAMETERS
READ_CT = 5
CHUNK = int( np.ceil( len(files)/6 ))
print(f'We will process {len(files)} files, in groups of {READ_CT} and chunks of {CHUNK}.')

We will process 146 files, in groups of 5 and chunks of 25.


In [4]:
type_weight = {0:1, 1:6, 2:3}

# USE SMALLEST DISK_PIECES POSSIBLE WITHOUT MEMORY ERROR
DISK_PIECES = 4
SIZE = 1.86e6/DISK_PIECES

In [5]:
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    
    # MERGE IS FASTEST PROCESSING CHUNKS WITHIN CHUNKS
    # => OUTER CHUNKS
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')
        for k in range(a,b,READ_CT):
            df = [pd.read_parquet(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append(pd.read_parquet(files[k+i]))
            df = pd.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = df.type_y.map(type_weight)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
            
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<40].drop('n',axis=1)
    tmp.to_parquet(os.path.join(data_dir,f'top_15_carts_orders_{PART}.pqt'))


### DISK PART 1
Processing files 0 thru 24 in groups of 5...
0 , 

KeyboardInterrupt: 

In [None]:
for j in range(6):
    a = j*CHUNK
    b = min( (j+1)*CHUNK, len(files) )
    print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')

    # => INNER CHUNKS
    for k in range(a,b,READ_CT):
        # READ FILE
        df = [pd.read_parquet(files[k])]
        for i in range(1,READ_CT): 
            if k+i<b: df.append(pd.read_parquet(files[k+i]) )
        df = pd.concat(df,ignore_index=True,axis=0)
        df = df.loc[df['type'].isin([1,2])] # ONLY WANT CARTS AND ORDERS
        df = df.sort_values(['session','ts'],ascending=[True,False])
        # USE TAIL OF SESSION
        df = df.reset_index(drop=True)
        df['n'] = df.groupby('session').cumcount()
        df = df.loc[df.n<30].drop('n',axis=1)
        # CREATE PAIRS
        df = df.merge(df,on='session')
        df = df.loc[ ((df.ts_x - df.ts_y).abs()< 14 * 24 * 60 * 60) & (df.aid_x != df.aid_y) ] # 14 DAYS
        # ASSIGN WEIGHTS
        df = df[['session', 'aid_x', 'aid_y','type_y']].drop_duplicates(['session', 'aid_x', 'aid_y'])
        df['wgt'] = 1
        df = df[['aid_x','aid_y','wgt']]
        df.wgt = df.wgt.astype('float32')
        df = df.groupby(['aid_x','aid_y']).wgt.sum()
        # COMBINE INNER CHUNKS
        if k==a: tmp2 = df
        else: tmp2 = tmp2.add(df, fill_value=0)
        print(k,', ',end='')
    print()
    # COMBINE OUTER CHUNKS
    if a==0: tmp = tmp2
    else: tmp = tmp.add(tmp2, fill_value=0)
    del tmp2, df
    gc.collect()
# CONVERT MATRIX TO DICTIONARY
tmp = tmp.reset_index()
tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
# SAVE TOP 40
tmp = tmp.reset_index(drop=True)
tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
tmp = tmp.loc[tmp.n<40].drop('n',axis=1)
# SAVE PART TO DISK (convert to pandas first uses less memory)
tmp.to_parquet(os.path.join(data_dir,'top_40_buy2buy.pqt'))

In [8]:
for PART in range(DISK_PIECES):
    print()
    print('### DISK PART',PART+1)
    for j in range(6):
        a = j*CHUNK
        b = min( (j+1)*CHUNK, len(files) )
        print(f'Processing files {a} thru {b-1} in groups of {READ_CT}...')

        # => INNER CHUNKS
        for k in range(a,b,READ_CT):
            # READ FILE
            df = [pd.read_parquet(files[k])]
            for i in range(1,READ_CT): 
                if k+i<b: df.append(pd.read_parquet(files[k+i]) )
            df = pd.concat(df,ignore_index=True,axis=0)
            df = df.sort_values(['session','ts'],ascending=[True,False])
            # USE TAIL OF SESSION
            df = df.reset_index(drop=True)
            df['n'] = df.groupby('session').cumcount()
            df = df.loc[df.n<30].drop('n',axis=1)
            # CREATE PAIRS
            df = df.merge(df,on='session')
            df = df.loc[ ((df.ts_x - df.ts_y).abs()< 24 * 60 * 60) & (df.aid_x != df.aid_y) ]
            df = df.loc[(df.aid_x >= PART*SIZE)&(df.aid_x < (PART+1)*SIZE)]
            # ASSIGN WEIGHTS
            df = df[['session', 'aid_x', 'aid_y','ts_x']].drop_duplicates(['session', 'aid_x', 'aid_y'])
            df['wgt'] = 1 + 3*(df.ts_x - 1659304800)/(1662328791-1659304800)
            df = df[['aid_x','aid_y','wgt']]
            df.wgt = df.wgt.astype('float32')
            df = df.groupby(['aid_x','aid_y']).wgt.sum()
            # COMBINE INNER CHUNKS
            if k==a: tmp2 = df
            else: tmp2 = tmp2.add(df, fill_value=0)
            print(k,', ',end='')
        print()
        # COMBINE OUTER CHUNKS
        if a==0: tmp = tmp2
        else: tmp = tmp.add(tmp2, fill_value=0)
        del tmp2, df
        gc.collect()
    # CONVERT MATRIX TO DICTIONARY
    tmp = tmp.reset_index()
    tmp = tmp.sort_values(['aid_x','wgt'],ascending=[True,False])
    # SAVE TOP 40
    tmp = tmp.reset_index(drop=True)
    tmp['n'] = tmp.groupby('aid_x').aid_y.cumcount()
    tmp = tmp.loc[tmp.n<40].drop('n',axis=1)
    # SAVE PART TO DISK (convert to pandas first uses less memory)
    tmp.to_parquet(os.path.join(data_dir,f'top_40_clicks_{PART}.pqt'))


### DISK PART 1
Processing files 0 thru 24 in groups of 5...
0 , 5 , 10 , 15 , 20 , 
Processing files 25 thru 49 in groups of 5...
25 , 30 , 35 , 40 , 45 , 
Processing files 50 thru 74 in groups of 5...
50 , 55 , 60 , 65 , 70 , 
Processing files 75 thru 99 in groups of 5...
75 , 80 , 85 , 90 , 95 , 
Processing files 100 thru 124 in groups of 5...
100 , 105 , 110 , 115 , 120 , 
Processing files 125 thru 145 in groups of 5...
125 , 130 , 135 , 140 , 145 , 

### DISK PART 2
Processing files 0 thru 24 in groups of 5...
0 , 5 , 10 , 15 , 20 , 
Processing files 25 thru 49 in groups of 5...
25 , 30 , 35 , 40 , 45 , 
Processing files 50 thru 74 in groups of 5...
50 , 55 , 60 , 65 , 70 , 
Processing files 75 thru 99 in groups of 5...
75 , 80 , 85 , 90 , 95 , 
Processing files 100 thru 124 in groups of 5...
100 , 105 , 110 , 115 , 120 , 
Processing files 125 thru 145 in groups of 5...
125 , 130 , 135 , 140 , 145 , 

### DISK PART 3
Processing files 0 thru 24 in groups of 5...
0 , 5 , 10 , 15 , 