In [1]:
import tensorflow as tf
import modin.pandas as pd
import numpy as np
import pickle

import ray
ray.init(ignore_reinit_error=True)

E0414 03:30:52.946936776    4131 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0414 03:30:52.960351721    4131 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0414 03:30:52.970095364    4131 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


{'node_ip_address': '172.17.0.2',
 'raylet_ip_address': '172.17.0.2',
 'redis_address': None,
 'object_store_address': '/tmp/ray/session_2022-04-14_03-30-51_881425_4131/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-04-14_03-30-51_881425_4131/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2022-04-14_03-30-51_881425_4131',
 'metrics_export_port': 46999,
 'gcs_address': '172.17.0.2:52573',
 'address': '172.17.0.2:52573',
 'node_id': '38e4acfb537bf6a93aec07a49fd5f7a5aba72a1b53ab814b3be56c3d'}

In [2]:
tf_transactions = tf.data.experimental.load("tf_transactions")
tf_items = tf.data.experimental.load("tf_items")

2022-04-14 03:30:56.937061: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-14 03:30:56.987880: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-14 03:30:56.988082: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-14 03:30:56.988939: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate

In [3]:
tf_transactions = tf_transactions.map(lambda x: {'customer_id' : x['customer_id'], 
                                            'product_code' : x['product_code'], 
                                            'quantity' : float(x['quantity']),})

tf_items = tf_items.map(lambda x: x['product_code'])

In [None]:
# Get lookup table of unique items and customers

In [4]:
unique_item_titles = np.unique(np.concatenate(list(tf_items.batch(1000))))
unique_customer_ids = np.unique(np.concatenate(list(tf_transactions.batch(1_000).map(lambda x: x["customer_id"]))))

In [9]:
# Save lookup tables
with open('unique_item_titles.pkl','wb') as handle:
    pickle.dump(unique_item_titles, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
with open('unique_customer_ids.pkl','wb') as handle:
    pickle.dump(unique_customer_ids, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Shuffle dataset and split 80/20

In [11]:
tf.random.set_seed(100)
shuffled = tf_transactions.shuffle(100_000, seed=100, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

In [12]:
len(shuffled)

24415302

In [13]:
embedding_dimension = 32

In [14]:
customer_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_customer_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens.
  tf.keras.layers.Embedding(len(unique_customer_ids) + 1, embedding_dimension)
])

In [15]:
item_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_item_titles, mask_token=None),
  tf.keras.layers.Embedding(len(unique_item_titles) + 1, embedding_dimension)
])

In [16]:
import tensorflow_recommenders as tfrs

In [17]:
metrics = tfrs.metrics.FactorizedTopK(
  candidates=tf_items.batch(128).map(item_model)
)

In [18]:
task = tfrs.tasks.Retrieval(
  metrics=metrics
)

In [19]:
from typing import Dict, Text

class HMRecommenderModel(tfrs.Model):

  def __init__(self, user_model, movie_model):
    super().__init__()
    self.item_model: tf.keras.Model = item_model
    self.customer_model: tf.keras.Model = customer_model
    self.task: tf.keras.layers.Layer = task

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the user features and pass them into the customer model.
    customer_embeddings = self.customer_model(features["customer_id"])
    # And pick out the item features and pass them into the item model,
    # getting embeddings back.
    positive_item_embeddings = self.item_model(features["product_code"])

    # The task computes the loss and the metrics.
    return self.task(customer_embeddings, positive_item_embeddings)

In [20]:
model = HMRecommenderModel(customer_model, item_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

In [21]:
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

In [22]:
model.fit(cached_train, epochs=3)

Epoch 1/3


2022-04-14 03:46:12.918880: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f0c3eda6a00>

In [23]:
model.evaluate(cached_test, return_dict=True)



{'factorized_top_k/top_1_categorical_accuracy': 0.00044999999227002263,
 'factorized_top_k/top_5_categorical_accuracy': 0.003949999809265137,
 'factorized_top_k/top_10_categorical_accuracy': 0.008200000040233135,
 'factorized_top_k/top_50_categorical_accuracy': 0.028950000181794167,
 'factorized_top_k/top_100_categorical_accuracy': 0.04805000126361847,
 'loss': 29752.251953125,
 'regularization_loss': 0,
 'total_loss': 29752.251953125}

In [24]:
# Create a model that takes in raw query features, and
index = tfrs.layers.factorized_top_k.BruteForce(model.customer_model, k=12)
# recommends movies out of the entire movies dataset.
index.index_from_dataset(
  tf.data.Dataset.zip((tf_items.batch(100), tf_items.batch(100).map(model.item_model)))
)

<tensorflow_recommenders.layers.factorized_top_k.BruteForce at 0x7f0be47e8220>

In [26]:
_, items = index(tf.constant(["00000dbacae5abe5e23885899a1fa44253a17956c6d1c3d25f88aa139fdfc657"]))
print(f"Recommendations for user: {items}")

Recommendations for user: [[b'591334' b'568601' b'605094' b'859416' b'188183' b'590928' b'681373'
  b'582480' b'179950' b'678260' b'745232' b'664074']]


In [25]:
unique_customer_ids

array([b'00000dbacae5abe5e23885899a1fa44253a17956c6d1c3d25f88aa139fdfc657',
       b'0000423b00ade91418cceaf3b26c6af3dd342b51fd051eec9c12fb36984420fa',
       b'000058a12d5b43e67d225668fa1f8d618c13dc232df0cad8ffe7ad4a1091e318',
       ...,
       b'ffffcf35913a0bee60e8741cb2b4e78b8a98ee5ff2e6a1778d0116cffd259264',
       b'ffffd7744cebcf3aca44ae7049d2a94b87074c3d4ffe38b2236865d949d4df6a',
       b'ffffd9ac14e89946416d80e791d064701994755c3ab686a1eaf3458c36f52241'],
      dtype=object)

In [None]:
# Look into https://www.datarobot.com/jp/platform/mlops/