<a href="https://colab.research.google.com/github/zmgy107/DGL-Learning-Notes/blob/main/Chapter_2_Message_Passing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
%pip install  dgl -f https://data.dgl.ai/wheels/cu117/repo.html
%pip install  dglgo -f https://data.dgl.ai/wheels-test/repo.html

import dgl

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in links: https://data.dgl.ai/wheels/cu117/repo.html
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in links: https://data.dgl.ai/wheels-test/repo.html
Collecting dglgo
  Using cached dglgo-0.0.2-py3-none-any.whl (63 kB)
Collecting numpydoc>=1.1.0
  Downloading numpydoc-1.5.0-py3-none-any.whl (52 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.4/52.4 KB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
Collecting ogb>=1.3.3
  Downloading ogb-1.3.5-py3-none-any.whl (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 KB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ruamel.yaml>=0.17.20
  Downloading ruamel.yaml-0.17.21-py3-none-any.whl (109 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m109.5/109.5 KB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m


# Message Passing Paradigm

Let $x_v\in R^{d_1}$ be the function for node v ,and $w_e \in R^{d_2}$ be the feature for edge (u,v). The __massage passing paradigm__ defines the following node-wise and edge-wise computation at step t+1:

$ Edge-wise:m_e^{(t+1)}=\phi(x_v^{(t)},x_u^{(t)},w_e^{(t)}),(u,v,e)\inϵ.$

$ Node-wise:x_v^{(t+1)}=\psi(x_v^{(t)},\rho(\{m_e^{(t+1)}:(u,v,e)\inϵ\}))$.

In the above equations,$\phi$ is a __message function__ defined on each edge to generate a message by combining the edge feature with the features of its incident nodes;$\varphi$ is an __update function__ defined on each node to update the node feature by aggregating its incoming messages using the __reduce function $\rho$__

# 2.1 Built-in Functions and Message Passing APIs

In DGL,__message function__ takes a single argument edges,which is an EdgeBatch instance.During message passing,DGL generates it internally to rpresent a batch of edge.It has three members __src,dst and data__ to access features of source nodes,destination nodes,and edges,repectively.

__reduce function__ takes a single argument nodes,which is a NodeBatch instance.During message passing,DGL generates it internally to represent a batch of nodes.It has member mailbox to access the message received for the nodes in the batch.Some of the most common reduce operations include sum,max,min,etc.

__update function__ takes a single argument nodes as described above.This function __operates on the aggregation result from reduce function__,typically combining it with a node's original feature at the last step and saving the result as a node feature.

DGL has implemented commomly used message functions and reduce functions as __built-in__ in the namespace dgl.function. In general,DGL suggests using built-in functions __whenever possible__ since they are heavily optimized and automatically handle dimension broadcasting.

Also can implement user-defined message/reduce function(UDF) when your message passing functions cannot be implemented with built-ins.

Built-in message functions can be unary or binary. DGL supports copy for unary. For binary funcs, DGL supports __add, sub, mul, div, dot_. The naming convention for message built-in funcs is that u represents src nodes, v represents dst nodes, and e represents edges. The parameters for those functions are __strings__ indicating the input and output field names for the corresponding nodes and edges. The list of supported built-in functions can be found in DGL Built-in Function.

For example, to add the hu feature from src nodes and hv feature from dst nodes then save the result on the edge at he field, one can use built-in function dgl.function.u_add_v('hu', 'hv', 'he'). This is equivalent to the Message UDF:

In [12]:
def message_func(edges):
  return {'he':edges.scr['hu']+edges.dst['hv']}

Built-in reduce functions support operations sum, max, min, and mean. Reduce functions usually have two parameters, one for field name in mailbox, one for field name in node features, both are strings. For example, dgl.function.sum('m', 'h') is equivalent to the Reduce UDF that sums up the message m:

In [13]:
import torch
def reduce_func(nodes):
  return {'h':torch.sum(nodes.mailbox['m'],dim=1)}

It is also possible to invoke only edge-wise computation by apply_edges() without invoking message passing. apply_edges() takes a message function for parameter and by default updates the features of all edges. For example:

In [14]:
import dgl.function as fn
#graph.apply_edges(fn.u_add_v('el','er','e'))

For message passing, update_all() is a high-level API that __merges message generation, message aggregation and node update__ in a single call, which leaves room for optimization as a whole.

The parameters for update_all() are a message function, a reduce function and an update function. One can call update function outside of update_all and not specify it in invoking update_all(). DGL recommends this approach since the update function can usually be written as pure tensor operations to make the code concise. For example：

In [15]:
def update_all_example(graph):
  # store the result in graph.ndata['ft']
  graph.update_all(fn.u_mul_v('ft','a','m'),fn.sum('m','ft'))
  #Call update function outside of update_all
  final_ft=graph.ndata['ft']*2
  return final_ft

This call will generate the messages m by multiply src node features ft and edge features a, sum up the messages m to update node features ft, and finally multiply ft by 2 to get the result final_ft. After the call, DGL will clean the intermediate messages m. The math formula for the above function is:

$final\_ft_i=2*\sum_{j\in N(i)}(fy_j*a_{ij})$

DGL’s built-in functions support floating point data types, i.e. the feature must be half (float16) /float/double tensors. float16 data type support is disabled by default as it has a minimum GPU compute capacity requirement of sm_53 (Pascal, Volta, Turing and Ampere architectures).

# 2.2 Writing Efficient Message Passing Code


DGL optimizes memory consumption and computing speed for message passing. A common practise to leverage those optimizations is to construct one’s own message passing functionality as a combination of update_all() calls with built-in functions as parameters.

Besides that, considering that the number of edges is much larger than the number of nodes for some graphs, avoiding unnecessary memory copy from nodes to edges is beneficial. For some cases like GATConv, where it is necessary to save message on the edges, one needs to call apply_edges() with built-in functions. Sometimes the messages on the edges can be high dimensional, which is memory consuming. DGL recommends keeping the dimension of edge features as low as possible.

Here’s an example on how to achieve this by splitting operations on the edges to nodes. The approach does the following: concatenate the src feature and dst feature, then apply a linear layer, i.e. W×(u||v)
. The src and dst feature dimension is high, while the linear layer output dimension is low. A straight forward implementation would be like:

In [16]:
# example
# import torch
# import torch.nn as nn

# linear = nn.Parameter(torch.FloatTensor(size=(node_feat_dim * 2, out_dim)))
# def concat_message_function(edges):
#     return {'cat_feat': torch.cat([edges.src['feat'], edges.dst['feat']], dim=1)}
# g.apply_edges(concat_message_function)
# g.edata['out'] = g.edata['cat_feat'] @ linear

In [17]:
# realization
import torch
import torch.nn as nn

u,v=torch.tensor([0,1,2,3]),torch.tensor([2,0,0,1])
weights=torch.tensor([0.1,0.6,0.9,0.7]) #weight of each edge
g=dgl.graph((u,v))
g.edata['w']=weights
g.ndata['x']=torch.ones(g.num_nodes(),3)

# linear=nn.Parameter(torch.FloatTensor(size=(node_feat_dim*2,out_dim)))
linear=nn.Parameter(torch.FloatTensor(size=(3*2,1)))
def concat_message_function(edges):
  return{'cat_feat':torch.cat([edges.src['x'],edges.dst['x']],dim=1)}
g.apply_edges(concat_message_function)
g.edata['out']=g.edata['cat_feat']@ linear
print(g)
print(g.ndata)
print(g.edata)

Graph(num_nodes=4, num_edges=4,
      ndata_schemes={'x': Scheme(shape=(3,), dtype=torch.float32)}
      edata_schemes={'w': Scheme(shape=(), dtype=torch.float32), 'cat_feat': Scheme(shape=(6,), dtype=torch.float32), 'out': Scheme(shape=(1,), dtype=torch.float32)})
{'x': tensor([[1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.]])}
{'w': tensor([0.1000, 0.6000, 0.9000, 0.7000]), 'cat_feat': tensor([[1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1., 1.]]), 'out': tensor([[5.1014e-31],
        [5.1014e-31],
        [5.1014e-31],
        [5.1014e-31]], grad_fn=<MmBackward0>)}


The suggested implementation splits the linear operation into two, one applies on src feature, the other applies on dst feature. It then adds the output of the linear operations on the edges at the final stage,i.e. performing $W_l\times u+W_r\times v.$This is beacuse $W\times (u||v)=W_l\times u+W_r\times v$,where $W_l$ and $W_r$ are the left and the right half of the matrix W,repectively:

In [18]:
# example
#import dgl.functions as fn

#linear_src=nn.Parameter(torch.FloatTensor(size=(node_feat_dim,out_dim)))
#linear_dst=nn.Parameter(torch.FloatTensor(size=(node_feat_dim,out_dim)))
#out_src=g.ndata['feat']@linear_src
#out_dst=g.ndata['feat']@linear_dst
#g.dstdata.update({'out_dst':out_dst})
#g.apply_edges(fn.u_add_v('out_src','out_dst','out'))

In [19]:
import dgl.function as fn

u,v=torch.tensor([0,1,2,3]),torch.tensor([2,0,0,1])
weights1=torch.tensor([0.1,0.6,0.9,0.7]) #weight of each edge
gg=dgl.graph((u,v))
gg.edata['w']=weights
gg.ndata['x']=torch.ones(gg.num_nodes(),3)

linear_src=nn.Parameter(torch.FloatTensor(size=(3,1)))
linear_dst=nn.Parameter(torch.FloatTensor(size=(3,1)))
out_src=gg.ndata['x']@ linear_src
out_dst=gg.ndata['x']@ linear_dst
gg.srcdata.update({'out_src':out_src})
gg.dstdata.update({'out_dst':out_dst})
gg.apply_edges(fn.u_add_v('out_src','out_dst','out'))
print(gg)
print(gg.ndata)
print(gg.edata)

Graph(num_nodes=4, num_edges=4,
      ndata_schemes={'x': Scheme(shape=(3,), dtype=torch.float32), 'out_src': Scheme(shape=(1,), dtype=torch.float32), 'out_dst': Scheme(shape=(1,), dtype=torch.float32)}
      edata_schemes={'w': Scheme(shape=(), dtype=torch.float32), 'out': Scheme(shape=(1,), dtype=torch.float32)})
{'x': tensor([[1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.]]), 'out_src': tensor([[0.],
        [0.],
        [0.],
        [0.]], grad_fn=<MmBackward0>), 'out_dst': tensor([[5.1014e-31],
        [5.1014e-31],
        [5.1014e-31],
        [5.1014e-31]], grad_fn=<MmBackward0>)}
{'w': tensor([0.1000, 0.6000, 0.9000, 0.7000]), 'out': tensor([[5.1014e-31],
        [5.1014e-31],
        [5.1014e-31],
        [5.1014e-31]], grad_fn=<GSDDMMBackward>)}


The above two implementations are mathematically equivalent. The latter one is more efficient because it does not need to save feat_src and feat_dst on edges, which is not memory-efficient. Plus, addition could be optimized with DGL’s built-in function u_add_v(), which further speeds up computation and saves memory footprint.

# 2.3 Apply Message Passing On Part Of The Graph

If one only wants to update part of the nodes in the graph, the practice is to create a subgraph by providing the IDs for the nodes to include in the update, then call update_all() on the subgraph. For example:

In [20]:
#nid=[0,2,3,6,7,9]
#sg=g.subgraph(nid)
#sg.updata_all(message_func,reduce_func,apply_node_func)

This is a common usage in mini-batch training.Check Chapter 6: Stochastic Training on Large Graphs for more detailed usages.



# 2.4 Message Passing on Heterogeneous Graph

Heterogeneous graphs (1.5 Heterogeneous Graphs), or heterographs for short, are graphs that contain different types of nodes and edges. The different types of nodes and edges tend to have different types of attributes that are designed to capture the characteristics of each node and edge type. Within the context of graph neural networks, depending on their complexity, certain node and edge types might need to be modeled with representations that have a __different number of dimensions.__

The message passing on heterographs can be split into two parts:

*  Message computation and aggregation for each relation r.

*  Reduction that merges the aggregation results from all relations for each node type.

DGL’s interface to call message passing on heterographs is multi_update_all(). multi_update_all() takes a dictionary containing the parameters for update_all() within each relation using relation as the key, and a string representing the cross type reducer. The reducer can be one of sum, min, max, mean, stack. Here’s an example:


不太懂wuwuwu......

In [24]:
'''import dgl.function as fn
import torch as th

graph_data={
    ('drug','interacts','drug'):(th.tensor([0,1]),th.tensor([1,2])), # 0-1,1-2
    ('drug','interacts','gene'):(th.tensor([0,1]),th.tensor([2,3])), # 0-2,1-3
    ('drug','treats','disease'):(th.tensor([1]),th.tensor([2])) # 1-2
}
G=dgl.heterograph(graph_data)

for c_etype in G.canonical_etypes:
  srctype,etype,dsttype=c_etype
  wh=self.weight[etype](feat_dict[srctype])
  # save it in graph for message passing
  G.nodes[srctype].data['wh_%s'% etype]=wh
  # specify per_relation message passing functions:(message_func,reduce_func).
  # note that the results are saved to the same destination feature 'h',
  # which hints the type wise reducer for aggregation.
  funcs[etype]=(fn.copy_u('wh_%s' % etype,'m'),fn.mean('m','h'))
# Trigger message passing of multiple types
G.multi_update_all(funcs,'sum')
# return the updated node feature dictionary
return {ntype:G.nodes[ntype].data['h'] for ntype in G.ntypes}'''

"import dgl.function as fn\nimport torch as th\n\ngraph_data={\n    ('drug','interacts','drug'):(th.tensor([0,1]),th.tensor([1,2])), # 0-1,1-2\n    ('drug','interacts','gene'):(th.tensor([0,1]),th.tensor([2,3])), # 0-2,1-3\n    ('drug','treats','disease'):(th.tensor([1]),th.tensor([2])) # 1-2\n}\nG=dgl.heterograph(graph_data)\n\nfor c_etype in G.canonical_etypes:\n  srctype,etype,dsttype=c_etype\n  wh=self.weight[etype](feat_dict[srctype])\n  # save it in graph for message passing\n  G.nodes[srctype].data['wh_%s'% etype]=wh\n  # specify per_relation message passing functions:(message_func,reduce_func).\n  # note that the results are saved to the same destination feature 'h',\n  # which hints the type wise reducer for aggregation.\n  funcs[etype]=(fn.copy_u('wh_%s' % etype,'m'),fn.mean('m','h'))\n# Trigger message passing of multiple types\nG.multi_update_all(funcs,'sum')\n# return the updated node feature dictionary\nreturn {ntype:G.nodes[ntype].data['h'] for ntype in G.ntypes}"