In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split


In [2]:
url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx'
data = pd.read_excel(url)


In [3]:
data = data.dropna()
data = data[data['Quantity'] > 0]
data = data[['CustomerID', 'StockCode']]


In [4]:
customers = data['CustomerID'].unique()
customer_to_idx = {old: new for new, old in enumerate(customers)}

items = data['StockCode'].unique()
item_to_idx = {old: new for new, old in enumerate(items)}

data['CustomerID'] = data['CustomerID'].apply(lambda x: customer_to_idx[x])
data['StockCode'] = data['StockCode'].apply(lambda x: item_to_idx[x])


In [5]:
train_data, test_data = train_test_split(data, test_size=0.2)


In [31]:
class RBM(object):
    def __init__(self, visible_dim, hidden_dim):
        self.visible_dim = visible_dim
        self.hidden_dim = hidden_dim
        self.W = tf.Variable(tf.random.normal(shape=(self.visible_dim, self.hidden_dim)))
        self.v_bias = tf.Variable(tf.zeros(shape=[self.visible_dim]))
        self.h_bias = tf.Variable(tf.zeros(shape=[self.hidden_dim]))

    def prob_h_given_v(self, visible):
        return tf.nn.sigmoid(tf.matmul(visible, self.W) + self.h_bias)

    def prob_v_given_h(self, hidden):
        return tf.nn.sigmoid(tf.matmul(hidden, tf.transpose(self.W)) + self.v_bias)

    def sample_h_given_v(self, visible):
        visible = tf.cast(visible, tf.float32)
        visible = tf.reshape(visible, [visible.shape[0], visible.shape[1]])
        return tf.nn.relu(tf.sign(self.prob_h_given_v(visible) - tf.random.uniform(tf.shape(visible),minval=0, maxval=1)))


    def sample_v_given_h(self, hidden):
        return tf.nn.relu(tf.sign(self.prob_v_given_h(hidden) - tf.random.uniform(tf.shape(hidden))))

    def gibbs_sampling(self, visible):
        h = self.sample_h_given_v(visible)
        v = self.sample_v_given_h(h)
        return v


    def prepare_data(self, data):
        data = data.astype('int32')
        data = np.clip(data, 0, self.visible_dim-1)
        return data

    def train(self, data, epochs=10, batch_size=64, lr=0.01):
        data = self.prepare_data(data)
        num_batches = len(data) // batch_size
        optimizer = tf.optimizers.Adam(lr)

        for epoch in range(epochs):
            np.random.shuffle(data)

            for i in range(num_batches):
                batch = data[i * batch_size: (i + 1) * batch_size]

                with tf.GradientTape() as tape:
                    positive_phase = batch
                    negative_phase = self.gibbs_sampling(positive_phase)

                    positive_energy = tf.reduce_mean(tf.reduce_sum(tf.multiply(self.W, positive_phase), axis=1)) \
                                    + tf.reduce_mean(tf.multiply(self.v_bias, positive_phase)) \
                                    + tf.reduce_mean(tf.multiply(self.h_bias, self.prob_h_given_v(positive_phase)))

                    negative_energy = tf.reduce_mean(tf.reduce_sum(tf.multiply(self.W, negative_phase), axis=1)) \
                                    + tf.reduce_mean(tf.multiply(self.v_bias, negative_phase)) \
                                    + tf.reduce_mean(tf.multiply(self.h_bias, self.prob_h_given_v(negative_phase)))

                    loss = positive_energy - negative_energy

                grads = tape.gradient(loss, [self.W, self.v_bias, self.h_bias])
                optimizer.apply_gradients(zip(grads, [self.W, self.v_bias, self.h_bias]))


    def predict(self, user_idx, item_indices):
        user_input = np.zeros((len(item_indices), self.visible_dim))
        user_input[:, user_idx] = 1
        item_input = np.zeros((len(item_indices), self.visible_dim))
        item_input[:, item_indices] = 1

        hidden_prob = self.prob_h_given_v(item_input)
        user_prob = tf.matmul(hidden_prob, tf.transpose(tf.cast(tf.expand_dims(user_input, axis=0), tf.float32)))
        user_ranking = tf.argsort(user_prob, direction='DESCENDING')

        return user_ranking
    
    def evaluate_precision_recall_at_k(self, test_data, k=10):
        precision_list = []
        recall_list = []

        for user in test_data['CustomerID'].unique():
            user_items = test_data[test_data['CustomerID'] == user]['StockCode'].tolist()
            if len(user_items) == 0:
                continue
            item_indices = np.arange(self.visible_dim)
            user_idx = self.customer_to_idx[user]
            predictions = self.predict(user_idx, item_indices)
            predictions = [i.numpy() for i in predictions]
            user_recommendations = [item for item in predictions if item not in user_items][:k]
            if len(user_recommendations) == 0:
                continue
            true_positives = len(set(user_recommendations).intersection(set(user_items)))
            precision = true_positives / k
            recall = true_positives / len(user_items)
            precision_list.append(precision)
            recall_list.append(recall)

        precision = np.mean(precision_list)
        recall = np.mean(recall_list)

        return precision, recall



In [32]:
# Create an instance of the RBM class and train the model
visible_dim = len(items)
hidden_dim = 50
rbm = RBM(visible_dim, hidden_dim)
rbm.customer_to_idx = customer_to_idx
rbm.train(train_data.values, epochs=20, batch_size=64, lr=0.01)

# Evaluate the model's precision and recall at k
precision, recall = rbm.evaluate_precision_recall_at_k(test_data, k=10)

print('Precision at k = 10:', precision)
print('Recall at k = 10:', recall)

InvalidArgumentError: Matrix size-incompatible: In[0]: [64,2], In[1]: [3665,50] [Op:MatMul]