In [None]:
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.


# Node Classification with GAT

In [1]:
from deepgnn.graph_engine.data.cora import CoraFull
CoraFull("/tmp/cora/")

[2022-02-15 09:55:10,457] {convert.py:200} INFO - worker 0 try to generate partition: 0 - 1
[2022-02-15 09:55:10,463] {_adl_reader.py:123} INFO - [1,0] Input files: ['/tmp/cora/graph.json']
[2022-02-15 09:55:10,921] {dispatcher.py:126} INFO - record processed: 1000
[2022-02-15 09:55:11,258] {dispatcher.py:126} INFO - record processed: 2000
  weights = np.multiply(weights, len(weights) / np.sum(weights))
[2022-02-15 09:55:11,578] {local.py:30} INFO - Graph data path: /tmp/cora/. Partitions [0]
[2022-02-15 09:55:11,588] {local.py:35} INFO - Loaded snark graph. Node counts: [2708, 0]. Edge counts: [0, 0]


<deepgnn.graph_engine.data.cora.CoraFull at 0x7fb872332cf8>

In [2]:
from typing import List
from dataclasses import dataclass
import argparse
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from deepgnn.pytorch.common import Accuracy
from deepgnn.pytorch.modeling.base_model import BaseModel
from deepgnn.pytorch.nn.gat_conv import GATConv

from deepgnn.graph_engine import Graph, FeatureType, graph_ops

from deepgnn import str2list_int
from deepgnn.pytorch.common.utils import set_seed
from deepgnn.pytorch.common.dataset import TorchDeepGNNDataset
from deepgnn.pytorch.modeling import BaseModel
from deepgnn.pytorch.training import run_dist
from deepgnn.graph_engine import FileNodeSampler, GraphEngineBackend

In [3]:
@dataclass
class GATQueryParameter:
    neighbor_edge_types: np.ndarray
    feature_idx: int
    feature_dim: int
    label_idx: int
    label_dim: int
    feature_type: FeatureType = FeatureType.FLOAT
    label_type: FeatureType = FeatureType.FLOAT
    num_hops: int = 2

class GATQuery:
    def __init__(self, p: GATQueryParameter):
        self.p = p
        self.label_meta = np.array([[p.label_idx, p.label_dim]], np.int32)
        self.feat_meta = np.array([[p.feature_idx, p.feature_dim]], np.int32)

    def query_training(self, graph: Graph, inputs):
        nodes, edges, src_idx = graph_ops.sub_graph(
            graph,
            inputs,
            edge_types=self.p.neighbor_edge_types,
            num_hops=self.p.num_hops,
            self_loop=True,
            undirected=True,
            return_edges=True,
        )
        input_mask = np.zeros(nodes.size, np.bool)
        input_mask[src_idx] = True

        feat = graph.node_features(nodes, self.feat_meta, self.p.feature_type)
        label = graph.node_features(nodes, self.label_meta, self.p.label_type)
        label = label.astype(np.int32)
        edges_value = np.ones(edges.shape[0], np.float32)
        edges = np.transpose(edges)
        adj_shape = np.array([nodes.size, nodes.size], np.int64)

        graph_tensor = (nodes, feat, input_mask, label, edges, edges_value, adj_shape)
        return graph_tensor

In [4]:
class GAT(BaseModel):
    def __init__(
        self,
        in_dim: int,
        head_num: List = [8, 1],
        hidden_dim: int = 8,
        num_classes: int = -1,
        ffd_drop: float = 0.0,
        attn_drop: float = 0.0,
        q_param: GATQueryParameter = None,
    ):
        self.q = GATQuery(q_param)
        super().__init__(FeatureType.FLOAT, 0, 0, None)
        self.num_classes = num_classes

        self.out_dim = num_classes

        self.input_layer = GATConv(
            in_dim=in_dim,
            attn_heads=head_num[0],
            out_dim=hidden_dim,
            act=F.elu,
            in_drop=ffd_drop,
            coef_drop=attn_drop,
            attn_aggregate="concat",
        )
        layer0_output_dim = head_num[0] * hidden_dim
        assert len(head_num) == 2
        self.out_layer = GATConv(
            in_dim=layer0_output_dim,
            attn_heads=head_num[1],
            out_dim=self.out_dim,
            act=None,
            in_drop=ffd_drop,
            coef_drop=attn_drop,
            attn_aggregate="average",
        )

        self.metric = Accuracy()

    def forward(self, inputs):
        # fmt: off
        nodes, feat, mask, labels, edges, edges_value, adj_shape = inputs
        nodes = torch.squeeze(nodes)                # [N], N: num of nodes in subgraph
        feat = torch.squeeze(feat)                  # [N, F]
        mask = torch.squeeze(mask)                  # [N]
        labels = torch.squeeze(labels)              # [N]
        edges = torch.squeeze(edges)                # [X, 2], X: num of edges in subgraph
        edges_value = torch.squeeze(edges_value)    # [X]
        adj_shape = torch.squeeze(adj_shape)        # [2]
        # fmt: on

        sp_adj = torch.sparse_coo_tensor(edges, edges_value, adj_shape.tolist())
        h_1 = self.input_layer(feat, sp_adj)
        scores = self.out_layer(h_1, sp_adj)

        labels = labels.type(torch.int64)
        labels = labels[mask]  # [batch_size]
        scores = scores[mask]  # [batch_size]
        pred = scores.argmax(dim=1)
        loss = self.xent(scores, labels)
        return loss, pred, labels

In [5]:
def create_model(args: argparse.Namespace):
    if args.seed:
        set_seed(args.seed)

    p = GATQueryParameter(
        neighbor_edge_types=np.array([args.neighbor_edge_types], np.int32),
        feature_idx=args.feature_idx,
        feature_dim=args.feature_dim,
        label_idx=args.label_idx,
        label_dim=args.label_dim,
    )

    return GAT(
        in_dim=args.feature_dim,
        head_num=args.head_num,
        hidden_dim=args.hidden_dim,
        num_classes=args.num_classes,
        ffd_drop=args.ffd_drop,
        attn_drop=args.attn_drop,
        q_param=p,
    )

def create_optimizer(args: argparse.Namespace, model: BaseModel, world_size: int):
    return torch.optim.Adam(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=args.learning_rate * world_size,
        weight_decay=0.0005,
    )

def create_dataset(
    args: argparse.Namespace,
    model: BaseModel,
    rank: int = 0,
    world_size: int = 1,
    backend: GraphEngineBackend = None,
):
    return TorchDeepGNNDataset(
        sampler_class=FileNodeSampler,
        backend=backend,
        query_fn=model.q.query_training,
        prefetch_queue_size=2,
        prefetch_worker_size=2,
        sample_files=args.sample_file,
        batch_size=args.batch_size,
        shuffle=True,
        drop_last=True,
        worker_index=rank,
        num_workers=world_size,
    )

In [6]:
def init_args(parser):
    parser.add_argument("--head_num", type=str2list_int, default="8,1", help="the number of attention headers.")
    parser.add_argument("--hidden_dim", type=int, default=8, help="hidden layer dimension.")
    parser.add_argument("--num_classes", type=int, default=-1, help="number of classes for category")
    parser.add_argument("--ffd_drop", type=float, default=0.0, help="feature dropout rate.")
    parser.add_argument("--attn_drop", type=float, default=0.0, help="attention layer dropout rate.")
    parser.add_argument("--l2_coef", type=float, default=0.0005, help="l2 loss")
    parser.add_argument("--neighbor_edge_types", type=str2list_int, default="0", help="Graph Edge for attention encoder.",)
    parser.add_argument("--eval_file", default="", type=str, help="")

In [7]:
# Not needed for .py file runs
try:
    init_args_base
except NameError:
    init_args_base = init_args

In [8]:
# Not needed for .py file runs
MODEL_DIR = f"tmp/gat_{np.random.randint(9999999)}"
arg_list = [
    "--data_dir", "/tmp/cora",
    "--mode", "train",
    "--trainer", "base",
    "--backend", "snark",
    "--graph_type", "local",
    "--converter", "skip",
    "--sample_file", "/tmp/cora/train.nodes",
    "--node_type", "0",
    "--feature_idx", "0",
    "--feature_dim", "1433",
    "--label_idx", "1",
    "--label_dim", "1",
    "--num_classes", "7",
    "--batch_size", "140",
    "--learning_rate", ".005",
    "--num_epochs", "50",
    "--log_by_steps", "1",
    "--use_per_step_metrics",
    "--model_dir", MODEL_DIR,
    "--metric_dir", MODEL_DIR,
    "--save_path", MODEL_DIR,
]

def init_args_wrap(init_args_base):
    def init_args_new(parser):
        init_args_base(parser)
        parse_args = parser.parse_args
        parser.parse_args = lambda: parse_args(arg_list)
    return init_args_new

init_args = init_args_wrap(init_args_base)

In [9]:
run_dist(
    init_model_fn=create_model,
    init_dataset_fn=create_dataset,
    init_optimizer_fn=create_optimizer,
    init_args_fn=init_args,
)

[2022-02-15 09:55:14,284] {factory.py:31} INFO - GE_OMP_NUM_THREADS=1
[2022-02-15 09:55:14,285] {factory.py:31} INFO - apex_opt_level=O2
[2022-02-15 09:55:14,286] {factory.py:31} INFO - attn_drop=0.0
[2022-02-15 09:55:14,292] {factory.py:31} INFO - backend=snark
[2022-02-15 09:55:14,295] {factory.py:31} INFO - batch_size=140
[2022-02-15 09:55:14,297] {factory.py:31} INFO - client_rank=None
[2022-02-15 09:55:14,298] {factory.py:31} INFO - clip_grad=False
[2022-02-15 09:55:14,299] {factory.py:31} INFO - converter=skip
[2022-02-15 09:55:14,305] {factory.py:31} INFO - data_dir=/tmp/cora
[2022-02-15 09:55:14,308] {factory.py:31} INFO - data_parallel_num=2
[2022-02-15 09:55:14,308] {factory.py:31} INFO - dim=256
[2022-02-15 09:55:14,309] {factory.py:31} INFO - disable_ib=False
[2022-02-15 09:55:14,311] {factory.py:31} INFO - enable_adl_uploader=False
[2022-02-15 09:55:14,314] {factory.py:31} INFO - enable_ssl=False
[2022-02-15 09:55:14,315] {factory.py:31} INFO - eval_during_train_by_steps=0

In [10]:
# Not needed for .py file runs
arg_list = [
    "--data_dir", "/tmp/cora",
    "--mode", "evaluate",
    "--trainer", "base",
    "--backend", "snark",
    "--graph_type", "local",
    "--converter", "skip",
    "--sample_file", "/tmp/cora/train.nodes",
    "--node_type", "0",
    "--feature_idx", "0",
    "--feature_dim", "1433",
    "--label_idx", "1",
    "--label_dim", "1",
    "--num_classes", "7",
    "--batch_size", "140",
    "--learning_rate", ".0",
    "--num_epochs", "50",
    "--log_by_steps", "1",
    "--use_per_step_metrics",
    "--model_dir", MODEL_DIR,
    "--metric_dir", MODEL_DIR,
    "--save_path", MODEL_DIR,
]

def init_args_wrap(init_args_base):
    def init_args_new(parser):
        init_args_base(parser)
        parse_args = parser.parse_args
        parser.parse_args = lambda: parse_args(arg_list)
    return init_args_new

init_args = init_args_wrap(init_args_base)

In [11]:
run_dist(
    init_model_fn=create_model,
    init_dataset_fn=create_dataset,
    init_optimizer_fn=create_optimizer,
    init_args_fn=init_args,
)

[2022-02-15 09:56:11,417] {factory.py:31} INFO - GE_OMP_NUM_THREADS=1
[2022-02-15 09:56:11,418] {factory.py:31} INFO - apex_opt_level=O2
[2022-02-15 09:56:11,419] {factory.py:31} INFO - attn_drop=0.0
[2022-02-15 09:56:11,422] {factory.py:31} INFO - backend=snark
[2022-02-15 09:56:11,423] {factory.py:31} INFO - batch_size=140
[2022-02-15 09:56:11,424] {factory.py:31} INFO - client_rank=None
[2022-02-15 09:56:11,426] {factory.py:31} INFO - clip_grad=False
[2022-02-15 09:56:11,427] {factory.py:31} INFO - converter=skip
[2022-02-15 09:56:11,428] {factory.py:31} INFO - data_dir=/tmp/cora
[2022-02-15 09:56:11,429] {factory.py:31} INFO - data_parallel_num=2
[2022-02-15 09:56:11,430] {factory.py:31} INFO - dim=256
[2022-02-15 09:56:11,431] {factory.py:31} INFO - disable_ib=False
[2022-02-15 09:56:11,437] {factory.py:31} INFO - enable_adl_uploader=False
[2022-02-15 09:56:11,439] {factory.py:31} INFO - enable_ssl=False
[2022-02-15 09:56:11,440] {factory.py:31} INFO - eval_during_train_by_steps=0