In [1]:
import os
import json
import random
import shutil
import logging
import numpy as np
import pandas as pd
import multiprocessing
import tensorflow as tf
from dask import dataframe as dd
from sklearn.utils import shuffle

## Preparing the data

In [2]:
transactions = dd.read_csv('data/transactions_train.csv')[["customer_id", "article_id"]]
customer_purchase_number = transactions.groupby("customer_id").size().to_frame("prod_number").reset_index()
transactions = transactions.merge(customer_purchase_number, on="customer_id", how="inner")

c_ids = transactions.customer_id.unique()
number_of_customer = len(c_ids)
customer_encoding = {c_id: i for i, c_id in enumerate(c_ids)}
p_ids = transactions.article_id.unique()
number_of_products = len(p_ids)
product_encoding = {p_id: i for i, p_id in enumerate(p_ids)}
with open('model_data/customer_id_encoding.json', 'w') as fp:
    json.dump(customer_encoding, fp)

with open('model_data/product_id_encoding.json', 'w') as fp:
    json.dump(product_encoding, fp)

transactions.customer_id = transactions.customer_id.map(customer_encoding)
transactions.article_id = transactions.article_id.map(product_encoding)
p_ids = list(product_encoding.values())
del c_ids, product_encoding, customer_encoding, customer_purchase_number

In [3]:
train, test = transactions.random_split([0.9, 0.1], random_state=43)
df = test.merge(train[["customer_id"]], on=["customer_id"], how="outer", indicator=True)
train = dd.concat([train, df[df['_merge'] == 'left_only'][["customer_id", "article_id"]]], axis=0, ignore_index=True, interleave_partitions=True, ignore_order=True)
del transactions, df, test
train = train.compute()

In [4]:
def prepare_data(transactions, p_ids):
    transactions = transactions.groupby(["customer_id"])['article_id']\
                                .apply(lambda x: list(x), meta=("article_ids",object))\
                                .reset_index().compute().drop_duplicates(subset=["customer_id"])

    transactions["hist_len"] = transactions.article_ids.apply(lambda x: 128 if len(x)>128 else len(x))
    transactions["prod_ids"] = transactions.apply(lambda x: random.sample(x.article_ids, x.hist_len), axis=1)
    transactions["not_prods"] = transactions.prod_ids.apply(lambda x: [p_id for p_id in random.sample(p_ids, 256) if p_id not in x][:128])

    return transactions[["customer_id", "prod_ids", "not_prods"]]

train = prepare_data(dd.from_pandas(train, npartitions=4), p_ids)
train.to_pickle('data/train.pkl')
del p_ids

## Data Loader

In [2]:
class Generator(tf.keras.utils.Sequence):
    def __init__(self, data, positive_sample_length, negative_sample_length, batch_size):
        self.data = data
        self.positive_sample_length = positive_sample_length
        self.negative_sample_length = negative_sample_length
        self.sample_length = positive_sample_length + negative_sample_length
        self.batch_size = batch_size
        if batch_size % self.sample_length != 0:
            raise ValueError("batch_size must be divisible by sum of positive_sample_length and negative_sample_length")


    def user_info_generator(self):
        for i, row in self.data[["customer_id","prod_ids","not_prods"]].iterrows():
            pids = np.asarray(random.sample(row["prod_ids"], self.positive_sample_length) + random.sample(row["not_prods"], self.negative_sample_length))
            labels = np.asarray([1]*self.positive_sample_length + [0]*self.negative_sample_length)
            indices = np.arange(self.sample_length)
            np.random.shuffle(indices)
            yield tf.convert_to_tensor(np.asarray([pids[indices],labels[indices]], dtype=np.int32), dtype=tf.int32)

    def user_id_generator(self):
        for i, row in self.data[["customer_id"]].iterrows():
            customer_id = [int(row["customer_id"])] * self.sample_length
            yield tf.convert_to_tensor(customer_id, dtype=tf.int32)

class BatchGenerator(Generator):
    def __init__(self, data, positive_sample_length, negative_sample_length, batch_size):
        super().__init__(data, positive_sample_length, negative_sample_length, batch_size)
        self.mini_batch = int(batch_size / self.sample_length)
        self.batch = batch_size

        self.user_info_loader = tf.data.Dataset.from_generator(
        self.user_info_generator, output_types=tf.int32).batch(self.mini_batch, drop_remainder=True)
        self.user_id_loader = tf.data.Dataset.from_generator(
        self.user_id_generator, output_types=tf.int32).batch(self.mini_batch, drop_remainder=True)

    def _get_batch(self):
        for c_ids, info in zip(self.user_id_loader, self.user_info_loader):
            x = tf.stack([tf.reshape(c_ids, self.batch), tf.reshape(info[:,0,:], self.batch)], axis=1)
            y = tf.reshape(info[:,1,:], self.batch)
            yield x, y

class DataGenerator (BatchGenerator):
    def __init__(self, data, positive_sample_length, negative_sample_length, batch_size, validation=False):
        super().__init__(data, positive_sample_length, negative_sample_length, batch_size)
        self.len_data = len(self.data) // self.batch_size
        self.validation = validation
        if validation:
            self.val_data = self.data.copy()
            self.data = self.val_data.sample(frac=0.1)

    def on_epoch_end(self):
        if self.validation:
            self.data = self.val_data.sample(frac=0.1)
        else:
            shuffle(self.data)

    def __len__(self):
        return self.len_data

    def __getitem__(self, index):
        return next(iter(self._get_batch()))

    def get(self):
        return self._get_batch()

In [3]:
train_data = pd.read_pickle("data/train.pkl")

In [4]:
batch_size = 128
train_generator = DataGenerator(train_data, 1, 3, batch_size)

## Matrix Factorization Model (GMF)

In [5]:
class GmfNet(tf.keras.Model):
    def __init__(self, num_users, num_prods, embedding_size, **kwargs):
        super(GmfNet, self).__init__(**kwargs)
        self.num_users = num_users
        self.num_prods = num_prods
        self.embedding_size = embedding_size
        self.user_embedding = tf.keras.layers.Embedding(
            num_users,
            embedding_size,
            embeddings_initializer="he_normal",
            embeddings_regularizer= tf.keras.regularizers.l2(1e-6),
        )
        self.user_bias = tf.keras.layers.Embedding(num_users, 1)
        self.prod_embedding = tf.keras.layers.Embedding(
            num_prods,
            embedding_size,
            embeddings_initializer="he_normal",
            embeddings_regularizer= tf.keras.regularizers.l2(1e-6),
        )
        self.prod_bias = tf.keras.layers.Embedding(num_prods, 1)

    def call(self, inputs):
        user_vector = self.user_embedding(inputs[:, 0])
        user_bias = self.user_bias(inputs[:, 0])
        prod_vector = self.prod_embedding(inputs[:, 1])
        prod_bias = self.prod_bias(inputs[:, 1])
        dot_user_prod = tf.tensordot(user_vector, prod_vector, 2)
        x = dot_user_prod + user_bias + prod_bias

        return tf.nn.sigmoid(x)

In [6]:
class MlpNet(tf.keras.Model):
    def __init__(self, num_users, num_prods, embedding_size, **kwargs):
        super(MlpNet, self).__init__(**kwargs)
        self.num_users = num_users
        self.num_prods = num_prods
        self.embedding_size = embedding_size
        self.user_embedding = tf.keras.layers.Embedding(
            num_users,
            embedding_size,
            embeddings_initializer="he_normal",
            embeddings_regularizer= tf.keras.regularizers.l2(1e-6),
        )
        self.prod_embedding = tf.keras.layers.Embedding(
            num_prods,
            embedding_size,
            embeddings_initializer="he_normal",
            embeddings_regularizer= tf.keras.regularizers.l2(1e-6),
        )

        self.prediction = tf.keras.Sequential([
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(embedding_size, activation="relu", name="layer1"),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(embedding_size, activation="relu", name="layer2"),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(embedding_size, activation="relu", name="layer3"),
            tf.keras.layers.Dense(embedding_size, activation="relu", name="layer4"),
        ])

    def call(self, inputs):
        user_vector = self.user_embedding(inputs[:, 0])
        prod_vector = self.prod_embedding(inputs[:, 1])

        return self.prediction(tf.concat([user_vector, prod_vector], axis=1))

In [7]:
class RecommenderNet(tf.keras.Model):
    def __init__(self, num_users, num_prods, embedding_size, **kwargs):
        super(RecommenderNet, self).__init__(**kwargs)
        self.gmf = GmfNet(num_users, num_prods, embedding_size)
        self.mlp = MlpNet(num_users, num_prods, embedding_size)
        self.pred = tf.keras.layers.Dense(1, activation="sigmoid")

    def call(self, inputs):
        gmf_output = self.gmf(inputs)
        mlp_output = self.mlp(inputs)
        return self.pred(tf.concat([gmf_output, mlp_output], axis=1))

In [8]:
#model = RecommenderNet(number_of_customer, c, 128) #1362281 104547
model = RecommenderNet(1362281, 104547, 128)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.BinaryCrossentropy(),
    metrics=["accuracy"])
es = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=5)
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath="weihts/collabrative",
    verbose=1,
    save_weights_only=True,
    monitor='loss',
    mode='min')
cores = multiprocessing.cpu_count()
history = model.fit(train_generator, epochs=25, callbacks=[es, checkpoint], workers=cores)

Epoch 1/25


ResourceExhaustedError: 2 root error(s) found.
  (0) Resource exhausted:  SameWorkerRecvDone unable to allocate output tensor. Key: /job:localhost/replica:0/task:0/device:CPU:0;80822a2798a5edc8;/job:localhost/replica:0/task:0/device:GPU:0;edge_60_recommender_net/gmf_net/embedding/embeddings/Regularizer/Square/ReadVariableOp;0:0
	 [[{{node recommender_net/gmf_net/embedding/embeddings/Regularizer/Square/ReadVariableOp/_12}}]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

	 [[Adam/gradients/AddN_6/_102]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

  (1) Resource exhausted:  SameWorkerRecvDone unable to allocate output tensor. Key: /job:localhost/replica:0/task:0/device:CPU:0;80822a2798a5edc8;/job:localhost/replica:0/task:0/device:GPU:0;edge_60_recommender_net/gmf_net/embedding/embeddings/Regularizer/Square/ReadVariableOp;0:0
	 [[{{node recommender_net/gmf_net/embedding/embeddings/Regularizer/Square/ReadVariableOp/_12}}]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.

0 successful operations.
0 derived errors ignored. [Op:__inference_train_function_2845]

Function call stack:
train_function -> train_function
