# Deep Neural networks for youtube recommendations ( generator)

- data source: watch history + user search history + other features 
- watch history:
    - a vriable-length sequence of saprse video Ids ==> a dense vector (embeddings)
    - fixed-sized dense inputs and averaging the embeddings (sum, or componnet-wise max, etc)
    - embeddings are learned 
    - embedding: fixed vocabulary; 

- user search history: similar to watch history 
    - query : tokenized into unigrams and bigrams 
    - token is embeded 
    - average ==> the user's tokenized, embeded queries represent a summarized dense search history

- other features: 
    - geographic region+ device
    - other features: simple binary and continous features (s.t. the user's gender, logged-in state, age) ==> normalized into 0-1
    - example age: 
        - now() - video update time
        - fresh videos are more preferable 
        - feature enginerring: example age, example age^2, sqrt(example age)

In [1]:
import pandas as pd
import tensorflow as tf
import json 
import os
import numpy as np
import math
print(tf.__version__)
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CollectiveCommunication.AUTO


## Preprocessing data

- the data set form [Retailrocket recommender system dataset](https://www.kaggle.com/retailrocket/ecommerce-dataset)
- event: user behaviour (only concerning 'transaction') 
- item_prop: item properties, changed by time, all values are hased. 

In [2]:
item_prop = pd.read_csv('data/item_properties_part1.csv')
item_prop2 = pd.read_csv('data/item_properties_part2.csv')
event = pd.read_csv('data/events.csv')

In [3]:
print("user behaviors:", event.columns)
print(event.head(5))
print("user behviors include:", event['event'].unique())

# only keep transaction, delete 'view' 'addtocart'
event = event[event['event']=='transaction'].drop(columns={'event','transactionid'})

user behaviors: Index(['timestamp', 'visitorid', 'event', 'itemid', 'transactionid'], dtype='object')
       timestamp  visitorid event  itemid  transactionid
0  1433221332117     257597  view  355908            NaN
1  1433224214164     992329  view  248676            NaN
2  1433221999827     111016  view  318965            NaN
3  1433221955914     483717  view  253185            NaN
4  1433221337106     951259  view  367447            NaN
user behviors include: ['view' 'addtocart' 'transaction']


### Preprocess item property 

In [4]:
print("preview item properties data: ",item_prop.head(10))
print("how many properties for item?", len(item_prop['property'].unique()))

preview item properties data:         timestamp  itemid    property                            value
0  1435460400000  460429  categoryid                             1338
1  1441508400000  206783         888          1116713 960601 n277.200
2  1439089200000  395014         400  n552.000 639502 n720.000 424566
3  1431226800000   59481         790                       n15360.000
4  1431831600000  156781         917                           828513
5  1436065200000  285026   available                                0
6  1434250800000   89534         213                          1121373
7  1431831600000  264312           6                           319724
8  1433646000000  229370         202                          1330310
9  1434250800000   98113         451                  1141052 n48.000
how many properties for item? 1097


In [5]:
# remove categoryid and available in properties
item_prop = item_prop[(item_prop['property'] != 'categoryid')& (item_prop['property'] != 'available')]
item_prop2 = item_prop2[(item_prop2['property'] != 'categoryid')& (item_prop2['property'] != 'available')]

In [6]:
# properties are not uniformed distributed, some properties are common among all items, some properties are only specific to some items
popular_prop = item_prop.groupby('property')\
                        .size()\
                        .reset_index(name='count')\
                        .sort_values(by='count',ascending=False)
popular_prop[['count']].boxplot()

<matplotlib.axes._subplots.AxesSubplot at 0x678074198>

In [7]:
print("the most popular properties are:\n", popular_prop.head(5))

the most popular properties are:
     property    count
972      888  1629817
864      790   970800
654        6   343207
308      283   323681
848      776   311654


In [8]:
# only select the top 5 popular item properties 
# thred = popular_prop['count'].quantile(0.98)
thred = popular_prop['count'].iloc[5] 
poperty_selected = popular_prop[popular_prop['count'] > thred]['property'].values.tolist()

item_prop = item_prop[item_prop['property'].isin(poperty_selected)]
item_prop2 = item_prop2[item_prop2['property'].isin(poperty_selected)]

# only select items that are appeared in user event
item_prop= item_prop[item_prop['itemid'].isin(event.itemid.values.tolist())]
item_prop2= item_prop2[item_prop2['itemid'].isin(event.itemid.values.tolist())]

items = pd.concat([item_prop,item_prop2])
del item_prop,item_prop2

In [9]:
# check null data
items.isnull().any()

timestamp    False
itemid       False
property     False
value        False
dtype: bool

In [10]:
# expand property into columns 
items = items.pivot_table(index=['timestamp','itemid'], 
                    columns='property', 
                    values='value',
                    aggfunc='first').reset_index()

In [11]:
## convert all the columns into string 

In [12]:
print("preview items: \n",items.head(5))
print("item columns are:\n",items.columns)

preview items: 
 property      timestamp  itemid  \
0         1431226800000      15   
1         1431226800000      19   
2         1431226800000      25   
3         1431226800000      42   
4         1431226800000     147   

property                                                283                6  \
0         433564 245772 789221 809278 245772 1213953 429...              NaN   
1                984060 150169 1037891 743822 552121 119805   353870 1310600   
2                                                       NaN          1272323   
3                       1285402 1042990 362953 731607 73247  1285402 1042990   
4         726714 n36.000 1128577 n12.000 322971 229273 3...              NaN   

property      776          790                                           888  
0         1132786       n0.000                                           NaN  
1             NaN   n14160.000                                        119805  
2             NaN   n46800.000                        

In [13]:
items['790'] = items['790'].fillna('n0.000') # this is the number property (maybe price)
items = items.fillna('') #fill other prorperty with text ''

items['item_prop_text'] = items['283']+ items['6']+items['776']+items['888']
items['item_prop_num'] = items['790']

# remove n in item_prop_num 
items['item_prop_num']  = items['item_prop_num'].str\
                            .replace('n(-*\d+)\.\d{3}','\\1', regex=True)\
                            .astype(int)

# remove n+numbers, only remain text hashes 
items['item_prop_text'] = items['item_prop_text'].replace('n-*\d+\.\d{3}', '', regex=True)

items = items.drop(columns= {'283', '6', '776', '790', '888'})

In [14]:
## remap hashed text into 0-n (since hased number is too big for int type)
text_list = items['item_prop_text']\
            .apply(lambda x: list(set([int(ele) for ele in x.split(' ') 
                                        if (ele !='') and ('n' not in ele)]))).values.tolist()

text_list = list(set([item for sublist in text_list for item in sublist]))
text_list = sorted(text_list, key=int)
map_word = dict(zip(text_list,range(len(text_list))))

items['item_prop_text'] = items['item_prop_text'].apply(lambda x: [map_word[int(ele)] for ele in x.split(' ') 
                                        if (ele !='') and ('n' not in ele)])

### preprocessing event data 

In [15]:
print("the event data:\n",event.head(5))
print("the items data:\n",items.head(5))

the event data:
          timestamp  visitorid  itemid
130  1433222276276     599528  356475
304  1433193500981     121688   15335
418  1433193915008     552148   81345
814  1433176736375     102019  150318
843  1433174518180     189384  310791
the items data:
 property      timestamp  itemid  \
0         1431226800000      15   
1         1431226800000      19   
2         1431226800000      25   
3         1431226800000      42   
4         1431226800000     147   

property                                     item_prop_text  item_prop_num  
0         [10441, 5955, 18899, 19372, 5955, 28994, 10324...              0  
1          [23503, 3615, 24805, 17840, 13270, 33803, 42269]          14160  
2                                            [42021, 29404]          46800  
3                 [30686, 24931, 8766, 17542, 38003, 45961]         244680  
4         [17424, 26928, 7822, 5498, 9397, 29760, 7795, ...         352416  


In [16]:
# combine item and event
temp = event.merge(items,on=['itemid'])

print("merged data columns are:",temp.columns)


temp = temp.rename(columns={'timestamp_x':'event_time', 'timestamp_y':'items_time'})
# the item property changed timestamp should before event timestamp (aka transaction time)
temp = temp[temp['event_time'] >temp['items_time'] ]

merged data columns are: Index(['timestamp_x', 'visitorid', 'itemid', 'timestamp_y', 'item_prop_text',
       'item_prop_num'],
      dtype='object')


In [17]:
# for each buying item, select only the most recently mofified item property
temp['timestamp_diff'] = temp['event_time'] - temp['items_time']
recent_temp = temp.groupby(['itemid','event_time'])['timestamp_diff'].min().reset_index()

#each buying item should only have one corresponding item property
assert recent_temp[['itemid','event_time']].duplicated().any() == False 

#  timestamp_diff == example_age (in the paper, this is kept)
pre_event = pd.merge(recent_temp,temp, on=['timestamp_diff','itemid','event_time'])\
            .drop(columns={'items_time','timestamp_diff'})

In [18]:
pre_event.head(10)

Unnamed: 0,itemid,event_time,visitorid,item_prop_text,item_prop_num
0,15,1436478142936,1124964,[27049],8400
1,19,1439663329849,325833,[],18600
2,25,1433743130655,456617,"[10073, 29404]",37320
3,25,1438647361098,575295,"[10073, 29404, 27596]",39000
4,42,1439431902653,432404,"[1753, 17418]",199080
5,147,1441036654674,582525,"[5498, 9397, 29760, 7795, 25807, 26362, 26928,...",375816
6,147,1441037956153,582525,"[5498, 9397, 29760, 7795, 25807, 26362, 26928,...",375816
7,147,1441038151733,582525,"[5498, 9397, 29760, 7795, 25807, 26362, 26928,...",375816
8,147,1441119453098,582525,"[5498, 9397, 29760, 7795, 25807, 26362, 26928,...",375816
9,168,1442451767273,152963,[],46284


#### For each event, calculate purchasing history (purchased items)

In [19]:
pre_event2 = pre_event.copy()
explod_event = pre_event.merge(pre_event2,on=['visitorid'])

print("the shape of expanded event df: ",explod_event.shape)
print("the shape of orginal event df:", pre_event.shape)
print("the columns of expanded event df:", explod_event.columns)

explod_event = explod_event.rename(columns={
                                    'event_time_x':'timestamp_hist', 
                                    'itemid_x':'itemid_hist', 
                                    'item_prop_text_x':'item_prop_text_hist',
                                    'item_prop_num_x':'item_prop_num_hist',
                                    'event_time_y':'timestamp_now',
                                    'itemid_y':'itemid_now',
                                    'item_prop_text_y':'item_prop_text_now',
                                    'item_prop_num_y':'item_prop_num_now'
                                    })

# purchasing history timestamp should smaller than this purchasing timestamp
explod_event = explod_event[explod_event['timestamp_hist']<explod_event['timestamp_now']] 

the shape of expanded event df:  (860785, 9)
the shape of orginal event df: (20757, 5)
the columns of expanded event df: Index(['itemid_x', 'event_time_x', 'visitorid', 'item_prop_text_x',
       'item_prop_num_x', 'itemid_y', 'event_time_y', 'item_prop_text_y',
       'item_prop_num_y'],
      dtype='object')


In [20]:
explod_event.head(5)

Unnamed: 0,itemid_hist,timestamp_hist,visitorid,item_prop_text_hist,item_prop_num_hist,itemid_now,timestamp_now,item_prop_text_now,item_prop_num_now
2,15,1436478142936,1124964,[27049],8400,66643,1436478142968,[],53040
7,15,1436478142936,1124964,[27049],8400,310720,1436478142968,[],18720
8,15,1436478142936,1124964,[27049],8400,348245,1436478142983,[],8520
10,16052,1436478142874,1124964,[],50880,15,1436478142936,[27049],8400
12,16052,1436478142874,1124964,[],50880,66643,1436478142968,[],53040


In [21]:
# combine the purched hist into list 
event_hist = explod_event.groupby(['timestamp_now','visitorid'])\
                        .agg({'item_prop_text_hist':lambda col: col.tolist(), 
                              'item_prop_num_hist':lambda col: col.tolist(), 
                              'itemid_hist': lambda col: col.tolist(),
                              'itemid_now': lambda col: col.tolist()[0],
                             'item_prop_text_now': lambda col: col.tolist()[0],
                             'item_prop_num_now': lambda col: col.tolist()[0]})\
                        .reset_index()

In [22]:
event_hist.head(5)

Unnamed: 0,timestamp_now,visitorid,item_prop_text_hist,item_prop_num_hist,itemid_hist,itemid_now,item_prop_text_now,item_prop_num_now
0,1431264802442,894034,[[7082]],[18000],[262813],36026,"[6895, 18406, 15220, 16079, 26709, 15220, 1607...",9360
1,1431264802458,894034,"[[6895, 18406, 15220, 16079, 26709, 15220, 160...","[9360, 18000]","[36026, 262813]",120050,"[2989, 15925, 35315]",23280
2,1431285199338,176878,[[20300]],[566280],[429369],217089,[34078],21840
3,1431286835450,527815,"[[47460, 28904]]",[145920],[40808],408139,"[16136, 15543, 16223, 844, 12759]",0
4,1431290048653,1392946,[[]],[121320],[241555],412622,"[15347, 10705, 21241, 740, 16170, 15347, 15331...",103920


In [23]:
# item in event (item that is purchasing now) is should not appeared in purchasing history 
# event_hist = event_hist[~ event_hist.apply(lambda x: x.itemid_now in x.itemid_hist, axis=1)]

In [24]:
event_hist["item_prop_hist_num_count"] =  event_hist["item_prop_num_hist"].apply(lambda x:len(x))
event_hist["item_prop_hist_num_mean"] =  event_hist["item_prop_num_hist"].apply(lambda x: np.mean(x))
event_hist["item_prop_hist_num_median"] =  event_hist["item_prop_num_hist"].apply(lambda x:np.median(x))
event_hist["item_prop_hist_num_max"] =  event_hist["item_prop_num_hist"].apply(lambda x: np.max(x))
event_hist["item_prop_hist_num_min"] =  event_hist["item_prop_num_hist"].apply(lambda x:np.min(x))

In [25]:
# event_hist['itemid_hist'] = event_hist['itemid_hist']\
#                             .apply(lambda x: [int(ele) for ele in x[1:-1].split(',')])

In [26]:
event_hist.dtypes

timestamp_now                  int64
visitorid                      int64
item_prop_text_hist           object
item_prop_num_hist            object
itemid_hist                   object
itemid_now                     int64
item_prop_text_now            object
item_prop_num_now              int64
item_prop_hist_num_count       int64
item_prop_hist_num_mean      float64
item_prop_hist_num_median    float64
item_prop_hist_num_max         int64
item_prop_hist_num_min         int64
dtype: object

## Trian the model
- only use user purchased items info 
- embedding for purchased item text data 
    - `itemid_hist`
- some statical features for purchased item nuemrical info
    - `item_prop_hist_num_count`,`item_prop_hist_num_mean`,`item_prop_hist_num_median`,
    `item_prop_hist_num_max`,`item_prop_hist_num_min`

In [28]:
def input_fn(epoch_num,batch_size):
    df = event_hist[["itemid_hist","item_prop_hist_num_count",
                                       "item_prop_hist_num_mean",
                                       "item_prop_hist_num_median",
                                       "item_prop_hist_num_max",
                                       "item_prop_hist_num_min",'itemid_now']]
    dataframe = df.copy()

    labels = dataframe.pop('itemid_now')
    features = {
        'itemid_hist':tf.ragged.constant(dataframe[['itemid_hist']].values, dtype= tf.int32),
        'cont_features': tf.constant(dataframe[["item_prop_hist_num_count",
                                       "item_prop_hist_num_mean",
                                       "item_prop_hist_num_median",
                                       "item_prop_hist_num_max",
                                       "item_prop_hist_num_min"]].values, dtype=tf.float32)
    }
   
    data_set = tf.data.Dataset.from_tensor_slices((features, labels))

    data_set = data_set.repeat(epoch_num)
    data_set = data_set.prefetch(buffer_size=100)
    data_set = data_set.batch(batch_size=batch_size)
    
    return data_set

In [None]:
def model_fn(features, labels, mode):
    visit_items_index = features['itemid_hist']
    cont_features = features['cont_features']

    next_visit_item_index = labels

    num_buckets = len(text_list)+1  #visit_items_index.values.max()[0]+1
    embedding_size = 300
    keep_prob = 0.8
    top_k = 10

    # items embedding 
    item_embedding = tf.Variable(
        tf.random.truncated_normal(
            [num_buckets, embedding_size], stddev=1.0 / math.sqrt(embedding_size)
        ))

    # visited item history
    visit_items_embedding = tf.nn.embedding_lookup(item_embedding, visit_items_index)   # [Batch, None, None, embedding]
    visit_items_average_embedding = tf.reduce_mean(visit_items_embedding, axis=1)        # [Batch, , None, embedding]
    visit_items_average_embedding = tf.reduce_mean(visit_items_average_embedding, axis=1) # [Batch, embedding]

    input_embedding = tf.concat([visit_items_average_embedding,cont_features], 1)       #  [Batch, embedding + 5]
    # print("the shape of input_embedding is:", input_embedding.shape)

    layer_1 = tf.keras.layers.Dense(64, activation=tf.nn.relu, name="layer_1")(input_embedding) 

    layer_dropout_1 = tf.nn.dropout(layer_1, keep_prob, name="layer_dropout_1")
    
    layer_2 = tf.keras.layers.Dense(32, 
                                    activation=tf.nn.relu, 
                                    name="layer_2")(layer_dropout_1)

    layer_dropout_2 = tf.nn.dropout(layer_2, keep_prob, name="layer_dropout_2")

   
    user_vector = tf.keras.layers.Dense(embedding_size, 
                                        activation=tf.nn.relu,
                                        name="user_vector")(layer_dropout_2)

    
    if mode == tf.estimator.ModeKeys.TRAIN:
        output_embedding = tf.nn.embedding_lookup(item_embedding, next_visit_item_index)   # num * embedding_size
        logits = tf.matmul(user_vector, output_embedding, transpose_a=False, transpose_b=True)  # num * num
        yhat = tf.nn.softmax(logits)  # num * num
        
        cross_entropy = tf.reduce_mean(-tf.math.log(tf.linalg.diag_part(yhat) + 1e-16))
        
        optimizer = tf.compat.v1.train.GradientDescentOptimizer(1e-4)
        
        train = optimizer.minimize(cross_entropy, tf.compat.v1.train.get_or_create_global_step())
        
        return tf.estimator.EstimatorSpec(mode, loss=cross_entropy, train_op=train)

    if mode == tf.estimator.ModeKeys.EVAL:
        output_embedding = tf.nn.embedding_lookup(item_embedding, next_visit_item_index)  # num * embedding_size
        logits = tf.matmul(user_vector, output_embedding, transpose_a=False, transpose_b=True)  # num * num
        yhat = tf.nn.softmax(logits)  # num * num
        
        cross_entropy = tf.reduce_mean(-tf.math.log(tf.linalg.diag_part(yhat) + 1e-16))
        
        return tf.estimator.EstimatorSpec(mode, loss=cross_entropy)

    if mode == tf.estimator.ModeKeys.PREDICT:
        logits_predict = tf.matmul(user_vector, item_embedding, transpose_a=False, transpose_b=True)  # num *  item_num
        yhat_predict = tf.nn.softmax(logits_predict)  # num *  item_num
        _, indices = tf.nn.top_k(yhat_predict, k=top_k, sorted=True)
        index = tf.identity(indices, name="index")  # num * top_k
        predictions = {
            "user_vector": user_vector,
            "index": index
        }
        export_outputs = {
            "prediction": tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)

INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode
INFO:tensorflow:Using config: {'_model_dir': 'temp/', '_tf_random_seed': 2019, '_save_summary_steps': 100, '_save_checkpoints_steps': 100, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x6775c0358>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_cre

In [None]:
config = tf.estimator.RunConfig(
    model_dir="temp/",
    tf_random_seed=2019,
    save_checkpoints_steps=100,
    keep_checkpoint_max=5,
    log_step_count_steps=100,
    train_distribute=strategy
)
classifier = tf.estimator.Estimator(model_fn=model_fn, config=config)

train_spec = tf.estimator.TrainSpec(
    input_fn=lambda: input_fn(
        epoch_num=11,
        batch_size=32),
    max_steps=1000
)

eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda: input_fn(
        epoch_num=1,
        batch_size=32),
    steps=15,           # eval on how many data 
    start_delay_secs=1, # how many seconds later to start evaluate 
    throttle_secs=20    # evaluate every 20seconds
)

tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)