# Base code : https://yamalab.tistory.com/92

In [1]:
import numpy as np
from tqdm import tqdm_notebook as tqdm

import numpy as np

# Base code : https://yamalab.tistory.com/92
class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose


    def fit(self):
        """
        training Matrix Factorization : Update matrix latent weight and bias

        참고: self._b에 대한 설명
        - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용
        - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌.

        :return: training_process
        """

        # init latent features
        self._P = np.random.normal(size=(self._num_users, self._k))
        self._Q = np.random.normal(size=(self._num_items, self._k))

        # init biases
        self._b_P = np.zeros(self._num_users)
        self._b_Q = np.zeros(self._num_items)
        self._b = np.mean(self._R[np.where(self._R != 0)])

        # train while epochs
        self._training_process = []
        for epoch in range(self._epochs):
            # rating이 존재하는 index를 기준으로 training
            xi, yi = self._R.nonzero()
            for i, j in zip(xi, yi):
                self.gradient_descent(i, j, self._R[i, j])
            cost = self.cost()
            self._training_process.append((epoch, cost))

            # print status
            if self._verbose == True and ((epoch + 1) % 10 == 0):
                print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))


    def cost(self):
        """
        compute root mean square error
        :return: rmse cost
        """

        # xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
        # 참고: http://codepractice.tistory.com/90
        xi, yi = self._R.nonzero()
        # predicted = self.get_complete_matrix()
        cost = 0
        for x, y in zip(xi, yi):
            cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
        return np.sqrt(cost/len(xi))


    def gradient(self, error, i, j):
        """
        gradient of latent feature for GD

        :param error: rating - prediction error
        :param i: user index
        :param j: item index
        :return: gradient of latent feature tuple
        """

        dp = (error * self._Q[j, :]) - (self._reg_param * self._P[i, :])
        dq = (error * self._P[i, :]) - (self._reg_param * self._Q[j, :])
        return dp, dq


    def gradient_descent(self, i, j, rating):
        """
        graident descent function

        :param i: user index of matrix
        :param j: item index of matrix
        :param rating: rating of (i,j)
        """

        # get error
        prediction = self.get_prediction(i, j)
        error = rating - prediction

        # update biases
        self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
        self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

        # update latent feature
        dp, dq = self.gradient(error, i, j)
        self._P[i, :] += self._learning_rate * dp
        self._Q[j, :] += self._learning_rate * dq


    def get_prediction(self, i, j):
        """
        get predicted rating: user_i, item_j
        :return: prediction of r_ij
        """
        return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)


    def get_complete_matrix(self):
        """
        computer complete matrix PXQ + P.bias + Q.bias + global bias

        - PXQ 행렬에 b_P[:, np.newaxis]를 더하는 것은 각 열마다 bias를 더해주는 것
        - b_Q[np.newaxis:, ]를 더하는 것은 각 행마다 bias를 더해주는 것
        - b를 더하는 것은 각 element마다 bias를 더해주는 것

        - newaxis: 차원을 추가해줌. 1차원인 Latent들로 2차원의 R에 행/열 단위 연산을 해주기위해 차원을 추가하는 것.

        :return: complete matrix R^
        """
        return self._b + self._b_P[:, np.newaxis] + self._b_Q[np.newaxis:, ] + self._P.dot(self._Q.T)



# run example
if __name__ == "__main__":
    # rating matrix - User X Item : (7 X 5)
    R = np.array([
        [1, 0, 0, 1, 3],
        [2, 0, 3, 1, 1],
        [1, 2, 0, 5, 0],
        [1, 0, 0, 4, 4],
        [2, 1, 5, 4, 0],
        [5, 1, 5, 4, 0],
        [0, 0, 0, 1, 0],
    ])

    # P, Q is (7 X k), (k X 5) matrix

In [None]:
%%time
factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()

Iteration: 10 ; cost = 1.0362
Iteration: 20 ; cost = 0.8564
Iteration: 30 ; cost = 0.7445
Iteration: 40 ; cost = 0.6600
Iteration: 50 ; cost = 0.5946
Iteration: 60 ; cost = 0.5445
Iteration: 70 ; cost = 0.5062
Iteration: 80 ; cost = 0.4762
Iteration: 90 ; cost = 0.4519
Iteration: 100 ; cost = 0.4315
CPU times: user 156 ms, sys: 44.1 ms, total: 200 ms
Wall time: 147 ms


In [None]:
factorizer.get_complete_matrix()

array([[1.69231184, 4.63853214, 4.33123326, 1.04190297, 2.3019729 ],
       [1.10197004, 4.16824534, 2.89533771, 0.93480457, 1.92833881],
       [0.92760527, 1.99988635, 1.75570805, 4.92560566, 2.94174512],
       [1.74873035, 2.95751855, 2.44667857, 3.84030146, 3.42374881],
       [2.36349126, 1.07194376, 4.60608054, 4.03856125, 4.03716133],
       [4.65781574, 0.97895124, 5.24043736, 3.92064079, 6.50939479],
       [1.03946748, 0.95193674, 0.83997154, 1.20092167, 2.69881011]])

## 코드의 Step by Step 설명

기본적인 구조는 위와 같습니다. 한번 __init__ 부터 차근차근 코드의 설명을 시작하겠습니다.

In [None]:
class MatrixFactorization():
    def __init__(self, R, k, learning_rate, reg_param, epochs, verbose=False):
        """
        :param R: rating matrix
        :param k: latent parameter
        :param learning_rate: alpha on weight update
        :param reg_param: beta on weight update
        :param epochs: training epochs
        :param verbose: print status
        """
        self._R = R
        self._num_users, self._num_items = R.shape
        self._k = k
        self._learning_rate = learning_rate
        self._reg_param = reg_param
        self._epochs = epochs
        self._verbose = verbose

일단, __init__ 함수는 Matrix Factorization이라는 클래스가 호출될 때 자동으로 실행되는 부분입니다. 파라미터로는 6개의 인자를 받는데 각각 아래의 의미를 가집니다.

- R: 평점 행렬
- k: User Latent와 Item Latent의 차원의 수
- learning_rate: 학습률
- reg_param: Weight의 Regularization 값
- epochs: 전체 학습 횟수 (Total Epoch)
- verbose: 학습 과정을 출력할지 여부 (True : 10번마다 cost 출력, False : cost를 출력하지 않음)

In [None]:
%%time

R = np.array([
    [1, 0, 0, 1, 3],
    [2, 0, 3, 1, 1],
    [1, 2, 0, 5, 0],
    [1, 0, 0, 4, 4],
    [2, 1, 5, 4, 0],
    [5, 1, 5, 4, 0],
    [0, 0, 0, 1, 0],
])

factorizer = MatrixFactorization(R, k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True)
factorizer.fit()

위의 실행은 7개의 User, 5개의 Item에 대한 평점 행렬(R)에 대해서 k=3, learning_rate=0.01, reg_param=0.01, epochs=100, verbose=True 와 같은 파라미터를 가지는 MatrixFactorization 객체를 생성하라는 의미입니다. 생성된 객체에서 factorizer.fit()을 통해서 fit() 함수를 실행하면, 아래의 코드가 실행되게 됩니다.

In [None]:
def fit(self):
    """
    training Matrix Factorization : Update matrix latent weight and bias

    참고: self._b에 대한 설명
    - global bias: input R에서 평가가 매겨진 rating의 평균값을 global bias로 사용
    - 정규화 기능. 최종 rating에 음수가 들어가는 것 대신 latent feature에 음수가 포함되도록 해줌.

    :return: training_process
    """

    # init latent features
    self._P = np.random.normal(size=(self._num_users, self._k))
    self._Q = np.random.normal(size=(self._num_items, self._k))

    # init biases
    self._b_P = np.zeros(self._num_users)
    self._b_Q = np.zeros(self._num_items)
    self._b = np.mean(self._R[np.where(self._R != 0)])

    # train while epochs
    self._training_process = []
    for epoch in range(self._epochs):
        # rating이 존재하는 index를 기준으로 training
        xi, yi = self._R.nonzero()
        for i, j in zip(xi, yi):
            self.gradient_descent(i, j, self._R[i, j])
        cost = self.cost()
        self._training_process.append((epoch, cost))

        # print status
        if self._verbose == True and ((epoch + 1) % 10 == 0):
            print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))

fit 함수에서 가장 먼저 실행되는 부분은 아래의 Latent Matrix를 초기화해주는 부분입니다.

In [None]:
# init latent features
self._P = np.random.normal(size=(self._num_users, self._k))
self._Q = np.random.normal(size=(self._num_items, self._k))

# init biases
self._b_P = np.zeros(self._num_users)
self._b_Q = np.zeros(self._num_items)
self._b = np.mean(self._R[np.where(self._R != 0)])

이때, np.random.normal 함수를 이용해서 행렬을 초기화해주고 np.zeros 함수를 이용해서 bias 부분을 초기화해줍니다. 함수의 의미는 아래와 같습니다.

- np.random.normal : (self._num_users, self._k)의 크기로 행렬을 정규분포 형태로 초기화합니다. 위의 예시에서는 User Latent Matrix는 (7, 3)의 크기를 Item Latent Matrix는 (5, 3)의 크기를 가짐
- np.zeros : self._num_users 혹은 self._num_items의 크기만큼의 0 값을 가지는 벡터를 생성합니다.
- np.mean(self._R[np.where(self._R != 0)]) : 전체 평점의 평균을 계산

이후, 전체 학습 과정을 진행합니다.

In [None]:
# train while epochs
self._training_process = []
for epoch in range(self._epochs):
    # rating이 존재하는 index를 기준으로 training
    xi, yi = self._R.nonzero()
    for i, j in zip(xi, yi):
        self.gradient_descent(i, j, self._R[i, j])
    cost = self.cost()
    self._training_process.append((epoch, cost))

    # print status
    if self._verbose == True and ((epoch + 1) % 10 == 0):
        print("Iteration: %d ; cost = %.4f" % (epoch + 1, cost))

- self._training_process = [] 는 for문 안의 self._training_process.append((epoch, cost)) 에 사용되는 부분으로 학습 시에 Epoch와 Cost를 저장하는 부분입니다. for문의 경우 처음 파라미터로 받은 self._epochs 만큼 반복학습을 진행하게 됩니다.


- 먼저 시행되는 for문의 첫 번째로 시행되는 xi, yi = self._R.nonzero()는 평점 행렬에서 0이 아닌 부분 즉, 사용자가 평점을 매긴 부분에 대해서만 값을 추출하라는 의미입니다. 그 이유는 위의 이론에서 말했듯이 결측치가 아닌 부분을 통해서만 학습을 진행하려는 의도입니다. 이후에 해당 부분을 통해서 모든 평점 부분에 대해서 gradient_descent를 실행해줍니다.

In [None]:
for i, j in zip(xi, yi):
    self.gradient_descent(i, j, self._R[i, j])

In [None]:
def gradient_descent(self, i, j, rating):
    """
    graident descent function

    :param i: user index of matrix
    :param j: item index of matrix
    :param rating: rating of (i,j)
    """

    # get error
    prediction = self.get_prediction(i, j)
    error = rating - prediction

    # update biases
    self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
    self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

    # update latent feature
    dp, dq = self.gradient(error, i, j)
    self._P[i, :] += self._learning_rate * dp
    self._Q[j, :] += self._learning_rate * dq

- gradient_descent 함수의 경우 행렬의 원소 위치(i, j)와 평점 값(self._R[i, j])을 받습니다. 바로 시작하는 prediction = self.get_prediction(i, j) 은 User Latent Matrix와 Item Latent Matrix의 곱을 통해서 평점 행렬의 값들을 생성하는 부분입니다.


- self._P[i, :].dot(self._Q[j, :].T) 에서 User Latent P와 Item Latent Q가 곱해져서 평점을 계산하고 Bias를 없애기 위해서 전체 평균(self._b), User의 평균 평점(self._b_P[i]), Item의 평균 평점(self._b_Q[j])을 더해줌으로써 값을 생성합니다.

In [None]:
def get_prediction(self, i, j):
    """
    get predicted rating: user_i, item_j
    :return: prediction of r_ij
    """
    return self._b + self._b_P[i] + self._b_Q[j] + self._P[i, :].dot(self._Q[j, :].T)

이후에, error = rating - prediction 을 통해서 얼마만큼의 차이가 있는지 계산을 합니다. 그리고 아래의 수식에서 사용한 공식을 이용해서 self.gradient(error, i, j) 에서 Gradient 값을 계산하고 Weight 및 Bais를 업데이트합니다.

![](https://drive.google.com/uc?export=view&id=1Nlhx0ilvzD4Vhxnj4WPrhnJNRFkRfBPf)

In [None]:
# update biases
self._b_P[i] += self._learning_rate * (error - self._reg_param * self._b_P[i])
self._b_Q[j] += self._learning_rate * (error - self._reg_param * self._b_Q[j])

# update latent feature
dp, dq = self.gradient(error, i, j)
self._P[i, :] += self._learning_rate * dp
self._Q[j, :] += self._learning_rate * dq

이후에, cost = self.cost() 에서 전체 Matrix에 대해서 오차를 계산하고 출력해줍니다. cost += pow(self._R[x, y] - self.get_prediction(x, y), 2) 에서 각 평점 별로 오차를 계산하게 됩니다.

In [None]:
def cost(self):
    """
    compute root mean square error
    :return: rmse cost
    """

    # xi, yi: R[xi, yi]는 nonzero인 value를 의미한다.
    # 참고: http://codepractice.tistory.com/90
    xi, yi = self._R.nonzero()
    # predicted = self.get_complete_matrix()
    cost = 0
    for x, y in zip(xi, yi):
        cost += pow(self._R[x, y] - self.get_prediction(x, y), 2)
    return np.sqrt(cost/len(xi))

이후, 모든 Epoch에 대해서 학습이 완료되면 factorizer.get_complete_matrix() 를 통해서 완성된 평점 행렬을 추출하게 되면 SGD를 이용한 협업 필터링이 완료되게 됩니다.

In [None]:
factorizer.get_complete_matrix()

참고로 이를 조금 수정해서 PyTorch를 이용한 코드는 다음의 링크에서 확인할 수 있습니다. 다음 포스팅에서는 ALS가 무엇인지에서부터 어떻게 사용하는지에 대해 알아보도록 하겠습니다.