- 공부하며 이해한 바를 바탕으로 KNN based CF model을 구현한 것입니다. 
- 코드에 잘못된 부분이 있을 수 있습니다⛔️                            

In [None]:
import pandas as pd
import numpy as np
import surprise 
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

## DATA

In [None]:
data = surprise.Dataset.load_builtin('ml-100k', prompt=False)
col_names = ["userid", "itemid", "rating", "timestamp"]
raw_data = np.array(data.raw_ratings, dtype=int)

raw_data[:,0] -= 1 # user_id range : [0,942]
raw_data[:,1] -= 1 # item_id range : [0,1681]

n_user = np.unique(raw_data[:, 0]).shape[0]
n_item = np.unique(raw_data[:, 1]).shape[0]

shape = (n_user, n_item)
rating_matrix = np.zeros(shape)

for user_id, movie_id, rating, _ in raw_data:
    rating_matrix[user_id][movie_id] = rating # interaction

# 장르 데이터 준비
gen_info = pd.read_csv("movies.csv")

idx2gen = {}

for i in range(gen_info.shape[0]):
    gen = gen_info.genres[i].split("|")
    idx2gen[i] = gen

예측 성능 확인을 위해 임의로 평점 마스킹

In [None]:
nzero_indices = np.where(rating_matrix != 0)
print(len(rating_matrix[nzero_indices[0], nzero_indices[1]]))

np.random.seed(42)
test_index = np.random.randint(0, len(rating_matrix[nzero_indices[0], nzero_indices[1]])-1, 100)

row = nzero_indices[0]
col = nzero_indices[1]

row = row[test_index]
col = col[test_index]

orgin_list = []

for i in range(len(row)):
    orgin_list.append([row[i], col[i], rating_matrix[row[i]][col[i]]])
    rating_matrix[row[i]][col[i]] = 0

## Model 

### BASE module
유사도 및 예측 평점 구하는 부분 말고는 다른 부분이 없기 때문에 공통된 부분에 대한 base 모듈 정의

In [None]:
class KNN_CF:
    def __init__(self, data:np.ndarray, k:int=25, method:str="simple", sim:str="cosin"):
        super().__init__()
        self.data = data
        self.k = k
        self.method = method
        self.sim = sim
        self.n_user = data.shape[0]
        self.n_item = data.shape[1]
        self.pred_matrix = np.zeros((self.n_user, self.n_item))
        self.item2idx, self.idx2item = {}, {}
        self.user2idx, self.idx2user = {}, {}
        
        self.idx_mapping()

    def idx_mapping(self) -> None:
        for idx, user_id in enumerate(range(1,self.n_user+1)):
            self.user2idx[user_id] = idx
            self.idx2user[idx] = user_id
        for idx, item_id in enumerate(range(1,self.n_item+1)):
            self.item2idx[item_id] = idx
            self.idx2item[idx] = item_id

    # 코사인 유사도
    def cos_sim(self,a:np.array, b:np.array) -> float:
        return np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)) + 1e-15
    
    # 유클리드 거리의 역수
    def euclidean_sim(self,a:np.array, b:np.array) -> float:
        return float(1/(np.sqrt(np.sum((a-b)**2))+ 1e-15))
    
    def get_sim_matrix(self) -> None:
        pass
    
    def pred_one(self) -> float:
        pass
    
    # 한 유저에 대한 모든 비관측 아이템에 대한 평점 예측
    def pred_user(self, user_id:int) -> None:
        u_idx = self.user2idx[user_id]
        for j in range(n_item):
            if not self.data[u_idx,j]:
                self.pred_matrix[u_idx,j] = self.pred_one(user_id,j+1)

    # 전체 유저와 전체 아이템에 대한 평점 예측
    def pred_full(self) -> np.ndarray:
        for i in tqdm(range(n_user)):
            for j in range(n_item):
                if not self.data[i,j]:
                    self.pred_matrix[i,j] = self.pred_one(i+1,j+1)
        return self.pred_matrix

    # 한 유저에 대한 top k item list
    def get_top_k(self, user_id: int, k: int) -> tuple[np.array, np.array]:
        u_idx = self.user2idx[user_id]
        user_pred = self.pred_matrix[u_idx]
        item_ranking = sorted(range(len(user_pred)), key=lambda i: user_pred[i], reverse=True)
        return (item_ranking[:k] + np.array([1]), user_pred[item_ranking[:k]])


### User based CF
1. 유저 - 유저 유사도를 구한다.
2. 유사도를 기준으로 k명의 유사 유저를 구한다.
3. k명의 유저의 item i에 대한 평점을 simple mean / weighted avg 하여 평점을 예측한다.

In [None]:
class KNN_UBCF(KNN_CF):
    def __init__(self, data:np.ndarray, k:int=25, method:str="simple", sim:str="cosin"):
        super().__init__(data, k, method, sim)
        self.sim_matrix = np.zeros((self.n_user, self.n_user)) # 유사도 행렬 (user x user)
        self.get_sim_matrix()

    # 모든 유저 사이의 유사도 계산
    def get_sim_matrix(self) -> None:
        if self.sim == "cosin":
            sim_func = self.cos_sim
        if self.sim == "euclid":
            sim_func = self.euclidean_sim

        for i in tqdm(range(n_user)):
            for j in range(i+1, n_user):
                self.sim_matrix[i][j] = sim_func(self.data[i],self.data[j])
                self.sim_matrix[j][i] = self.sim_matrix[i][j]

        np.fill_diagonal(self.sim_matrix, np.min(self.sim_matrix)-5) # 대각행렬 값 변경
        
    # 한 유저와 한 아이템에 대한 평점 예측
    def pred_one(self, user_id:int, item_id:int) -> float:
        u_idx = self.user2idx[user_id]
        i_idx = self.item2idx[item_id]
        k_users = []

        sims = self.sim_matrix[u_idx] # 한 유저의 유사도 벡터
        sim2idx = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True) # 유사도 값 기준으로 정렬된 다른 유저 idx

        for idx in sim2idx:
            if self.data[idx, i_idx] != 0: # 이웃 유저가 해당 아이템을 평가하지 않은 경우, 이웃 유저로 뽑지 않음
                k_users.append(idx)
            if len(k_users)==self.k:
                break

        # 평점 계산 방법 (simple mean / weighted average)
        # 이웃 유저(k_users)가 한 아이템에 준 평점을 기준으로 쿼리 유저의 평점 예측
        if self.method == "simple":
            self.pred_matrix[u_idx,i_idx] = np.mean(self.data[k_users, i_idx])
        if self.method == "weighted":
            self.pred_matrix[u_idx,i_idx] = np.dot(sims[k_users], self.data[k_users, i_idx])/np.sum(sims[k_users])

        return self.pred_matrix[u_idx,i_idx]

In [None]:
u_id = 4
i_id = 858
top_k = 15
n_neighbor = 40

model = KNN_UBCF(rating_matrix, n_neighbor, method ="simple" ,sim="cosin")
full_matrix_UBCF = model.pred_full()
print(f'user:{u_id}, item:{i_id}, pred_rating:{model.pred_one(u_id, i_id)}')
item_list, pred_rating = model.get_top_k(u_id, top_k)
print(f'user:{u_id}\ntop_k_list:{item_list}')


### Item based CF
1. 아이템 - 아이템 유사도를 구한다.
2. 유사도를 기준으로 k개의 유사 아이템을 구한다.
3. k개의 아이템에 대한 user u의 평점을 simple mean / weighted avg 하여 평점을 예측한다.

In [None]:
class KNN_IBCF(KNN_CF):
    def __init__(self, data:np.ndarray, k:int=25, method:str="simple", sim:str="cosin"):
        super().__init__(data, k, method, sim)
        self.sim_matrix = np.zeros((self.n_item, self.n_item)) # 유사도 행렬 (item x item)
        self.get_sim_matrix()
    
    # 모든 아이템 사이의 유사도 계산
    def get_sim_matrix(self) -> None:
        if self.sim == "cosin":
            sim_func = self.cos_sim
        if self.sim == "euclid":
            sim_func = self.euclidean_sim

        for i in tqdm(range(n_item)):
            for j in range(i+1, n_item):
                self.sim_matrix[i][j] = sim_func(self.data[:, i],self.data[:, j])
                self.sim_matrix[j][i] = self.sim_matrix[i][j]
        
        np.fill_diagonal(self.sim_matrix, np.min(self.sim_matrix)-5)
        
    # 한 유저와 한 아이템에 대한 평점 예측
    def pred_one(self, user_id:int, item_id:int) -> float:
        u_idx = self.user2idx[user_id]
        i_idx = self.item2idx[item_id]
        k_items = []

        sims = self.sim_matrix[i_idx] # 한 아이템의 유사도 벡터
        sim2idx = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True) # 유사도 기준으로 정렬된 타 아이템 idx

        for idx in sim2idx:
            if self.data[u_idx, idx] != 0: # 쿼리 유저에게 평가되지 않은 아이템은 제외
                k_items.append(idx)
            if len(k_items)==self.k:
                break

        # 평점 계산 방법 (simple mean / weighted average)
        # 이웃 아이템(k_items)의 평점을 기반으로 쿼리 유저의 쿼리 아이템에 대한 평점 예측
        if self.method == "simple":
            self.pred_matrix[u_idx,i_idx] = np.mean(self.data[u_idx, k_items])
        if self.method == "weighted":
            self.pred_matrix[u_idx,i_idx] = np.dot(sims[k_items], self.data[u_idx, k_items])/np.sum(sims[k_items])
                    
        return self.pred_matrix[u_idx,i_idx]

In [None]:
u_id = 4
i_id = 14
top_k = 15
n_neighbor = 40

model = KNN_IBCF(rating_matrix, n_neighbor, method="weighted", sim="euclid")
full_matrix_IBCF = model.pred_full()
print(f'user:{u_id}, item:{i_id}, pred_rating:{model.pred_one(u_id, i_id)}')
item_list, pred_rating = model.get_top_k(u_id, top_k)
print(f'user:{u_id}\ntop_k_list:{item_list}')


### Item based CF with side info
- item에 대한 side info인 장르를 유사도 구할 때 활용
- 유사도 외에는 IBCF와 차이 없음
- 유사도 구하는 방법
    1. 자카드 유사도
    2. 장르 원-핫인코딩 후 코사인 유사도

In [None]:
class KNN_SIBCF(KNN_CF):
    def __init__(self, data:np.ndarray, gen_info:tuple, k:int=25, method:str="simple", sim:str="cosin"):
        super().__init__(data, k, method, sim)
        self.gen_info = gen_info
        self.sim_matrix = np.zeros((self.n_item, self.n_item))
        self.get_sim_matrix()
    
    # 장르 유사도 계산을 위한 자카드 유사도
    def jaccard_sim(self,a,b):
        a = set(a)
        b = set(b)
        return float(len(a.intersection(b)) / len(a.union(b)))
    
    # 모든 아이템 사이의 유사도 계산
    def get_sim_matrix(self) -> None:
        if self.sim == "cosin":
            sim_func = self.cos_sim
        if self.sim == "euclid":
            sim_func = self.euclidean_sim

        for i in tqdm(range(n_item)):
            for j in range(i+1, n_item):
                # IBCF와 다르게 장르의 자카드 유사도 추가됨
                gen_sim = self.jaccard_sim(self.gen_info[i], self.gen_info[j])
                self.sim_matrix[i][j] = sim_func(self.data[:, i],self.data[:, j]) + gen_sim
                self.sim_matrix[j][i] = self.sim_matrix[i][j]
        
        np.fill_diagonal(self.sim_matrix, np.min(self.sim_matrix)-5)

    # 한 유저와 한 아이템에 대한 평점 예측
    def pred_one(self, user_id:int, item_id:int) -> float:
        u_idx = self.user2idx[user_id]
        i_idx = self.item2idx[item_id]
        k_items = []

        sims = self.sim_matrix[i_idx]
        sim2idx = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)

        for idx in sim2idx:
            if self.data[u_idx, idx] != 0:
                k_items.append(idx)
            if len(k_items)==self.k:
                break

        # 평점 계산 방법 (simple mean / weighted average)
        if self.method == "simple":
            self.pred_matrix[u_idx,i_idx] = np.mean(self.data[u_idx, k_items])
        if self.method == "weighted":
            self.pred_matrix[u_idx,i_idx] = np.dot(sims[k_items], self.data[u_idx, k_items])/np.sum(sims[k_items])

        return self.pred_matrix[u_idx,i_idx]

In [None]:
u_id = 4
i_id = 14
top_k = 15
n_neighbor = 40

model = KNN_SIBCF(rating_matrix, idx2gen, n_neighbor, method="weighted", sim = "cosin") 
full_matrix_SIBCF = model.pred_full()
print(f'user:{u_id}, item:{i_id}, pred_rating:{model.pred_one(u_id, i_id)}')
item_list, pred_rating = model.get_top_k(u_id, top_k)
print(f'user:{u_id}\ntop_k_list:{item_list}')


모델에 따른 RMSE 비교

In [None]:
from sklearn.metrics import mean_squared_error

def rmse(actual, pred):
    return np.sqrt(mean_squared_error(actual, pred))

actual = np.array([ele[-1] for ele in orgin_list])
pred_UBCF = []
pred_IBCF = []
pred_SIBCF = []

for row, col, rating in orgin_list:
    pred_UBCF.append(full_matrix_UBCF[row][col])
    pred_IBCF.append(full_matrix_IBCF[row][col])
    pred_SIBCF.append(full_matrix_SIBCF[row][col])

print(f'UBCF rmse : {rmse(actual,pred_UBCF)}')
print(f'IBCF rmse : {rmse(actual,pred_IBCF)}')
print(f'SIBCF rmse : {rmse(actual,pred_SIBCF)}')

n_neighbor 개수에 따른 RMSE 비교

In [None]:
u_id = 4
i_id = 14
top_k = 15
n_neighbor = 30

model = KNN_SIBCF(rating_matrix, idx2gen, n_neighbor, method="weighted", sim = "cosin") 
full_matrix_SIBCF_30 = model.pred_full()
item_list, pred_rating = model.get_top_k(u_id, top_k)
print(f'user:{u_id}\ntop_k_list:{item_list}')

n_neighbor = 50

model = KNN_SIBCF(rating_matrix, idx2gen, n_neighbor, method="weighted", sim = "cosin") 
full_matrix_SIBCF_50 = model.pred_full()
item_list, pred_rating = model.get_top_k(u_id, top_k)
print(f'user:{u_id}\ntop_k_list:{item_list}')



In [None]:
pred_SIBCF_30 = []
pred_SIBCF_50 = []

for row, col, rating in orgin_list:
    pred_SIBCF_30.append(full_matrix_SIBCF_30[row][col])
    pred_SIBCF_50.append(full_matrix_SIBCF_50[row][col])

print(f'SIBCF_50 rmse : {rmse(actual,pred_SIBCF_50)}')
print(f'SIBCF_40 rmse : {rmse(actual,pred_SIBCF)}')
print(f'SIBCF_30 rmse : {rmse(actual,pred_SIBCF_30)}')
