In [1]:
from typing import Optional

import torch
import torch.nn.functional as F
from torch import Tensor
from torch.nn import Parameter

from torch_geometric.experimental import disable_dynamic_shapes
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.nn.inits import glorot, zeros
from torch_geometric.utils import scatter, softmax
from torch_geometric.nn import GCNConv

  from .autonotebook import tqdm as notebook_tqdm


In [34]:
class test_HGConv_node_to_edge(MessagePassing):
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        use_attention: bool = False,
        attention_mode: str = 'node',
        heads: int = 1,
        concat: bool = True,
        negative_slope: float = 0.2,
        dropout: float = 0,
        bias: bool = True,
        **kwargs,
    ):
        kwargs.setdefault('aggr', 'add')
        super().__init__(flow='source_to_target', node_dim=0, **kwargs)

        assert attention_mode in ['node', 'edge']

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.use_attention = use_attention
        self.attention_mode = attention_mode
        self.heads = 1
        self.concat = 1
        
        if bias and concat:
            self.bias = Parameter(torch.empty(heads * out_channels))
        elif bias and not concat:
            self.bias = Parameter(torch.empty(out_channels))
        else:
            self.register_parameter('bias', None)

        self.reset_parameters()

    def reset_parameters(self):
        super().reset_parameters()
        if self.use_attention:
            glorot(self.att)
        zeros(self.bias)

    @disable_dynamic_shapes(required_args=['num_edges'])
    def forward(self,x: Tensor, hyperedge_index: Tensor,
                hyperedge_weight: Optional[Tensor] = None,
                hyperedge_attr: Optional[Tensor] = None,
                num_edges: Optional[int] = None) -> Tensor:
        r"""Runs the forward pass of the module.

        Args:
            x (torch.Tensor): Node feature matrix
                :math:`\mathbf{X} \in \mathbb{R}^{N \times F}`.
            hyperedge_index (torch.Tensor): The hyperedge indices, *i.e.*
                the sparse incidence matrix
                :math:`\mathbf{H} \in {\{ 0, 1 \}}^{N \times M}` mapping from
                nodes to edges.
            hyperedge_weight (torch.Tensor, optional): Hyperedge weights
                :math:`\mathbf{W} \in \mathbb{R}^M`. (default: :obj:`None`)
            hyperedge_attr (torch.Tensor, optional): Hyperedge feature matrix
                in :math:`\mathbb{R}^{M \times F}`.
                These features only need to get passed in case
                :obj:`use_attention=True`. (default: :obj:`None`)
            num_edges (int, optional) : The number of edges :math:`M`.
                (default: :obj:`None`)
        """
        num_nodes = x.size(0)

        if num_edges is None:
            num_edges = 0
            if hyperedge_index.numel() > 0:
                num_edges = int(hyperedge_index[1].max()) + 1

        if hyperedge_weight is None:
            hyperedge_weight = x.new_ones(num_edges)

        alpha = None

        B = scatter(x.new_ones(hyperedge_index.size(1)), hyperedge_index[1],
                    dim=0, dim_size=num_edges, reduce='sum')
        B = 1.0 / B
        B[B == float("inf")] = 0

        out = self.propagate(hyperedge_index, x=x, norm=B, alpha=alpha,
                             size=(num_nodes, num_edges))
        print("out : ", out)

        if self.concat is True:
            out = out.view(-1, self.heads * self.out_channels)
        else:
            out = out.mean(dim=1)

        if self.bias is not None:
            out = out + self.bias

        return out, num_edges, num_nodes, hyperedge_weight
    def message(self, x_j: Tensor, norm_i: Tensor, alpha: Tensor) -> Tensor:
        H, F = self.heads, self.out_channels

        out = norm_i.view(-1, 1, 1) * x_j.view(-1, H, F)

        if alpha is not None:
            out = alpha.view(-1, self.heads, 1) * out

        return out

In [35]:
class test_HGConv_edge_to_node(MessagePassing):
    def __init__(
            self,
            in_channels: int,
            out_channels: int,
            use_attention: bool = False,
            attention_mode: str = 'node',
            heads: int = 1,
            concat: bool = True,
            negative_slope: float = 0.2,
            dropout: float = 0,
            bias: bool = True,
            **kwargs,
    ):
        kwargs.setdefault('aggr', 'add')
        super().__init__(flow='source_to_target', node_dim=0, **kwargs)

        assert attention_mode in ['node', 'edge']

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.use_attention = use_attention
        self.attention_mode = attention_mode
        self.heads = 1
        self.concat = 1

        if bias and concat:
            self.bias = Parameter(torch.empty(heads * out_channels))
        elif bias and not concat:
            self.bias = Parameter(torch.empty(out_channels))
        else:
            self.register_parameter('bias', None)

        self.reset_parameters()

    def reset_parameters(self):
        super().reset_parameters()
        if self.use_attention:
            glorot(self.att)
        zeros(self.bias)

    @disable_dynamic_shapes(required_args=['num_edges'])
    def forward(self, x: Tensor, hyperedge_index: Tensor,
                hyperedge_weight: Optional[Tensor] = None,
                hyperedge_attr: Optional[Tensor] = None,
                num_edges: Optional[int] = None, num_nodes: Optional[int] = None) -> Tensor:
        r"""Runs the forward pass of the module.

        Args:
            x (torch.Tensor): Node feature matrix
                :math:`\mathbf{X} \in \mathbb{R}^{N \times F}`.
            hyperedge_index (torch.Tensor): The hyperedge indices, *i.e.*
                the sparse incidence matrix
                :math:`\mathbf{H} \in {\{ 0, 1 \}}^{N \times M}` mapping from
                nodes to edges.
            hyperedge_weight (torch.Tensor, optional): Hyperedge weights
                :math:`\mathbf{W} \in \mathbb{R}^M`. (default: :obj:`None`)
            hyperedge_attr (torch.Tensor, optional): Hyperedge feature matrix
                in :math:`\mathbb{R}^{M \times F}`.
                These features only need to get passed in case
                :obj:`use_attention=True`. (default: :obj:`None`)
            num_edges (int, optional) : The number of edges :math:`M`.
                (default: :obj:`None`)
        """

        # num_nodes = x.size(0)

        if num_edges is None:
            num_edges = 0
            if hyperedge_index.numel() > 0:
                num_edges = int(hyperedge_index[1].max()) + 1

        if hyperedge_weight is None:
            hyperedge_weight = x.new_ones(num_edges)


        alpha = None
        
        D = scatter(hyperedge_weight[hyperedge_index[1]], hyperedge_index[0],
                    dim=0, dim_size=num_nodes, reduce='sum')
        D = 1.0 / D
        D[D == float("inf")] = 0


        out = self.propagate(hyperedge_index.flip([0]), x=x, norm=D,
                             alpha=alpha, size=(num_edges, num_nodes))

        if self.concat is True:
            out = out.view(-1, self.heads * self.out_channels)
        else:
            out = out.mean(dim=1)

        if self.bias is not None:
            out = out + self.bias

        return out

    def message(self, x_j: Tensor, norm_i: Tensor, alpha: Tensor) -> Tensor:
        H, F = self.heads, self.out_channels

        out = norm_i.view(-1, 1, 1) * x_j.view(-1, H, F)

        if alpha is not None:
            out = alpha.view(-1, self.heads, 1) * out

        return out


In [69]:
class SimpleHGCN(torch.nn.Module):
    def __init__(self):
        super(SimpleHGCN, self).__init__()
        self.n2e = test_HGConv_node_to_edge(in_channels=1, out_channels=1, use_attention=False)
        self.e2n = test_HGConv_edge_to_node(in_channels=1, out_channels=1, use_attention=False)
    
    def forward(self, x, edge_index):
        # 节点到超边的信息传递
        hyperedge_feats = self.n2e(x, edge_index)
        # 超边到节点的信息传递
        node_feats = self.e2n(hyperedge_feats[0], edge_index)
        # return hyperedge_feats
        return hyperedge_feats, node_feats

In [70]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import HypergraphConv

# 构建 6 个节点和 3 个超边的 incident matrix
num_nodes = 6
num_hyperedges = 3

# incident matrix shape: (num_nodes, num_hyperedges)
# 假设以下 incident matrix (手动设置一个示例)
incidence_matrix = torch.tensor([
    [1, 0, 0],  # node 1 connected to hyperedge 1
    [1, 1, 0],  # node 2 connected to hyperedge 1 and 2
    [0, 1, 0],  # node 3 connected to hyperedge 2
    [0, 1, 1],  # node 4 connected to hyperedge 2 and 3
    [0, 0, 1],  # node 5 connected to hyperedge 3
    [1, 0, 1]   # node 6 connected to hyperedge 1 and 3
], dtype=torch.float)

# 转换为边索引
edge_index = incidence_matrix.nonzero().t()

# 节点特征 (假设 6 个节点，每个节点特征维度为 1，且初始值为 1)
# x = torch.ones((num_nodes, 1))
x = torch.arange(1, num_nodes + 1, dtype=torch.float).view(-1, 1)
print("x : ", x)
# 初始化模型
model = SimpleHGCN()

# 前向传播
hyperedge_feats, node_feats = model(x, edge_index)
# hyperedge_feats = model(x, edge_index)

# 打印结果
print("Node to Hyperedge Information Transfer Result:")
print(hyperedge_feats)


print("\nHyperedge to Node Information Transfer Result:")
print(node_feats)

x :  tensor([[1.],
        [2.],
        [3.],
        [4.],
        [5.],
        [6.]])
out :  tensor([[[3.]],

        [[3.]],

        [[5.]]])
Node to Hyperedge Information Transfer Result:
(tensor([[3.],
        [3.],
        [5.]], grad_fn=<AddBackward0>), 3, 6, tensor([1., 1., 1.]))

Hyperedge to Node Information Transfer Result:
tensor([[3.],
        [3.],
        [3.],
        [4.],
        [5.],
        [4.]], grad_fn=<AddBackward0>)


In [59]:
from typing import Optional

import torch
import torch.nn.functional as F
from torch import Tensor
from torch.nn import Parameter

from torch_geometric.experimental import disable_dynamic_shapes
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.nn.inits import glorot, zeros
from torch_geometric.utils import scatter, softmax


class test_HypergraphConv(MessagePassing):
    r"""The hypergraph convolutional operator from the `"Hypergraph Convolution
    and Hypergraph Attention" <https://arxiv.org/abs/1901.08150>`_ paper.

    .. math::
        \mathbf{X}^{\prime} = \mathbf{D}^{-1} \mathbf{H} \mathbf{W}
        \mathbf{B}^{-1} \mathbf{H}^{\top} \mathbf{X} \mathbf{\Theta}

    where :math:`\mathbf{H} \in {\{ 0, 1 \}}^{N \times M}` is the incidence
    matrix, :math:`\mathbf{W} \in \mathbb{R}^M` is the diagonal hyperedge
    weight matrix, and
    :math:`\mathbf{D}` and :math:`\mathbf{B}` are the corresponding degree
    matrices.

    For example, in the hypergraph scenario
    :math:`\mathcal{G} = (\mathcal{V}, \mathcal{E})` with
    :math:`\mathcal{V} = \{ 0, 1, 2, 3 \}` and
    :math:`\mathcal{E} = \{ \{ 0, 1, 2 \}, \{ 1, 2, 3 \} \}`, the
    :obj:`hyperedge_index` is represented as:

    .. code-block:: python

        hyperedge_index = torch.tensor([
            [0, 1, 2, 1, 2, 3],
            [0, 0, 0, 1, 1, 1],
        ])

    Args:
        in_channels (int): Size of each input sample, or :obj:`-1` to derive
            the size from the first input(s) to the forward method.
        out_channels (int): Size of each output sample.
        use_attention (bool, optional): If set to :obj:`True`, attention
            will be added to this layer. (default: :obj:`False`)
        attention_mode (str, optional): The mode on how to compute attention.
            If set to :obj:`"node"`, will compute attention scores of nodes
            within all nodes belonging to the same hyperedge.
            If set to :obj:`"edge"`, will compute attention scores of nodes
            across all edges holding this node belongs to.
            (default: :obj:`"node"`)
        heads (int, optional): Number of multi-head-attentions.
            (default: :obj:`1`)
        concat (bool, optional): If set to :obj:`False`, the multi-head
            attentions are averaged instead of concatenated.
            (default: :obj:`True`)
        negative_slope (float, optional): LeakyReLU angle of the negative
            slope. (default: :obj:`0.2`)
        dropout (float, optional): Dropout probability of the normalized
            attention coefficients which exposes each node to a stochastically
            sampled neighborhood during training. (default: :obj:`0`)
        bias (bool, optional): If set to :obj:`False`, the layer will not learn
            an additive bias. (default: :obj:`True`)
        **kwargs (optional): Additional arguments of
            :class:`torch_geometric.nn.conv.MessagePassing`.

    Shapes:
        - **input:**
          node features :math:`(|\mathcal{V}|, F_{in})`,
          hyperedge indices :math:`(|\mathcal{V}|, |\mathcal{E}|)`,
          hyperedge weights :math:`(|\mathcal{E}|)` *(optional)*
          hyperedge features :math:`(|\mathcal{E}|, D)` *(optional)*
        - **output:** node features :math:`(|\mathcal{V}|, F_{out})`
    """
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        use_attention: bool = False,
        attention_mode: str = 'node',
        heads: int = 1,
        concat: bool = True,
        negative_slope: float = 0.2,
        dropout: float = 0,
        bias: bool = True,
        **kwargs,
    ):
        kwargs.setdefault('aggr', 'add')
        super().__init__(flow='source_to_target', node_dim=0, **kwargs)

        assert attention_mode in ['node', 'edge']

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.use_attention = use_attention
        self.attention_mode = attention_mode

        if self.use_attention:
            self.heads = heads
            self.concat = concat
            self.negative_slope = negative_slope
            self.dropout = dropout
            self.lin = Linear(in_channels, heads * out_channels, bias=False,
                              weight_initializer='glorot')
            self.att = Parameter(torch.empty(1, heads, 2 * out_channels))
        else:
            self.heads = 1
            self.concat = True
            self.lin = Linear(in_channels, out_channels, bias=False,
                              weight_initializer='glorot')

        if bias and concat:
            self.bias = Parameter(torch.empty(heads * out_channels))
        elif bias and not concat:
            self.bias = Parameter(torch.empty(out_channels))
        else:
            self.register_parameter('bias', None)

        self.reset_parameters()

    def reset_parameters(self):
        super().reset_parameters()
        self.lin.reset_parameters()
        if self.use_attention:
            glorot(self.att)
        zeros(self.bias)

    @disable_dynamic_shapes(required_args=['num_edges'])
    def forward(self, x: Tensor, hyperedge_index: Tensor,
                hyperedge_weight: Optional[Tensor] = None,
                hyperedge_attr: Optional[Tensor] = None,
                num_edges: Optional[int] = None) -> Tensor:
        r"""Runs the forward pass of the module.

        Args:
            x (torch.Tensor): Node feature matrix
                :math:`\mathbf{X} \in \mathbb{R}^{N \times F}`.
            hyperedge_index (torch.Tensor): The hyperedge indices, *i.e.*
                the sparse incidence matrix
                :math:`\mathbf{H} \in {\{ 0, 1 \}}^{N \times M}` mapping from
                nodes to edges.
            hyperedge_weight (torch.Tensor, optional): Hyperedge weights
                :math:`\mathbf{W} \in \mathbb{R}^M`. (default: :obj:`None`)
            hyperedge_attr (torch.Tensor, optional): Hyperedge feature matrix
                in :math:`\mathbb{R}^{M \times F}`.
                These features only need to get passed in case
                :obj:`use_attention=True`. (default: :obj:`None`)
            num_edges (int, optional) : The number of edges :math:`M`.
                (default: :obj:`None`)
        """
        num_nodes = x.size(0)

        if num_edges is None:
            num_edges = 0
            if hyperedge_index.numel() > 0:
                num_edges = int(hyperedge_index[1].max()) + 1

        if hyperedge_weight is None:
            hyperedge_weight = x.new_ones(num_edges)

        # x = self.lin(x)

        alpha = None
        if self.use_attention:
            assert hyperedge_attr is not None
            x = x.view(-1, self.heads, self.out_channels)
            hyperedge_attr = self.lin(hyperedge_attr)
            hyperedge_attr = hyperedge_attr.view(-1, self.heads,
                                                 self.out_channels)
            x_i = x[hyperedge_index[0]]
            x_j = hyperedge_attr[hyperedge_index[1]]
            alpha = (torch.cat([x_i, x_j], dim=-1) * self.att).sum(dim=-1)
            alpha = F.leaky_relu(alpha, self.negative_slope)
            if self.attention_mode == 'node':
                alpha = softmax(alpha, hyperedge_index[1], num_nodes=num_edges)
            else:
                alpha = softmax(alpha, hyperedge_index[0], num_nodes=num_nodes)
            alpha = F.dropout(alpha, p=self.dropout, training=self.training)

        D = scatter(hyperedge_weight[hyperedge_index[1]], hyperedge_index[0],
                    dim=0, dim_size=num_nodes, reduce='sum')
        print("before node degree D : ", D)
        D = 1.0 / D
        D[D == float("inf")] = 0
        print("after node degree D : ", D)

        B = scatter(x.new_ones(hyperedge_index.size(1)), hyperedge_index[1],
                    dim=0, dim_size=num_edges, reduce='sum')
        print("before edge degree B : ", B)
        B = 1.0 / B
        B[B == float("inf")] = 0
        print("after edge degree B : ", B)

        out = self.propagate(hyperedge_index, x=x, norm=B, alpha=alpha,
                             size=(num_nodes, num_edges))
        print("node to edge : ", out)
        out = self.propagate(hyperedge_index.flip([0]), x=out, norm=D,
                             alpha=alpha, size=(num_edges, num_nodes))
        print("edge to node : ", out)
        
        if self.concat is True:
            out = out.view(-1, self.heads * self.out_channels)
        else:
            out = out.mean(dim=1)

        if self.bias is not None:
            out = out + self.bias

        return out

    def message(self, x_j: Tensor, norm_i: Tensor, alpha: Tensor) -> Tensor:
        H, F = self.heads, self.out_channels

        out = norm_i.view(-1, 1, 1) * x_j.view(-1, H, F)

        if alpha is not None:
            out = alpha.view(-1, self.heads, 1) * out

        return out


In [60]:
class test_SimpleHGCN(torch.nn.Module):
    def __init__(self):
        super(test_SimpleHGCN, self).__init__()
        self.hgcn = test_HypergraphConv(in_channels=1, out_channels=1, use_attention=False)
        # self.e2n = test_HGConv_edge_to_node(in_channels=1, out_channels=1, use_attention=False)
    
    def forward(self, x, edge_index):
        # 节点到超边的信息传递
        hyperedge_feats = self.hgcn(x, edge_index)
        # 超边到节点的信息传递
        # node_feats = self.e2n(hyperedge_feats[0], edge_index)
        return hyperedge_feats
        # return hyperedge_feats, node_feats

In [93]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import HypergraphConv

# 构建 6 个节点和 3 个超边的 incident matrix
num_nodes = 6
num_hyperedges = 3

# incident matrix shape: (num_nodes, num_hyperedges)
# 假设以下 incident matrix (手动设置一个示例)
incidence_matrix = torch.tensor([
    [1, 0, 0],  # node 1 connected to hyperedge 1
    [0, 1, 0],  # node 2 connected to hyperedge 1 and 2
    [1, 1, 0],  # node 3 connected to hyperedge 2
    [0, 1, 1],  # node 4 connected to hyperedge 2 and 3
    [0, 0, 1],  # node 5 connected to hyperedge 3
    [0, 0, 1]   # node 6 connected to hyperedge 1 and 3
], dtype=torch.float)

# 转换为边索引
edge_index = incidence_matrix.nonzero().t()

# 节点特征 (假设 6 个节点，每个节点特征维度为 1，且初始值为 1)
# x = torch.ones((num_nodes, 1))
x = torch.arange(1, num_nodes + 1, dtype=torch.float).view(-1, 1)
print("x : ", x)
# 初始化模型
model = test_SimpleHGCN()

# 前向传播
node_feats = model(x, edge_index)
# hyperedge_feats = model(x, edge_index)

# 打印结果
# print("Node to Hyperedge Information Transfer Result:")
# print(hyperedge_feats)


print("\nHyperedge to Node Information Transfer Result:")
print(node_feats)

x :  tensor([[1.],
        [2.],
        [3.],
        [4.],
        [5.],
        [6.]])
before node degree D :  tensor([1., 1., 2., 2., 1., 1.])
after node degree D :  tensor([1.0000, 1.0000, 0.5000, 0.5000, 1.0000, 1.0000])
before edge degree B :  tensor([2., 3., 3.])
after edge degree B :  tensor([0.5000, 0.3333, 0.3333])
node to edge :  tensor([[[2.]],

        [[3.]],

        [[5.]]])
edge to node :  tensor([[[2.0000]],

        [[3.0000]],

        [[2.5000]],

        [[4.0000]],

        [[5.0000]],

        [[5.0000]]])

Hyperedge to Node Information Transfer Result:
tensor([[2.0000],
        [3.0000],
        [2.5000],
        [4.0000],
        [5.0000],
        [5.0000]], grad_fn=<AddBackward0>)


In [94]:

# 前向传播
node_feats = model(node_feats, edge_index)
# hyperedge_feats = model(x, edge_index)

# 打印结果
# print("Node to Hyperedge Information Transfer Result:")
# print(hyperedge_feats)


print("\nHyperedge to Node Information Transfer Result:")
print(node_feats)

before node degree D :  tensor([1., 1., 2., 2., 1., 1.])
after node degree D :  tensor([1.0000, 1.0000, 0.5000, 0.5000, 1.0000, 1.0000])
before edge degree B :  tensor([2., 3., 3.])
after edge degree B :  tensor([0.5000, 0.3333, 0.3333])
node to edge :  tensor([[[2.2500]],

        [[3.1667]],

        [[4.6667]]], grad_fn=<ScatterAddBackward0>)
edge to node :  tensor([[[2.2500]],

        [[3.1667]],

        [[2.7083]],

        [[3.9167]],

        [[4.6667]],

        [[4.6667]]], grad_fn=<ScatterAddBackward0>)

Hyperedge to Node Information Transfer Result:
tensor([[2.2500],
        [3.1667],
        [2.7083],
        [3.9167],
        [4.6667],
        [4.6667]], grad_fn=<AddBackward0>)


In [95]:

# 前向传播
node_feats = model(node_feats, edge_index)
# hyperedge_feats = model(x, edge_index)

# 打印结果
# print("Node to Hyperedge Information Transfer Result:")
# print(hyperedge_feats)


print("\nHyperedge to Node Information Transfer Result:")
print(node_feats)

before node degree D :  tensor([1., 1., 2., 2., 1., 1.])
after node degree D :  tensor([1.0000, 1.0000, 0.5000, 0.5000, 1.0000, 1.0000])
before edge degree B :  tensor([2., 3., 3.])
after edge degree B :  tensor([0.5000, 0.3333, 0.3333])
node to edge :  tensor([[[2.4792]],

        [[3.2639]],

        [[4.4167]]], grad_fn=<ScatterAddBackward0>)
edge to node :  tensor([[[2.4792]],

        [[3.2639]],

        [[2.8715]],

        [[3.8403]],

        [[4.4167]],

        [[4.4167]]], grad_fn=<ScatterAddBackward0>)

Hyperedge to Node Information Transfer Result:
tensor([[2.4792],
        [3.2639],
        [2.8715],
        [3.8403],
        [4.4167],
        [4.4167]], grad_fn=<AddBackward0>)


In [77]:
from typing import Optional

import torch
from torch import Tensor
from torch.nn import Parameter

from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.nn.inits import zeros
from torch_geometric.typing import (
    Adj,
    OptPairTensor,
    OptTensor,
    SparseTensor,
    torch_sparse,
)
from torch_geometric.utils import add_remaining_self_loops
from torch_geometric.utils import add_self_loops as add_self_loops_fn
from torch_geometric.utils import (
    is_torch_sparse_tensor,
    scatter,
    spmm,
    to_edge_index,
)
from torch_geometric.utils.num_nodes import maybe_num_nodes
from torch_geometric.utils.sparse import set_sparse_value

torch.jit._overload
def gcn_norm(  # noqa: F811
        edge_index, edge_weight, num_nodes, improved, add_self_loops, flow,
        dtype):
    # type: (SparseTensor, OptTensor, Optional[int], bool, bool, str, Optional[int]) -> SparseTensor  # noqa
    pass


def gcn_norm(  # noqa: F811
    edge_index: Adj,
    edge_weight: OptTensor = None,
    num_nodes: Optional[int] = None,
    improved: bool = False,
    add_self_loops: bool = True,
    flow: str = "source_to_target",
    dtype: Optional[torch.dtype] = None,
):
    fill_value = 2. if improved else 1.

    if isinstance(edge_index, SparseTensor):
        assert edge_index.size(0) == edge_index.size(1)

        adj_t = edge_index

        if not adj_t.has_value():
            adj_t = adj_t.fill_value(1., dtype=dtype)
        if add_self_loops:
            adj_t = torch_sparse.fill_diag(adj_t, fill_value)

        deg = torch_sparse.sum(adj_t, dim=1)
        deg_inv_sqrt = deg.pow_(-0.5)
        deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float('inf'), 0.)
        adj_t = torch_sparse.mul(adj_t, deg_inv_sqrt.view(-1, 1))
        adj_t = torch_sparse.mul(adj_t, deg_inv_sqrt.view(1, -1))

        return adj_t

    if is_torch_sparse_tensor(edge_index):
        assert edge_index.size(0) == edge_index.size(1)

        if edge_index.layout == torch.sparse_csc:
            raise NotImplementedError("Sparse CSC matrices are not yet "
                                      "supported in 'gcn_norm'")

        adj_t = edge_index
        if add_self_loops:
            adj_t, _ = add_self_loops_fn(adj_t, None, fill_value, num_nodes)

        edge_index, value = to_edge_index(adj_t)
        col, row = edge_index[0], edge_index[1]

        deg = scatter(value, col, 0, dim_size=num_nodes, reduce='sum')
        deg_inv_sqrt = deg.pow_(-0.5)
        deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float('inf'), 0)
        value = deg_inv_sqrt[row] * value * deg_inv_sqrt[col]

        return set_sparse_value(adj_t, value), None

    assert flow in ['source_to_target', 'target_to_source']
    num_nodes = maybe_num_nodes(edge_index, num_nodes)

    if add_self_loops:
        edge_index, edge_weight = add_remaining_self_loops(
            edge_index, edge_weight, fill_value, num_nodes)

    if edge_weight is None:
        edge_weight = torch.ones((edge_index.size(1), ), dtype=dtype,
                                 device=edge_index.device)

    row, col = edge_index[0], edge_index[1]
    idx = col if flow == 'source_to_target' else row
    deg = scatter(edge_weight, idx, dim=0, dim_size=num_nodes, reduce='sum')
    deg_inv_sqrt = deg.pow_(-0.5)
    deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float('inf'), 0)
    edge_weight = deg_inv_sqrt[row] * edge_weight * deg_inv_sqrt[col]

    return edge_index, edge_weight

class test_GCNConv(MessagePassing):
    r"""The graph convolutional operator from the `"Semi-supervised
    Classification with Graph Convolutional Networks"
    <https://arxiv.org/abs/1609.02907>`_ paper.

    .. math::
        \mathbf{X}^{\prime} = \mathbf{\hat{D}}^{-1/2} \mathbf{\hat{A}}
        \mathbf{\hat{D}}^{-1/2} \mathbf{X} \mathbf{\Theta},

    where :math:`\mathbf{\hat{A}} = \mathbf{A} + \mathbf{I}` denotes the
    adjacency matrix with inserted self-loops and
    :math:`\hat{D}_{ii} = \sum_{j=0} \hat{A}_{ij}` its diagonal degree matrix.
    The adjacency matrix can include other values than :obj:`1` representing
    edge weights via the optional :obj:`edge_weight` tensor.

    Its node-wise formulation is given by:

    .. math::
        \mathbf{x}^{\prime}_i = \mathbf{\Theta}^{\top} \sum_{j \in
        \mathcal{N}(i) \cup \{ i \}} \frac{e_{j,i}}{\sqrt{\hat{d}_j
        \hat{d}_i}} \mathbf{x}_j

    with :math:`\hat{d}_i = 1 + \sum_{j \in \mathcal{N}(i)} e_{j,i}`, where
    :math:`e_{j,i}` denotes the edge weight from source node :obj:`j` to target
    node :obj:`i` (default: :obj:`1.0`)

    Args:
        in_channels (int): Size of each input sample, or :obj:`-1` to derive
            the size from the first input(s) to the forward method.
        out_channels (int): Size of each output sample.
        improved (bool, optional): If set to :obj:`True`, the layer computes
            :math:`\mathbf{\hat{A}}` as :math:`\mathbf{A} + 2\mathbf{I}`.
            (default: :obj:`False`)
        cached (bool, optional): If set to :obj:`True`, the layer will cache
            the computation of :math:`\mathbf{\hat{D}}^{-1/2} \mathbf{\hat{A}}
            \mathbf{\hat{D}}^{-1/2}` on first execution, and will use the
            cached version for further executions.
            This parameter should only be set to :obj:`True` in transductive
            learning scenarios. (default: :obj:`False`)
        add_self_loops (bool, optional): If set to :obj:`False`, will not add
            self-loops to the input graph. By default, self-loops will be added
            in case :obj:`normalize` is set to :obj:`True`, and not added
            otherwise. (default: :obj:`None`)
        normalize (bool, optional): Whether to add self-loops and compute
            symmetric normalization coefficients on-the-fly.
            (default: :obj:`True`)
        bias (bool, optional): If set to :obj:`False`, the layer will not learn
            an additive bias. (default: :obj:`True`)
        **kwargs (optional): Additional arguments of
            :class:`torch_geometric.nn.conv.MessagePassing`.

    Shapes:
        - **input:**
          node features :math:`(|\mathcal{V}|, F_{in})`,
          edge indices :math:`(2, |\mathcal{E}|)`
          or sparse matrix :math:`(|\mathcal{V}|, |\mathcal{V}|)`,
          edge weights :math:`(|\mathcal{E}|)` *(optional)*
        - **output:** node features :math:`(|\mathcal{V}|, F_{out})`
    """
    _cached_edge_index: Optional[OptPairTensor]
    _cached_adj_t: Optional[SparseTensor]

    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        improved: bool = False,
        cached: bool = False,
        add_self_loops: Optional[bool] = None,
        normalize: bool = True,
        bias: bool = True,
        **kwargs,
    ):
        kwargs.setdefault('aggr', 'add')
        super().__init__(**kwargs)

        if add_self_loops is None:
            add_self_loops = normalize

        if add_self_loops and not normalize:
            raise ValueError(f"'{self.__class__.__name__}' does not support "
                             f"adding self-loops to the graph when no "
                             f"on-the-fly normalization is applied")

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.improved = improved
        self.cached = cached
        self.add_self_loops = add_self_loops
        self.normalize = normalize

        self._cached_edge_index = None
        self._cached_adj_t = None

        self.lin = Linear(in_channels, out_channels, bias=False,
                          weight_initializer='glorot')

        if bias:
            self.bias = Parameter(torch.empty(out_channels))
        else:
            self.register_parameter('bias', None)

        self.reset_parameters()

    def reset_parameters(self):
        super().reset_parameters()
        self.lin.reset_parameters()
        zeros(self.bias)
        self._cached_edge_index = None
        self._cached_adj_t = None

    def forward(self, x: Tensor, edge_index: Adj,
                edge_weight: OptTensor = None) -> Tensor:

        if isinstance(x, (tuple, list)):
            raise ValueError(f"'{self.__class__.__name__}' received a tuple "
                             f"of node features as input while this layer "
                             f"does not support bipartite message passing. "
                             f"Please try other layers such as 'SAGEConv' or "
                             f"'GraphConv' instead")

        if self.normalize:
            if isinstance(edge_index, Tensor):
                cache = self._cached_edge_index
                if cache is None:
                    edge_index, edge_weight = gcn_norm(  # yapf: disable
                        edge_index, edge_weight, x.size(self.node_dim),
                        self.improved, self.add_self_loops, self.flow, x.dtype)
                    if self.cached:
                        self._cached_edge_index = (edge_index, edge_weight)
                else:
                    edge_index, edge_weight = cache[0], cache[1]

            elif isinstance(edge_index, SparseTensor):
                cache = self._cached_adj_t
                if cache is None:
                    edge_index = gcn_norm(  # yapf: disable
                        edge_index, edge_weight, x.size(self.node_dim),
                        self.improved, self.add_self_loops, self.flow, x.dtype)
                    if self.cached:
                        self._cached_adj_t = edge_index
                else:
                    edge_index = cache

        # x = self.lin(x)

        # propagate_type: (x: Tensor, edge_weight: OptTensor)
        out = self.propagate(edge_index, x=x, edge_weight=edge_weight)

        if self.bias is not None:
            out = out + self.bias

        return out

    def message(self, x_j: Tensor, edge_weight: OptTensor) -> Tensor:
        return x_j if edge_weight is None else edge_weight.view(-1, 1) * x_j

    def message_and_aggregate(self, adj_t: Adj, x: Tensor) -> Tensor:
        return spmm(adj_t, x, reduce=self.aggr)

In [88]:
class SimpleHGCN_GC(torch.nn.Module):
    def __init__(self):
        super(SimpleHGCN_GC, self).__init__()
        self.n2e = test_HGConv_node_to_edge(in_channels=1, out_channels=1, use_attention=False)
        self.e2n = test_HGConv_edge_to_node(in_channels=1, out_channels=1, use_attention=False)
        self.gcn = test_GCNConv(in_channels=1, out_channels=1)
    
    def forward(self, x, edge_index, hyperedge_index):
        # 节点到超边的信息传递
        hyperedge_feats = self.n2e(x, hyperedge_index)

        new_hyperedge_feats = self.gcn(hyperedge_feats[0], edge_index)
        # 超边到节点的信息传递
        node_feats = self.e2n(new_hyperedge_feats, hyperedge_index)
        # return hyperedge_feats
        # return hyperedge_feats, new_hyperedge_feats
        return hyperedge_feats, new_hyperedge_feats, node_feats

In [96]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import HypergraphConv

# 构建 6 个节点和 3 个超边的 incident matrix
num_nodes = 6
num_hyperedges = 3

# incident matrix shape: (num_nodes, num_hyperedges)
# 假设以下 incident matrix (手动设置一个示例)
incidence_matrix = torch.tensor([
    [1, 0, 0],  # node 1 connected to hyperedge 1
    [0, 1, 0],  # node 2 connected to hyperedge 1 and 2
    [1, 1, 0],  # node 3 connected to hyperedge 2
    [0, 1, 1],  # node 4 connected to hyperedge 2 and 3
    [0, 0, 1],  # node 5 connected to hyperedge 3
    [0, 0, 1]   # node 6 connected to hyperedge 1 and 3
], dtype=torch.float)

# 转换为边索引
hyperedge_index = incidence_matrix.nonzero().t()

adjacency_matrix = torch.tensor([
    [1, 1, 0],  # node 1 has a self-loop
    [1, 1, 1],  # node 2 has a self-loop and connects to node 3
    [0, 1, 1]   # node 3 has a self-loop and connects to node 2
], dtype=torch.float)

# 将邻接矩阵转换为边索引
edge_index = adjacency_matrix.nonzero(as_tuple=False).t()


# 节点特征 (假设 6 个节点，每个节点特征维度为 1，且初始值为 1)
# x = torch.ones((num_nodes, 1))
x = torch.arange(1, num_nodes + 1, dtype=torch.float).view(-1, 1)
print("x : ", x)
# 初始化模型
model = SimpleHGCN_GC()

# 前向传播
# node_feats = model(x, edge_index, hyperedge_index)
# hyperedge_feats, new_hyperedge_feats = model(x, edge_index, hyperedge_index)
hyperedge_feats, new_hyperedge_feats, node_feats = model(x, edge_index, hyperedge_index)
# hyperedge_feats = model(x, edge_index)

# 打印结果
print("Node to Hyperedge Information Transfer Result:")
print(hyperedge_feats)

print("\n new Hyperedge to Node Information Transfer Result:")
print(new_hyperedge_feats)


print("\nHyperedge to Node Information Transfer Result:")
print(node_feats)

x :  tensor([[1.],
        [2.],
        [3.],
        [4.],
        [5.],
        [6.]])
out :  tensor([[[2.]],

        [[3.]],

        [[5.]]])
Node to Hyperedge Information Transfer Result:
(tensor([[2.],
        [3.],
        [5.]], grad_fn=<AddBackward0>), 3, 6, tensor([1., 1., 1.]))

 new Hyperedge to Node Information Transfer Result:
tensor([[2.2247],
        [3.8577],
        [3.7247]], grad_fn=<AddBackward0>)

Hyperedge to Node Information Transfer Result:
tensor([[2.2247],
        [3.8577],
        [3.0412],
        [3.7912],
        [3.7247],
        [3.7247]], grad_fn=<AddBackward0>)


In [188]:
import sympy as sp

# 定义符号
a, b, c, d, e, f = sp.symbols('v1 v2 v3 v4 v5 v6')

# 定义矩阵 A
H = sp.Matrix([
    [1, 0, 0],  # node 1 connected to hyperedge 1
    [0, 1, 0],  # node 2 connected to hyperedge 1 and 2
    [1, 1, 0],  # node 3 connected to hyperedge 2
    [0, 1, 1],  # node 4 connected to hyperedge 2 and 3
    [0, 0, 1],  # node 5 connected to hyperedge 3
    [0, 0, 1]   # node 6 connected to hyperedge 1 and 3
])

A = sp.Matrix([
    [1, 1, 0],  # node 1 connected to hyperedge 1
    [1, 1, 1],  # node 2 connected to hyperedge 1 and 2
    [0, 1, 1]  # node 3 connected to hyperedge 2
])

# 定义矩阵 X
X = sp.Matrix([
    [a, b, c, d, e,f],
])
# X = sp.Matrix([
#     [1, 2, 3, 4, 5, 6],
# ])


# 一個節點有幾個 hyperedge
hyperedges_of_node = [sum(H.row(i)) for i in range(H.rows)]
# 一個 hyperedge 有幾個 node
nodes_of_hyperedges = [sum(H.col(i)) for i in range(H.cols)]
node_degree = [sum(A.row(i)) for i in range(A.rows)]

D = sp.diag(*hyperedges_of_node)
AD = sp.diag(*node_degree)
B = sp.diag(*nodes_of_hyperedges)


# 计算 AXW
XH = X * H


print("XH:")
sp.pprint(XH)
print("\nDiagonal Matrix B:")
sp.pprint(B)

print("\nDiagonal Matrix D:")
sp.pprint(D)

XH:
[v₁ + v₃  v₂ + v₃ + v₄  v₄ + v₅ + v₆]

Diagonal Matrix B:
⎡2  0  0⎤
⎢       ⎥
⎢0  3  0⎥
⎢       ⎥
⎣0  0  3⎦

Diagonal Matrix D:
⎡1  0  0  0  0  0⎤
⎢                ⎥
⎢0  1  0  0  0  0⎥
⎢                ⎥
⎢0  0  2  0  0  0⎥
⎢                ⎥
⎢0  0  0  2  0  0⎥
⎢                ⎥
⎢0  0  0  0  1  0⎥
⎢                ⎥
⎣0  0  0  0  0  1⎦


In [192]:
# AD_neg_half = AD.applyfunc(lambda x: 0 if x == 0 else x**(-1/2))
# AD_neg_half = AD.applyfunc(lambda x: 0 if x == 0 else sp.Rational(1, x)**(1/2))
# AD_neg_half = AD.applyfunc(lambda x: sp.Rational(0, 1) if x == 0 else sp.Rational(1, x)**(1/2))
AD_neg_half = AD.applyfunc(lambda x: sp.Rational(0, 1) if x == 0 else sp.sqrt(sp.Rational(1, x)))
D_neg = D.applyfunc(lambda x: 0 if x == 0 else x**(-1))
B_neg = B.applyfunc(lambda x: 0 if x == 0 else x**(-1))
print("Diagonal Matrix B:")
sp.pprint(D)

print("\nB^(-1/2):")
sp.pprint(AD_neg_half)


Diagonal Matrix B:
⎡1  0  0  0  0  0⎤
⎢                ⎥
⎢0  1  0  0  0  0⎥
⎢                ⎥
⎢0  0  2  0  0  0⎥
⎢                ⎥
⎢0  0  0  2  0  0⎥
⎢                ⎥
⎢0  0  0  0  1  0⎥
⎢                ⎥
⎣0  0  0  0  0  1⎦

B^(-1/2):
⎡√2        ⎤
⎢──  0   0 ⎥
⎢2         ⎥
⎢          ⎥
⎢    √3    ⎥
⎢0   ──  0 ⎥
⎢    3     ⎥
⎢          ⎥
⎢        √2⎥
⎢0   0   ──⎥
⎣        2 ⎦


In [198]:
print("HGCN hyperedge feature E : ")
E = B_neg * H.transpose() * X.transpose()
sp.pprint(E)
print("HGCN next node feature X : ")
next_x = D_neg * H * B_neg * H.transpose() * X.transpose()
sp.pprint(next_x)

HGCN hyperedge feature E : 
⎡  v₁   v₃   ⎤
⎢  ── + ──   ⎥
⎢  2    2    ⎥
⎢            ⎥
⎢v₂   v₃   v₄⎥
⎢── + ── + ──⎥
⎢3    3    3 ⎥
⎢            ⎥
⎢v₄   v₅   v₆⎥
⎢── + ── + ──⎥
⎣3    3    3 ⎦
HGCN next node feature X : 
⎡       v₁   v₃        ⎤
⎢       ── + ──        ⎥
⎢       2    2         ⎥
⎢                      ⎥
⎢     v₂   v₃   v₄     ⎥
⎢     ── + ── + ──     ⎥
⎢     3    3    3      ⎥
⎢                      ⎥
⎢ v₁   v₂   5⋅v₃   v₄  ⎥
⎢ ── + ── + ──── + ──  ⎥
⎢ 4    6     12    6   ⎥
⎢                      ⎥
⎢v₂   v₃   v₄   v₅   v₆⎥
⎢── + ── + ── + ── + ──⎥
⎢6    6    3    6    6 ⎥
⎢                      ⎥
⎢     v₄   v₅   v₆     ⎥
⎢     ── + ── + ──     ⎥
⎢     3    3    3      ⎥
⎢                      ⎥
⎢     v₄   v₅   v₆     ⎥
⎢     ── + ── + ──     ⎥
⎣     3    3    3      ⎦


In [199]:
nn_x = D_neg * H * B_neg * H.transpose() * next_x
sp.pprint(nn_x)

⎡        3⋅v₁   v₂   11⋅v₃   v₄         ⎤
⎢        ──── + ── + ───── + ──         ⎥
⎢         8     12     24    12         ⎥
⎢                                       ⎥
⎢  v₁   2⋅v₂   11⋅v₃   5⋅v₄   v₅   v₆   ⎥
⎢  ── + ──── + ───── + ──── + ── + ──   ⎥
⎢  12    9       36     18    18   18   ⎥
⎢                                       ⎥
⎢11⋅v₁   11⋅v₂   55⋅v₃   13⋅v₄   v₅   v₆⎥
⎢───── + ───── + ───── + ───── + ── + ──⎥
⎢  48      72     144      72    36   36⎥
⎢                                       ⎥
⎢  v₁   5⋅v₂   13⋅v₃   11⋅v₄   v₅   v₆  ⎥
⎢  ── + ──── + ───── + ───── + ── + ──  ⎥
⎢  24    36      72      36    6    6   ⎥
⎢                                       ⎥
⎢      v₂   v₃   v₄   5⋅v₅   5⋅v₆       ⎥
⎢      ── + ── + ── + ──── + ────       ⎥
⎢      18   18   3     18     18        ⎥
⎢                                       ⎥
⎢      v₂   v₃   v₄   5⋅v₅   5⋅v₆       ⎥
⎢      ── + ── + ── + ──── + ────       ⎥
⎣      18   18   3     18     18        ⎦


In [196]:
print("HGCN_GC hyperedge feature E : ")
E = B_neg * H.transpose() * X.transpose()
sp.pprint(E)
print("HGCN_GC next node feature X_GC : ")
NE =  AD_neg_half * A * AD_neg_half * E
sp.pprint(NE)
print("HGCN_GC next node feature X : ")
next_x = D_neg * H * NE
sp.pprint(next_x)

HGCN hyperedge feature E : 
⎡  v₁   v₃   ⎤
⎢  ── + ──   ⎥
⎢  2    2    ⎥
⎢            ⎥
⎢v₂   v₃   v₄⎥
⎢── + ── + ──⎥
⎢3    3    3 ⎥
⎢            ⎥
⎢v₄   v₅   v₆⎥
⎢── + ── + ──⎥
⎣3    3    3 ⎦
HGCN_GC next node feature X_GC : 
⎡                       ⎛v₂   v₃   v₄⎞          ⎤
⎢                    √6⋅⎜── + ── + ──⎟          ⎥
⎢          v₁   v₃      ⎝3    3    3 ⎠          ⎥
⎢          ── + ── + ─────────────────          ⎥
⎢          4    4            6                  ⎥
⎢                                               ⎥
⎢                  ⎛v₁   v₃⎞      ⎛v₄   v₅   v₆⎞⎥
⎢               √6⋅⎜── + ──⎟   √6⋅⎜── + ── + ──⎟⎥
⎢v₂   v₃   v₄      ⎝2    2 ⎠      ⎝3    3    3 ⎠⎥
⎢── + ── + ── + ──────────── + ─────────────────⎥
⎢9    9    9         6                 6        ⎥
⎢                                               ⎥
⎢                         ⎛v₂   v₃   v₄⎞        ⎥
⎢                      √6⋅⎜── + ── + ──⎟        ⎥
⎢       v₄   v₅   v₆      ⎝3    3    3 ⎠        ⎥
⎢       ── + ── + ── + 

In [None]:
hgcn_gc_x = D_neg * H * B_neg * H.transpose() * X.transpose()