In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from torch_geometric.nn import GATConv

## 创建GCN网络

In [16]:
# 假设你已经获得了三种物品的bbox
# 对每种物品的bbox进行编号，分别为0、1、2
# 例如，boxes[0] 存储的是容器的bbox，boxes[1] 存储的是抓取物品的bbox，boxes[2] 存储的是干扰物品的bbox
class GCN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

## 创建GAT网络

In [17]:
class GAT(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GAT, self).__init__()
        # 使用了 GATConv 作为图神经网络的层。
        # GATConv 是 GAT 中的一个注意力机制层，可以对节点之间的关系进行建模，并通过注意力机制来学习不同节点之间的重要性
        self.conv1 = GATConv(input_dim, hidden_dim, heads=1)
        self.conv2 = GATConv(hidden_dim, output_dim, heads=1)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

## 模拟数据

In [18]:
boxes = [
    torch.tensor([[0, 0, 1, 1], [1, 1, 2, 2]]),  # 容器的bbox，假设有两个容器
    torch.tensor([[1.5, 1.5, 2.5, 2.5], [2, 2, 3, 3]]),  # 抓取物品的bbox，假设有两个抓取物品
    torch.tensor([[0.5, 0.5, 1.5, 1.5], [2.5, 2.5, 3.5, 3.5]]),  # 干扰物品的bbox，假设有两个干扰物品
]

In [19]:
# 创建节点特征，每个节点的特征是它的bbox
node_features = torch.cat(boxes, dim=0)

# 创建边，这里简单地假设所有节点之间都有边
num_nodes = sum([bbox.size(0) for bbox in boxes])
edge_index = torch.tensor([
    [i, j] for i in range(num_nodes) for j in range(num_nodes) if i != j
], dtype=torch.long).t().contiguous()

# 构建图数据
data = Data(x=node_features, edge_index=edge_index)

# 定义模型
model = GCN(input_dim=node_features.size(-1), hidden_dim=64, output_dim=2)

# 定义模型2
model2 = GAT(input_dim=node_features.size(-1), hidden_dim=64, output_dim=2)


In [20]:
model(data)

tensor([[-0.4909, -0.9470],
        [-0.4909, -0.9470],
        [-0.4909, -0.9470],
        [-0.4909, -0.9470],
        [-0.4909, -0.9470],
        [-0.4909, -0.9470]], grad_fn=<LogSoftmaxBackward0>)

In [21]:
model2(data)

tensor([[-0.7933, -0.6021],
        [-0.7933, -0.6021],
        [-0.7598, -0.6306],
        [-0.7933, -0.6021],
        [-0.7523, -0.6373],
        [-0.7558, -0.6342]], grad_fn=<LogSoftmaxBackward0>)

In [9]:
data

Data(x=[6, 4], edge_index=[2, 30])

In [8]:
data.x

tensor([[0.0000, 0.0000, 1.0000, 1.0000],
        [1.0000, 1.0000, 2.0000, 2.0000],
        [1.5000, 1.5000, 2.5000, 2.5000],
        [2.0000, 2.0000, 3.0000, 3.0000],
        [0.5000, 0.5000, 1.5000, 1.5000],
        [2.5000, 2.5000, 3.5000, 3.5000]])

In [10]:
data.edge_index

tensor([[0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4,
         4, 5, 5, 5, 5, 5],
        [1, 2, 3, 4, 5, 0, 2, 3, 4, 5, 0, 1, 3, 4, 5, 0, 1, 2, 4, 5, 0, 1, 2, 3,
         5, 0, 1, 2, 3, 4]])

## 图神经网络进行建模的代码

In [142]:
import torch
import numpy as np
import dgl
from dgl import DGLGraph

class SceneGraph():
    def __init__(self, base_tf, theta, dist2goal, obj_bboxes, obj_bboxes_indexes):
        super(SceneGraph, self).__init__()
        self.graph = None
        self.data = None
        self.robot_visible = False
        self._device = 'cpu'
        # 包含不同类型关系的列表，其中每个元素表示一个关系类型
        # 这样做的目的是为了使得图在后续的处理中能够区分不同类型的边，即对应于边的权重
        # 通过使用这些关系类型，代码可以正确地标识和区分从不同实体到机器人的边，从而构建了一个更加复杂的社交图
        # 'o2r'：表示抓取物品旁的其他物品到机器人的关系，obstacle2robot，类似于circle obstacle
        # 'c2r'：容器本身作为障碍物，container2robot，类似于line obstacle
        # 's2r'：容器附近的障碍物，surrounding2robot
        self.rels = ['o2r', 'c2r', 's2r']
        self.mode = 0

        # ToDo:第一步，首先要将坐标转换到世界坐标系下
        rotated_bboxes = self.rotate_state(base_tf, theta, obj_bboxes)

        # ToDo:第二步，再转换为图的数据结构
        self.build_up_graph_on_local_state(base_tf, theta, dist2goal, rotated_bboxes, obj_bboxes_indexes)

    def rotate_state(self, base_tf, theta, obj_bboxes):
        """
        将其他物品的状态转到机器人坐标系下
        Args:
            base_tf:当前robot的坐标系
            theta:当前robot的朝向
            obj_bboxes:obj_bboxes的维度为[n, 5]其中0:4为bbox，4为高度z，5为朝向
            obj_bboxes_indexes:

        Returns:
        Transform the coordinate to agent-centric.
        Input tuple include robot state tensor and human state tensor.
        robot state tensor is of size (number, state_length)(for example 1*9)
        obstacle state tensor is of size (number, state_length)(for example 3*4)
        container state tensor is of size (number, state_length)(for example 5*4)
        surrounding state tensor is of size (number, state_length)(for example 4*4)
        """
        
        """第一步：将物体转移到机器人坐标系下"""
        inv_base_tf = torch.linalg.inv(base_tf)
        
        """第二步：分别获取障碍物坐标的索引值"""
        local_obj_bboxes = torch.zeros_like(obj_bboxes)

        total_num = len(obj_bboxes)

        for obj_num in range(total_num):
            # 获取当前对象轴对齐边界框的最小的 XY 顶点坐标
            min_xy_vertex = torch.hstack(
                (obj_bboxes[obj_num, 0:2], torch.tensor([0.0, 1.0], device=self._device))).T
            # 获取当前对象轴对齐边界框的最大的 XY 顶点坐标
            max_xy_vertex = torch.hstack(
                (obj_bboxes[obj_num, 2:4], torch.tensor([0.0, 1.0], device=self._device))).T
            
            # 通过矩阵乘法将最小顶点坐标转换到机器人参考框架中，并更新为新的最小顶点坐标
            new_min_xy_vertex = torch.matmul(inv_base_tf, min_xy_vertex)[0:2].T.squeeze()
            # 通过矩阵乘法将最大顶点坐标转换到机器人参考框架中，并更新为新的最大顶点坐标
            new_max_xy_vertex = torch.matmul(inv_base_tf, max_xy_vertex)[0:2].T.squeeze()
            
            # 记录结果
            local_obj_bboxes[obj_num, 0:4] = torch.hstack((new_min_xy_vertex, new_max_xy_vertex))
            # 设置高度差
            local_obj_bboxes[obj_num, 5] = self.limit_angle(obj_bboxes[obj_num, 5] - theta)

        return local_obj_bboxes


    def limit_angle(self, angle):
        # 将角度限制在 -π 到 π 之间
        while angle < -np.pi:
            angle += 2 * np.pi
        while angle > np.pi:
            angle -= 2 * np.pi
        return angle


    def build_up_graph_on_local_state(self, base_tf, theta, dist2goal, obj_bboxes, obj_bboxes_indexes):
        """

        Args:
            base_tf: 机器人当前的坐标系
            theta: 机器人当前的朝向
            obj_bboxes: 物品的bboxes
            obj_bboxes_indexes: 物品的索引值

        Returns:

        """

        src_id = torch.Tensor([])
        dst_id = torch.Tensor([])
        # We create a map to store the types of the nodes. We'll use it to compute edges' types
        self.typeMap = dict()
        position_by_id = {}

        # Node Descriptor Table
        # 建立四种类型的节点，r表示robot，h表示human，o表示obstacle，w表示wall
        self.node_descriptor_header = ['r', 'o', 'c', 's']

        # Relations are integers
        node_types_one_hot = ['robot', 'obstacle', 'container', 'surrounding']

        # 机器人矩阵的特征
        robot_metric_features = ['rob_x', 'rob_y', 'rob_ori', 'dis2goal']

        # 桌面上其他障碍物矩阵的特征
        obstacle_metric_features = ['obs_min_x', 'obs_min_y', 'obs_max_x', 'obs_max_y', 'obs_z', 'obs_ori']

        # 障碍物矩阵的特征
        container_metric_features = ['con_min_x', 'con_min_y', 'con_max_x', 'con_max_y', 'con_z', 'con_ori']

        # 墙矩阵的特征
        surrounding_features = ['surr_min_x', 'surr_min_y', 'surr_max_x', 'surr_max_y', 'surr_z', 'surr_ori']

        # 所有的特征为特征之和
        all_features = node_types_one_hot + robot_metric_features + obstacle_metric_features + container_metric_features + surrounding_features

        # Copy input data
        # 复制输入数据
        self.obj_bboxes = obj_bboxes

        # 输入数据分别表示的含义
        robot_state = np.hstack([base_tf[:3, 3], theta, dist2goal])
        obstacle_state = self.obj_bboxes[obj_bboxes_indexes[0], :]
        container_state = self.obj_bboxes[obj_bboxes_indexes[1], :]
        surrounding_state = self.obj_bboxes[obj_bboxes_indexes[2], :]

        # 特征的维度
        feature_dimensions = len(all_features)

        # 统计各个物品的数目
        # 机器人的数目
        robot_num = 1

        # 障碍物的数目
        if obstacle_state is not None:
            obstacle_num = obstacle_state.shape[0]
        else:
            obstacle_num = 0

        # 容器的数目，这是一个广义的容器，因为对于桌子只有一个长方体，对于沙发有三个长方体，对于冰箱有四个长方体
        if container_state is not None:
            container_num = container_state.shape[0]
        else:
            container_num = 0

        # 容器周围障碍物的数目
        if surrounding_state is not None:
            surrounding_num = surrounding_state.shape[0]
        else:
            surrounding_num = 0

        # 统计总的节点的数目
        total_node_num = robot_num + obstacle_num + container_num + surrounding_num

        # robot的tensor
        robot_tensor = torch.zeros((robot_num, feature_dimensions))

        # 记录机器人的特征
        robot_tensor[0, all_features.index('robot')] = 1
        # 记录机器人的state特征
        robot_tensor[0, all_features.index('rob_x'):all_features.index("rob_ori") + 1] = robot_state[0]
        # 将features记录为robot_tensor
        features = robot_tensor

        # 记录人类特征
        if obstacle_num > 0:
            obstacle_tensor = torch.zeros((obstacle_num, feature_dimensions))
            for i in range(obstacle_num):
                obstacle_tensor[i, all_features.index('obstacle')] = 1
                obstacle_tensor[i, all_features.index('obs_min_x'):all_features.index("obs_ori") + 1] = \
                obstacle_state[i]

            # self.graph.nodes['human'].data['h'] = human_tensor
            features = torch.cat([features, obstacle_tensor], dim=0)

        # 记录障碍物特征
        if container_num > 0:
            container_tensor = torch.zeros((container_num, feature_dimensions))
            for i in range(container_num):
                container_tensor[i, all_features.index('container')] = 1
                container_tensor[i, all_features.index('con_min_x'):all_features.index("con_ori") + 1] = \
                    container_state[i]
            # self.graph.nodes['obstacle'].data['h'] = obstacle_tensor
            features = torch.cat([features, container_tensor], dim=0)

        # 记录墙特征
        if surrounding_num > 0:
            surrounding_tensor = torch.zeros((surrounding_num, feature_dimensions))
            for i in range(surrounding_num):
                surrounding_tensor = torch.zeros((surrounding_num, feature_dimensions))
                surrounding_tensor[i, all_features.index('surrounding')] = 1
                surrounding_tensor[i, all_features.index('surr_min_x'):all_features.index("surr_ori") + 1] = \
                    surrounding_state[i]
            features = torch.cat([features, surrounding_tensor], dim=0)
        # self.graph.nodes['wall'].data['h'] = wall_tensor
        # features = torch.cat([robot_tensor, human_tensor, obstacle_tensor, wall_tensor], dim=0)

        ### build up edges for the social graph
        # add obstacle_to_robot edges
        # 创建了一些空的张量来存储边的信息
        src_id = torch.Tensor([])  # 源节点 ID
        dst_id = torch.Tensor([])  # 目标节点 ID (dst_id)
        edge_types = torch.Tensor([])  # 边的类型 (edge_types)
        edge_norm = torch.Tensor([])  # 边的归一化值 (edge_norm)
        # add human_to_robot edges

        # 如果存在障碍物 (obstacle_num > 0)，则创建从障碍物到机器人的边
        if obstacle_num > 0:
            # 生成障碍物的源节点 ID (src_obstacle_id)
            # 这部分相当于是对起始点索引的编号
            src_obstacle_id = torch.tensor(range(obstacle_num)) + robot_num
            # 将目标节点 ID (o2r_robot_id) 设置为零向量，表示所有这些边都指向机器人
            # 终止点索引
            o2r_robot_id = torch.zeros_like(src_obstacle_id)
            # 这行代码为边的类型创建了一个张量，这部分是乘以边的权重
            o2r_edge_types = torch.ones_like(o2r_robot_id) * torch.LongTensor([self.rels.index('o2r')])
            # 为边的归一化值创建了一个张量，将其设置为所有边的归一化值都为 1.0，无权图
            o2r_edge_norm = torch.ones_like(o2r_robot_id) * (1.0)
            src_id = src_obstacle_id
            dst_id = o2r_robot_id
            edge_types = o2r_edge_types
            edge_norm = o2r_edge_norm

        # 如果存在人物 (human_num > 0)，则创建从人物到机器人的边
        if container_num > 0:
            # 这行代码生成人物的源节点 ID，并将其与机器人的数量相加，以确保人物的 ID 与机器人的 ID 不重叠
            src_human_id = torch.tensor(range(container_num)) + robot_num + obstacle_num
            # 创建了目标节点ID，将其设置为与人物相同长度的零向量。这意味着所有的边都指向机器人
            h2r_robot_id = torch.zeros_like(src_human_id)
            # 为边的类型创建了一个张量，与之前类似，找到了关系列表中“人物到机器人”的索引，并创建了一个与目标节点 ID 相同长度的张量，并将其填充为相应的边类型
            h2r_edge_types = torch.ones_like(h2r_robot_id) * torch.LongTensor([self.rels.index('c2r')])
            # 为边的归一化值创建了一个张量，将其设置为所有边的归一化值都为 1.0，无权图
            h2r_edge_norm = torch.ones_like(h2r_robot_id) * (1.0)
            # 记录所有的边节点
            src_id = torch.cat([src_id, src_human_id], dim=0)
            dst_id = torch.cat([dst_id, h2r_robot_id], dim=0)
            edge_types = torch.cat([edge_types, h2r_edge_types], dim=0)
            edge_norm = torch.cat([edge_norm, h2r_edge_norm], dim=0)

        # add wall_to_robot edges
        # 如果存在墙壁，则创建从墙壁到机器人的边
        if surrounding_num > 0:
            # 这一行创建了墙壁节点的源节点 ID，相加，以确保源节点 ID 的唯一性
            src_wall_id = torch.tensor(range(surrounding_num)) + robot_num + obstacle_num + container_num
            # 创建了一个与墙壁数量相同的零向量，表示所有这些边都指向机器人
            w2r_robot_id = torch.zeros_like(src_wall_id)
            # 创建了墙壁的边权
            w2r_edge_types = torch.ones_like(w2r_robot_id) * torch.LongTensor([self.rels.index('s2r')])
            # 创建了归一化后的边权
            w2r_edge_norm = torch.ones_like(w2r_robot_id) * (1.0)

            src_id = torch.cat([src_id, src_wall_id], dim=0)
            dst_id = torch.cat([dst_id, w2r_robot_id], dim=0)
            edge_types = torch.cat([edge_types, w2r_edge_types], dim=0)
            edge_norm = torch.cat([edge_norm, w2r_edge_norm], dim=0)

        edge_norm = edge_norm.unsqueeze(dim=1)
        edge_norm = edge_norm.float()
        edge_types = edge_types.float()

        # 通过dgl库创建图
        self.graph = dgl.graph((src_id, dst_id), num_nodes=total_node_num, idtype=torch.int64)
        self.graph.ndata['h'] = features
        self.graph.edata.update({'rel_type': edge_types, 'norm': edge_norm})




## 进行数据的填充

In [143]:
# base_tf, theta, dist2goal, obj_bboxes, obj_bboxes_indexes

In [144]:
base_xy = [1.0, 2.0]
base_ori = 0.5

base_tf = torch.tensor([[np.cos(base_ori), -np.sin(base_ori), 0, base_xy[0]],
                   [np.sin(base_ori),  np.cos(base_ori),  0, base_xy[1]],
                   [               0,                 0,  1,        1.2],
                   [               0,                 0,  0,          1]], device='cpu', dtype=torch.double)

dist2goal = 1.5

In [145]:
obj_bboxes = torch.rand(6, 6, dtype=torch.double)

obj_bboxes_indexes = [[0, 1, 2], [3, 4], [5]]

## 运行代码

In [146]:
state_graph = SceneGraph(base_tf, base_ori, dist2goal, obj_bboxes, obj_bboxes_indexes).graph

In [147]:
state_graph

Graph(num_nodes=7, num_edges=6,
      ndata_schemes={'h': Scheme(shape=(26,), dtype=torch.float32)}
      edata_schemes={'rel_type': Scheme(shape=(), dtype=torch.float32), 'norm': Scheme(shape=(1,), dtype=torch.float32)})

## 进行编码

In [149]:
from learned_robot_placement.models.gnn_models import *

ModuleNotFoundError: No module named 'learned_robot_placement'