In [None]:
from fastbook import untar_data, URLs, Learner, MSELossFlat
import pandas as pd
import torch
import torch.nn.functional as F

In [None]:
path = untar_data(URLs.ML_100k)

path.ls()

In [None]:
ratings = pd.read_csv(path/'u.data', delimiter='\t', header=None, names=['user', 'movie', 'rating', 'timestamp'])

ratings.head()

In [None]:
movies = pd.read_csv(path/'u.item', delimiter='|', header=None, names=['movie', 'title'], encoding='latin-1', usecols=(0,1))

movies.head()

In [None]:
ratings = ratings.merge(movies)
ratings.head()

In [None]:
movies[movies['title'].duplicated()]

In [None]:
class CollabBase(torch.nn.Module):
  def __init__(self, n_users, n_items, n_factors=50) -> None:
    super().__init__()
    
    self.user_embs = torch.nn.Embedding(n_users, n_factors)
    self.item_embs = torch.nn.Embedding(n_items, n_factors)
    
  def forward(self, x) -> None:
    users = self.user_embs(x[:, 0])
    items = self.item_embs(x[:, 1])
    
    return (users*items).sum(dim=1, keepdim=True)

In [None]:
n_users = ratings['user'].unique().size + 1
n_movies = movies['movie'].unique().size + 1

n_users, n_movies

In [None]:
collab_base_model = CollabBase(n_users, n_movies, n_factors=5)

In [None]:
train_x = torch.tensor(ratings[['user','movie']].to_numpy())
train_y = torch.tensor(ratings['rating'].to_numpy(), dtype=torch.float32).reshape(100000, 1)

In [None]:
train_x[:5]

In [None]:
train_y[:5]

In [None]:
preds = collab_base_model(train_x)

preds[:5]

In [None]:
def loss_fn(preds: torch.torch.torch.Tensor, acts: torch.Tensor):
  return ((preds-acts)**2).mean()

In [None]:
loss = loss_fn(preds, train_y)
loss

In [None]:
loss.backward()

In [None]:
for p in collab_base_model.parameters():
  print(p.grad)

In [None]:
def calculate_norm(model: torch.nn.Module):
  total_norm = 0
  for p in model.parameters():
    p_norm = p.grad.data.norm(2)
    total_norm += p_norm.item()**2

  return total_norm**(1./2)

In [None]:
norm = calculate_norm(collab_base_model)
norm

In [None]:
batch_x = train_x[:64]
batch_y = train_y[:64]

In [None]:
preds = collab_base_model(batch_x)
loss = loss_fn(preds, batch_y)

for p in collab_base_model.parameters():
  p.grad.zero_()
  
loss.backward()

total_norm = calculate_norm(collab_base_model)
total_norm

In [None]:
for p in collab_base_model.parameters():
  print(p.grad.data)

In [None]:
import torch.utils.data as data

def train(model: torch.nn.Module, train_x: torch.Tensor, train_y: torch.Tensor, n_epochs=5, lr=.1, loss_fn=F.mse_loss, wd=0.0): 
  if torch.backends.mps.is_available() and torch.backends.mps.is_built():
    model.to('mps')
    x = train_x.to('mps')
    y = train_y.to('mps')
  
  dataset = data.TensorDataset(x,y)
  
  train_size = round(.8 * len(x))
  valid_size = len(x) - train_size
  train_set, validation_set = data.random_split(dataset, [train_size, valid_size])
  
  t_loader = data.DataLoader(train_set, 64, True)
  v_loader = data.DataLoader(validation_set, 64, True)
  
  optimizer = torch.optim.SGD(model.parameters(), lr=lr, weight_decay=wd)
  for i in range(n_epochs):
    model.train()
    t_loss = 0.0
    for xb, yb in t_loader:
      optimizer.zero_grad()
      preds = model(xb)
      loss = loss_fn(preds, yb)
      loss.backward()
      optimizer.step()
      t_loss += loss.item()
    
    t_loss /= len(t_loader)
    
    model.eval()
    v_loss = 0.0
    with torch.no_grad():
      for vbx, vby in v_loader:
        preds = model(vbx)
        loss = loss_fn(preds, vby)
        v_loss += loss.item()
      
    v_loss /= len(v_loader)
    
    print(f"t_loss: {t_loss} - v_loss: {v_loss}")
    
  model.cpu()

In [None]:
test_model = CollabBase(n_users, n_movies, n_factors=50)

train(test_model, train_x, train_y, lr=5e-2, loss_fn=F.mse_loss)

In [None]:
class CollabBaseWithSigmoid(torch.nn.Module):
  def __init__(self, n_users, n_items, n_factors=50, y_range=(0, 5.5)) -> None:
    super().__init__()
    
    self.user_embs = torch.nn.Parameter(torch.zeros((n_users, n_factors)).normal_(0, .1))
    self.item_embs = torch.nn.Parameter(torch.zeros((n_items, n_factors)).normal_(0, .1))
    self.y_range = y_range
    
  def forward(self, x):
    users = self.user_embs[x[:,0]]
    items = self.item_embs[x[:, 1]]
    
    return F.sigmoid((users*items).sum(dim=1, keepdim=True))*(self.y_range[1]-self.y_range[0])+self.y_range[0]
    

In [None]:
test_model = CollabBaseWithSigmoid(n_users, n_movies)

# train(test_model, train_x=train_x, train_y=train_y, lr=.1)
preds = test_model(train_x)
loss = F.mse_loss(preds, train_y)
loss.backward()

In [None]:
preds[:5]

In [None]:
for p in test_model.parameters():
  print(p.grad)

In [None]:
test_model = CollabBaseWithSigmoid(n_users, n_movies)

train(test_model, train_x=train_x, train_y=train_y, lr=5e-1, loss_fn=F.mse_loss)

In [None]:
class CollabWithBias(torch.nn.Module):
  def __init__(self, n_users, n_items, n_factors=50, y_range=(0,5.5)) -> None:
    super().__init__()
    
    self.user_embs = torch.nn.Embedding(n_users, n_factors)
    torch.nn.init.normal_(self.user_embs.weight, 0, .1)
    self.user_bias = torch.nn.Embedding(n_users, 1)
    
    self.item_embs = torch.nn.Embedding(n_items, n_factors)
    torch.nn.init.normal_(self.item_embs.weight, 0, .1)
    self.item_bias = torch.nn.Embedding(n_items, 1)
    
    self.y_range = y_range
    
  def forward(self, x) -> torch.Tensor:
    users = self.user_embs(x[:, 0])
    user_bias = self.user_bias(x[:, 0])
    
    items = self.item_embs(x[:, 1])
    item_bias = self.item_bias(x[:, 1])
    
    interaction = (users * items).sum(dim=1, keepdim=True)
    interaction += user_bias + item_bias
    
    return F.sigmoid(interaction)*(self.y_range[1]-self.y_range[0]) + self.y_range[0]

In [None]:
test_model = CollabWithBias(n_users, n_movies)

train(test_model, train_x=train_x, train_y=train_y, loss_fn=F.mse_loss, lr=5e-1)

In [None]:
movie_bias = test_model.item_bias.weight.squeeze()
movie_bias

In [None]:
idx = movie_bias.argsort()[:5].tolist()

[movies['title'][i-1] for i in idx]

In [None]:
test_model(train_x[:5]), train_y[:5]

In [None]:
with torch.no_grad():
  print(F.mse_loss(test_model(train_x[:10]), train_y[:10]))

In [None]:
test_model = CollabWithBias(n_users, n_movies)

train(test_model, train_x=train_x, train_y=train_y, loss_fn=F.mse_loss, lr=8e-1, n_epochs=5, wd=1e-3)

In [None]:
test_model(train_x[:5]), train_y[:5]

In [None]:
torch.mps.empty_cache()