In [None]:
import dgl
from dgl.dataloading import NeighborSampler
from dgl.distributed import DistGraph, DistDataLoader, node_split
import torch as th

# initialize distributed contexts
dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')
# load distributed graph
g = DistGraph('graph_name', 'part_config.json')
pb = g.get_partition_book()
# get training workload, i.e., training node IDs
train_nid = node_split(g.ndata['train_mask'], pb, force_even=True)


# Create sampler
sampler = NeighborSampler(g, [10,25],
                          dgl.distributed.sample_neighbors,
                          device)

dataloader = DistDataLoader(
    dataset=train_nid.numpy(),
    batch_size=batch_size,
    collate_fn=sampler.sample_blocks,
    shuffle=True,
    drop_last=False)

# Define model and optimizer
model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout)
model = th.nn.parallel.DistributedDataParallel(model)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)

# training loop
for epoch in range(args.num_epochs):
    with model.join():
        for step, blocks in enumerate(dataloader):
            batch_inputs, batch_labels = load_subtensor(g, blocks[0].srcdata[dgl.NID],
                                                        blocks[-1].dstdata[dgl.NID])
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

### Data Preprocesing
#### Partitioning API

In [None]:
import dgl

g = ...  # create or load a DGLGraph object
dgl.distributed.partition_graph(g, 'mygraph', 2, 'data_root_dir')

##### Load balancing

In [None]:
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])

##### ID Mapping

In [None]:
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
                                                     balance_ntypes=g.ndata['train_mask'],
                                                     return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb

##### Load Partitioned Graphs

In [None]:
import dgl
# load partition 0
part_data = dgl.distributed.load_partition('data_root_dir/graph_name.json', 0)
g, nfeat, efeat, partition_book, graph_name, ntypes, etypes = part_data  # unpack
print(g)

### Programming APIs
#### DistGraph creation

In [None]:
import dgl
g = dgl.distributed.DistGraph('graph_name')

In [None]:
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')

#### Accsssing Graph structure

In [None]:
print(g.num_nodes())

#### Accessing node/edge data

In [None]:
g.ndata['train_mask']  # <dgl.distributed.dist_graph.DistTensor at 0x7fec820937b8>
g.ndata['train_mask'][0]  # tensor([1], dtype=torch.uint8)

#### Distributed Tensor

In [None]:
tensor = dgl.distributed.DistTensor((g.num_nodes(), 10), th.float32, name='test')

In [None]:
g.ndata['feat'] = tensor

In [None]:
data = g.ndata['feat'][[1, 2, 3]]
print(data)
g.ndata['feat'][[3, 4, 5]] = data

#### Distributed DistEmbedding

In [None]:
def initializer(shape, dtype):
    arr = th.zeros(shape, dtype=dtype)
    arr.uniform_(-1, 1)
    return arr
emb = dgl.distributed.DistEmbedding(g.num_nodes(), 10, init_func=initializer)

In [None]:
sparse_optimizer = dgl.distributed.SparseAdagrad([emb], lr=lr1)
optimizer = th.optim.Adam(model.parameters(), lr=lr2)
feats = emb(nids)
loss = model(feats)
loss.backward()
optimizer.step()
sparse_optimizer.step()

#### Distributed Sampling

In [None]:
def sample_blocks(seeds):
    seeds = th.LongTensor(np.asarray(seeds))
    blocks = []
    for fanout in [10, 25]:
        frontier = dgl.distributed.sample_neighbors(g, seeds, fanout, replace=True)
        block = dgl.to_block(frontier, seeds)
        seeds = block.srcdata[dgl.NID]
        blocks.insert(0, block)
        return blocks
    dataloader = dgl.distributed.DistDataLoader(dataset=train_nid,
                                                batch_size=batch_size,
                                                collate_fn=sample_blocks,
                                                shuffle=True)
    for batch in dataloader:
        ...

In [None]:
sampler = dgl.sampling.MultiLayerNeighborSampler([10, 25])
dataloader = dgl.sampling.DistNodeDataLoader(g, train_nid, sampler,
                                             batch_size=batch_size, shuffle=True)
for batch in dataloader:
    ...

#### Split Workloads

In [None]:
train_nids = dgl.distributed.node_split(g.ndata['train_mask'])

### Advanced Graph Partitioning
#### METIS partition algorithm

In [None]:
import dgl
# load partition 0
part_data = dgl.distributed.load_partition('data_root_dir/graph_name.json', 0)
g, nfeat, efeat, partition_book, graph_name, ntypes, etypes = part_data  # unpack
print(g)

### Heterogeneous Graph Under the Hood
#### ID Conversion Utilities
#### During Preprocessing

In [None]:
from bisect import bisect_left
import numpy as np

class IDConverter:
    def __init__(self, meta):
        # meta is the JSON object loaded from metadata.json
        self.node_type = meta['node_type']
        self.edge_type = meta['edge_type']
        self.ntype2id_map = {ntype : i for i, ntype in enumerate(self.node_type)}
        self.etype2id_map = {etype : i for i, etype in enumerate(self.edge_type)}
        self.num_nodes = [sum(ns) for ns in meta['num_nodes_per_chunk']]
        self.num_edges = [sum(ns) for ns in meta['num_edges_per_chunk']]
        self.nid_offset = np.cumsum([0] + self.num_nodes)
        self.eid_offset = np.cumsum([0] + self.num_edges)

    def ntype2id(self, ntype):
        """From node type name to node type ID"""
        return self.ntype2id_map[ntype]

    def etype2id(self, etype):
        """From edge type name to edge type ID"""
        return self.etype2id_map[etype]

    def id2ntype(self, id):
        """From node type ID to node type name"""
        return self.node_type[id]

    def id2etype(self, id):
        """From edge type ID to edge type name"""
        return self.edge_type[id]

    def nid_het2hom(self, ntype, id):
        """From heterogeneous node ID to homogeneous node ID"""
        tid = self.ntype2id(ntype)
        if id < 0 or id >= self.num_nodes[tid]:
            raise ValueError(f'Invalid node ID of type {ntype}. Must be within range [0, {self.num_nodes[tid]})')
        return self.nid_offset[tid] + id

    def nid_hom2het(self, id):
        """From heterogeneous node ID to homogeneous node ID"""
        if id < 0 or id >= self.nid_offset[-1]:
            raise ValueError(f'Invalid homogeneous node ID. Must be within range [0, self.nid_offset[-1])')
        tid = bisect_left(self.nid_offset, id) - 1
        # Return a pair (node_type, type_wise_id)
        return self.id2ntype(tid), id - self.nid_offset[tid]

    def eid_het2hom(self, etype, id):
        """From heterogeneous edge ID to homogeneous edge ID"""
        tid = self.etype2id(etype)
        if id < 0 or id >= self.num_edges[tid]:
            raise ValueError(f'Invalid edge ID of type {etype}. Must be within range [0, {self.num_edges[tid]})')
        return self.eid_offset[tid] + id

    def eid_hom2het(self, id):
        """From heterogeneous edge ID to homogeneous edge ID"""
        if id < 0 or id >= self.eid_offset[-1]:
            raise ValueError(f'Invalid homogeneous edge ID. Must be within range [0, self.eid_offset[-1])')
        tid = bisect_left(self.eid_offset, id) - 1
        # Return a pair (edge_type, type_wise_id)
        return self.id2etype(tid), id - self.eid_offset[tid]

#### After Partition Loading

In [None]:
gpb = g.get_partition_book()
# We need to map the type-wise node IDs to homogeneous IDs.
cur = gpb.map_to_homo_nid(seeds, 'paper')
# For a heterogeneous input graph, the returned frontier is stored in
# the homogeneous graph format.
frontier = dgl.distributed.sample_neighbors(g, cur, fanout, replace=False)
block = dgl.to_block(frontier, cur)
cur = block.srcdata[dgl.NID]

block.edata[dgl.EID] = frontier.edata[dgl.EID]
# Map the homogeneous edge Ids to their edge type.
block.edata[dgl.ETYPE], block.edata[dgl.EID] = gpb.map_to_per_etype(block.edata[dgl.EID])
# Map the homogeneous node Ids to their node types and per-type Ids.
block.srcdata[dgl.NTYPE], block.srcdata[dgl.NID] = gpb.map_to_per_ntype(block.srcdata[dgl.NID])
block.dstdata[dgl.NTYPE], block.dstdata[dgl.NID] = gpb.map_to_per_ntype(block.dstdata[dgl.NID])

#### Access distributed graph data

In [None]:
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
feat = g.nodes['T0'].data['feat'][[0, 10, 20]]

In [None]:
g.nodes['T0'].data['feat1'] = dgl.distributed.DistTensor(
    (g.num_nodes('T0'), 1), th.float32, 'feat1',
    part_policy=g.get_node_partition_policy('T0'))