In [31]:
%load_ext autoreload
%autoreload 2

In [32]:

import numpy as np
import pandas as pd
import datetime
from ast import literal_eval

from db_connectors.load import bigquery_reader
from db_connectors.postprocessing import bigquery_writer

# from notebooks.env import *
from notebooks.env_prod import *


In [33]:
bq_project_id, json_key_path

('mikvpc-prod-service-com',
 '/Users/LINGYU1/work/repo/recommender/sa-rec-dataproc-2c-prd.json')

In [34]:
import gc
gc.collect()

1326

# Data

In [35]:
def get_sale_view_data(days, top_user_num):
    query = f"""

    -- combine mik_sales with view data
    -- just pick top 200 user first
    WITH cte1 AS (
      SELECT
        user_id,
        COUNT(trans_date) AS num_trans
      FROM `Data_Infra_Eng.mik_sales`
      WHERE data_source = "MIK"
        AND DATE_DIFF(CURRENT_DATE(), trans_date, DAY) < {days}
      GROUP BY user_id
      ORDER BY num_trans DESC
      LIMIT {top_user_num}
    ), cte2 AS (

      SELECT 
        t1.user_id, 
        t1.sku_number, 
        t1.qty, 
        t1.trans_date,
        t1.created_time,
        t1.data_source,
        t2.full_taxonomy_path as category_path,
        ARRAY_AGG(t1.sku_number) --IFNULL(t2.sku_number, "na")
          OVER (
            PARTITION BY t1.user_id 
            ORDER BY t1.trans_date, t1.created_time ASC
            ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
          ) AS sku_purchase_seq,
        ARRAY_AGG(IFNULL(t2.full_taxonomy_path, "na")) 
          OVER (
            PARTITION BY t1.user_id 
            ORDER BY t1.trans_date, t1.created_time ASC
            ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
          ) AS category_path_purchase_seq
      FROM `Data_Infra_Eng.mik_sales` t1
      LEFT JOIN `Data_Infra_Eng.mik_item` t2
        ON t1.sku_number = t2.sku_number
      WHERE data_source = "MIK"
        AND DATE_DIFF(CURRENT_DATE(), trans_date, DAY) < {days} 
        AND t1.user_id IN (SELECT user_id FROM cte1)
      ORDER BY user_id, trans_date, created_time ASC
    )
    SELECT
      *
    FROM cte2 t1
    LEFT JOIN `Data_Infra_Eng.user_behavior` t2
      ON CAST(t1.user_id AS STRING) = t2.user_id
    ;
    """
    df = bigquery_reader(
            project_id=bq_project_id, json_credentials_path=json_key_path,
            query_string=query
        )
    df = df[df['user_id_1'].notna()].reset_index(drop=True)
    df['sku_view_sequence'] = df['user_behavior'].apply(lambda x: np.array([y['item'] for y in x]))
    print(f"""
            df shape: {df.shape}, user number: {df['user_id'].nunique()}, 
            avg trans per user: {df.groupby('user_id').size().mean()}, 
            item number: {df['sku_number'].nunique()}
            """)
    return df

In [6]:
days = 35
query = f"""

-- combine mik_sales with view data
-- just pick top 200 user first
WITH cte1 AS (
  SELECT
    user_id,
    COUNT(trans_date) AS num_trans
  FROM `Data_Infra_Eng.mik_sales`
  WHERE data_source = "MIK"
    AND DATE_DIFF(CURRENT_DATE(), trans_date, DAY) < {days}
  GROUP BY user_id
  ORDER BY num_trans DESC
  LIMIT 200
), cte2 AS (

  SELECT 
    t1.user_id, 
    t1.sku_number, 
    t1.qty, 
    t1.trans_date,
    t1.created_time,
    t1.data_source,
    t2.full_taxonomy_path as category_path,
    ARRAY_AGG(t1.sku_number) --IFNULL(t2.sku_number, "na")
      OVER (
        PARTITION BY t1.user_id 
        ORDER BY t1.trans_date, t1.created_time ASC
        ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
      ) AS sku_purchase_seq,
    ARRAY_AGG(IFNULL(t2.full_taxonomy_path, "na")) 
      OVER (
        PARTITION BY t1.user_id 
        ORDER BY t1.trans_date, t1.created_time ASC
        ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
      ) AS category_path_purchase_seq
  FROM `Data_Infra_Eng.mik_sales` t1
  LEFT JOIN `Data_Infra_Eng.mik_item` t2
    ON t1.sku_number = t2.sku_number
  WHERE data_source = "MIK"
    AND DATE_DIFF(CURRENT_DATE(), trans_date, DAY) < {days} 
    AND t1.user_id IN (SELECT user_id FROM cte1)
  ORDER BY user_id, trans_date, created_time ASC
)
SELECT
  *
FROM cte2 t1
LEFT JOIN `Data_Infra_Eng.user_behavior` t2
  ON CAST(t1.user_id AS STRING) = t2.user_id
;
"""

In [None]:
df = bigquery_reader(
        project_id=bq_project_id, json_credentials_path=json_key_path,
        query_string=query
    )
df = df[df['user_id_1'].notna()].reset_index(drop=True)
df['sku_view_sequence'] = df['user_behavior'].apply(lambda x: np.array([y['item'] for y in x]))

In [29]:
df

Unnamed: 0,user_id,sku_number,qty,trans_date,created_time,data_source,category_path,sku_purchase_seq,category_path_purchase_seq,user_id_1,user_behavior,sku_view_sequence
0,59943339,10363580,1,2023-02-25,2023-02-25 01:59:22.574,MIK,root//Shop Categories//Art Supplies//Paint & P...,[],[],59943339,"[{'behavior': 'item_view', 'item': '10473166',...","[10473166, MP472538, 10542383, 10065111, 10622..."
1,59943339,10363580,1,2023-02-25,2023-02-25 01:59:22.574,MIK,root//Shop Categories//Art Supplies//Paint & P...,[],[],59943339,"[{'behavior': 'item_view', 'item': '10552981',...","[10552981, 170232377470697496, D045628S, 10648..."
2,59943339,10363580,1,2023-02-25,2023-02-25 01:59:22.574,MIK,root//Shop Categories//Art Supplies//Paint & P...,[],[],59943339,"[{'behavior': 'item_view', 'item': '10633516',...",[10633516]
3,59943339,10363582,1,2023-02-25,2023-02-25 01:59:22.574,MIK,root//Shop Categories//Art Supplies//Paint & P...,[10363580],[root//Shop Categories//Art Supplies//Paint & ...,59943339,"[{'behavior': 'item_view', 'item': '10473166',...","[10473166, MP472538, 10542383, 10065111, 10622..."
4,59943339,10363582,1,2023-02-25,2023-02-25 01:59:22.574,MIK,root//Shop Categories//Art Supplies//Paint & P...,[10363580],[root//Shop Categories//Art Supplies//Paint & ...,59943339,"[{'behavior': 'item_view', 'item': '10552981',...","[10552981, 170232377470697496, D045628S, 10648..."
...,...,...,...,...,...,...,...,...,...,...,...,...
67619,8070610866200922400,D671240S,1,2023-03-05,2023-03-05 03:34:51.792,MIK,root//Shop Categories//Baking & Kitchen//Bakew...,"[10707677, 10707678, 10466356, 10706638, 10671...",[root//Shop Categories//Holidays & Occasions//...,8070610866200922400,"[{'behavior': 'item_view', 'item': '1089560036...","[108956003695296517, D534830S, 158620787052920..."
67620,8070610866200922400,D671240S,1,2023-03-05,2023-03-05 03:34:51.792,MIK,root//Shop Categories//Baking & Kitchen//Bakew...,"[10707677, 10707678, 10466356, 10706638, 10671...",[root//Shop Categories//Holidays & Occasions//...,8070610866200922400,"[{'behavior': 'item_view', 'item': '10707973',...","[10707973, 10707688, 10707677, 10706778, 10360..."
67621,8070610866200922400,D671240S,1,2023-03-05,2023-03-05 03:34:51.792,MIK,root//Shop Categories//Baking & Kitchen//Bakew...,"[10707677, 10707678, 10466356, 10706638, 10671...",[root//Shop Categories//Holidays & Occasions//...,8070610866200922400,"[{'behavior': 'item_view', 'item': '10706430',...","[10706430, D611263S, 10706638, 10627361]"
67622,8070610866200922400,D671240S,1,2023-03-05,2023-03-05 03:34:51.792,MIK,root//Shop Categories//Baking & Kitchen//Bakew...,"[10707677, 10707678, 10466356, 10706638, 10671...",[root//Shop Categories//Holidays & Occasions//...,8070610866200922400,"[{'behavior': 'item_view', 'item': 'D651817S',...","[D651817S, 171186061250314253, D651573S, D5113..."


In [9]:
df = get_sale_view_data(35, 200)

In [10]:
df.shape, df['user_id'].nunique(), df.groupby('user_id').size().mean(), df['sku_number'].nunique()

((67624, 12), 179, 377.7877094972067, 8183)

In [73]:
import pickle
with open("/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/df_sales_raw_20230310.pickle", "wb") as f: 
    pickle.dump(df, f) 

In [46]:
df[['user_id', 'sku_number', 'trans_date', 'created_time',
    'category_path', 
    'sku_purchase_seq', 'category_path_purchase_seq', 'sku_view_sequence']]\
    .to_csv("/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/mik_sales_views_20230227.csv", index=False)

In [7]:
df_sp = pd.read_csv("/Users/LINGYU1/work/localspace/data/model_comparison_10172022/train_set_28_7.csv")

In [8]:
df_sp.head()

Unnamed: 0.1,Unnamed: 0,user_id,item_id,label,hist_item_id,seq_len,hist_genres,genres
0,0,88165,12388,1,"[1716, 1633, 40349, 40317, 40351, 40091, 1634,...",8,"[1456, 1471, 1359, 1463, 1359, 1468, 1471, 1359]",1469
1,1,200652,25138,0,[4050],1,[295],891
2,2,226591,15856,1,[9620],1,[325],323
3,3,156799,3923,1,[3924],1,[877],877
4,4,39784,7190,1,"[17880, 7662, 22924, 34779, 10925, 22918, 6894...",10,"[1398, 1404, 421, 1265, 1444, 421, 1367, 1398,...",1486


# Negative Sampling

In [9]:
import gc
gc.collect()

155

In [10]:
candidate_sku = df[['sku_number','category_path']].drop_duplicates()
negsample_ratio = 1
SEQ_LEN = 50
# neg_list = np.random.choice(candidate_set, size=df.shape[0] * negsample_ratio, replace=True)


In [14]:
def create_negative_sample(data, candidate_item, negsample_ratio):
    # get positive sample data
    df_pos = data.copy()
    df_pos['label'] = 1
    
    # create negative data
    df_neg = pd.concat([df_pos.copy()] * negsample_ratio, ignore_index=True)
    df_neg['label'] = 0
    # negative sampling
    neg_sku = candidate_item[~candidate_item['sku_number'].isin(df_pos['sku_number'])]\
                .sample(df_pos.shape[0] * negsample_ratio)
    df_neg['sku_number'] = neg_sku['sku_number'].values
    df_neg['category_path'] = neg_sku['category_path'].values
    
    return pd.concat([df_pos, df_neg], axis=0).reset_index(drop=True)

In [15]:
def gen_input_data(df, negsample_ratio, seq_len):
    # get candidate items and categories
    candidate_sku = df[['sku_number','category_path']].drop_duplicates()
    # group by user_id
    user_group = df.groupby('user_id')
    # negative sampling for each user group
    df_res = []
    for user_id, data in user_group:
        # first refine sequence len to < seq_len
        for col in ['sku_purchase_seq','category_path_purchase_seq','sku_view_sequence']:
            data[col] =  data[col].apply(lambda x: x[0:seq_len])
#             data[col] = data[col].apply(lambda x: np.lib.pad(x, 
#                                                              (seq_len - x.shape[0],0), 
#                                                              'constant', 
#                                                              constant_values=('na')))        
        # create negative sample and combine with positive sample        
        df_sp = create_negative_sample(data = data, candidate_item = candidate_sku, negsample_ratio = negsample_ratio)
        df_res.append(df_sp)
    df_res = pd.concat(df_res).reset_index(drop=True)
    df_res['seq_len'] = seq_len
    return df_res[[
        'user_id', 'sku_number','category_path', 'trans_date', 'created_time',
        'sku_purchase_seq','category_path_purchase_seq','sku_view_sequence', 'seq_len',
        'label'
    ]]

In [21]:
df_input = gen_input_data(df, 1, 50)
df_input.shape

(135248, 10)

In [17]:
import multiprocessing as mp

def process_chunk(chunk, candidate_sku, negsample_ratio, seq_len):
    # group by user_id
    user_group = chunk.groupby('user_id')
    # negative sampling for each user group
    df_res = []
    for user_id, data in user_group:
        # first refine sequence len to < seq_len
        for col in ['sku_purchase_seq','category_path_purchase_seq','sku_view_sequence']:
            data[col] =  data[col].apply(lambda x: x[0:seq_len])
        # create negative sample and combine with positive sample        
        df_sp = create_negative_sample(data = data, candidate_item = candidate_sku, negsample_ratio = negsample_ratio)
        df_res.append(df_sp)
    df_res = pd.concat(df_res)
    df_res['seq_len'] = seq_len
    return df_res

def gen_input_data_parallel(df, negsample_ratio, seq_len, num_processes=mp.cpu_count()):
    # get candidate items and categories
    candidate_sku = df[['sku_number','category_path']].drop_duplicates()
    
    # split data into chunks
    chunks = np.array_split(df, num_processes)
    
    # create pool of processes
    with mp.Pool(processes=num_processes) as pool:
        # process each chunk using a separate process
        results = [pool.apply_async(process_chunk, args=(chunk, candidate_sku, negsample_ratio, seq_len)) for chunk in chunks]
        # collect the results from all processes
        df_res = pd.concat([result.get() for result in results]).reset_index(drop=True)
    
    return df_res[[
        'user_id', 'sku_number','category_path', 'trans_date', 'created_time',
        'sku_purchase_seq','category_path_purchase_seq','sku_view_sequence', 'seq_len',
        'label'
    ]]


In [None]:
df_input = gen_input_data_parallel(df, 1, 50)
df_input.shape

In [14]:
df_input_copy = df_input.copy()

In [15]:
df_input = df_input_copy.copy()

# label encoding

In [16]:
df_input.columns

Index(['user_id', 'sku_number', 'category_path', 'trans_date', 'created_time',
       'sku_purchase_seq', 'category_path_purchase_seq', 'sku_view_sequence',
       'seq_len', 'label'],
      dtype='object')

In [24]:
from sklearn.preprocessing import LabelEncoder

In [26]:
def label_transform(lbe, x):
    try:
        return lbe.transform(x) + 1 # add one to all the encoded categories labels
    except:
        return np.array([])
    
def encode_features(df_input):
    # store original item id and user id
    df_input['sku_number_org'] = df_input['sku_number']
    df_input['user_id_org'] = df_input['user_id']
    
    # specify features and sequence features
    sparse_features = ['sku_number', 'category_path',
                       'user_id']
    seq_sparse_feature = ['sku_purchase_seq','category_path_purchase_seq','sku_view_sequence']
    
    # get full set of item and category
    full_item_set = np.append(
        np.unique(
            np.concatenate(
                (
                    df_input['sku_number'].values,  # all sku in sales
                    df_input['sku_view_sequence'].explode().values # all sku in views
                )
            )
        )
        , 'na')
    full_cat_set = np.append(df_input['category_path'].unique(), 'na')
    
    # fit and transform features
    for feature in sparse_features:
        # need to store sku encoder
        if feature == 'sku_number':
            lbe_sku = LabelEncoder()
            lbe_sku.fit(full_item_set)
            df_input[feature] = lbe_sku.transform(df_input[feature]) + 1 # add one to all the encoded categories labels
        # need to store
        elif feature == 'category_path':
            lbe_cat = LabelEncoder()
            lbe_cat.fit(full_cat_set)
            df_input[feature] = lbe_cat.transform(df_input[feature]) + 1 # add one to all the encoded categories labels
        else:
            lbe = LabelEncoder()
            df_input[feature] = lbe.fit_transform(df_input[feature]) + 1 # add one to all the encoded categories labels 
    # trasnform sequence features
    for feature in seq_sparse_feature:
        if feature == 'sku_purchase_seq' or feature == 'sku_view_sequence':
            df_input[feature] = df_input[feature].apply(lambda x: label_transform(lbe_sku, x))

        elif feature == 'category_path_purchase_seq':
            df_input[feature] = df_input[feature].apply(lambda x: label_transform(lbe_cat, x))
    
    # get feature index table
    feature_max_idx = {}
    for feature in sparse_features:
        feature_max_idx[feature] = df_input[feature].max() + 1
    
    return df_input, feature_max_idx

In [19]:
df_input['sku_number_org'] = df_input['sku_number']
df_input['user_id_org'] = df_input['user_id']

In [20]:
df_input, feature_max_idx = encode_features(df_input)

In [27]:
import concurrent.futures

def encode_features(df_input):
    # store original item id and user id
    df_input['sku_number_org'] = df_input['sku_number']
    df_input['user_id_org'] = df_input['user_id']

    # specify features and sequence features
    sparse_features = ['sku_number', 'category_path', 'user_id']
    seq_sparse_feature = ['sku_purchase_seq','category_path_purchase_seq','sku_view_sequence']

    # get full set of item and category
    full_item_set = np.append(
        np.unique(
            np.concatenate(
                (
                    df_input['sku_number'].values,  # all sku in sales
                    df_input['sku_view_sequence'].explode().values # all sku in views
                )
            )
        )
        , 'na')
    full_cat_set = np.append(df_input['category_path'].unique(), 'na')

    # fit and transform features
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        encoders = {}
        for feature in sparse_features:
            # need to store sku encoder
            if feature == 'sku_number':
                encoders[feature] = LabelEncoder()
                encoders[feature].fit(full_item_set)
            # need to store
            elif feature == 'category_path':
                encoders[feature] = LabelEncoder()
                encoders[feature].fit(full_cat_set)
            else:
                encoders[feature] = LabelEncoder()
            futures.append(executor.submit(lambda feature: (feature, encoders[feature].fit_transform(df_input[feature]) + 1), feature))
        for future in concurrent.futures.as_completed(futures):
            feature, transformed = future.result()
            df_input[feature] = transformed

    # transform sequence features
    for feature in seq_sparse_feature:
        if feature == 'sku_purchase_seq' or feature == 'sku_view_sequence':
            df_input[feature] = df_input[feature].apply(lambda x: label_transform(encoders['sku_number'], x))

        elif feature == 'category_path_purchase_seq':
            df_input[feature] = df_input[feature].apply(lambda x: label_transform(encoders['category_path'], x))

    # get feature index table
    feature_max_idx = {}
    for feature in sparse_features:
        feature_max_idx[feature] = df_input[feature].max() + 1

    return df_input, feature_max_idx


In [28]:
df_input, feature_max_idx = encode_features(df_input)
df_input.shape

(135248, 12)

In [None]:
feature_max_idx

In [21]:
df_input.to_pickle('/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/df_input_20230227.sav')


In [73]:
import pickle
with open("/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/df_input_20230227.pickle", "wb") as f: 
    pickle.dump(df_input, f) 

In [71]:
df_input.to_json('/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/df_input_20230227.json')

In [75]:
full_item_set = np.append(
    np.unique(
        np.concatenate(
            (
                df_input['sku_number'].values,  # all sku in sales
                df_input['sku_view_sequence'].explode().values # all sku in views
            )
        )
    )
    , 'na')

In [76]:
full_item_set

array([1, 2, 3, ..., 11035, 11036, 'na'], dtype=object)

In [93]:
a = df['sku_purchase_seq'][15]
a.shape

(5,)

In [92]:
df['sku_purchase_seq'][15]

array(['10358097', '10683360', '10403125', '10196947', '10228172'],
      dtype=object)

In [83]:
np.concatenate([a, np.zeros((13, a.shape[1]))], axis=0)


IndexError: tuple index out of range

In [102]:
np.lib.pad(a, (50 - a.shape[0],0), 'constant', constant_values=('na'))


array(['na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na',
       'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na',
       'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na',
       'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na', 'na',
       'na', '10358097', '10683360', '10403125', '10196947', '10228172'],
      dtype=object)

In [168]:
full_item_set = np.append(
    np.unique(
        np.concatenate(
            (
                df_input['sku_number'].values,  # all sku in sales
                df_input['sku_view_sequence'].explode().values # all sku in views
            )
        )
    )
    , 'na')
full_cat_set = np.append(df_input['category_path'].unique(), 'na')
# fit and transform features
for feature in sparse_features:
    # need to store sku encoder
    if feature == 'sku_number':
        lbe_sku = LabelEncoder()
        lbe_sku.fit(full_item_set)
        df_input[feature] = lbe_sku.transform(df_input[feature]) + 1
    # need to store
    elif feature == 'category_path':
        lbe_cat = LabelEncoder()
        lbe_cat.fit(full_cat_set)
        df_input[feature] = lbe_cat.transform(df_input[feature]) + 1
    else:
        lbe = LabelEncoder()
        df_input[feature] = lbe.fit_transform(df_input[feature]) + 1 # add one to all the encoded categories labels 

In [171]:
# df_input_copy['category_path_purchase_seq'][0:10].apply(lambda x: label_transform(lbe_cat, x))

In [172]:
# df_input[feature].apply(lambda x: lbe_cat.transform(x) + 1)

In [174]:
# trasnform sequence features
for feature in seq_sparse_feature:
    if feature == 'sku_purchase_seq' or feature == 'sku_view_sequence':
        df_input[feature] = df_input[feature].apply(lambda x: label_transform(lbe_sku, x))
#         df_input[feature] = df_input[feature].apply(lambda x: lbe_sku.transform(x) + 1)

    elif feature == 'category_path_purchase_seq':
        df_input[feature] = df_input[feature].apply(lambda x: label_transform(lbe_cat, x))


In [177]:
# get feature index table
feature_max_idx = {}
for feature in sparse_features:
    feature_max_idx[feature] = df_input[feature].max() + 1


In [181]:
df_input.to_csv("/Users/LINGYU1/work/localspace/data/mik_dnn_model_02222023/df_input_20230224.csv", index=False)