reference: https://github.com/ghmagazine/ml_design_book/blob/main/ch03/mf.py


In [None]:
from typing import Tuple, Optional, List
from dataclasses import dataclass

import numpy as np
from sklearn.metrics import mean_squared_error as calc_mse
from sklearn.utils import check_random_state

In [None]:
@dataclass
class MatrixFactorization:
    k: int  # ユーザ・アイテムの特徴ベクトルの次元
    learning_rate: float  # 学習率
    reg_param: float    # 正則化パラメータ
    alpha: float = 0.001
    beta1: float = 0.9
    beta2: float = 0.999
    eps: float = 1e-8
    random_state: int = 12345

    def __post_init__(self) -> None:
        self.random_state = check_random_state(self.random_state)

    def fit(
        self,
        train: np.ndarray,
        val: np.ndarray,
        test: np.ndarray,
        pscore: Optional[np.ndarray] = None,    # 傾向スコア
        n_epochs: int = 10,
    ) -> Tuple[List[float], List[float]]:
        """
        トレーニングデータを用いてモデルパラメータを学習し、
        バリデーションとテストデータに対する予測誤差の推移を出力する
        """

        # 傾向スコアが設定されていない場合はナイーブ推定量を用いる
        if pscore is None:
            pscore = np.ones(np.unique(train[:, 2]).shape[0])

        # ユニークユーザとユニークアイテムの数を取得する
        n_users = np.unique(train[:, 0]).shape[0]
        n_items = np.unique(train[:, 1]).shape[0]

        # モデルパラメータを初期化する
        self._initialize_model_parameters(n_users=n_users, n_items=n_items)

        # トレーニングデータを用いてモデルパラメータを学習する
        val_loss, test_loss = [], []
        for _ in range(n_epochs):
            self.random_.state.shuffle(train)
            for user, item, rating in train:
                # 傾向スコアの逆数で予測誤差を重み付けて計算する
                err = (rating - self._predict_pair(user=user, item=item)) / pscore[rating - 1]  # ratingは1から始まるため、1を引く

                # モデルパラメータPとQを更新する
                grad_P = err * self.Q[item] - self.reg_param * self.P[user]
                self._update_P(user=user, grad=grad_P)
                grad_Q = err * self.P[user] - self.reg_param * self.Q[item]
                self._update_Q(item=item, grad=grad_Q)

            # バリデーションデータに対する嗜好度合いの予測誤差を計算する
            # 傾向スコアが与えられた場合は、それを用いたIPS推定量を計算する
            # それ以外は、ナイーブ推定量を計算する
            r_hat_val = self.predict(data=val)
            inv_pscore_val = 1. / pscore[val[:, 2] - 1] # 傾向スコアの逆数
            val_loss.append(calc_mse(y_true=val[:, 2], y_pred=r_hat_val, sample_weight=inv_pscore_val))
            # テストデータに対する嗜好度合いの予測誤差を計算する
            r_hat_test = self.predict(data=test)
            test_loss.append(calc_mse(y_true=test[:, 2], y_pred=r_hat_test))

        return val_loss, test_loss

    def _initialize_model_parameters(self, n_users: int, n_items: int) -> None:
        """モデルパラメータを初期化."""
        self.P = self.random_.rand(n_users, self.k) / self.k
        self.Q = self.random_.rand(n_items, self.k) / self.k
        self.M_P = np.zeros_like(self.P)
        self.M_Q = np.zeros_like(self.Q)
        self.V_P = np.zeros_like(self.P)
        self.V_Q = np.zeros_like(self.Q)

    def _update_P(self, user: int, grad: np.ndarray) -> None:
        "与えられたユーザのベクトルp_uを与えられた勾配に基づき更新."
        self.M_P[user] = self.beta1 * self.M_P[user] + (1 - self.beta1) * grad
        self.V_P[user] = self.beta2 * self.V_P[user] + (1 - self.beta2) * (grad ** 2)
        M_P_hat = self.M_P[user] / (1 - self.beta1)
        V_P_hat = self.V_P[user] / (1 - self.beta2)
        self.P[user] += self.alpha * M_P_hat / ((V_P_hat ** 0.5) + self.eps)

    def _update_Q(self, item: int, grad: np.ndarray) -> None:
        "与えられたアイテムのベクトルq_iを与えられた勾配に基づき更新."
        self.M_Q[item] = self.beta1 * self.M_Q[item] + (1 - self.beta1) * grad
        self.V_Q[item] = self.beta2 * self.V_Q[item] + (1 - self.beta2) * (grad ** 2)
        M_Q_hat = self.M_Q[item] / (1 - self.beta1)
        V_Q_hat = self.V_Q[item] / (1 - self.beta2)
        self.Q[item] += self.alpha * M_Q_hat / ((V_Q_hat ** 0.5) + self.eps)

    def _predict_pair(self, user: int, item: int) -> float:
        """与えられたユーザ・アイテムペア(u,i)の嗜好度合いを予測する."""
        return self.P[user] @ self.Q[item]

    def predict(self, data: np.ndarray) -> np.ndarray:
        """与えられたデータセットに含まれる全ユーザ・アイテムペアの嗜好度合いを予測する."""
        r_hat_arr = np.empty(data.shape[0])
        for i, row in enumerate(data):
            r_hat_arr[i] = self._predict_pair(user=row[0], item=row[1])
        return r_hat_arr
