In [1]:

# run main function
path = "/mnt/c/mldata/aliccp"
log_path = "/mnt/c/mldata/aliccp"

In [4]:
import os
import pprint
import tempfile
import argparse

from typing import Dict, Text
import pandas as pd
import numpy as np
import tensorflow as tf
import mlflow
import tensorflow_recommenders as tfrs
import datetime
import gc


df_train = pd.read_parquet(path+'/train_processed/')
print(len(df_train))

#filter out long tail products and reduce size of dataset
df_train['counter'] = 1
df_agg = df_train[['item_id','counter']].groupby('item_id').count()
df_agg = df_agg.sort_values("counter",ascending=False)[:10000]
df_train = pd.merge(df_agg, df_train, on=['item_id', 'item_id'], how='left')
print(len(df_train))
df_train['counter'] = 1
cust_agg = df_train[['user_id','counter']].groupby('user_id').count()
cust_agg = cust_agg.sort_values("counter",ascending=False)[:8000]
df_train = pd.merge(cust_agg, df_train, on=['user_id', 'user_id'], how='left')
print(len(df_train))

train = df_train[df_train['click']==1][['item_id','user_id']]
features = ['item_id','user_id']
for i in features:
    train[i] = pd.to_numeric(train[i], errors='coerce',downcast="integer")
train= train.dropna()

items = train['item_id'].unique()
items = tf.convert_to_tensor(items, dtype=tf.int64)

train = tf.convert_to_tensor(train, dtype=tf.int64)


train = tf.data.Dataset.from_tensor_slices(train)

interactions = train.map(lambda x: {
    "item_id": x[0],
    "user_id": x[1],
})

item_ds = tf.data.Dataset.from_tensor_slices(items)

tf.random.set_seed(42)
shuffled = interactions.shuffle(100000, seed=42, reshuffle_each_iteration=False)

split = round(len(shuffled)*.8)

train = shuffled.take(split)
test = shuffled.skip(split).take(len(shuffled)-split)

item_ds = item_ds.batch(1_000)
user_ids = interactions.batch(1_000_000).map(lambda x: x["user_id"])

unique_items = np.unique(np.concatenate(list(item_ds)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

embedding_dimension = 64
mlflow.log_param("embedding_dimension", embedding_dimension)

user_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(
    vocabulary=unique_user_ids, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
])

item_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(
    vocabulary=unique_items, mask_token=None),
tf.keras.layers.Embedding(len(unique_items) + 1, embedding_dimension)
])

metrics = tfrs.metrics.FactorizedTopK(candidates=item_ds.batch(128).map(item_model))

task = tfrs.tasks.Retrieval(metrics=metrics)

class RetrievalModel(tfrs.Model):
    def __init__(self, user_model, item_model):
        super().__init__()
        self.item_model: tf.keras.Model = item_model
        self.user_model: tf.keras.Model = user_model
        self.task: tf.keras.layers.Layer = task

    def compute_loss(self, features, training=False) -> tf.Tensor:
        # We pick out the user features and pass them into the user model.
        user_embeddings = self.user_model(features['user_id'])
        # And pick out the movie features and pass them into the movie model,
        # getting embeddings back.
        positive_item_embeddings = self.item_model(features['item_id'])

        # The task computes the loss and the metrics.
        return self.task(user_embeddings, positive_item_embeddings)

model = RetrievalModel(user_model, item_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))


cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

model.fit(cached_train, epochs=4)


model.evaluate(test.batch(512), return_dict=True)

index = tfrs.layers.factorized_top_k.BruteForce(model.user_model,k = 50)

index.index_from_dataset(tf.data.Dataset.zip((items.batch(100), items.batch(100).map(model.item_model))))

_, titles = index(tf.constant([42]))
print(titles)
tf.saved_model.save(index, "index")
mlflow.log_artifacts("index", "index")

41999771
9477639


  df_train = pd.merge(cust_agg, df_train, on=['user_id', 'user_id'], how='left')


1473399
Epoch 1/4


ValueError: in user code:

    File "/home/paul/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1249, in train_function  *
        return step_function(self, iterator)
    File "/home/paul/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1233, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/home/paul/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1222, in run_step  **
        outputs = model.train_step(data)
    File "/home/paul/.local/lib/python3.10/site-packages/tensorflow_recommenders/models/base.py", line 68, in train_step
        loss = self.compute_loss(inputs, training=True)
    File "/tmp/ipykernel_218/1915021112.py", line 101, in compute_loss
        return self.task(user_embeddings, positive_item_embeddings)
    File "/home/paul/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/tmp/__autograph_generated_fileo38aqctf.py", line 159, in tf__call
        ag__.if_stmt(ag__.ld(compute_metrics), if_body_5, else_body_5, get_state_7, set_state_7, (), 0)
    File "/tmp/__autograph_generated_fileo38aqctf.py", line 155, in if_body_5
        ag__.for_stmt(ag__.ld(self)._factorized_metrics, None, loop_body_1, get_state_6, set_state_6, (), {'iterate_names': 'metric'})
    File "/tmp/__autograph_generated_fileo38aqctf.py", line 154, in loop_body_1
        ag__.converted_call(ag__.ld(update_ops).append, (ag__.converted_call(ag__.ld(metric).update_state, (ag__.ld(query_embeddings), ag__.ld(candidate_embeddings)[:ag__.converted_call(ag__.ld(tf).shape, (ag__.ld(query_embeddings),), None, fscope)[0]]), dict(true_candidate_ids=ag__.ld(candidate_ids)), fscope),), None, fscope)
    File "/tmp/__autograph_generated_filevtqt02ea.py", line 50, in tf__update_state
        (top_k_predictions, retrieved_ids) = ag__.converted_call(ag__.ld(self)._candidates, (ag__.ld(query_embeddings),), dict(k=ag__.converted_call(ag__.ld(max), (ag__.ld(self)._ks,), None, fscope)), fscope)
    File "/tmp/__autograph_generated_filen507280_.py", line 163, in tf__call
        results = ag__.converted_call(ag__.converted_call(ag__.ld(candidates).map, (ag__.ld(top_scores),), dict(num_parallel_calls=ag__.ld(self)._num_parallel_calls), fscope).reduce, (ag__.ld(initial_state), ag__.ld(top_k)), None, fscope)
    File "/tmp/__autograph_generated_filen507280_.py", line 98, in top_k
        joined_scores = ag__.converted_call(ag__.ld(tf).concat, ([ag__.ld(state_scores), ag__.ld(x_scores)],), dict(axis=1), fscope_2)

    ValueError: Exception encountered when calling layer 'retrieval_1' (type Retrieval).
    
    in user code:
    
        File "/home/paul/.local/lib/python3.10/site-packages/tensorflow_recommenders/tasks/retrieval.py", line 197, in call  *
            update_ops.append(
        File "/home/paul/.local/lib/python3.10/site-packages/tensorflow_recommenders/metrics/factorized_top_k.py", line 136, in update_state  *
            top_k_predictions, retrieved_ids = self._candidates(
        File "/home/paul/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 70, in error_handler  **
            raise e.with_traceback(filtered_tb) from None
        File "/tmp/__autograph_generated_filen507280_.py", line 163, in tf__call
            results = ag__.converted_call(ag__.converted_call(ag__.ld(candidates).map, (ag__.ld(top_scores),), dict(num_parallel_calls=ag__.ld(self)._num_parallel_calls), fscope).reduce, (ag__.ld(initial_state), ag__.ld(top_k)), None, fscope)
        File "/tmp/__autograph_generated_filen507280_.py", line 98, in top_k
            joined_scores = ag__.converted_call(ag__.ld(tf).concat, ([ag__.ld(state_scores), ag__.ld(x_scores)],), dict(axis=1), fscope_2)
    
        ValueError: Exception encountered when calling layer 'streaming_1' (type Streaming).
        
        in user code:
        
            File "/home/paul/.local/lib/python3.10/site-packages/tensorflow_recommenders/layers/factorized_top_k.py", line 454, in top_k  *
                joined_scores = tf.concat([state_scores, x_scores], axis=1)
        
            ValueError: Shape must be rank 2 but is rank 3 for '{{node concat}} = ConcatV2[N=2, T=DT_FLOAT, Tidx=DT_INT32](args_0, args_2, concat/axis)' with input shapes: [?,0], [?,?,?], [].
        
        
        Call arguments received by layer 'streaming_1' (type Streaming):
          • queries=tf.Tensor(shape=(None, 64), dtype=float32)
          • k=100
    
    
    Call arguments received by layer 'retrieval_1' (type Retrieval):
      • query_embeddings=tf.Tensor(shape=(None, 64), dtype=float32)
      • candidate_embeddings=tf.Tensor(shape=(None, 64), dtype=float32)
      • sample_weight=None
      • candidate_sampling_probability=None
      • candidate_ids=None
      • compute_metrics=True
      • compute_batch_metrics=True
