<a href="https://colab.research.google.com/github/minshyee/RecoSyS/blob/main/Hybrid_RecomSystem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 하이브리드 추천시스템의 원리

In [1]:
from sklearn.model_selection import train_test_split
import random
import numpy as np
import pandas as pd

In [10]:
r_cols = ['user_id','movie_id','rating','timestamp']
data_src = '/content/drive/MyDrive/Recosys/Data/u.data'
ratings = pd.read_csv(data_src, 
                      names=r_cols,
                      sep='\t',
                      encoding='latin-1')

ratings_train, ratings_test = train_test_split(ratings,
                                               test_size=0.2,
                                               shuffle=True,
                                               random_state=2021)

def RMSE(y_true, y_pred):
  return np.sqrt(np.mean((np.array(y_true) - np.array(y_pred))**2))

### dummy recosys

In [6]:
def recommender_1(recom_list):
  recommandations = []
  for pair in recom_list:
    recommandations.append(random.random() * 4 + 1) # 1-5 사이의 난수(임의의) 평점
  return np.array(recommandations)

def recommender_2(recom_list):
  recommandations = []
  for pair in recom_list:
    recommandations.append(random.random() * 4 + 1) # 1-5 사이의 난수(임의의) 평점
  return np.array(recommandations)

### Hybrid

In [7]:
weight = [0.8, 0.2]
recom_list = np.array(ratings_test)
predictions_1 = recommender_1(recom_list)
predictions_2 = recommender_2(recom_list)

predictions = predictions_1 * weight[0] + predictions_2 * weight[1]
RMSE(recom_list[:,2], predictions)

1.5590157722579836

## CF 와 MF의 결합 추천 시스템

In [8]:
from sklearn.metrics.pairwise import cosine_similarity
import os

In [9]:
class NEW_MF():
  def __init__(self, ratings, hyper_params):
    self.R = np.array(ratings)
    self.num_users, self.num_items = np.shape(self.R)

    self.K = hyper_params['K']
    self.alpha = hyper_params['alpha']
    self.beta = hyper_params['beta']
    self.iterations = hyper_params['iterations']
    self.verbose = hyper_params['verbose']

    # index 재정렬 + 맵핑
    item_id_index = []
    index_item_id = []
    for i, one_id in enumerate(ratings):
      item_id_index.append([one_id, i])
      index_item_id.append([i, one_id])
    self.item_id_index = dict(item_id_index)
    self.index_item_id = dict(index_item_id)

    user_id_index = []
    index_user_id = []
    for i, one_id in enumerate(ratings.T):
      user_id_index.append([one_id, i])
      index_user_id.append([i, one_id])
    self.user_id_index = dict(user_id_index)
    self.index_user_id = dict(index_user_id)

  def rmse(self):
    xs, ys = self.R.nonzero() # 좌표값 받기
    self.predictions = []
    self.errors = []
    for x,y in zip(xs,ys):
      prediction = self.get_predict(x,y)
      self.predictions.append(prediction)
      self.errors.append(self.R[x,y] - prediction)
    self.predictions = np.array(self.predictions)
    self.errors = np.array(self.errors)
    return np.sqrt(np.mean(self.errors**2))

  def sgd(self):
    for i,j,r in self.samples: # x,y = (i,j) | 평가데이터 r
      prediction = self.get_predict(i,j)
      e = (r - prediction)

      self.b_u[i] += self.alpha * (e - (self.beta * self.b_u[i]))
      self.b_d[j] += self.alpha * (e - (self.beta * self.b_d[j]))

      self.P[i,:] += self.alpha * ((e * self.Q[j,:]) - (self.beta * self.P[i,:]))
      self.Q[j,:] += self.alpha * ((e * self.P[i,:]) - (self.beta * self.Q[j,:]))

  def get_predict(self,i,j):
    prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i,:].dot(self.Q[j,:].T)
    return prediction

  def set_test(self,ratings_test):
    test_set = []
    for i in range(len(ratings_test)):
      x = self.user_id_index[ratings_test.iloc[i,0]]
      y = self.item_id_index[ratings_test.iloc[i,1]]
      z = ratings_test.iloc[i,2]
      test_set.append([x,y,z])
      self.R[x,y] = 0
    self.test_set = test_set
    return test_set

  def test_rmse(self):
    error = 0
    for one_set in self.test_set:
      predicted = self.get_predict(one_set[0], one_set[1])
      error += pow(one_set[2] - predicted, 2)
    return np.sqrt(error/len(self.test_set))

  def test(self):
    self.P = np.random.normal(scale=1./self.K,
                              size=(self.num_users,self.K))
    self.Q = np.random.normal(scale=1./self.K,
                              size=(self.num_items,self.K))
    self.b_u = np.zeros(self.num_users)
    self.b_d = np.zeros(self.num_items)
    self.b = np.mean(self.R[self.R.nonzero()])

    rows, columns = self.R.nonzero()
    self.samples = [(i,j,self.R[i,j]) for i, j in zip(rows, columns)]

    training_process = []
    for i in range(self.iterations):
      np.random.shuffle(self.samples)
      self.sgd()
      rmse_train = self.rmse()
      rmse_test = self.test_rmse()
      training_process.append((i+1, rmse_train, rmse_test))
      if self.verbose:
        if (i+1) % 10 == 0:
          print(f"Iteration : {i+1} | Train RMSE : {rmse_train} | Test RMSE : {rmse_test}")
    
    return training_process

  def get_one_predict(self,user_id,item_id):
    return self.get_predict(self.user_id_index[user_id],
                            self.item_id_index[item_id])
    
  def full_predict(self):
    return self.b + self.b_u[:, np.newaxis] + self.b_d[np.newaxis, :] + self.P.dot(self.Q.T)

