사용자 id와 아이템 id만 사용해서 FM 구현

In [3]:
from chapter5_modules import *
import numpy as np
import pandas as pd
from sklearn.utils import shuffle


In [4]:
ratings = load_ratings()

# User encoding
user_dict = {} # user_id - index mapping
for i in set(ratings['user_id']): # 중복제거, 순서대로 출력
  user_dict[i] = len(user_dict)
n_user = len(user_dict)

# Item encoding
item_dict = {}
start_point = n_user
for i in set(ratings['movie_id']):
  item_dict[i] = start_point + len(item_dict)
n_item = len(item_dict)

num_x = n_user + n_item
ratings = shuffle(ratings, random_state=1)

In [5]:
num_x, len(user_dict)+len(item_dict)

(2625, 2625)

In [6]:
# Generate X data , sparse matrix -> coordinate format matrix 
w0 = np.mean(ratings['rating']) # global bias
y = (ratings['rating'] - w0).values.tolist()

data = []; 

for i in range(len(ratings)):
  case = ratings.iloc[i]
  
  x_index = []; x_value = []
  x_index.append(user_dict[case['user_id']]); x_value.append(1)
  x_index.append(item_dict[case['movie_id']]); x_value.append(1)
  
  data.append([x_index, x_value])
  
  if (i % 10000) == 0:
    print(f'Encoding {i} cases...')

Encoding 0 cases...
Encoding 10000 cases...
Encoding 20000 cases...
Encoding 30000 cases...
Encoding 40000 cases...
Encoding 50000 cases...
Encoding 60000 cases...
Encoding 70000 cases...
Encoding 80000 cases...
Encoding 90000 cases...


In [14]:
class FM:
  def __init__(self, N:int, K:int, data:list, y, alpha:float, beta:float, 
               train_ratio:float=0.75, iterations:int=100, 
               tolerance:float=0.005, l2_reg:bool=True, verbose:bool=True):
    """FM inital function

    Args:
        N (int): # of x
        K (int): # of latent feature
        data (list): coo matrix
        y (list): rating data
        alpha (float): learnign rate
        beta (float): regularization rate
        train_ratio (float, optional): ratio of train set. Defaults to 0.75.
        iterations (int, optional): # of learning. Defaults to 100.
        tolerance (float, optional): 반복을 중단하는 RMSE의 기준. Defaults to 0.005.
        l2_reg (bool, optional): 정규화를 할지 여부. Defaults to True.
        verbose (bool, optional): 학습상황을 표시할지 여부. Defaults to True.
    """
    self.K = K; self.N = N; self.n_cases = len(data)
    self.alpha = alpha; self.beta = beta
    self.iterations = iterations
    self.tolerance = tolerance; self.l2_reg = l2_reg
    self.verbose = verbose
    # w 초기화
    self.w = np.random.normal(scale=1./self.N, size=(self.N))
    # v 초기화 (latent matrix)
    self.v = np.random.normal(scale=1./self.K, size=(self.N, self.K))
    # Train / Test 분리
    cutoff = int(train_ratio * self.n_cases)
    self.train_X = data[:cutoff]; self.test_X = data[cutoff:]
    self.train_y = y[:cutoff]; self.test_y = y[cutoff:]
  
  def predict(self, x_idx:list, x_value:list) -> float:
    """x_idx와 x_value값으로 y_hat을 예측하는 함수

    Args:
        x_idx (list): x의 idx list
        x_value (list): x의 value list

    Returns:
        float: y_hat
    """
    
    x_0 = np.array(x_value)
    x_1 = x_0.reshape(-1, 1) # 2차원으로 변경 (vx와의 연산을 위함)
    # cal bias score
    bias_score = np.sum(self.w[x_idx] * x_0)
    # cal latent score
    vx = self.v[x_idx] * x_1
    sum_vx = np.sum(vx, axis=0); sum_vx_2 = np.sum(vx * vx, axis=0)
    
    latent_score = 0.5 * np.sum(np.square(sum_vx) - sum_vx_2)
    # cal prediction
    y_hat = bias_score + latent_score
    
    return y_hat
  
  def sgd(self, X_data:list, y_data:list) -> float:
    """한번의 SGD를 진행하고 , RMSE값을 return 함.

    Args:
        X_data (list): input variables
        y_data (list): rating data

    Returns:
        float: RMSE of before w, v
    """
    y_pred = []
    
    for data, y in zip(X_data, y_data):
      x_idx = data[0] # index
      x_0 = np.array(data[1]) # value
      x_1 = x_0.reshape(-1, 1) # 2차원으로 변경 (vx와의 연산을 위함)
      vx = self.v[x_idx] * x_1
      
      y_hat = self.predict(x_idx, data[1])
      y_pred.append(y_hat)
      
      error = y - y_hat
      # update w, v
      if self.l2_reg: 
        self.w[x_idx] += error * self.alpha * (x_0 - self.beta * self.w[x_idx])
        self.v[x_idx] += error * self.alpha * (x_1 * sum(vx) - (vx * x_1) - self.beta * self.v[x_idx])
      else:
        self.w[x_idx] += error * self.alpha * x_0
        self.v[x_idx] += error * self.alpha * (x_1 * sum(vx) - (vx * x_1))
      
    return RMSE(y_data, y_pred)
    
  def test(self)-> list[float]:
    """train하면서 RMSE를 계산하는 함수

    Returns:
        list[float]: iter별로 RMSE를 저장한 list 
    """
    # SGD를 iterations 숫자만큼 수행
    best_RMSE = 10000; best_iteration = 0
    training_process = []
    
    for i in range(self.iterations):
      rmse1 = self.sgd(self.train_X, self.train_y)
      rmse2 = self.test_rmse(self.test_X, self.test_y)
      training_process.append((i, rmse1, rmse2))
      if self.verbose and i+1 % 10 == 0:
        print(f"Iteration = {i+1} / Train RMSE = {rmse1:.6f} / Test RMSE = {rmse2:.6f}")
      if best_RMSE > rmse2:
        best_RMSE = rmse2; best_iteration = i
      elif (rmse2 - best_RMSE) >   self.tolerance: # rmse2가 tolerance 이상으로 best rmse보다 크다면 중단
        break
    
    print(best_iteration, best_RMSE)
    return training_process  
    
  def test_rmse(self, x_data, y_data) -> float:
    """현재 w와 v로 계산한 예측치의 RMSE를 계산하는 함수

    Args:
        x_data (list): _description_
        y_data (list): _description_

    Returns:
        float: RMSE
    """
    y_pred = []
    
    for data, y in zip(x_data, y_data):
      y_hat = self.predict(data[0], data[1])
      y_pred.append(y_hat)
      
    return RMSE(y_data, y_pred)
  

In [1]:
from chapter5_modules import *
import numpy as np
import pandas as pd
from sklearn.utils import shuffle

data, y, num_x = get_data_y()

K = 350

FM_kwargs = {
  'N': num_x,
  'K': K,
  'data': data,
  'y': y,
  'alpha': 0.0014,
  'beta': 0.075,
  'train_ratio': 0.75,
  'iterations':600,
  'tolerance': 0.0005,
  'l2_reg':True,
  'verbose': True
}

fm1 = FM(**FM_kwargs)
result = fm1.test()

Encoding 0 cases...
Encoding 10000 cases...
Encoding 20000 cases...
Encoding 30000 cases...
Encoding 40000 cases...
Encoding 50000 cases...
Encoding 60000 cases...
Encoding 70000 cases...
Encoding 80000 cases...
Encoding 90000 cases...
Iteration = 10 / Train RMSE = 0.955977 / Test RMSE = 0.972691
Iteration = 20 / Train RMSE = 0.934229 / Test RMSE = 0.957411
Iteration = 30 / Train RMSE = 0.925384 / Test RMSE = 0.951438
Iteration = 40 / Train RMSE = 0.920586 / Test RMSE = 0.948412
Iteration = 50 / Train RMSE = 0.917455 / Test RMSE = 0.946656
Iteration = 60 / Train RMSE = 0.914895 / Test RMSE = 0.945459
Iteration = 70 / Train RMSE = 0.912038 / Test RMSE = 0.944364
Iteration = 80 / Train RMSE = 0.907725 / Test RMSE = 0.942855
Iteration = 90 / Train RMSE = 0.900192 / Test RMSE = 0.940212
Iteration = 100 / Train RMSE = 0.887676 / Test RMSE = 0.935870
Iteration = 110 / Train RMSE = 0.870094 / Test RMSE = 0.930350
Iteration = 120 / Train RMSE = 0.848429 / Test RMSE = 0.924821
Iteration = 130 /

사용자, 아이템 외에 추가 데이터까지 사용하는 경우

In [1]:
from chapter5_modules import *
import numpy as np
import pandas as pd
from sklearn.utils import shuffle

In [2]:
users, movies, ratings = get_dataset_1()

# User encoding
user_dict = {}
for i in set(users['user_id']):
  user_dict[i] = len(user_dict)
n_user = len(user_dict)

# Item encoding
item_dict = {}
start_point = n_user
for i in set(movies['movie_id']):
  item_dict[i] = start_point + len(item_dict)
n_item = len(item_dict)
start_point += n_item

# Occupation encoding
occ_dict = {}
for i in set(users['occupation']):
  occ_dict[i] = start_point + len(occ_dict)
n_occ = len(occ_dict)
start_point += n_occ

# Gender encoding
gender_dict = {}
for i in set(users['sex']):
  gender_dict[i] = start_point + len(gender_dict)
n_gender = len(gender_dict)
start_point += n_gender

# Genre encoding
genre_dict = {}
genre = movies.columns.tolist()[5:]
for i in genre:
  genre_dict[i] = start_point + len(genre_dict)
n_genre = len(genre_dict)
start_point += n_genre

# Age encoding
age_index = start_point
start_point += 1
num_x = start_point

In [3]:
users, movies, ratings = get_dataset_1()

# Merge data
movies = movies.drop(['title', 'release_date', 'video release date', 'IMDB URL'], axis=1)
users = users.drop(['zip_code'], axis=1)

x = pd.merge(ratings, movies, how='outer', on='movie_id')
x = pd.merge(x, users, how='outer', on='user_id')
x = shuffle(x, random_state=1)

In [4]:
# Generate X data
data = []; y = []

age_mean = np.mean(x['age'])
age_std = np.mean(x['age'])

w0 = np.mean(x['rating'])
y = (ratings['rating'] - w0).values.tolist()

for i in range(len(x)):
  case = x.iloc[i]
  x_index = []; x_value = []
  x_index.append(user_dict[case['user_id']]); x_value.append(1)
  x_index.append(item_dict[case['movie_id']]); x_value.append(1)
  x_index.append(occ_dict[case['occupation']]); x_value.append(1)
  x_index.append(gender_dict[case['sex']]); x_value.append(1)
  
  for j in genre:
    if case[j] == 1:
      x_index.append(genre_dict[j]); x_value.append(1)
      
  x_index.append(age_index); x_value.append((case['age'] - age_mean) / age_std)
  
  data.append([x_index, x_value])

  if (i % 10000) == 0:
    print(f"Encoding {i} cases...")

Encoding 0 cases...
Encoding 10000 cases...
Encoding 20000 cases...
Encoding 30000 cases...
Encoding 40000 cases...
Encoding 50000 cases...
Encoding 60000 cases...
Encoding 70000 cases...
Encoding 80000 cases...
Encoding 90000 cases...


In [6]:
K = 100

FM_kwargs = {
  'N': num_x,
  'K': K,
  'data': data,
  'y': y,
  'alpha': 0.0014,
  'beta': 0.075,
  'train_ratio': 0.75,
  'iterations':600,
  'tolerance': 0.001,
  'l2_reg':True,
  'verbose': True
}

fm2 = FM(**FM_kwargs)
result = fm2.test()

Iteration = 10 / Train RMSE = 1.115670 / Test RMSE = 1.130784
Iteration = 20 / Train RMSE = 1.095844 / Test RMSE = 1.145900
Iteration = 30 / Train RMSE = 1.067208 / Test RMSE = 1.166218
0 1.1223694944531286


결과가 더 안좋아짐. 파라미터 조정을 통해 성능을 향상시킬 수 있음.  
여기서 알 수 있는 점은 변수를 무작정 추가한다고 반드시 좋은 결과가 나오는 것은 아니라는 점.