# Recommender Systems with DGL

## Introduction

Graph Neural Networks (GNN), as a methodology of learning representations on graphs, has gained much attention recently.  Various models such as Graph Convolutional Networks, GraphSAGE, etc. are proposed to obtain representations of whole graphs, or nodes on a single graph.

A primary goal of Collaborative Filtering (CF) is to automatically make predictions about a user's interest, e.g. whether/how a user would interact with a set of items, given the interaction history of the user herself, as well as the histories of other users.  The user-item interaction can also be viewed as a bipartite graph, where users and items form two sets of nodes, and edges connecting them stands for interactions.  The problem can then be formulated as a *link-prediction* problem, where we try to predict whether an edge (of a given type) exists between two nodes.

Based on this intuition, the academia developed multiple new models for CF, including but not limited to:

* Geometric Learning Approaches
  * [Geometric Matrix Completion](https://papers.nips.cc/paper/5938-collaborative-filtering-with-graph-information-consistency-and-scalable-methods.pdf)
  * [Recurrent Multi-graph CNN](https://arxiv.org/pdf/1704.06803.pdf)
* Graph-convolutional Approaches
  * Models such as [R-GCN](https://arxiv.org/pdf/1703.06103.pdf) or [GraphSAGE](https://github.com/stellargraph/stellargraph/tree/develop/demos/link-prediction/hinsage) also apply.
  * [Graph Convolutional Matrix Completion](https://arxiv.org/abs/1706.02263)
  * [PinSage](https://arxiv.org/pdf/1806.01973.pdf)
  
In this hands-on tutorial, we will demonstrate how to write GraphSAGE in DGL + MXNet, and how to apply it in a recommender system setting.

## Dependencies

* Latest DGL release: `conda install -c dglteam dgl`
* `pandas`
* `stanfordnlp`
* `mxnet`
* `tqdm` for displaying the progress bar.

## Loading data

In this tutorial, we focus on rating prediction on MovieLens-1M dataset.  The data comes from [MovieLens](http://files.grouplens.org/datasets/movielens/ml-1m.zip) and is shipped with the notebook already.

After loading and train-validation-test-splitting the dataset, we process the movie title into (padded) word-ID sequences, and other features into categorical variables (i.e. integers).  We then store them as node features on the graph.

Since user features and item features are different, we pad both types of features with zeros.

All of the above is encapsulated in `movielens.MovieLens` class for clarity of this notebook.

In [None]:
import mxnet as mx
from mxnet import ndarray as nd, autograd, gluon
from mxnet.gluon import nn
import dgl
import dgl.function as FN
import numpy as np

In [None]:
import movielens_mx
import stanfordnlp

# IMPORTANT!!!
# If you don't have stanfordnlp installed and the English models downloaded, please uncomment this statement
#stanfordnlp.download('en', force=True)

ml = movielens_mx.MovieLens('ml-100k')

## Model

We can now write a GraphSAGE layer.  In GraphSAGE, the node representation is updated with the representation in the previous layer as well as an aggregation (often mean) of "messages" sent from all neighboring nodes.

### Algorithm

The algorithm of a single GraphSAGE layer goes as follows for each node $v$:

1. $h_{\mathcal{N}(v)} \gets \mathtt{Average}_{u \in \mathcal{N}(v)} h_{u}$
2. $h_{v} \gets \sigma\left(W \cdot \mathtt{CONCAT}(h_v, h_{\mathcal{N}(v)})\right)$
3. $h_{v} \gets h_{v} / \lVert h_{v} \rVert_2$

where

* $\mathtt{Average}$ can be replaced by any kind of aggregation including `sum`, `max`, or even an LSTM.
* $\sigma$ is any non-linearity function (e.g. `LeakyReLU`)

We simply repeat the computation above for multiple GraphSAGE layers.

### DGL Message Passing

DGL adopts the message-passing paradigm, or scatter-apply-gather paradigm, for feature computation on a graph.  It decomposes the computation into three stages:

1. *Message computation*: each edge is computed a message according to features on the edge itself, as well as the features on its source and destination node.  Often times, the message computation simply involves copying the representation of the source node.
2. *Message aggregation*: each node then "receives" the messages sent from its neighbors, and call a function which reduces these messages into a single representation independent of the number of neighbors.  Averaging and summing are two of the most common message aggregation functions.
3. *Node feature update*: with an aggregated representation from the neighbors, a node then updates its own representation using the aggregation.

With the three stages in mind, we can easily figure out how to map the GraphSAGE layer computation into the message-passing paradigm:

1. $h_{\mathcal{N}(v)} \gets \underbrace{\mathtt{Average}_{u \in \mathcal{N}(v)} \underbrace{h_{u}}_{\text{Message computation (copy from source)}}}_{\text{Message aggregation}}$
2. $h_{v} \gets \underbrace{\sigma\left(W \cdot \mathtt{CONCAT}(h_v, h_{\mathcal{N}(v)})\right)}_{\text{Node feature update}}$
3. $h_{v} \gets \underbrace{h_{v} / \lVert h_{v} \rVert_2}_{\text{Node feature update}}$

While DGL does not provide the $\mathtt{Average}$ aggregation function yet (as it's a future work item), it does provide the $\mathtt{Sum}$ aggregation.  So we can modify the algorithm above to the following that is readily to be implemented in DGL:

1. $d_{\mathcal{N}(v)} \gets \underbrace{\mathtt{Sum}_{u \in \mathcal{N}(v)} \underbrace{1}_{\text{Message computation (copy from source)}}}_{\text{Message aggregation}}$
2. $h_{\mathcal{N}(v)} \gets \underbrace{\mathtt{Sum}_{u \in \mathcal{N}(v)} \underbrace{h_{u}}_{\text{Message computation (copy from source)}}}_{\text{Message aggregation}}$
3. $h_{v} \gets \underbrace{\sigma\left(W \cdot \mathtt{CONCAT}(h_v, h_{\mathcal{N}(v)} / d_{\mathcal{N}(v)})\right)}_{\text{Node feature update}}$
4. $h_{v} \gets \underbrace{h_{v} / \lVert h_{v} \rVert_2}_{\text{Node feature update}}$

## Sampling

Ideally, we wish to execute a full update of the node embeddings with the GraphSAGE layer.  However, when the graph scales up, the full update soon becomes impractical, because the node embeddings couldn't fit in the GPU memory.

A natural solution would be partitioning the nodes and computing the embeddings one partition (minibatch) at a time.  The nodes at one convolution layer then only depends on their neighbors, rather than all the nodes in the graph, hence reducing the computational cost.  However, if we have multiple layers, and some of the nodes have a lot of neighbors (which is often the case since the degree distribution of many real-world graphs follow [power-law](https://en.wikipedia.org/wiki/Scale-free_network)), then the computation may still eventually depend on every node in the graph.

*Neighbor sampling* is an answer to further reduce the cost of computing node embeddings.  When aggregating messages, instead of collecting from all neighboring nodes, we only collect from some of the randomly-sampled (for instance, uniform sampling at most K neighbors without replacement) neighbors.

DGL provides the `NodeFlow` object that describes the computation dependency of nodes in a graph convolutional network, as well as various samplers that constructs such `NodeFlow`s as graphs.  From a programmer's perspective, training with minibatch and neighbor sampling reduces to propagating the messages in `NodeFlow` as follows.

In [None]:
def mix_embeddings(ndata, gcn):
    """Adds external (categorical and numeric) features into node representation G.ndata['h']"""
    extra_repr = []
    for key, value in ndata.items():
        if (value.dtype == np.int64) and key in gcn.emb:
            result = getattr(gcn, 'emb_' + key)(value)
            if result.ndim == 3:    # bag of words: the result would be a (n_nodes x seq_len x feature_size) tensor
                mask = value != 0
                result = (result * mask.expand_dims(2).astype(float)).sum(1) / mask.sum(1)
            extra_repr.append(result)
        elif (value.dtype == np.float32) and key in gcn.proj:
            result = getattr(gcn, 'proj_' + key)(value)
            extra_repr.append(result)
    ndata['h'] = ndata['h'] + nd.stack(*extra_repr, axis=0).sum(axis=0)

class GraphSageConvWithSampling(nn.Block):
    def __init__(self, feature_size):
        super(GraphSageConvWithSampling, self).__init__()

        self.feature_size = feature_size
        self.W = nn.Dense(feature_size)
        self.leaky_relu = nn.LeakyReLU(0.1)

    def forward(self, nodes):
        h_agg = nodes.data['h_agg']
        h = nodes.data['h']
        w = nodes.data['w'].expand_dims(1)
        # HACK 1:
        # When computing the representation of node v on layer L, we would like to
        # include the dependency of node v itself on layer L-1.  However, we don't
        # want to aggregate node v's own "message".  So we tell the sampler to
        # always "add self loop" to include such dependency, but we subtract the
        # node's representation from aggregation later.
        h_agg = h_agg - h / nd.maximum((w - 1), 1e-6)    # HACK 1
        h_concat = nd.concat(h, h_agg, dim=1)
        h_new = self.leaky_relu(self.W(h_concat))
        return {'h': h_new / nd.maximum(h_new.norm(axis=1, keepdims=True), 1e-6)}
    
class GraphSageWithSampling(nn.Block):
    def __init__(self, feature_size, n_layers, G):
        super(GraphSageWithSampling, self).__init__()
        
        self.feature_size = feature_size
        self.n_layers = n_layers

        # Simulating ModuleList
        for i in range(n_layers):
            setattr(self, 'conv_%d' % i, GraphSageConvWithSampling(feature_size))
        self.emb = set()
        self.proj = set()

        for key, scheme in G.node_attr_schemes().items():
            if scheme.dtype == np.int64:
                n_items = G.ndata[key].max().asscalar()
                # Simulating ModuleDict
                self.emb.add(key)
                setattr(self,
                        'emb_' + key,
                        nn.Embedding(
                            n_items + 1,
                            self.feature_size))
            elif scheme.dtype == np.float32:
                # Simulating ModuleDict
                self.proj.add(key)
                seq = nn.Sequential()
                with seq.name_scope():
                    w = nn.Dense(self.feature_size)
                    seq.add(w)
                    seq.add(nn.LeakyReLU(0.1))
                setattr(self, 'proj_' + key, seq)
                
        self.G = G
        
        self.node_emb = nn.Embedding(G.number_of_nodes() + 1, feature_size)

    msg = [FN.copy_src('h', 'h'),
           FN.copy_src('one', 'one')]
    red = [FN.sum('h', 'h_agg'), FN.sum('one', 'w')]

    def forward(self, nf):
        '''
        nf: NodeFlow.
        '''
        nf.copy_from_parent(edge_embed_names=None)
        for i in range(nf.num_layers):
            nf.layers[i].data['h'] = self.node_emb(nf.layer_parent_nid(i) + 1)
            nf.layers[i].data['one'] = nd.ones(nf.layer_size(i))
            mix_embeddings(nf.layers[i].data, self)
        if self.n_layers == 0:
            return nf.layers[i].data['h']
        for i in range(self.n_layers):
            nf.block_compute(i, self.msg, self.red, getattr(self, 'conv_%d' % i))

        result = nf.layers[self.n_layers].data['h']
        assert (result != result).sum() == 0
        return result
    
class GraphSAGERecommender(nn.Block):
    def __init__(self, gcn):
        super(GraphSAGERecommender, self).__init__()
        
        with self.name_scope():
            self.gcn = gcn
            self.node_biases = self.params.get(
                'node_biases',
                init=mx.init.Zero(),
                shape=(gcn.G.number_of_nodes()+1,))
        
    def forward(self, nf, src, dst):
        h_output = self.gcn(nf)
        h_src = h_output[nodeflow.map_from_parent_nid(-1, src, True)]
        h_dst = h_output[nodeflow.map_from_parent_nid(-1, dst, True)]
        score = (h_src * h_dst).sum(1) + self.node_biases.data()[src+1] + self.node_biases.data()[dst+1]
        return score

## Training

As above, training now only involves
1. Initializing a sampler
2. Iterating over the neighbor sampler, propagating the messages, and computing losses and gradients as usual.

Meanwhile, we also evaluate the RMSE on validation and test set.

In [None]:
g = ml.g
# Find the subgraph of all "training" edges
g_train = g.edge_subgraph(g.filter_edges(lambda edges: edges.data['train']).astype('int64'), True)
g_train.copy_from_parent()
g_train.readonly()
eid_valid = g.filter_edges(lambda edges: edges.data['valid']).astype('int64')
eid_test = g.filter_edges(lambda edges: edges.data['test']).astype('int64')
src_valid, dst_valid = g.find_edges(eid_valid)
src_test, dst_test = g.find_edges(eid_test)
src, dst = g_train.all_edges()
rating = g_train.edata['rating']
rating_valid = g.edges[eid_valid].data['rating']
rating_test = g.edges[eid_test].data['rating']

model = GraphSAGERecommender(GraphSageWithSampling(100, 1, g_train))
model.collect_params().initialize(ctx=mx.cpu())
trainer = gluon.Trainer(model.collect_params(), 'adam', {'learning_rate': 0.001, 'wd': 1e-9})

batch_size = 1024
n_users = len(ml.user_ids)
n_products = len(ml.product_ids)

for epoch in range(50):
    shuffle_idx = nd.from_numpy(np.random.permutation(g_train.number_of_edges()))
    src_shuffled = src[shuffle_idx]
    dst_shuffled = dst[shuffle_idx]
    rating_shuffled = rating[shuffle_idx]
    src_batches = []
    dst_batches = []
    rating_batches = []
    for i in range(0, g_train.number_of_edges(), batch_size):
        j = min(i + batch_size, g_train.number_of_edges())
        src_batches.append(src_shuffled[shuffle_idx[i:j]])
        dst_batches.append(dst_shuffled[shuffle_idx[i:j]])
        rating_batches.append(rating_shuffled[shuffle_idx[i:j]])
    
    # HACK 2: Alternate between source batch and destination batch, so we can put exactly
    # a batch of edges' endpoints in a single NodeFlow.
    seed_nodes = nd.concat(*sum([[s, d] for s, d in zip(src_batches, dst_batches)], []), dim=0)
    
    sampler = dgl.contrib.sampling.NeighborSampler(
        g_train,               # the graph
        batch_size * 2,        # number of nodes to compute at a time, HACK 2
        5,                     # number of neighbors for each node
        1,                     # number of layers in GCN
        seed_nodes=seed_nodes, # list of seed nodes, HACK 2
        prefetch=True,         # whether to prefetch the NodeFlows
        add_self_loop=True,    # whether to add a self-loop in the NodeFlows, HACK 1
        shuffle=False,         # whether to shuffle the seed nodes.  Should be False here.
        num_workers=4,
    )

    # Training
    for s, d, r, nodeflow in zip(src_batches, dst_batches, rating_batches, sampler):
        with mx.autograd.record():
            score = model.forward(nodeflow, s, d)
            loss = ((score - r) ** 2).mean()
            loss.backward()
        trainer.step(s.shape[0])

    # Validation & Test, we precompute GraphSage output for all nodes first.
    sampler = dgl.contrib.sampling.NeighborSampler(
        g_train,
        batch_size,
        5,
        1,
        seed_nodes=nd.arange(g.number_of_nodes()).astype('int64'),
        prefetch=True,
        add_self_loop=True,
        shuffle=False,
        num_workers=4
    )

    h = []
    for nf in sampler:
        h.append(model.gcn(nf))
    h = nd.concat(*h, dim=0)

    # Compute validation RMSE
    score = nd.zeros(len(src_valid))
    for i in range(0, len(src_valid), batch_size):
        j = min(i + batch_size, len(src_valid))
        s = src_valid[i:j]
        d = dst_valid[i:j]
        node_biases = model.node_biases.data()
        score[i:j] = (h[s] * h[d]).sum(1) + node_biases[s + 1] + node_biases[d + 1]
    valid_rmse = nd.sqrt(((score - rating_valid) ** 2).mean())

    # Compute test RMSE
    score = nd.zeros(len(src_test))
    for i in range(0, len(src_test), batch_size):
        j = min(i + batch_size, len(src_test))
        s = src_test[i:j]
        d = dst_test[i:j]
        node_biases = model.node_biases.data()
        score[i:j] = (h[s] * h[d]).sum(1) + node_biases[s + 1] + node_biases[d + 1]
    test_rmse = nd.sqrt(((score - rating_test) ** 2).mean())

    print('Training loss:', loss.asscalar(), 'Validation RMSE:', valid_rmse.asscalar(), 'Test RMSE:', test_rmse.asscalar())