In [1]:
import numpy as np
import pandas as pd

# データの生成
session_ids = np.arange(1, 6)
item_ids = np.arange(1, 6)
time_stamps = np.random.rand(20)
rating_data = np.random.choice(item_ids, size=(20, 1), replace=True)
rating_data = np.hstack([np.random.choice(session_ids, size=(20, 1)), rating_data, time_stamps.reshape(-1, 1)])
rating_df = pd.DataFrame(rating_data, columns=['SessionId', 'ItemId', 'Time'])
rating_df = rating_df.drop_duplicates(subset=['SessionId', 'ItemId'])

# マッピングの作成
user_id_mapping = {sid: i for i, sid in enumerate(session_ids)}
item_id_mapping = {iid: i for i, iid in enumerate(item_ids)}

# ユーザーごとのアイテムの選択を格納するマトリックス
num_users = len(session_ids)
num_items = len(item_ids)
matrix = np.zeros((num_users, num_items))
for _, row in rating_df.iterrows():
    user_id = user_id_mapping[row['SessionId']]
    item_id = item_id_mapping[row['ItemId']]
    matrix[user_id, item_id] = 1

In [2]:
genres_columns = ['Action', 'Comedy', 'Drama', 'Mystery', 'Romance', 'Thriller']
genre_data = np.random.randint(0, 2, size=(5, len(genres_columns)))
movie_genres = pd.DataFrame(genre_data, columns=genres_columns)
movie_genres.insert(0, 'ItemId', item_ids)
# ジャンルのランダムな特徴を生成する
num_genres = len(genres_columns)
item_genres = movie_genres[genres_columns].to_numpy()

In [3]:
class LinUCB:
    def __init__(self, alpha, num_users, num_items, num_genres):
        self.alpha = alpha
        self.num_users = num_users
        self.num_items = num_items
        self.num_genres = num_genres
        self.d = num_genres + num_items

        # パラメータの初期化
        self.A = np.repeat(np.identity(self.d)[np.newaxis, :, :], num_items, axis=0)
        self.b = np.zeros((num_items, self.d))

    def fit(self, matrix, item_genres, num_epochs):
        avg_rewards = []
        for epoch in range(num_epochs):
            rewards = []
            for user_id in range(self.num_users):
                user_features_vector = matrix[user_id]
                arm_features = np.concatenate((np.tile(user_features_vector, (self.num_items, 1)), item_genres), axis=1)

                p_t = np.zeros(self.num_items)
                for item_id in range(self.num_items):
                    x_ta = arm_features[item_id].reshape(-1, 1)
                    A_a_inv = np.linalg.inv(self.A[item_id])
                    theta_a = A_a_inv.dot(self.b[item_id])
                    p_t[item_id] = theta_a.T.dot(x_ta) + self.alpha * np.sqrt(x_ta.T.dot(A_a_inv).dot(x_ta))

                max_p_t = np.max(p_t)
                max_idxs = np.argwhere(p_t == max_p_t).flatten()
                a_t = np.random.choice(max_idxs)

                r_t = 1 if matrix[user_id, a_t] == 1 else 0
                
                rewards.append(r_t)

                # パラメータの更新
                x_t_at = arm_features[a_t].reshape(-1, 1)
                self.A[a_t] = self.A[a_t] + x_t_at.dot(x_t_at.T)
                self.b[a_t] = self.b[a_t] + r_t * x_t_at.flatten()

            avg_rewards.append(np.mean(rewards))

        return avg_rewards

    def predict(self, user_features, item_genres):
        arm_features = np.concatenate((np.tile(user_features, (self.num_items, 1)), item_genres), axis=1)
        p_t = np.zeros(self.num_items)
    
        for item_id in range(self.num_items):
            x_ta = arm_features[item_id].reshape(-1, 1)
            A_a_inv = np.linalg.inv(self.A[item_id])
            theta_a = A_a_inv.dot(self.b[item_id])
            p_t[item_id] = theta_a.T.dot(x_ta) + self.alpha * np.sqrt(x_ta.T.dot(A_a_inv).dot(x_ta))
    
        recommended_items = np.argsort(-p_t)
        return recommended_items

    def update(self, user_id, item_id, reward, user_features, item_genres):
        user_features_vector = user_features.reshape(-1)
        item_genres_vector = item_genres[item_id].reshape(-1)
    
        # Concatenating the feature vectors
        x_t_at = np.concatenate((user_features_vector, item_genres_vector))
    
        # Ensure x_t_at is a 2D column vector
        x_t_at = x_t_at.reshape(-1, 1)
    
        # Update the model
        self.A[item_id] = self.A[item_id] + x_t_at.dot(x_t_at.T)
        self.b[item_id] = self.b[item_id] + reward * x_t_at.flatten()

In [18]:
# LinUCBクラスのインスタンス化と使用例
linucb_model = LinUCB(alpha=1.0, num_users=num_users, num_items=num_items, num_genres=num_genres)
avg_rewards = linucb_model.fit(matrix, item_genres, num_epochs=10)

In [19]:
avg_rewards

[0.6, 0.6, 1.0, 1.0, 0.8, 1.0, 1.0, 0.8, 1.0, 0.8]

In [20]:
user_features = matrix
user_features

array([[1., 1., 0., 1., 1.],
       [0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 1.],
       [1., 0., 1., 0., 1.],
       [1., 1., 0., 1., 1.]])

In [21]:
selected_user_id = 0
selected_user_features = user_features[selected_user_id]
selected_user_features

array([1., 1., 0., 1., 1.])

In [22]:
predicted_items = linucb_model.predict(selected_user_features, item_genres)
top_predicted_item = predicted_items[0]
top_predicted_item

4

In [23]:
actual_reward = 1
linucb_model.update(selected_user_id, top_predicted_item, actual_reward, selected_user_features, item_genres)