In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

In [3]:
def l2_norm(parameters):
    """Calculate L2 norm of parameters"""
    return torch.sum(torch.square(parameters))


class DenseBatchFC(nn.Module):
    """Dense layer with optional batch normalization"""
    def __init__(self, input_dim, units, do_norm=False):
        super(DenseBatchFC, self).__init__()
        self.do_norm = do_norm
        
        # Initialize with same standard deviation as original
        self.fc = nn.Linear(input_dim, units)
        torch.nn.init.normal_(self.fc.weight, std=0.01)
        torch.nn.init.zeros_(self.fc.bias)
        
        if do_norm:
            self.bn = nn.BatchNorm1d(units, momentum=0.1)  # 1-0.9 to match TF's decay=0.9
    
    def forward(self, x):
        out = self.fc(x)
        if self.do_norm:
            out = self.bn(out)
        return out, l2_norm(self.fc.weight) + l2_norm(self.fc.bias)


class DebiasingAutoencoder(nn.Module):
    def __init__(self, model_select, num_user, num_item, reg):
        """
        Args:
            model_select: List of hidden layer dimensions
            num_user: Number of users
            num_item: Number of items
            reg: Regularization strength
        """
        super(DebiasingAutoencoder, self).__init__()
        self.reg = reg
        self.num_user = num_user
        self.num_item = num_item
        self.model_select = model_select
        
        # Build encoder layers
        layers = []
        input_dim = num_user
        for hid in model_select:
            layers.append(DenseBatchFC(input_dim, hid, do_norm=True))
            input_dim = hid
        self.encoder_layers = nn.ModuleList(layers)
        
        # Output layer
        self.output_layer = nn.Linear(input_dim, num_user)
        torch.nn.init.normal_(self.output_layer.weight, std=0.01)
        torch.nn.init.zeros_(self.output_layer.bias)
        
    def forward(self, x, return_reg_loss=True):
        last = x
        reg_loss = 0
        
        # Process through encoder layers
        for layer in self.encoder_layers:
            last, reg = layer(last)
            reg_loss += reg
            
        # Output layer
        preds = self.output_layer(last)
        if return_reg_loss:
            reg_loss += l2_norm(self.output_layer.weight) + l2_norm(self.output_layer.bias)
            reg_loss *= self.reg
            return preds, reg_loss
        return preds
    
    def get_recommendations(self, R, user_indices, k):
        """Get top-k recommendations for specified users"""
        with torch.no_grad():
            # Get predictions for all items
            preds = self.forward(R, return_reg_loss=False)
            
            # Select predictions for requested users
            user_preds = preds.transpose(0, 1)[user_indices]
            
            # Get top-k items
            _, indices = torch.topk(user_preds, k, dim=1)
            return indices


class ModelTrainer:
    def __init__(self, model, learning_rate=0.01, momentum=0.9, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model.to(device)
        self.device = device
        self.optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
        
    def train_step(self, R_input, R_output, optimize_all=True):
        """
        Single training step
        Args:
            R_input: Input ratings matrix
            R_output: Target ratings matrix
            optimize_all: If True, optimize both reconstruction and regularization loss
        """
        self.model.train()
        R_input = R_input.to(self.device)
        R_output = R_output.to(self.device)
        
        # Forward pass
        preds, reg_loss = self.model(R_input)
        
        # Calculate loss
        reconstruction_loss = torch.mean(torch.sqrt(torch.sum((preds - R_output) ** 2, dim=1, keepdim=True)))
        loss = reconstruction_loss + (reg_loss if optimize_all else 0)
        
        # Backward pass and optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return {
            'reconstruction_loss': reconstruction_loss.item(),
            'reg_loss': reg_loss.item() if optimize_all else 0,
            'total_loss': loss.item()
        }
    
    def evaluate(self, R, eval_data, batch_size=1024):
        """
        Evaluate model on test data
        Args:
            R: Full ratings matrix
            eval_data: Evaluation data containing test_item_ids and test_user_ids
            batch_size: Batch size for evaluation
        """
        self.model.eval()
        R = R.to(self.device)
        
        recommendations = []
        with torch.no_grad():
            for start_idx in range(0, len(eval_data.test_user_ids), batch_size):
                end_idx = min(start_idx + batch_size, len(eval_data.test_user_ids))
                batch_users = torch.tensor(eval_data.test_user_ids[start_idx:end_idx], device=self.device)
                
                # Get recommendations for batch
                batch_recs = self.model.get_recommendations(
                    R[eval_data.test_item_ids],
                    batch_users,
                    k=max(eval_data.recall_at)
                )
                recommendations.append(batch_recs.cpu())
                
        return torch.cat(recommendations, dim=0)

In [None]:
"""
Example workings
# Initialize model
model = DebiasingAutoencoder(
    model_select=[256, 128, 64],  # Hidden layer dimensions
    num_user=1000,
    num_item=500,
    reg=0.01
)

# Create trainer
trainer = ModelTrainer(model)

# Training loop
for epoch in range(num_epochs):
    losses = trainer.train_step(R_input, R_output)
    
    # Evaluate periodically
    if epoch % eval_interval == 0:
        recommendations = trainer.evaluate(R, eval_data)"""