In [None]:
!pip install shapely -U
!pip install lyft-dataset-sdk

In [None]:
# 定义用于存储生成的数据、可视化结果和模型检查点的文件夹
ARTIFACTS_FOLDER = "./artifacts"

In [None]:
from datetime import datetime
from functools import partial
import glob
from multiprocessing import Pool

import os
os.environ["OMP_NUM_THREADS"] = "1"

import matplotlib.pyplot as plt
%matplotlib inline

import pandas as pd
import cv2
from PIL import Image
import numpy as np
from tqdm import tqdm, tqdm_notebook
import scipy
import scipy.ndimage
import scipy.special
from scipy.spatial.transform import Rotation as R

from lyft_dataset_sdk.lyftdataset import LyftDataset
from lyft_dataset_sdk.utils.data_classes import LidarPointCloud, Box, Quaternion
from lyft_dataset_sdk.utils.geometry_utils import view_points, transform_matrix

In [None]:
# 创建符号链接，访问对应数据目录
!ln -s /kaggle/input/3d-object-detection-for-autonomous-vehicles/train_images images
!ln -s /kaggle/input/3d-object-detection-for-autonomous-vehicles/train_maps maps
!ln -s /kaggle/input/3d-object-detection-for-autonomous-vehicles/train_lidar lidar

In [None]:
# 加载数据集，创建输出目录
level5data = LyftDataset(data_path='.', json_path='/kaggle/input/3d-object-detection-for-autonomous-vehicles/train_data', verbose=True)
os.makedirs(ARTIFACTS_FOLDER, exist_ok=True)

In [None]:
# 数据集中可能出现的物体类别
classes = ["car", "motorcycle", "bus", "bicycle", "truck", "pedestrian", "other_vehicle", "animal", "emergency_vehicle"]

In [None]:
# 获取场景信息，提取每个场景的关键信息（如场景名称、时间戳、token 等），并将其存储到一个列表中
records = [(level5data.get('sample', record['first_sample_token'])['timestamp'], record) for record in level5data.scene]

entries = []

for start_time, record in sorted(records):
    start_time = level5data.get('sample', record['first_sample_token'])['timestamp'] / 1000000

    token = record['token']
    name = record['name']
    date = datetime.utcfromtimestamp(start_time)
    host = "-".join(record['name'].split("-")[:2])
    first_sample_token = record["first_sample_token"]

    entries.append((host, name, date, token, first_sample_token))
            
df = pd.DataFrame(entries, columns=["host", "scene_name", "date", "scene_token", "first_sample_token"])

In [None]:
# 按 host不同车辆对数据进行分组
host_count_df = df.groupby("host")['scene_token'].count()
print(host_count_df)

In [None]:
# 拆分为训练集和验证集，
validation_hosts = ["host-a007", "host-a008", "host-a009"]#定义了验证集的车辆host

validation_df = df[df["host"].isin(validation_hosts)]
vi = validation_df.index
train_df = df[~df.index.isin(vi)]

print(len(train_df), len(validation_df), "train/validation split scene counts")

In [None]:
# 获取训练集中的第一个样本的 token
sample_token = train_df.first_sample_token.values[0]

# 使用 sample_token 从数据集中获取对应的样本数据
sample = level5data.get("sample", sample_token)

# 获取当前样本中 LIDAR_TOP 数据的 token
sample_lidar_token = sample["data"]["LIDAR_TOP"]

# 根据 lidar 数据的 token 获取对应的 LiDAR 数据
lidar_data = level5data.get("sample_data", sample_lidar_token)

# 获取 LiDAR 数据的文件路径
lidar_filepath = level5data.get_sample_data_path(sample_lidar_token)

# 获取当前样本对应的 ego pose（车辆自身坐标系到世界坐标系的变换信息）
ego_pose = level5data.get("ego_pose", lidar_data["ego_pose_token"])

# 获取当前样本对应的校准传感器信息（从传感器坐标系到车辆坐标系的变换信息）
calibrated_sensor = level5data.get("calibrated_sensor", lidar_data["calibrated_sensor_token"])

# 计算从车辆坐标系到世界坐标系的变换矩阵（即车辆坐标系到世界坐标系的转换）
global_from_car = transform_matrix(ego_pose['translation'],
                                   Quaternion(ego_pose['rotation']), inverse=False)

# 计算从传感器坐标系到车辆坐标系的变换矩阵（即传感器坐标系到车辆坐标系的转换）
car_from_sensor = transform_matrix(calibrated_sensor['translation'], Quaternion(calibrated_sensor['rotation']),inverse=False)


In [None]:
# 从文件中加载 LiDAR 点云数据
lidar_pointcloud = LidarPointCloud.from_file(lidar_filepath)

# 点云数据是以传感器坐标系为参考系定义的。
# 现在我们希望将其转换到车辆坐标系，因此我们对每个点进行坐标转换
lidar_pointcloud.transform(car_from_sensor)

# 做一个合理性检查，确保点云数据应该在车辆坐标系中接近于 0（即车辆中心）
plt.hist(lidar_pointcloud.points[0], alpha=0.5, bins=30, label="X")  # 绘制 X 轴方向的点云分布
plt.hist(lidar_pointcloud.points[1], alpha=0.5, bins=30, label="Y")  # 绘制 Y 轴方向的点云分布
plt.legend()  # 显示图例
plt.xlabel("Distance from car along axis")  # X 轴标签：距离车辆沿轴方向的距离
plt.ylabel("Amount of points")  # Y 轴标签：点的数量
plt.show()  # 显示图像


In [None]:
# 点云-->体素
def create_transformation_matrix_to_voxel_space(shape, voxel_size, offset):
    """
    构建一个变换矩阵，将点云数据从世界坐标系转换到体素空间坐标系，使得体素空间的 (0,0,0) 位于中心。
    voxel_size 定义了每个体素在世界坐标系中的大小，(1,1,1) 就像 Minecraft 中的体素。
    
    可以提供每个轴的偏移量（以世界坐标为单位），这在 Z 轴（上下）方向尤其对于 lidar 点云有用。
    """
    
    shape, voxel_size, offset = np.array(shape), np.array(voxel_size), np.array(offset)
    
    # 创建一个 4x4 的单位矩阵，用于变换
    tm = np.eye(4, dtype=np.float32)
    
    # 计算平移量，使得体素空间的原点位于体素网格的中心
    translation = shape / 2 + offset / voxel_size
    
    # 设置缩放（将每个体素的大小按 voxel_size 进行缩放）
    tm = tm * np.array(np.hstack((1 / voxel_size, [1])))
    
    # 设置平移部分，使得体素空间的原点位于网格的中心
    tm[:3, 3] = np.transpose(translation)
    
    return tm

def transform_points(points, transf_matrix):
    """
    使用变换矩阵将 (3,N) 或 (4,N) 形状的点云数据进行变换。
    """
    if points.shape[0] not in [3, 4]:
        raise Exception("输入的点云数据应该是 (3,N) 或 (4,N) 的形状，当前形状是 {}".format(points.shape))
    
    # 使用变换矩阵对点云数据进行变换
    return transf_matrix.dot(np.vstack((points[:3, :], np.ones(points.shape[1]))))[:3, :]

# 使用一些示例值来测试函数
# 创建一个变换矩阵，将点云转换到体素空间
tm = create_transformation_matrix_to_voxel_space(shape=(100, 100, 4), voxel_size=(0.5, 0.5, 0.5), offset=(0, 0, 0.5))

# 创建一个示例点云，格式为 (3, N)，其中 N 为点的数量
p = transform_points(np.array([[10, 10, 0, 0, 0], [10, 5, 0, 0, 0], [0, 0, 0, 2, 0]], dtype=np.float32), tm)

# 打印变换后的点云数据
print(p)


In [None]:
# 将 LiDAR 点云数据转换为体素网格，并进行可视化处理的过程。最终生成的是一个 BEV 图像（鸟瞰图）
def car_to_voxel_coords(points, shape, voxel_size, z_offset=0):
    # 检查输入的体素网格形状是否为三维
    if len(shape) != 3:
        raise Exception("Voxel volume shape should be 3 dimensions (x,y,z)")
        
    # 检查点云数据的形状是否为 (3, N) 或 (4, N)，如果不符合则抛出异常
    if len(points.shape) != 2 or points.shape[0] not in [3, 4]:
        raise Exception("Input points should be (3,N) or (4,N) in shape, found {}".format(points.shape))

    # 创建从世界坐标系到体素坐标系的变换矩阵
    tm = create_transformation_matrix_to_voxel_space(shape, voxel_size, (0, 0, z_offset))
    
    # 使用变换矩阵转换点云坐标
    p = transform_points(points, tm)
    return p


def create_voxel_pointcloud(points, shape, voxel_size=(0.5, 0.5, 1), z_offset=0):
    # 将点云坐标转换到体素坐标系
    points_voxel_coords = car_to_voxel_coords(points.copy(), shape, voxel_size, z_offset)
    
    # 转换后的点云坐标，只取前三个维度
    points_voxel_coords = points_voxel_coords[:3].transpose(1, 0)
    
    # 将坐标转换为整数类型
    points_voxel_coords = np.int0(points_voxel_coords)
    
    # 创建一个空的体素网格，尺寸为给定的 shape
    bev = np.zeros(shape, dtype=np.float32)
    bev_shape = np.array(shape)

    # 检查点云坐标是否在体素网格的有效范围内
    within_bounds = (np.all(points_voxel_coords >= 0, axis=1) * np.all(points_voxel_coords < bev_shape, axis=1))
    
    # 保留有效的坐标
    points_voxel_coords = points_voxel_coords[within_bounds]
    
    # 找到每个体素的位置和该位置的点云数量（即强度）
    coord, count = np.unique(points_voxel_coords, axis=0, return_counts=True)
        
    # 注意：X 和 Y 被交换了，这里填充体素网格
    bev[coord[:, 1], coord[:, 0], coord[:, 2]] = count
    
    return bev


def normalize_voxel_intensities(bev, max_intensity=16):
    # 将体素网格中的强度值归一化到 0 到 1 之间
    return (bev / max_intensity).clip(0, 1)


# 设置体素网格的尺寸和 Z 轴偏移量
voxel_size = (0.4, 0.4, 1.5)
z_offset = -2.0
bev_shape = (336, 336, 3)

# 创建体素点云（BEV 图像）
bev = create_voxel_pointcloud(lidar_pointcloud.points, bev_shape, voxel_size=voxel_size, z_offset=z_offset)

# 归一化体素强度值，使其范围为 [0, 1]
bev = normalize_voxel_intensities(bev)


In [15]:
# 展示鸟瞰图
plt.figure(figsize=(16,8))
plt.imshow(bev)
plt.show()

NameError: name 'plt' is not defined

In [None]:
# 将 LiDAR 数据中的物体框从世界坐标系转换到车辆坐标系，并将它们绘制在之前生成的鸟瞰图
# 获取物体框（bounding boxes）数据
boxes = level5data.get_boxes(sample_lidar_token)

# 初始化目标图像，大小与之前生成的鸟瞰图（BEV）一致
target_im = np.zeros(bev.shape[:3], dtype=np.uint8)

def move_boxes_to_car_space(boxes, ego_pose):
    """
    将物体框从世界坐标系转换到车辆坐标系。
    注意：此函数会直接修改输入的物体框。
    """
    translation = -np.array(ego_pose['translation'])  # 获取车辆位姿的平移部分
    rotation = Quaternion(ego_pose['rotation']).inverse  # 获取车辆位姿的旋转部分，并取其逆

    # 对每个物体框进行转换
    for box in boxes:
        box.translate(translation)  # 进行平移变换
        box.rotate(rotation)  # 进行旋转变换

def scale_boxes(boxes, factor):
    """
    缩放物体框的尺寸。
    注意：此函数会直接修改输入的物体框。
    """
    for box in boxes:
        box.wlh = box.wlh * factor  # 按照给定的比例因子缩放物体框的宽度、长度和高度

def draw_boxes(im, voxel_size, boxes, classes, z_offset=0.0):
    """
    将物体框绘制到图像上。
    im: 目标图像，voxel_size: 体素大小，boxes: 物体框，classes: 类别，z_offset: Z轴偏移
    """
    for box in boxes:
        # 只关心物体框的底部四个角
        corners = box.bottom_corners()  # 获取底部四个角的坐标
        corners_voxel = car_to_voxel_coords(corners, im.shape, voxel_size, z_offset).transpose(1,0)  # 转换到体素坐标系
        corners_voxel = corners_voxel[:,:2]  # 只保留 X 和 Y 坐标，丢弃 Z 坐标

        # 根据类别为物体框分配颜色
        class_color = classes.index(box.name) + 1  # 获取物体框的类别，并为其分配颜色
        
        # 如果类别未知，抛出异常
        if class_color == 0:
            raise Exception("Unknown class: {}".format(box.name))

        # 绘制物体框的轮廓，填充该区域
        cv2.drawContours(im, np.int0([corners_voxel]), 0, (class_color, class_color, class_color), -1)

# 将物体框从世界坐标系转换到车辆坐标系
move_boxes_to_car_space(boxes, ego_pose)

# 缩放物体框的尺寸
scale_boxes(boxes, 0.8)

# 将转换后的物体框绘制到目标图像上
draw_boxes(target_im, voxel_size, boxes, classes, z_offset=z_offset)

# 显示绘制后的图像，物体框用不同颜色表示不同类别
plt.figure(figsize=(8,8))
plt.imshow((target_im > 0).astype(np.float32), cmap='Set2')
plt.show()


In [None]:
# 可视化点云
def visualize_lidar_of_sample(sample_token, axes_limit=80):
    sample = level5data.get("sample", sample_token)
    sample_lidar_token = sample["data"]["LIDAR_TOP"]
    level5data.render_sample_data(sample_lidar_token, axes_limit=axes_limit)
    
visualize_lidar_of_sample(sample_token)

In [None]:
# 删除之前的变量，以便释放内存
del bev, lidar_pointcloud, boxes

# 定义一些超参数，后续系统需要使用
voxel_size = (0.4, 0.4, 1.5)  # 每个体素（Voxel）在三个维度上的大小，单位为米 (m)
z_offset = -2.0  # LiDAR 数据在 Z 轴上的偏移量，通常用于调整 LiDAR 数据的垂直坐标
bev_shape = (336, 336, 3)  # BEV（鸟瞰图）的尺寸，336x336 的图像，3个通道表示 RGB

# 设置一个缩放因子，用于调整物体框大小，使其在鸟瞰图中更加分离
box_scale = 0.8

# 设置训练和验证数据的存储路径
train_data_folder = os.path.join(ARTIFACTS_FOLDER, "bev_train_data")  # 训练数据文件夹路径
validation_data_folder = os.path.join(ARTIFACTS_FOLDER, "./bev_validation_data")  # 验证数据文件夹路径


In [None]:
# 定义一个常量，表示使用的工作进程数，通常是系统 CPU 核心数的三倍
NUM_WORKERS = os.cpu_count() * 3

def prepare_training_data_for_scene(first_sample_token, output_folder, bev_shape, voxel_size, z_offset, box_scale):
    """
    给定场景的第一个样本 token，输出鸟瞰图视角下的栅格化输入体素数据和目标框数据。
    """
    sample_token = first_sample_token
    
    while sample_token:
        # 获取当前样本的详细信息
        sample = level5data.get("sample", sample_token)

        # 获取与该样本相关的 LiDAR 数据 token
        sample_lidar_token = sample["data"]["LIDAR_TOP"]
        lidar_data = level5data.get("sample_data", sample_lidar_token)
        lidar_filepath = level5data.get_sample_data_path(sample_lidar_token)

        # 获取车辆位置（ego_pose）和校准传感器信息（calibrated_sensor）
        ego_pose = level5data.get("ego_pose", lidar_data["ego_pose_token"])
        calibrated_sensor = level5data.get("calibrated_sensor", lidar_data["calibrated_sensor_token"])

        # 计算从世界坐标到车辆坐标的转换矩阵
        global_from_car = transform_matrix(ego_pose['translation'],
                                           Quaternion(ego_pose['rotation']), inverse=False)

        # 计算从传感器坐标到车辆坐标的转换矩阵
        car_from_sensor = transform_matrix(calibrated_sensor['translation'], Quaternion(calibrated_sensor['rotation']),
                                            inverse=False)

        try:
            # 从文件中加载 LiDAR 点云数据，并将其从传感器坐标系转换到车辆坐标系
            lidar_pointcloud = LidarPointCloud.from_file(lidar_filepath)
            lidar_pointcloud.transform(car_from_sensor)
        except Exception as e:
            print("无法加载 LiDAR 点云数据 {}: {}".format(sample_token, e))
            sample_token = sample["next"]  # 跳过当前样本，继续处理下一个样本
            continue
        
        # 使用 LiDAR 点云生成鸟瞰图体素数据
        bev = create_voxel_pointcloud(lidar_pointcloud.points, bev_shape, voxel_size=voxel_size, z_offset=z_offset)
        # 对鸟瞰图体素强度进行归一化
        bev = normalize_voxel_intensities(bev)

        # 获取当前样本的目标框
        boxes = level5data.get_boxes(sample_lidar_token)

        # 创建一个与 BEV 相同形状的空白目标图像
        target = np.zeros_like(bev)

        # 将目标框从世界坐标系转换到车辆坐标系
        move_boxes_to_car_space(boxes, ego_pose)
        # 缩放目标框
        scale_boxes(boxes, box_scale)
        # 在目标图像上绘制目标框
        draw_boxes(target, voxel_size, boxes=boxes, classes=classes, z_offset=z_offset)

        # 将 BEV 图像的值放大到 255，并转换为 8 位无符号整型
        bev_im = np.round(bev*255).astype(np.uint8)
        # 选择目标图像的一个通道（通常是第一通道）
        target_im = target[:,:,0]

        # 保存输入的鸟瞰图和目标图像为 PNG 文件
        cv2.imwrite(os.path.join(output_folder, "{}_input.png".format(sample_token)), bev_im)
        cv2.imwrite(os.path.join(output_folder, "{}_target.png".format(sample_token)), target_im)
        
        # 获取下一个样本 token，继续处理下一个样本
        sample_token = sample["next"]

# 处理训练集和验证集的每个样本
for df, data_folder in [(train_df, train_data_folder), (validation_df, validation_data_folder)]:
    print("使用 {} 个工作进程准备数据并保存到 {}".format(NUM_WORKERS, data_folder))
    # 获取第一个样本 token
    first_samples = df.first_sample_token.values

    # 确保输出文件夹存在
    os.makedirs(data_folder, exist_ok=True)
    
    # 准备处理函数，使用部分函数（partial）将其它参数固定
    process_func = partial(prepare_training_data_for_scene,
                           output_folder=data_folder, bev_shape=bev_shape, voxel_size=voxel_size, z_offset=z_offset, box_scale=box_scale)

    # 使用多进程池并行化处理任务
    pool = Pool(NUM_WORKERS)
    # 使用 tqdm_notebook 显示处理进度
    for _ in tqdm_notebook(pool.imap_unordered(process_func, first_samples), total=len(first_samples)):
        pass
    pool.close()  # 关闭进程池
    del pool  # 删除进程池对象，释放资源


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import cv2
import numpy as np
import glob
import os
import matplotlib.pyplot as plt

# 定义BEV图像数据集类
class BEVImageDataset(torch.utils.data.Dataset):
    def __init__(self, input_filepaths, target_filepaths, map_filepaths=None):
        """
        初始化BEV图像数据集

        :param input_filepaths: 输入图像文件路径列表（BEV图像）
        :param target_filepaths: 目标图像文件路径列表（目标框图像）
        :param map_filepaths: 可选的地图文件路径列表
        """
        self.input_filepaths = input_filepaths  # 存储输入图像路径
        self.target_filepaths = target_filepaths  # 存储目标图像路径
        self.map_filepaths = map_filepaths  # 存储地图文件路径（如果有）

        # 如果提供了地图文件路径，确保输入和地图文件路径数量一致
        if map_filepaths is not None:
            assert len(input_filepaths) == len(map_filepaths)

        # 确保输入图像路径和目标图像路径数量一致
        assert len(input_filepaths) == len(target_filepaths)

    def __len__(self):
        """
        返回数据集的大小
        """
        return len(self.input_filepaths)

    def __getitem__(self, idx):
        """
        根据索引返回图像和目标图像

        :param idx: 索引
        :return: 输入图像、目标图像和样本的token（用于标识样本）
        """
        input_filepath = self.input_filepaths[idx]  # 获取输入图像路径
        target_filepath = self.target_filepaths[idx]  # 获取目标图像路径
        
        # 从路径中提取样本token（通过去除"_input.png"得到样本ID）
        sample_token = input_filepath.split("/")[-1].replace("_input.png", "")

        # 读取输入图像
        im = cv2.imread(input_filepath, cv2.IMREAD_UNCHANGED)
        
        # 如果有地图文件路径，将地图图像与输入图像拼接
        if self.map_filepaths:
            map_filepath = self.map_filepaths[idx]
            map_im = cv2.imread(map_filepath, cv2.IMREAD_UNCHANGED)
            im = np.concatenate((im, map_im), axis=2)  # 在通道维度拼接输入图像和地图图像
        
        # 读取目标图像
        target = cv2.imread(target_filepath, cv2.IMREAD_UNCHANGED)

        # 将输入图像转为浮动型，并将值归一化到[0, 1]
        im = im.astype(np.float32) / 255
        
        # 将目标图像转换为整数类型（标签）
        target = target.astype(np.int64)
        
        # 将图像和目标从NumPy数组转换为PyTorch张量，并调整维度
        im = torch.from_numpy(im.transpose(2, 0, 1))  # 转换维度为[C, H, W]
        target = torch.from_numpy(target)

        return im, target, sample_token  # 返回图像、目标和样本的token

# 获取训练数据文件路径
input_filepaths = sorted(glob.glob(os.path.join(train_data_folder, "*_input.png")))
target_filepaths = sorted(glob.glob(os.path.join(train_data_folder, "*_target.png")))

# 创建BEV图像数据集实例
train_dataset = BEVImageDataset(input_filepaths, target_filepaths)

# 获取数据集中的第二个样本（索引为1）
im, target, sample_token = train_dataset[1]

# 将图像和目标转为NumPy数组，以便进行处理
im = im.numpy()
target = target.numpy()

# 创建图像窗口并显示图像
plt.figure(figsize=(16, 8))

# 将目标图像复制为RGB格式，方便显示
target_as_rgb = np.repeat(target[..., None], 3, 2)

# 将输入图像的维度从[C, H, W]调整为[H, W, C]，符合Matplotlib的要求
# 使用np.hstack将BEV图像和目标图像拼接在一起显示
plt.imshow(np.hstack((im.transpose(1, 2, 0)[..., :3], target_as_rgb)))
plt.title(sample_token)  # 设置图像标题为样本token
plt.show()  # 显示图像

# 使用样本token可视化对应的LiDAR数据
visualize_lidar_of_sample(sample_token)


In [None]:
# U-Net网络
import torch
import torch.nn as nn
import torch.nn.functional as F

# 定义U-Net模型
class UNet(nn.Module):
    def __init__(
        self,
        in_channels=1,        # 输入通道数，默认为1
        n_classes=2,          # 输出通道数，默认为2（比如二分类）
        depth=5,              # 网络的深度（卷积层的数量）
        wf=6,                 # 第一个卷积层的滤波器数量是 2**wf
        padding=False,        # 是否进行零填充
        batch_norm=False,     # 是否在卷积层后使用批标准化
        up_mode='upconv',     # 上采样方式，'upconv' 或 'upsample'
    ):
        """
        U-Net: Convolutional Networks for Biomedical Image Segmentation
        使用默认参数时，得到的是原论文中使用的U-Net版本
        """
        super(UNet, self).__init__()

        # 检查up_mode是否有效
        assert up_mode in ('upconv', 'upsample')
        self.padding = padding
        self.depth = depth
        prev_channels = in_channels

        # 下采样路径
        self.down_path = nn.ModuleList()
        for i in range(depth):
            self.down_path.append(
                UNetConvBlock(prev_channels, 2 ** (wf + i), padding, batch_norm)
            )
            prev_channels = 2 ** (wf + i)

        # 上采样路径
        self.up_path = nn.ModuleList()
        for i in reversed(range(depth - 1)):
            self.up_path.append(
                UNetUpBlock(prev_channels, 2 ** (wf + i), up_mode, padding, batch_norm)
            )
            prev_channels = 2 ** (wf + i)

        # 最后的1x1卷积层，输出预测结果
        self.last = nn.Conv2d(prev_channels, n_classes, kernel_size=1)

    def forward(self, x):
        blocks = []
        # 下采样过程
        for i, down in enumerate(self.down_path):
            x = down(x)
            if i != len(self.down_path) - 1:
                blocks.append(x)
                x = F.max_pool2d(x, 2)  # 池化操作，下采样

        # 上采样过程
        for i, up in enumerate(self.up_path):
            x = up(x, blocks[-i - 1])  # 跳跃连接，将下采样特征与上采样特征拼接

        return self.last(x)  # 返回最终预测结果


# 定义卷积块，包含两个卷积层
class UNetConvBlock(nn.Module):
    def __init__(self, in_size, out_size, padding, batch_norm):
        super(UNetConvBlock, self).__init__()
        block = []

        # 第一个卷积层
        block.append(nn.Conv2d(in_size, out_size, kernel_size=3, padding=int(padding)))
        block.append(nn.ReLU())
        if batch_norm:
            block.append(nn.BatchNorm2d(out_size))

        # 第二个卷积层
        block.append(nn.Conv2d(out_size, out_size, kernel_size=3, padding=int(padding)))
        block.append(nn.ReLU())
        if batch_norm:
            block.append(nn.BatchNorm2d(out_size))

        # 将卷积块连接起来
        self.block = nn.Sequential(*block)

    def forward(self, x):
        return self.block(x)  # 执行卷积块的前向传播


# 定义上采样块
class UNetUpBlock(nn.Module):
    def __init__(self, in_size, out_size, up_mode, padding, batch_norm):
        super(UNetUpBlock, self).__init__()

        # 根据上采样模式选择不同的上采样方法
        if up_mode == 'upconv':
            self.up = nn.ConvTranspose2d(in_size, out_size, kernel_size=2, stride=2)
        elif up_mode == 'upsample':
            self.up = nn.Sequential(
                nn.Upsample(mode='bilinear', scale_factor=2),  # 双线性插值上采样
                nn.Conv2d(in_size, out_size, kernel_size=1),  # 1x1卷积
            )

        # 使用卷积块
        self.conv_block = UNetConvBlock(in_size, out_size, padding, batch_norm)

    def center_crop(self, layer, target_size):
        """
        中心裁剪，用于对齐上采样和下采样路径的特征图
        """
        _, _, layer_height, layer_width = layer.size()
        diff_y = (layer_height - target_size[0]) // 2
        diff_x = (layer_width - target_size[1]) // 2
        return layer[
            :, :, diff_y : (diff_y + target_size[0]), diff_x : (diff_x + target_size[1])
        ]

    def forward(self, x, bridge):
        # 上采样
        up = self.up(x)
        # 中心裁剪
        crop1 = self.center_crop(bridge, up.shape[2:])
        # 拼接上采样结果与下采样结果
        out = torch.cat([up, crop1], 1)
        # 通过卷积块
        out = self.conv_block(out)

        return out


In [None]:
def get_unet_model(in_channels=3, num_output_classes=2):
    # 创建一个 U-Net 模型
    model = UNet(in_channels=in_channels,        # 输入通道数，默认为 3（RGB 图像）
                 n_classes=num_output_classes,  # 输出类别数，默认为 2（适用于二分类任务）
                 wf=5,                          # 网络的初始滤波器数
                 depth=4,                       # 网络的深度（层数）
                 padding=True,                  # 是否对输入进行零填充，使得输出尺寸和输入相同
                 up_mode='upsample')            # 使用上采样进行上采样，采用双线性插值

    # 可选：为了支持多 GPU 训练和推理，使用 DataParallel 进行封装
    model = nn.DataParallel(model)

    # 返回训练好的 U-Net 模型
    return model


In [None]:
# 可视化模型的输入图像、预测结果和目标标签
def visualize_predictions(input_image, prediction, target, n_images=2, apply_softmax=True):
    """
    输入三个 PyTorch 张量，绘制输入图像、预测结果和目标标签。
    """
    # 只选择前 n 张图片
    prediction = prediction[:n_images]
    target = target[:n_images]
    input_image = input_image[:n_images]

    # 从计算图中分离张量并转移到 CPU，然后转换为 NumPy 数组
    prediction = prediction.detach().cpu().numpy()
    
    # 如果需要，应用 softmax 将预测值转为概率分布
    if apply_softmax:
        prediction = scipy.special.softmax(prediction, axis=1)
    
    # 将第一类预测值进行反转 (1 - 预测值)
    class_one_preds = np.hstack(1 - prediction[:, 0])

    # 获取目标标签的 NumPy 数组
    target = np.hstack(target.detach().cpu().numpy())

    # 创建 RGB 颜色映射，红色通道为预测，绿色通道为目标标签
    class_rgb = np.repeat(class_one_preds[..., None], 3, axis=2)
    class_rgb[..., 2] = 0  # 将蓝色通道设置为 0
    class_rgb[..., 1] = target  # 将绿色通道设置为目标标签值

    # 将输入图像转为 NumPy 数组并进行转置
    input_im = np.hstack(input_image.cpu().numpy().transpose(0, 2, 3, 1))

    # 如果输入图像有 3 个通道（RGB），则转换为灰度图像
    if input_im.shape[2] == 3:
        input_im_grayscale = np.repeat(input_im.mean(axis=2)[..., None], 3, axis=2)
        # 将灰度图像与预测结果叠加
        overlayed_im = (input_im_grayscale * 0.6 + class_rgb * 0.7).clip(0, 1)
    else:
        # 如果图像有 4 个通道，则处理其中的 map 信息
        input_map = input_im[..., 3:]
        overlayed_im = (input_map * 0.6 + class_rgb * 0.7).clip(0, 1)

    # 将预测值大于 0.5 的地方标记为真，生成二值图像
    thresholded_pred = np.repeat(class_one_preds[..., None] > 0.5, 3, axis=2)

    # 绘制图像
    fig = plt.figure(figsize=(12, 26))
    # 将图像按垂直方向堆叠，显示输入图像、预测图像、目标图像、叠加图像和二值化预测结果
    plot_im = np.vstack([class_rgb, input_im[..., :3], overlayed_im, thresholded_pred]).clip(0, 1).astype(np.float32)
    
    # 显示图像
    plt.imshow(plot_im)
    plt.axis("off")  # 不显示坐标轴
    plt.show()


In [None]:
# 我们通过降低类别 0 的损失权重来处理类别不平衡问题。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 选择计算设备，如果有 GPU 就用 GPU，否则用 CPU

# 定义每个类别的权重：类别 0 的权重为 0.2，其余类别的权重为 1.0
class_weights = torch.from_numpy(np.array([0.2] + [1.0]*len(classes), dtype=np.float32))

# 将权重张量移动到选择的计算设备上
class_weights = class_weights.to(device)


In [None]:
batch_size = 8
epochs = 15  # 训练的轮数。为了缩短训练时间，这里设置为 15，实际训练中可以增加轮数来获得更好的结果。

# 获取 U-Net 模型
model = get_unet_model(num_output_classes=len(classes)+1)
model = model.to(device)  # 将模型移到 GPU 或 CPU 上

# 使用 Adam 优化器，学习率为 1e-3
optim = torch.optim.Adam(model.parameters(), lr=1e-3)

# 创建数据加载器
dataloader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True, num_workers=os.cpu_count()*2)

all_losses = []  # 用来记录所有轮次的损失

# 开始训练
for epoch in range(1, epochs+1):
    print("Epoch", epoch)
    
    epoch_losses = []  # 每个 epoch 的损失
    progress_bar = tqdm_notebook(dataloader)  # 显示进度条

    for ii, (X, target, sample_ids) in enumerate(progress_bar):
        X = X.to(device)  # 输入图像，大小为 [N, 3, H, W]
        target = target.to(device)  # 目标标签，大小为 [N, H, W]
        prediction = model(X)  # 模型预测结果，大小为 [N, 2, H, W]
        
        # 计算损失，使用加权交叉熵
        loss = F.cross_entropy(prediction, target, weight=class_weights)

        # 反向传播
        optim.zero_grad()  # 清空梯度
        loss.backward()  # 反向传播
        optim.step()  # 更新参数
        
        # 记录损失
        epoch_losses.append(loss.detach().cpu().numpy())

        # 每个 epoch 的第一个批次可视化预测结果
        if ii == 0:
            visualize_predictions(X, prediction, target)
    
    print("Loss:", np.mean(epoch_losses))  # 打印当前 epoch 的平均损失
    all_losses.extend(epoch_losses)  # 将当前 epoch 的损失添加到总损失列表
    
    # 保存模型的检查点
    checkpoint_filename = "unet_checkpoint_epoch_{}.pth".format(epoch)
    checkpoint_filepath = os.path.join(ARTIFACTS_FOLDER, checkpoint_filename)
    torch.save(model.state_dict(), checkpoint_filepath)
    
# 绘制训练过程中损失的变化曲线
plt.figure(figsize=(12,12))
plt.plot(all_losses, alpha=0.75)
plt.show()


In [None]:
# 获取验证数据集的输入图像和目标标签路径，并进行排序
input_filepaths = sorted(glob.glob(os.path.join(validation_data_folder, "*_input.png")))
target_filepaths = sorted(glob.glob(os.path.join(validation_data_folder, "*_target.png")))

# 设置批次大小为16
batch_size = 16

# 创建验证数据集对象
validation_dataset = BEVImageDataset(input_filepaths, target_filepaths)

# 创建数据加载器，设置批次大小，并禁用随机打乱数据
validation_dataloader = torch.utils.data.DataLoader(validation_dataset, batch_size, shuffle=False, num_workers=os.cpu_count())


In [None]:
# 验证集推理，计算损失值
# 使用tqdm_notebook展示训练进度条
progress_bar = tqdm_notebook(validation_dataloader)

# 初始化目标和预测数组，用于存储所有的目标标签和预测结果
# 使用uint8类型以节省内存，否则会占用超过20GB的内存
targets = np.zeros((len(target_filepaths), 336, 336), dtype=np.uint8)
predictions = np.zeros((len(target_filepaths), 1 + len(classes), 336, 336), dtype=np.uint8)

# 用于存储样本token和损失值的列表
sample_tokens = []
all_losses = []

# 不需要计算梯度，因此使用`torch.no_grad()`来节省内存
with torch.no_grad():
    # 设置模型为评估模式
    model.eval()
    # 遍历验证数据集
    for ii, (X, target, batch_sample_tokens) in enumerate(progress_bar):

        # 根据当前批次更新目标数组
        offset = ii * batch_size
        targets[offset:offset + batch_size] = target.numpy()
        sample_tokens.extend(batch_sample_tokens)
        
        # 将输入和目标数据转移到设备（GPU或CPU）
        X = X.to(device)  # 输入的形状为 [N, 1, H, W]
        target = target.to(device)  # 目标的形状为 [N, H, W]，包含类别索引（0, 1）
        
        # 获取模型预测的输出，形状为 [N, 2, H, W]（2表示类别数）
        prediction = model(X)  # [N, 2, H, W]
        
        # 计算损失值，这里使用了加权交叉熵损失函数
        loss = F.cross_entropy(prediction, target, weight=class_weights)
        all_losses.append(loss.detach().cpu().numpy())
        
        # 对预测结果进行softmax操作
        prediction = F.softmax(prediction, dim=1)
        
        # 将预测结果转回CPU并转换为numpy格式
        prediction_cpu = prediction.cpu().numpy()
        
        # 将预测结果量化为[0, 255]的uint8类型
        predictions[offset:offset + batch_size] = np.round(prediction_cpu * 255).astype(np.uint8)
        
        # 可视化第一次的预测结果
        if ii == 0:
            visualize_predictions(X, prediction, target, apply_softmax=False)
            
# 输出平均损失
print("Mean loss:", np.mean(all_losses))


In [None]:
# 预测结果和目标数据进行可视化展示
predictions_non_class0 = 255 - predictions[:, 0]

# 设置背景的阈值，这里背景阈值为 127
background_threshold = 255 // 2

# 遍历前 3 张图片进行展示
for i in range(3):
    # 创建一个 1 行 3 列的子图，图像尺寸为 (16, 6)
    fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(16, 6))
    
    # 绘制非背景类的预测图
    axes[0].imshow(predictions_non_class0[i])
    axes[0].set_title("predictions")  # 设置标题为“predictions”
    
    # 绘制经过阈值化后的预测图，预测大于阈值的像素为前景，其他为背景
    axes[1].imshow(predictions_non_class0[i] > background_threshold)
    axes[1].set_title("thresholded predictions")  # 设置标题为“thresholded predictions”
    
    # 绘制目标图像，背景为 0，前景为 1
    axes[2].imshow((targets[i] > 0).astype(np.uint8), interpolation="nearest")
    axes[2].set_title("targets")  # 设置标题为“targets”
    
    # 调整子图之间的间距，使得显示更紧凑
    fig.tight_layout()
    fig.show()  # 显示图像


In [None]:
# 使用形态学开运算进行过滤
# 使用椭圆形内核创建结构元素进行形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))

# 初始化一个与预测图像相同尺寸的空数组，用于保存形态学开运算后的结果
predictions_opened = np.zeros((predictions_non_class0.shape), dtype=np.uint8)

# 遍历所有的预测图像
for i, p in enumerate(tqdm(predictions_non_class0)):
    # 将预测值大于背景阈值的位置设置为1，其他为0（进行二值化处理）
    thresholded_p = (p > background_threshold).astype(np.uint8)
    
    # 对二值化后的图像执行开运算，去除小的噪声区域
    predictions_opened[i] = cv2.morphologyEx(thresholded_p, cv2.MORPH_OPEN, kernel)

# 绘制原始二值化图像和开运算后的图像对比
plt.figure(figsize=(12, 12))
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16, 6))

# 显示阈值化后的预测图像
axes[0].imshow(predictions_non_class0[0] > 255 // 2)
axes[0].set_title("thresholded prediction")

# 显示开运算后的预测图像
axes[1].imshow(predictions_opened[0])
axes[1].set_title("opened thresholded prediction")

# 显示图像
fig.show()


In [None]:
# 使用 scipy.ndimage.label 来标记预测图像中的连通组件
labels, n = scipy.ndimage.label(predictions_opened[0])

# 绘制标记后的连通组件图像
plt.imshow(labels, cmap="tab20b")

# 在图像中添加标签，显示连通组件的数量
plt.xlabel("N predictions: {}".format(n))

# 显示图像
plt.show()


In [None]:
# 提取检测框和计算得分
detection_boxes = []
detection_scores = []
detection_classes = []

for i in tqdm_notebook(range(len(predictions))):
    prediction_opened = predictions_opened[i]
    probability_non_class0 = predictions_non_class0[i]
    class_probability = predictions[i]

    sample_boxes = []
    sample_detection_scores = []
    sample_detection_classes = []
    
    contours, hierarchy = cv2.findContours(prediction_opened, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) 
    
    for cnt in contours:
        rect = cv2.minAreaRect(cnt)
        box = cv2.boxPoints(rect)
        
        # 根据中心像素得到置信度
        box_center_index = np.int0(np.mean(box, axis=0))
        
        for class_index in range(len(classes)):
            box_center_value = class_probability[class_index+1, box_center_index[1], box_center_index[0]]
            
            if box_center_value < 0.01:
                continue
            
            box_center_class = classes[class_index]

            box_detection_score = box_center_value
            sample_detection_classes.append(box_center_class)
            sample_detection_scores.append(box_detection_score)
            sample_boxes.append(box)
        
    
    detection_boxes.append(np.array(sample_boxes))
    detection_scores.append(sample_detection_scores)
    detection_classes.append(sample_detection_classes)
    
print("Total amount of boxes:", np.sum([len(x) for x in detection_boxes]))
    

# Visualize the boxes in the first sample
t = np.zeros_like(predictions_opened[0])
for sample_boxes in detection_boxes[0]:
    box_pix = np.int0(sample_boxes)
    cv2.drawContours(t,[box_pix],0,(255),2)
plt.imshow(t)
plt.show()

# Visualize their probabilities
plt.hist(detection_scores[0], bins=20)
plt.xlabel("Detection Score")
plt.ylabel("Count")
plt.show()

In [None]:
# 加载真实值
from lyft_dataset_sdk.eval.detection.mAP_evaluation import Box3D, recall_precision

def load_groundtruth_boxes(nuscenes, sample_tokens):
    gt_box3ds = []

    # Load annotations and filter predictions and annotations.
    for sample_token in tqdm_notebook(sample_tokens):

        sample = nuscenes.get('sample', sample_token)
        sample_annotation_tokens = sample['anns']

        sample_lidar_token = sample["data"]["LIDAR_TOP"]
        lidar_data = level5data.get("sample_data", sample_lidar_token)
        ego_pose = level5data.get("ego_pose", lidar_data["ego_pose_token"])
        ego_translation = np.array(ego_pose['translation'])
        
        for sample_annotation_token in sample_annotation_tokens:
            sample_annotation = nuscenes.get('sample_annotation', sample_annotation_token)
            sample_annotation_translation = sample_annotation['translation']
            
            class_name = sample_annotation['category_name']
            
            box3d = Box3D(
                sample_token=sample_token,
                translation=sample_annotation_translation,
                size=sample_annotation['size'],
                rotation=sample_annotation['rotation'],
                name=class_name
            )
            gt_box3ds.append(box3d)
            
    return gt_box3ds

gt_box3ds = load_groundtruth_boxes(level5data, sample_tokens)

In [None]:
# 初始化存储 3D 预测框的列表
pred_box3ds = []

# 遍历每个样本的检测框
for (sample_token, sample_boxes, sample_detection_scores, sample_detection_class) in tqdm_notebook(zip(sample_tokens, detection_boxes, detection_scores, detection_classes), total=len(sample_tokens)):
    
    # 将每个检测框的数据重新调整形状
    sample_boxes = sample_boxes.reshape(-1, 2)  # (N, 4, 2) -> (N*4, 2)
    sample_boxes = sample_boxes.transpose(1,0)  # (N*4, 2) -> (2, N*4)
    
    # 为检测框添加 Z 维度（默认为 0，即假设目标的 Z 坐标为 0）
    sample_boxes = np.vstack((sample_boxes, np.zeros(sample_boxes.shape[1]),))  # (2, N*4) -> (3, N*4)
    
    # 获取当前样本的相关信息
    sample = level5data.get("sample", sample_token)
    sample_lidar_token = sample["data"]["LIDAR_TOP"]  # 获取 LIDAR 数据的标识符
    lidar_data = level5data.get("sample_data", sample_lidar_token)  # 获取 LIDAR 数据
    lidar_filepath = level5data.get_sample_data_path(sample_lidar_token)  # 获取 LIDAR 数据文件路径
    ego_pose = level5data.get("ego_pose", lidar_data["ego_pose_token"])  # 获取 Ego 车的姿态信息
    ego_translation = np.array(ego_pose['translation'])  # Ego 车的位置
    
    # 计算从 Ego 车坐标系到全球坐标系的转换矩阵
    global_from_car = transform_matrix(ego_pose['translation'], Quaternion(ego_pose['rotation']), inverse=False)
    
    # 计算从车载坐标系到体素坐标系的转换矩阵，并得到全局到体素的转换矩阵
    car_from_voxel = np.linalg.inv(create_transformation_matrix_to_voxel_space(bev_shape, voxel_size, (0, 0, z_offset)))
    global_from_voxel = np.dot(global_from_car, car_from_voxel)
    
    # 将检测框从车载坐标系转换到体素坐标系
    sample_boxes = transform_points(sample_boxes, global_from_voxel)

    # 假设所有的目标框都与 Ego 车的 Z 坐标相同，因此将 Z 轴的值设为 Ego 车的 Z 坐标
    sample_boxes[2,:] = ego_pose["translation"][2]
    
    # 调整框的维度：从 (3, N*4) 转换为 (N, 4, 3)，每个检测框由 4 个点的 3D 坐标组成
    sample_boxes = sample_boxes.transpose(1,0).reshape(-1, 4, 3)
    
    # 假设所有目标的高度为 1.75
    box_height = 1.75
    
    # 计算每个框的中心位置（X, Y, Z）
    sample_boxes_centers = sample_boxes.mean(axis=1)
    sample_boxes_centers[:,2] += box_height/2  # 假设目标框底部与地面接触，因此需要加上目标高度的一半
    
    # 计算每个框的长度和宽度，使用欧几里得距离计算两点之间的距离
    sample_lengths = np.linalg.norm(sample_boxes[:,0,:] - sample_boxes[:,1,:], axis=1) * 1/box_scale
    sample_widths = np.linalg.norm(sample_boxes[:,1,:] - sample_boxes[:,2,:], axis=1) * 1/box_scale
    
    # 构建每个框的尺寸信息（宽度、长度、高度）
    sample_boxes_dimensions = np.zeros_like(sample_boxes_centers) 
    sample_boxes_dimensions[:,0] = sample_widths
    sample_boxes_dimensions[:,1] = sample_lengths
    sample_boxes_dimensions[:,2] = box_height

    # 遍历每个框，计算它们的旋转信息并将其转换为四元数
    for i in range(len(sample_boxes)):
        translation = sample_boxes_centers[i]  # 目标框的中心位置
        size = sample_boxes_dimensions[i]  # 目标框的尺寸（宽度、长度、高度）
        class_name = sample_detection_class[i]  # 目标类别
        ego_distance = float(np.linalg.norm(ego_translation - translation))  # 计算目标与 Ego 车的距离
        
        # 计算框的旋转信息，假设框的旋转矩阵可以通过框的两个角点计算
        v = (sample_boxes[i,0] - sample_boxes[i,1])
        v /= np.linalg.norm(v)  # 将向量标准化
        r = R.from_dcm([  # 使用旋转矩阵创建四元数
            [v[0], -v[1], 0],
            [v[1],  v[0], 0],
            [   0,     0, 1],
        ])
        quat = r.as_quat()  # 转换为四元数
        quat = quat[[3,0,1,2]]  # 将四元数顺序调整为 XYZW -> WXYZ
        
        detection_score = float(sample_detection_scores[i])  # 获取检测得分
        
        # 创建 Box3D 对象，表示一个 3D 预测框
        box3d = Box3D(
            sample_token=sample_token,  # 样本的 token
            translation=list(translation),  # 目标框的中心位置
            size=list(size),  # 目标框的尺寸
            rotation=list(quat),  # 目标框的旋转四元数
            name=class_name,  # 目标类别
            score=detection_score  # 检测得分
        )
        
        # 将创建的 3D 预测框添加到列表中
        pred_box3ds.append(box3d)