<a href="https://colab.research.google.com/github/stebechoi/CP2/blob/YJ/FM_%EC%B5%9C%EC%A2%85.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import heapq
from sklearn.utils import shuffle

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 데이터 불러오기

In [None]:
data_path = '/content/drive/MyDrive/CP2/ml-100k/'
r_cols =['user_id','movie_id','rating','timestamp']
ratings = pd.read_csv(data_path + 'u.data', sep='\t',names = r_cols, encoding ='latin-1')
item_df = pd.read_csv(data_path + 'u.item', sep='|', encoding='latin-1', header=None,
                        names=['movie_id', 'movie_title', 'release_date', 'video_release_date',
                               'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation',
                               'Childrens', 'Comedy', 'Crime', 'Documentary', 'Drama',
                               'Fantasy', 'Film-Noir', 'Horror', 'Musical', 'Mystery',
                               'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western'])
item_df = item_df.iloc[:,:2]

## 데이터 병합

In [None]:
ratingcount = ratings.groupby(['movie_id'])['rating'].count().reset_index().rename(columns = {'rating': 'TotalRatingCount'})
n_ratings = pd.merge(ratings,ratingcount,how='left',on='movie_id')
n_ratings = pd.merge(n_ratings,item_df,how='left', on='movie_id')

df_movie_100 = n_ratings[n_ratings['TotalRatingCount']>=100]
df_movie_100 = df_movie_100.reset_index(drop=True)

In [None]:
movie_100 = df_movie_100.drop(['user_id','rating','timestamp','TotalRatingCount'],axis=1)

## X vector 생성

In [None]:
#user encoding
user_dict = {}
for i in set(df_movie_100['user_id']):
  user_dict[i] = len(user_dict)
n_user = len(user_dict)

#item encoding
item_dict = {}
start_point = n_user
for i in set(df_movie_100['movie_id']):
    item_dict[i] = start_point + len(item_dict)
n_item = len(item_dict)
start_point += n_item
num_x = start_point
df_movie_100 = shuffle(df_movie_100, random_state=1)

#generate x data
#[user_index, movie_index], [user]
data = []
y = []
w0 = np.mean(df_movie_100['rating'])
for i in range(len(df_movie_100)):
  case = df_movie_100.iloc[i]
  x_index = []
  x_value = []
  x_index.append(user_dict[case['user_id']])
  x_value.append(1)
  x_index.append(item_dict[case['movie_id']])
  x_value.append(1)
  data.append([x_index, x_value])
  y.append(case['rating'] - w0)


## FM 모델

In [7]:
def RMSE(y_true, y_pred):
  return np.sqrt(np.mean((np.array(y_true) - np.array(y_pred))**2))

class FM():
  def __init__(self, N, K, data, y, alpha, beta, train_ratio=0.75, iterations=100, tolerance=0.005, l2_reg=True, verbose=True):
    self.K = K    #no of latent factors
    self.N = N    #no of x variables
    self.n_cases = len(data)
    self.alpha = alpha
    self.beta = beta
    self.iterations = iterations
    self.l2_reg = l2_reg
    self.tolerance = tolerance
    self.verbose = verbose

    # w 초기화  변수의 편향
    self.w = np.random.normal(scale=1./self.N, size=(self.N))
    # v 초기화  잠재요인행렬
    self.v = np.random.normal(scale=1./self.K, size=(self.N, self.K))
    #Train/ Test 분리
    cutoff = int(train_ratio* len(data))
    self.train_x = data[:cutoff]
    self.test_x = data[cutoff:]
    self.train_y = y[:cutoff]
    self.test_y = y[cutoff:]

  #학습 함수
  def test(self):
    best_RMSE = 10000
    best_iteration = 0
    training_process = []  #학습과정을 기록
    for i in range(self.iterations):
      rmse1 = self.sgd(self.train_x, self.train_y)
      rmse2 = self.test_rmse(self.test_x, self.test_y)
      training_process.append((i,rmse1,rmse2))
      if self.verbose:
        if(i+1) % 10 == 0:
          print("Iteration: %d ; Train RMSE = %.6f ; Test RMSE = %.6f" % (i+1, rmse1, rmse2))
      #기존의 RMSE보다 향상되었으면 새로운 RMSE 와 iteration기록
      if best_RMSE> rmse2:
        best_RMSE = rmse2
        best_iteration = i
      elif(rmse2 - best_RMSE) >self.tolerance:

        break
    print(best_iteration, best_RMSE)
    return training_process
  #sgd 실행
  def sgd(self, x_data,  y_data):
    y_pred = []
    for data, y in zip(x_data, y_data):
      x_idx = data[0]
      x_0 = np.array(data[1])
      x_1 = x_0.reshape(-1,1)

      bias_score = np.sum(self.w[x_idx]*x_0)
      vx = self.v[x_idx]*(x_1)
      sum_vx = np.sum(vx, axis=0)
      sum_vx_2 = np.sum(vx*vx, axis=0)
      latent_score = 0.5*np.sum(np.square(sum_vx) - sum_vx_2)

      y_hat = bias_score + latent_score
      y_pred.append(y_hat)
      error = y - y_hat

      if self.l2_reg:
        self.w[x_idx] += error*self.alpha*(x_0 - self.beta*self.w[x_idx])
        self.v[x_idx] += error*self.alpha*((x_1)*sum(vx) - (vx*x_1) - self.beta*self.v[x_idx])
      else:
        self.w[x_idx] += error*self.alpha*x_0
        self.v[x_idx] += error*self.alpha*((x_1)*sum(vx) - (vx*x_1))
    return RMSE(y_data, y_pred)

  def test_rmse(self, x_data, y_data):
    y_pred =[]
    for data, y in zip(x_data, y_data):
      y_hat = self.predict(data[0], data[1])
      y_pred.append(y_hat)
    return RMSE(y_data, y_pred)
  
  def predict(self, idx, x):
    x_0 = np.array(x)
    x_1 = x_0.reshape(-1,1)
    bias_score =  np.sum(self.w[idx]*x_0)

    vx = self.v[idx]*(x_1)
    sum_vx = np.sum(vx, axis=0)
    sum_vx_2 = np.sum(vx*vx,axis=0)
    latent_score = 0.5*np.sum(np.square(sum_vx)-sum_vx_2)

    y_hat = bias_score + latent_score
    return y_hat


In [None]:
K=350
fm1 = FM(num_x, K, data, y, alpha=0.001, beta=0.05, train_ratio=0.75, iterations=600, tolerance=0.0005, l2_reg=True, verbose=True)
result = fm1.test()

In [11]:
results=[]
for K in range(100,201,20):
  print('K :',K)
  fm1 = FM(num_x, K , data, y, alpha=0.001, beta=0.05, train_ratio=0.75, iterations=600, tolerance=0.0005, l2_reg=True, verbose=True)
  result = fm1.test()
  results.append(result)

K : 100


KeyboardInterrupt: ignored

In [54]:
def recommendations(user_id, k):
  user_predictions = []
  user_index = user_dict[user_id]
  for movie_id, item_index in item_dict.items():
      x_index = [user_index, item_index]
      x_value = [1, 1]
      predicted_rating = fm1.predict(x_index, x_value)
      user_predictions.append((movie_id, predicted_rating))
  
  top_k_recommendations = heapq.nlargest(k, user_predictions, key=lambda x: x[1])
  id_list = [movie_id for movie_id, _ in top_k_recommendations]
  rec_list=[]
  for i,r in movie_100.iterrows():
    if i in id_list:
      rec_list.append(r['movie_title'])
  return rec_list

In [55]:
recommendations(1,5)

['Star Trek III: The Search for Spock (1984)',
 'Kolya (1996)',
 'Pulp Fiction (1994)',
 'Mimic (1997)',
 'Ghost (1990)']

In [58]:
def get_top_k_recommendations(user_id, k):
    user_predictions = []
    user_index = user_dict[user_id]
    for movie_id, item_index in item_dict.items():
        x_index = [user_index, item_index]
        x_value = [1, 1]
        predicted_rating = fm1.predict(x_index, x_value)
        user_predictions.append((movie_id, predicted_rating))
    
    top_k_recommendations = heapq.nlargest(k, user_predictions, key=lambda x: x[1])
    return [movie_id for movie_id, _ in top_k_recommendations]

In [59]:
def precision_at_k(user_id, k):
    user_ratings = df_movie_100[df_movie_100['user_id'] == user_id]
    test_ratings = user_ratings.sample(frac=0.2, random_state=1)
    top_k_recommendations = get_top_k_recommendations(user_id, k)

    relevant_items = set(test_ratings[test_ratings['rating'] >= 4]['movie_id'].values)
    recommended_relevant_items = [movie_id for movie_id in top_k_recommendations if movie_id in relevant_items]
    
    precision = len(recommended_relevant_items) / k
    return precision


k = 10
all_user_precision_at_k = [precision_at_k(user_id, k) for user_id in user_dict.keys()]
mean_precision_at_k = np.mean(all_user_precision_at_k)
print("Precision@{}: {:.4f}".format(k, mean_precision_at_k))

Precision@10: 0.0863


In [60]:
def recall_at_k(user_id, k):
    user_ratings = df_movie_100[df_movie_100['user_id'] == user_id]
    test_ratings = user_ratings.sample(frac=0.2, random_state=1)
    top_k_recommendations = get_top_k_recommendations(user_id, k)

    relevant_items = set(test_ratings[test_ratings['rating'] >= 4]['movie_id'].values)
    recommended_relevant_items = [movie_id for movie_id in top_k_recommendations if movie_id in relevant_items]
    
    recall = len(recommended_relevant_items) / len(relevant_items) if relevant_items else 0
    return recall


k = 10
all_user_recall_at_k = [recall_at_k(user_id, k) for user_id in user_dict.keys()]
mean_recall_at_k = np.mean(all_user_recall_at_k)
print("Recall@{}: {:.4f}".format(k, mean_recall_at_k))

Recall@10: 0.0893
