In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Function

In [74]:
def compute_lambda(scores, labels):
    assert(scores.shape[0] == labels.shape[0])
    
    N = scores.shape[0]
    
    A = np.tile(scores, N)
    scores_diff = A - A.T

    B = np.tile(labels, N)
    labels_diff = B - B.T
    labels_diff = (labels_diff > 0).astype(np.float)
    np.fill_diagonal(labels_diff, 0.0)

    sigmoid_like = -1.0 / (1.0 + np.exp(scores_diff))
    
    sorted_labels = labels.copy()
    sorted_labels[::-1].sort(axis=0)
    max_dcg = np.sum((np.power(2, sorted_labels) - 1) / np.log(np.arange(N) + 2).reshape((N, 1)))

    DCG = np.tile(np.power(2, labels) - 1, N) / np.tile(np.log(np.arange(N) + 2), (N, 1))

    delta_dcg = DCG + DCG.T - np.tile(DCG.diagonal().reshape((N, 1)), N) - np.tile(DCG.diagonal(), (N, 1))
    delta_dcg = np.abs(delta_dcg)
    
    G = sigmoid_like * labels_diff * delta_dcg / max_dcg
    gradients = np.sum(G - G.T, axis=1).reshape((N, 1))
    
    return torch.tensor(gradients, dtype=torch.float)

def compute_ndcg(scores, labels):
    assert(scores.shape[0] == labels.shape[0])
    
    N = scores.shape[0]
    
    sorted_labels = labels.copy()
    sorted_labels[::-1].sort(axis=0)
    max_dcg = np.sum((np.power(2, sorted_labels) - 1) / np.log(np.arange(N) + 2).reshape((N, 1)))
    
    idx = np.flip(np.argsort(scores, axis=0).reshape((N,)), axis=0)
    true_labels = labels[idx]
    true_dcg = np.sum((np.power(2, true_labels) - 1) / np.log(np.arange(N) + 2).reshape((N, 1)))
    
    return torch.tensor(true_dcg / max_dcg, dtype=torch.float)

In [75]:
class LambdaLoss(Function):
    
    @staticmethod
    def forward(ctx, input, target):
        # input is scores, target is labels
        input, target = input.detach(), target.detach()
        ctx.save_for_backward(input, target)
        return compute_ndcg(input.numpy(), target.numpy())
    
    @staticmethod
    def backward(ctx, grad_output):
        grad_output = grad_output.detach()
        input, target = ctx.saved_tensors
        grad = compute_lambda(input.numpy(), target.numpy())
        
        return grad, None

In [None]:
# Use nn module and optim module
N, D_in, H, D_out = 4, 100, 100, 1
device = torch.device("cpu")

x = torch.randn(N, D_in)
y = torch.tensor([0, 1, 0, 1], device=device, dtype=torch.float).view(N, 1)

model = torch.nn.Sequential(
    torch.nn.Linear(D_in, H),
    torch.nn.ReLU(),
    torch.nn.Linear(H, D_out),
)

loss_fn = LambdaLoss.apply

learning_rate = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for t in range(50):
    y_pred = model(x)
    
    loss = loss_fn(y_pred, y)
    print(t, loss.item())
    
    optimizer.zero_grad()
    
    loss.backward()
    
    optimizer.step()