In [1]:
# Namespace
import torch.nn.functional as f
import torch_geometric as pyg
import itertools as it
# import networkx as nx
import torch

# Constructor.
from torch_geometric.transforms import RandomLinkSplit
from torch_geometric.typing import EdgeType, NodeType
from torch_geometric.loader import LinkNeighborLoader
from torch.nn.modules.loss import _Loss as Loss
from torch_geometric.data import HeteroData
from torch.utils.data import DataLoader
from torch.optim import Adam, Optimizer
from torch.nn import (
    Dropout1d,
    Embedding, 
    Module, 
    ModuleDict,
    ModuleList, 
    Linear, 
    LeakyReLU
)
from torch import Tensor

## Data Handling

In [2]:
# Specifying the target edge.
TRG_EDGE = ('user', 'rated', 'item')

In [3]:
# Loads the data.
data = torch.load('data/out/Video_Games.pt')
# Removes superfluous attributes, to save memory.
del data['rated'].edge_attr
del data['rated_by'].edge_attr
# Sets the node and edge IDs.
data.generate_ids()

# Defines the final graph transformations.
transform = RandomLinkSplit(
    num_test=.1,
    num_val=.1,
    is_undirected=True, 
    add_negative_train_samples=False,
    neg_sampling_ratio=0.,
    edge_types=('user', 'rated', 'item'),
    rev_edge_types=('item', 'rated_by', 'user')
)
# Splits the set into training, validation and testing.
trn_data, vld_data, tst_data = transform(data)

# Specifies the shared key-word arguments for the batch loaders..
kwargs = dict(
    num_neighbors=[0],  # [8, 4, 2],
    neg_sampling='triplet',
    num_workers=10,
    shuffle=True,
    # pin_memory=True
)
# Extracts the edge attribute indices for all datasets.
trn_edge_label_index = trn_data[TRG_EDGE].edge_label_index
vld_edge_label_index = vld_data[TRG_EDGE].edge_label_index
tst_edge_label_index = tst_data[TRG_EDGE].edge_label_index
# Creates the sub-graph loaders.
trn_loader = LinkNeighborLoader(**kwargs,
    data=trn_data,
    edge_label_index=[TRG_EDGE, trn_edge_label_index],
    batch_size=2048
)
vld_loader = LinkNeighborLoader(**kwargs,
    data=vld_data,
    edge_label_index=[TRG_EDGE, vld_edge_label_index],
    batch_size=2048
)
tst_loader = LinkNeighborLoader(**kwargs,
    data=tst_data,
    edge_label_index=[TRG_EDGE, tst_edge_label_index],
    batch_size=2048
)

In [4]:
# tmp_loader = LinkNeighborLoader(
#     data=trn_data,
#     edge_label_index=[('user', 'rated', 'item'), trn_edge_label_index],
#     num_neighbors=[16, 8],
#     neg_sampling='binary',
#     num_workers=10,
#     batch_size=64,
#     shuffle=True,
#     pin_memory=True
# )
# batch = next(iter(tmp_loader))
# batch

## Architecture

In [5]:
class EdgePredictor(Module):

    def __init__(self, *, trg_edge: EdgeType = TRG_EDGE) -> None:
        super().__init__()
        self.trg_edge = trg_edge

    
    @property
    def trg_src_node(self) -> NodeType:
        return self.trg_edge[0]
    

    @property
    def trg_edge_name(self) -> str:
        return self.trg_edge[1]
    

    @property
    def trg_dst_node(self) -> NodeType:
        return self.trg_edge[2]
    

    @property
    def trg_nodes(self) -> tuple[NodeType, NodeType]:
        return (self.trg_src_node, self.trg_dst_node)

In [6]:
class NodeEmbedding(ModuleDict):

    def __init__(self, 
        num_embeddings: dict[NodeType, int],
        embedding_dim: int,
        **kwargs
    ) -> None:
        super().__init__({
            node_type: Embedding(
                num_embeddings=num_embeddings,
                embedding_dim=embedding_dim, 
                **kwargs
            ) 
                for node_type, num_embeddings 
                in num_embeddings.items()
        })


    def forward(self, n_id_dict: dict[NodeType, Tensor]) -> dict[NodeType, Tensor]:
        return {
            node_type: self[node_type](n_id) 
                for node_type, n_id 
                in n_id_dict.items()
        }

In [7]:
class InnerProduct(Module): 

    def forward(self, x_src: Tensor, x_dst: Tensor) -> Tensor:
        return torch.bmm(
            x_src.unsqueeze(-2),
            x_dst.unsqueeze(-1)
        ).squeeze()

In [8]:
class EdgeRegressor(Linear):

    def __init__(self, in_dim: int, out_dim: int = 1, bias: bool = False, **kwargs) -> None:
        super().__init__(
            in_features=in_dim,
            out_features=out_dim,
            bias=bias,
            **kwargs
        )
        self.weight.data = torch.nn.init.ones_(self.weight.data)
        if self.bias:
            self.bias.data = torch.nn.init.zeros_(self.bias.data)


    def forward(self, x_src: Tensor, x_dst: Tensor) -> Tensor:
        return super().forward(x_src * x_dst)

### Matrix Factorization

In [9]:
class MF(EdgePredictor):

    def __init__(self, 
            num_embeddings: dict[NodeType, int], 
            embedding_dim: int,
            *,
            trg_edge: EdgeType,
            **kwargs
        ) -> None:
        super().__init__(trg_edge=trg_edge)
        self.embedding = NodeEmbedding(
            num_embeddings=num_embeddings, 
            embedding_dim=embedding_dim,
            **kwargs
        )
        self.regressor = InnerProduct()
        

    def forward(self, 
        n_id: dict[NodeType, Tensor], 
        edge_label_index: dict[EdgeType, Tensor], 
    ) -> Tensor:
        # Constructs the embeddings.
        x_src, x_dst = self.embedding({
            node_type: n_id[node_type] 
            for node_type 
            in self.trg_nodes
        }).values()
        # Extracts the edges to predict.
        i_src, i_dst = edge_label_index[self.trg_edge]
        # Computes and returns the predicted scores.
        return self.regressor(x_src[i_src], x_dst[i_dst])

### Generalized Matrix Factorization

In [10]:
class GMF(EdgePredictor):

    def __init__(self, 
            num_embeddings: dict[NodeType, int], 
            embedding_dim: int,
            *,
            trg_edge: EdgeType,
            **kwargs
        ) -> None:
        super().__init__(trg_edge=trg_edge)
        self.embedding = NodeEmbedding(
            num_embeddings=num_embeddings, 
            embedding_dim=embedding_dim,
            **kwargs
        )
        self.regressor = EdgeRegressor(
            in_dim=embedding_dim
        )
        

    def forward(self, 
        n_id: dict[NodeType, Tensor], 
        edge_label_index: dict[EdgeType, Tensor], 
    ) -> Tensor:
        # Constructs the embeddings.
        x_src, x_dst = self.embedding({
            node_type: n_id[node_type] 
            for node_type 
            in self.trg_nodes
        }).values()
        # Extracts the edges to predict.
        i_src, i_dst = edge_label_index[self.trg_edge]
        # Computes and returns the predicted scores.
        return self.regressor(x_src[i_src], x_dst[i_dst])

### Neural Graph Collaborative Filtering

In [None]:
class EmbeddingPropagationCell(Module):
    
    def __init__(self, 
        in_dim: int,
        out_dim: int = None, 
        bias: bool = False,
        dropout: float = .5
    ) -> None:
        super().__init__()
        self.drop = Dropout1d(dropout)
        self.loop = Linear(in_dim, out_dim or in_dim, bias=bias)
        self.intr = Linear(in_dim, out_dim or in_dim, bias=bias)
        self.actv = LeakyReLU()

    
    def forward(self, 
        x_src: Tensor, 
        x_dst: Tensor, 
        edge_index: Tensor,
        edge_weight: Tensor = None
    ) -> Tensor:
        # Applies the node dropout.
        x_src = self.drop(x_src)  # node dropout
        x_dst = self.drop(x_dst)  # node dropout
        # Computes the messages to pass.
        i_src, i_dst = edge_index
        z_src = self.loop(x_src)[i_src]
        z_int = self.intr(x_src[i_src] * x_dst[i_dst])
        z_msg = edge_weight * (z_src + z_int)
        z_msg = self.drop(z_msg)  # message dropout
        z_sum = pyg.utils.scatter(z_msg, i_dst, 
            dim_size=x_dst.size(0)
        )
        # Computes the self-messages.
        z_dst = self.loop(x_dst)
        z_dst = self.drop(z_dst)  # message dropout
        # Computes the new embeddings and returns them.
        x_new = self.actv(z_dst + z_sum)
        return x_new
    

class EmbeddingPropagationLayer(ModuleDict):

    def __init__(self, 
            edge_types: list[EdgeType], 
            in_dim: int, 
            out_dim: int = None, 
            **kwargs
        ) -> None:
        super().__init__({
            edge_label: EmbeddingPropagationCell(
                in_dim=in_dim, 
                out_dim=out_dim, 
                **kwargs
            )
                for (_, edge_label, _)
                in edge_types
        })


    def forward(self, 
        x: dict[NodeType, Tensor], 
        edge_index: dict[EdgeType, Tensor], 
        edge_weight: dict[EdgeType, Tensor]
    ) -> dict[NodeType, Tensor]:
        return {
            dst_node: self[edge_label](
                x_src=x[src_node], 
                x_dst=x[dst_node], 
                edge_index=edge_index[
                    src_node, edge_label, dst_node
                ],
                edge_weight=edge_weight[
                    src_node, edge_label, dst_node
                ]
            )
                for src_node, edge_label, dst_node 
                in edge_index
        }
    

class EmbeddingPropagation(ModuleList):
    
    def __init__(self, 
        embedding_dims: list[int], 
        edge_types: list[EdgeType],
        **kwargs
    ) -> None:
        super().__init__([
            EmbeddingPropagationLayer(
                edge_types=edge_types,
                in_dim=in_dim,
                out_dim=out_dim,
                **kwargs
            ) 
                for in_dim, out_dim 
                in it.pairwise(embedding_dims)
        ])


    def forward(self, 
        x: dict[NodeType, Tensor], 
        edge_index: dict[EdgeType, Tensor]
    ) -> dict[NodeType, Tensor]:
        # Constructs the edge weights.
        edge_weight = {
            edge_type: (
                pyg.utils.degree(i_src)[i_src]
                *
                pyg.utils.degree(i_dst)[i_dst]
            ).pow(-.5).unsqueeze(-1)
            for edge_type, (i_src, i_dst)
            in edge_index.items()
        }
        # Applies the embedding propagation layers.
        xs = [x]
        for module in self:
            x = module(x, edge_index, edge_weight)
            xs.append(x)
        # Concatenates all layers' embeddings and returns them.
        x_new = {
            node_type: torch.cat([
                    x_[node_type] for x_ in xs
            ], dim=-1) for node_type in x.keys()
        }
        return x_new
    

class EdgeRegressor(Linear): 

    def __init__(self, 
        in_features: int,
        out_features: int = 1,
        *,
        bias: bool = True,
        **kwargs
    ) -> None:
        super().__init__(in_features, out_features, bias=bias, **kwargs)


    def forward(self, x_src: Tensor, x_dst: Tensor) -> Tensor:
        return super().forward(x_src * x_dst)


# class EdgeRegressor(Module): 

#     def forward(self, x_src: Tensor, x_dst: Tensor) -> Tensor:
#         return (x_src * x_dst).sum(-1)
    
    
class NGCF(Module): 

    def __init__(self, 
        num_embeddings: dict[NodeType, int], 
        embedding_dims: list[int], 
        *,
        edge_types: list[EdgeType],
        src_node: NodeType,
        dst_node: NodeType,
        **kwargs
    ) -> None:
        super().__init__()
        self.embedding = NodeEmbedding(
            num_embeddings=num_embeddings, 
            embedding_dim=embedding_dims[0]
        )
        self.propagation = EmbeddingPropagation(
            embedding_dims=embedding_dims, 
            edge_types=edge_types,
            **kwargs
        )
        self.regressor = EdgeRegressor(
            in_features=sum(embedding_dims)
        )
        # self.regressor = EdgeRegressor()
        self.src_node = src_node
        self.dst_node = dst_node
    

    def forward(self, 
            n_id: dict[NodeType, Tensor],
            edge_index: dict[EdgeType, Tensor],
            edge_label_index: Tensor
        ) -> Tensor:
        # Generates and propagates the node embeddings.
        x = self.embedding(n_id)
        x = self.propagation(x, edge_index)
        # Computes the rank predictions and returns them.
        y = self.regressor(
            x[self.src_node][edge_label_index[0]], 
            x[self.dst_node][edge_label_index[1]]
        )
        # Returns the modified dataset.
        return y

## Utility

In [11]:
def dispatch_epoch(
    module: type[Module], 
    loader: type[DataLoader],
    criterion: type[Loss],
    *,
    batch_handler: callable,
    optimizer: type[Optimizer] = None, 
    device: torch.device = None,
    verbose: bool | int = None
) -> list[float]:

    # Ensures a device is specified.
    if device is None:
        device = torch.device(
            'cuda' if torch.cuda.is_available() else 'cpu'
        )
    # Sends the model to the specified device.
    module = module.to(device)

    # Initializes the data structures for the verbose output.
    if verbose:
        cum_loss = 0

    # Initializes the loss trace buffer.
    loss_trace = []
    # Iterates over the data-loader's batches.
    for batch_id, batch in enumerate(loader, start=1):

        # Resets the gradient if an optimizer exists.
        if optimizer:
            optimizer.zero_grad()

        # Constructs the design and target data structures.
        loss = batch_handler(module, batch, criterion, 
            device=device
        )

        # Updates the module, if an optimizer has been given
        if optimizer:
            loss.backward()
            optimizer.step()

        # Logs the computed loss.
        loss = loss.item()
        loss_trace.append(loss)
        
        # Handles the verbose messaging, if verbose is set.
        if verbose:
            # Updates the cumulative loss sum.
            cum_loss += loss
            # Outputs the current statistics, if the correct index is present.
            if batch_id % verbose == 0:
                # Updates the tracked statistics.
                avg_loss = cum_loss / batch_id
                # Outputs the tracked staistics.
                print(
                    f'Batch({batch_id}): '
                    f'CumAvgLoss({avg_loss:.4f}) & '
                    f'BatchLoss({loss:.4f})',
                    end='\r',
                    flush=True
                )
    
    # Returns the traced loss.
    return loss_trace

In [12]:
def triplet_handler(
        data: HeteroData, 
        x: dict[NodeType, Tensor],
        *,
        src_node: NodeType,
        dst_node: NodeType
    ) -> tuple[Tensor, Tensor, Tensor]:
    
    # Extracts the source and destination nodes' indices.
    i_src = data[src_node].src_index
    i_pos = data[dst_node].dst_pos_index
    i_neg = data[dst_node].dst_neg_index

    # Constructs the node types' feature matrices.
    x_src = x[src_node][i_src]
    x_pos = x[dst_node][i_pos]
    x_neg = x[dst_node][i_neg]

    # Returns the source, positive and negative features.
    return x_src, x_pos, x_neg

In [13]:
def mf_handler(
    module: type[EdgePredictor], 
    data: HeteroData, 
    criterion: type[Loss],
    *,
    device: torch.device
) -> Tensor:
    
    # Sends the items to the correct device.
    data = data.to(device)
    # Computes the embedding propegation.
    x = module.embedding(data.n_id_dict)

    # Extracts the node target features.
    x_src, x_pos, x_neg = triplet_handler(data, x,
        src_node=module.trg_src_node,
        dst_node=module.trg_dst_node
    )

    # Computes the link scores.
    y_pos = module.regressor(x_src, x_pos)
    y_neg = module.regressor(x_src, x_neg)
    # Computes the loss.
    loss = criterion(y_pos, y_neg)

    # Returns the loss.
    return loss

## Training

In [14]:
class BPRLoss(Loss):

    def forward(self, y_pos: Tensor, y_neg: Tensor) -> Tensor:
        return - f.logsigmoid(y_pos - y_neg).mean()

In [15]:
batch = next(iter(trn_loader))
model = MF(
    num_embeddings=data.num_nodes_dict,
    embedding_dim=16,
    trg_edge=TRG_EDGE
)
display(model)

# Instanciates the learning algorithm and loss criterion.
optimizer = Adam(model.parameters(), 
    lr=1e-3,
    weight_decay=1e-6
)
criterion = BPRLoss()

MF(
  (embedding): NodeEmbedding(
    (user): Embedding(1540618, 16)
    (item): Embedding(71982, 16)
  )
  (regressor): InnerProduct()
)

In [27]:
# Clears the GPU cache.
torch.cuda.empty_cache()

trn_trace = []
vld_trace = []
for epoch_id in range(1, 16+1):
    print(f'Epoch({epoch_id})')

    # Dispatches one epoch of training.
    trn_loss = dispatch_epoch(
        module=model,
        loader=trn_loader,
        criterion=criterion,
        optimizer=optimizer,
        batch_handler=mf_handler,
        verbose=32
    )
    trn_trace.append(trn_loss)
    print()

    # Dispatches one epoch of validation.
    with torch.no_grad():
        vld_loss = dispatch_epoch(
            module=model,
            loader=vld_loader,
            criterion=criterion,
            batch_handler=mf_handler,
            verbose=32
        )
    vld_trace.append(vld_loss)
    print()

Epoch(1)
Batch(992): CumAvgLoss(1.7614) & BatchLoss(1.3771)
Batch(96): CumAvgLoss(1.4220) & BatchLoss(1.3724)
Epoch(2)
Batch(992): CumAvgLoss(1.4850) & BatchLoss(1.4232)
Batch(96): CumAvgLoss(1.1411) & BatchLoss(1.1724)
Epoch(3)
Batch(992): CumAvgLoss(1.2985) & BatchLoss(1.2532)
Batch(96): CumAvgLoss(1.0223) & BatchLoss(1.0516)
Epoch(4)
Batch(992): CumAvgLoss(1.1488) & BatchLoss(1.1160)
Batch(96): CumAvgLoss(0.9575) & BatchLoss(0.9664)
Epoch(5)
Batch(992): CumAvgLoss(1.0259) & BatchLoss(1.0278)
Batch(96): CumAvgLoss(0.9126) & BatchLoss(0.9366)
Epoch(6)
Batch(992): CumAvgLoss(0.9233) & BatchLoss(0.9253)
Batch(96): CumAvgLoss(0.8710) & BatchLoss(0.8660)
Epoch(7)
Batch(992): CumAvgLoss(0.8391) & BatchLoss(0.8313)
Batch(96): CumAvgLoss(0.8397) & BatchLoss(0.8551)
Epoch(8)
Batch(992): CumAvgLoss(0.7676) & BatchLoss(0.7471)
Batch(96): CumAvgLoss(0.8104) & BatchLoss(0.7979)
Epoch(9)
Batch(992): CumAvgLoss(0.7087) & BatchLoss(0.7072)
Batch(96): CumAvgLoss(0.7901) & BatchLoss(0.7966)
Epoch(10)


In [123]:
src_nodes, _ = trn_data['rated'].edge_index
src_nodes.unique()

tensor([      1,       2,       4,  ..., 1540615, 1540616, 1540617])

In [90]:
model = model.cpu()
batch = next(iter(vld_loader))
with torch.no_grad():
    x = model.embedding(batch.n_id_dict)
x_src, x_pos, x_neg = triplet_handler(batch, x, 
    src_node=model.trg_src_node, 
    dst_node=model.trg_dst_node
)
with torch.no_grad():
    y_pos = model.regressor(x_src, x_pos)
    y_neg = model.regressor(x_src, x_neg)

In [103]:
y = torch.cat([y_pos, y_neg])
labels = torch.cat([
    torch.ones_like(y_pos),
    torch.zeros_like(y_neg)
])
indices = y.sort(descending=True).indices
labels = labels[indices]

In [104]:
# Average Precision @ K
cum_ap = labels[:at_k].cumsum(0) / torch.arange(1, at_k + 1)
ap = (labels[:at_k] * cum_ap).sum() / labels[:at_k].sum()

In [105]:
ap

tensor(0.8089)

In [106]:
recall = labels[indices][:64].sum().item() / y_pos.size(0)
print(f'R@64({recall:.2%})')

R@64(1.95%)


In [None]:
# Namespace.
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Constructor.
from pandas import DataFrame


# Setting the plotting style.
sns.set()

# Typing.
TraceType = list[list[float]]

In [None]:
def parse_trace(trace: TraceType) -> DataFrame:
    return DataFrame(trace) \
        .rename_axis(index='epoch', columns='batch') \
        .transpose() \
        .stack() \
        .rename('loss') \
        .reset_index()


def make_frame(traces: list[TraceType], *, labels: list[str]) -> DataFrame:
    return pd.concat([
        parse_trace(trace).assign(trace=label) 
        for label, trace in zip(labels, traces)
    ])

In [None]:
# Reformats the loss traces.
trace = make_frame(
    traces=[trn_trace, vld_trace], 
    labels=['Train', 'Valid.']
)

# Plots the loss trace.
fig, ax = plt.subplots(figsize=[7, 3], dpi=192)
ax = sns.lineplot(
    data=trace, 
    x='epoch', 
    y='loss', 
    hue='trace', 
    style='trace', 
    errorbar='sd', 
    ax=ax
)
ax.set_xlabel('Epoch')
ax.set_ylabel('Average Loss')
ax.legend(title='Dataset')
plt.show(fig)

In [None]:
def ngcf_handler(
    module: type[Module], 
    data: HeteroData, 
    criterion: type[Loss],
    *,
    device: torch.device
) -> Tensor:
    
    # Sends the items to the correct device.
    data = data.to(device)

    # Computes the embedding propegation.
    x = module.embedding(data.n_id_dict)
    x = module.propagation(x, data.edge_index_dict)
    # Extracts the source nodes' features.
    i_src = data['user'].src_index
    x_src = x['user'][i_src]
    # Constructs the positive and negative feature matrices.
    i_pos = data['item'].dst_pos_index
    i_neg = data['item'].dst_neg_index
    x_pos = x['item'][i_pos]
    x_neg = x['item'][i_neg]
    # Computes the link scores.
    y_pos = module.regressor(x_src, x_pos)
    y_neg = module.regressor(x_src, x_neg)

    # Computes the loss.
    loss = criterion(y_pos, y_neg)

    # Returns the loss.
    return loss

In [None]:
# Initializes the neural graph model.
ngcf = NGCF(
    num_embeddings=data.num_nodes_dict,
    embedding_dims=[16] * 5,
    edge_types=data.edge_types,
    src_node='user',
    dst_node='item',
    dropout=.1,
)
display(ngcf)

# Instanciates the learning algorithm and loss criterion.
optimizer = Adam(ngcf.parameters(), 
    lr=1e-3,
    weight_decay=1e-5
)
criterion = BPRLoss()

In [None]:
# Clears the GPU cache.
torch.cuda.empty_cache()

trn_trace = []
vld_trace = []
for epoch_id in range(1, 16+1):
    print(f'Epoch({epoch_id})')

    # Dispatches one epoch of training.
    trn_loss = dispatch_epoch(
        module=ngcf,
        loader=trn_loader,
        criterion=criterion,
        optimizer=optimizer,
        batch_handler=ngcf_handler,
        verbose=16
    )
    trn_trace.append(trn_loss)
    print()

    # Dispatches one epoch of validation.
    with torch.no_grad():
        vld_loss = dispatch_epoch(
            module=ngcf,
            loader=vld_loader,
            criterion=criterion,
            batch_handler=ngcf_handler,
            verbose=16
        )
    vld_trace.append(vld_loss)
    print()