In [13]:
import torch 
import torch.nn as nn

# 0. 專案介紹

## (1) 資料集

- jpg file: 彩色影像，應該是來自賽車遊戲 SuperTuxKart 的環境截圖。
- png file: 深度圖，應該表示場景中每個像素距離相機的深度資訊。

- 攝影機拍攝的 2D 影像，實際上是在一個 3D 空間內捕捉畫面。

![image info](drive_data/train/cornfield_crossing_00/00000_im.jpg)

![image info](drive_data/train/cornfield_crossing_00/00000_depth.png)

## (2) 標籤檔

In [5]:
import numpy as np
file_path = "drive_data/train/hacienda_00/info.npz"
data = np.load(file_path, allow_pickle=True)
print(data.files)

['track', 'frames']


### track

- path_nodes（賽道節點）
  - 座標 (x, y, z) 是基於 3D 遊戲引擎的座標系統，用來表示賽道上的節點位置。
  - 車輛應該沿著這些節點行駛。
  - 第一個節點 (0.41, 0.355, 18.822) 是起點。
  - 第二個節點 (0.41, 0.355, 24.698) 是下一個應該到達的地方。

- path_distance（節點間距離）
  - 從起點（第一個節點）到其他節點的累計距離。
  - 計算它已經在賽道上行駛了多遠，有助於導航規劃，根據這些距離來預測前方路徑行駛速度和方向。
  - 如果 path_distance 變大，代表車輛正在往前移動。
  - 第一個節點的距離 = 0.0（起點）
  - 第二個節點的距離 = 5.8759995（從第一個節點到這個節點的距離）

- path_width（賽道寬度）
  - 指定了該路徑的寬度，單位應該是米（m），車輛可以在這個範圍內移動。
  - 告訴車輛可以在這條賽道內左右移動，不會超出界線。
  - 如果賽道變窄（例如 path_width = 5），車輛就要小心不能偏離中心，避免撞牆。

In [6]:
for key in data['track'].item().keys():
    value = data['track'].item()[key]
    print(f"key = {key} \n{value[0]}\n")

key = path_nodes 
[[ 0.   -0.99  0.  ]
 [ 0.   -0.99 10.  ]]

key = path_distance 
[ 0. 10.]

key = path_width 
[8.000168]



### frames

- V：(View Matrix, 視圖矩陣)
  - 將 世界座標 (world coordinates) 轉換到 相機座標系 (camera coordinates)。
  - 4x4 齊次變換矩陣 (Homogeneous Transformation Matrix)
     - 前三行三列 3x3 區域：旋轉矩陣，決定了相機的方向。
     - 最後一列前三個值：相機的位置 (x, y, z)
     -  後一行 (0,0,0,1)：是齊次座標變換的一部分，確保矩陣可以正確應用於 3D 點。

- P：(Projection Matrix, 投影矩陣)
  - 將 相機座標系的 3D 點投影到 2D 屏幕。
  - 透視投影矩陣 (Perspective Projection Matrix)
    - P[0,0] = 0.8938152：代表水平 (X) 方向的縮放因子。
    - P[1,1] = 1.1917536：代表垂直 (Y) 方向的縮放因子。
    - P[2,2] = 1.001001，P[2,3] = 1，P[3,2] = -1.001001：這些值用來進行透視投影，使物體遠小近大。
    - P[3,3] = 0：表示這是透視投影，而非正交投影。

- location：(相機位置)
  - 相機在 3D 世界中的位置 (x, y, z)。
- front：(相機朝向)
  - 相機朝向正前方的一個點坐標 (x, y, z)。

- velocity(速度向量)
  - 速度 (vx, vy, vz)，表示車輛當前的速度。
  - vx 和 vy 很小，表示車子當前幾乎靜止或移動速度很慢。

- distance_down_track：(賽道上的距離)
  - 代表 車輛在賽道上的累計距離 (沿賽道的行駛距離)。

In [56]:
for key in data['frames'].item().keys():
    value = data['frames'].item()[key]
    print(f"key = {key}\n{value[0]}\n")

key = V
[[ 9.9995995e-01  8.5655162e-03 -2.6082825e-03  0.0000000e+00]
 [-8.8857561e-03  9.8517323e-01 -1.7133233e-01  0.0000000e+00]
 [ 1.1020603e-03  1.7134863e-01  9.8520982e-01  0.0000000e+00]
 [ 7.2150356e-01  5.8840716e-01  4.2071357e+00  1.0000000e+00]]

key = P
[[ 0.8938152  0.         0.         0.       ]
 [ 0.         1.1917536  0.         0.       ]
 [ 0.         0.         1.0004002  1.       ]
 [ 0.         0.        -1.0004002  0.       ]]

key = location
[-0.72824645 -0.7002725  -2.2456324 ]

key = front
[-0.72835624 -0.6994907  -1.5271327 ]

key = velocity
[ 0.00284756  0.08167525 -0.00017326]

key = distance_down_track
0.0



### 實作-計算左右邊界

- `road_utils.py` 根據原始導航點 (path_nodes) 計算出道路的左右邊界。

In [31]:
from homework.datasets.road_utils import Track

track = Track(**data["track"].item())
type(track)

#### 屬性
# 道路中心點
print("Center points:\n", track.center[0], '\n')
# 左邊界
print("Left boundary:\n", track.left[0], '\n')
# 右邊界
print("Right boundary:\n", track.right[0], '\n')
# 累積距離
print("Center distance:\n", track.center_distance[0], '\n')
# 道路寬度
print("Track width:\n", track.width[0], '\n')

#### 齊次座標 (Homogeneous Coordinates)
# 道路邊界的左法向量
print("Homogeneous left track:\n", track.track_left[0], '\n')
# 道路邊界的右法向量
print("Homogeneous right track:\n", track.track_right[0], '\n')


Center points:
 [-1.76699624e-06 -9.90008039e-01 -5.09255339e-06] 

Left boundary:
 [-4.00008169e+00 -9.90008039e-01 -8.99676112e-06] 

Right boundary:
 [ 4.00007816e+00 -9.90008039e-01 -1.18834566e-06] 

Center distance:
 0.0 

Track width:
 [8.00016785] 

Homogeneous left track:
 [-4.00008169e+00 -9.90008039e-01 -8.99676112e-06  1.00000000e+00] 

Homogeneous right track:
 [ 4.00007816e+00 -9.90008039e-01 -1.18834566e-06  1.00000000e+00] 



### 實作-資料入與出

- `homework.datasets.`

  - 匯入路徑

  - 匯出所需資料

  - 包含 `road_transforms`、`road_utils`

In [None]:
from homework.datasets.road_dataset import RoadDataset

dataset_path = "drive_data/train/hacienda_00"
dataset = RoadDataset(dataset_path)

sample = dataset[0]
idx = 0
sample = dataset[idx]

print("keys:\n", sample.keys())
print("Image Size:\n", sample["image"].shape)         # 圖片大小
print("Track Left:\n", sample["track_left"][:3])      # 10*2, n=10 太多了，只取前3個點示意
print("Track Right:\n", sample["track_right"][:3])    # 10*2, n=10 太多了，只取前3個點示意
print("Waypoints:\n", sample["waypoints"])            # 這三個 waypoints 都是「有效的」 (沒有填充點 (padding))
print("Waypoints Mask:\n", sample["waypoints_mask"])  # n=3，要補足到三個點，補的點是False


keys:
 dict_keys(['image', 'track_left', 'track_right', 'waypoints', 'waypoints_mask'])
Image:
 [0.20784314 0.20784314 0.21176471 0.21176471 0.21176471 0.21176471
 0.21176471 0.21176471 0.20392157 0.20392157 0.20392157 0.20392157
 0.20392157 0.20392157 0.20392157 0.20392157 0.21568628 0.21568628
 0.21568628 0.21176471 0.21176471 0.20784314 0.20784314 0.20784314
 0.2        0.2        0.2        0.2        0.2        0.2
 0.2        0.2        0.1764706  0.1764706  0.1764706  0.1764706
 0.1764706  0.17254902 0.18039216 0.18039216 0.17254902 0.17254902
 0.1764706  0.1764706  0.1764706  0.17254902 0.17254902 0.17254902
 0.18039216 0.18039216 0.18431373 0.18431373 0.1882353  0.19215687
 0.19607843 0.2        0.19607843 0.2        0.20784314 0.20784314
 0.21568628 0.21960784 0.21960784 0.21568628 0.21176471 0.20784314
 0.20784314 0.21176471 0.21176471 0.21568628 0.21568628 0.21568628
 0.21568628 0.21176471 0.21176471 0.21568628 0.21568628 0.22352941
 0.22745098 0.23137255 0.24705882 0.24705

In [None]:
print("Image:\n", sample["image"][0,0,0])
print("Image:\n", sample["image"][0,0].shape)
print("Image:\n", sample["image"][0].shape)
print("Image:\n", sample["image"].shape)

print("Image:\n", sample["image"][0,:,0])
print("Image:\n", sample["image"][0,:,0].shape)

print("Image:\n", sample["image"][:,0,0])
print("Image:\n", sample["image"][:,0,0].shape)

# (3,96,128)
# 3是通道數，96是高度，128是寬度
# 每一張圖有 96*128 = 12288 個像素點
# 每一個像素點有3個顏色，代表RGB三個顏色

Image:
 0.20784314
Image:
 (128,)
Image:
 (96, 128)
Image:
 (3, 96, 128)
Image:
 [0.20784314 0.21568628 0.22352941 0.23529412 0.23921569 0.24313726
 0.24313726 0.24313726 0.24705882 0.25490198 0.26666668 0.2784314
 0.2901961  0.29411766 0.29803923 0.30588236 0.30980393 0.32156864
 0.32156864 0.30588236 0.29803923 0.31764707 0.34509805 0.3882353
 0.7411765  0.8039216  0.9764706  0.8901961  0.49411765 0.45490196
 0.40392157 0.42352942 0.4509804  0.40784314 0.39215687 0.78431374
 0.91764706 0.8745098  0.8901961  0.94509804 0.92156863 0.89411765
 0.87058824 0.8627451  0.84705883 0.83137256 0.8352941  0.8784314
 0.8862745  0.8039216  0.7607843  0.7058824  0.69411767 0.7176471
 0.6666667  0.64705884 0.61960787 0.5647059  0.5372549  0.5529412
 0.5686275  0.5882353  0.6156863  0.60784316 0.58431375 0.5882353
 0.6117647  0.63529414 0.62352943 0.59607846 0.5921569  0.6117647
 0.61960787 0.6313726  0.6        0.5803922  0.6039216  0.57254905
 0.5176471  0.5137255  0.5686275  0.58431375 0.59607846

In [104]:
import torch
track_left = torch.tensor([[[0,1],[0,2]],[[0,3],[0,4]]])
print(track_left.shape)
print(track_left)

track_right = torch.tensor([[[1,1],[1,2]],[[1,3],[1,4]]])
print(track_right.shape)
print(track_right)

track_all = torch.cat((track_left, track_right), dim=1).flatten(start_dim=1, end_dim=2)
print(track_all.shape)
print(track_all)

torch.Size([2, 2, 2])
tensor([[[0, 1],
         [0, 2]],

        [[0, 3],
         [0, 4]]])
torch.Size([2, 2, 2])
tensor([[[1, 1],
         [1, 2]],

        [[1, 3],
         [1, 4]]])
torch.Size([2, 8])
tensor([[0, 1, 0, 2, 1, 1, 1, 2],
        [0, 3, 0, 4, 1, 3, 1, 4]])


# 1. 環境設定

## (1) 目錄結構

In [None]:
homework4/
│── assets/                 # 存放示意圖、架構圖等
│   ├── perceiver_architecture.png
│   ├── perceiver_io.png
│   ├── sample.png
│
│── grader/                 # 評測相關
│   ├── datasets/
│   │   ├── road_dataset.py
│   │   ├── road_transforms.py
│   │   ├── road_utils.py
│   ├── supertux_utils/
│   │   ├── evaluate.py
│   │   ├── video_visualization.py
│   ├── __main__.py
│   ├── grader.py
│   ├── metrics.py
│   ├── tests.py
│
│── homework/               # 主要作業實作
│   ├── datasets/
│   │   ├── road_dataset.py
│   │   ├── road_transforms.py
│   │   ├── road_utils.py
│   ├── supertux_utils/
│   │   ├── evaluate.py
│   │   ├── video_visualization.py
│   ├── __init__.py
│   ├── metrics.py
│   ├── models.py           # 你的模型實作應該在這裡！
│   ├── train_planner.py    # 訓練腳本
│
│── bundle.py               # 用於打包提交
│── README.md               # 作業說明文件
│── requirements.txt        # 依賴套件清單


## (2) 套件限制

- matplotlib>=3.5.0
- Pillow>=10.0.0
- tensorboard>=2.0.0
- termcolor==2.4.0
- opencv-python>=4.10.0
- tqdm==4.66.4

In [None]:
pip install -r requirements.txt

In [None]:
conda --version
git --version
conda env list
python --version
pip list

## (3) GPU

In [52]:
import torch
print(torch.cuda.is_available()) 

True


In [None]:
# 每秒更新一次
nvidia-smi dmon

## (4) 視覺化

運行 SuperTuxKart 和視覺化腳本

In [None]:
pip install PySuperTuxKartData
pip install PySuperTuxKart --index-url=https://www.cs.utexas.edu/~bzhou/dl_class/pystk

# PySuperTuxKart requires several dependencies and has only been tested on certain systems.
# Check out https://www.cs.utexas.edu/~bzhou/dl_class/pystk/pysupertuxkart/
# for the full list of pre-built supported python versions / OS / CPU architectures.

# If this doesn't work, you can always run your model on Colab,
# or you can trying installing from source https://github.com/philkr/pystk

視覺化駕駛，請參閱supertux_utils模組中的以下文件：

- `evaluate.py`- 關於如何使用模型的預測來驅動以及遊戲如何運作的邏輯

- `visualizations.py`- matplotlib 駕駛視覺化（需要imageio安裝）

然後您可以運行以下命令來查看您的模型如何驅動：

In [None]:
python -m homework.supertux_utils.evaluate --model mlp_planner --track lighthouse

# 2. 評分方式

In [None]:
python -m grader homework -v

python -m grader homework -vv

# 3. Part 1a：MLP 計畫器

- 目標
  - 從車道邊界資訊來預測車輛的目標軌跡 (waypoints)
  - 使用 多層感知機 (MLP, Multi-Layer Perceptron) 

- 輸入輸出
  - 一張原生圖片資料經過 `road_dataset.py` 會有 
    - Image Size: (3, 96, 128)
    - Track Left: 10*2: [[x,z] ... 10* ... [x,z]]
    - Track Right: 10*2: [[x,z] ... 10* ... [x,z]]
    - Waypoints: n*2: [[x,z] ... n ... [x,z]]
    - Waypoints Mask: n*2: [ T/F  ... n ... T/F]
  - 形狀過程
    - 輸入資料
    - 經過 `road_dataset.py` 變成 Track Left/Right 座標 [B,10,2] & [B,10,2]
    - 兩個合併攤開(座標消失)，[B,10,2] & [B,10,2] → [B,20,2] → [B,40]
    - \~~中間MLP\~~
    - 得到 [B, n*2]，n是預測的點數量，可更改，默認為3
    - Waypoints 變回座標 [B, n, 2]
    - 同時擁有 Waypoints Mask 布林值，代表該目標點是真實預測點/補充點，形狀為 [B,n]

- 結構
  - 輸入層
    - 數據壓平成1D
    - `Flatten()`
  - 隱藏層
    - Linear + ReLU
    - `torch.nn.Linear` + `ReLU`
  - 輸出層
    - 匹配成 [n*2] 的形狀

- 損失函數
  - 均方誤差 MSE
  - `torch.nn.MSELoss()`

- 優化器
  - Adam
  - `torch.optim.Adam()`

- 訓練
  - `forward()` 前向傳播 (Forward Pass)：輸入 X 通過 MLP，獲得預測值 Y_pred。
  - `loss` 計算損失：使用 MSE 衡量 Y_pred 和 Y 的差距。
  - `backward()` 反向傳播 (Backward Pass)
  - `update weights` 計算梯度並更新權重。
  - `epochs` 重複多個 Epoch，直到收斂。

- 測試
  - 使用 測試集 (test set) 來評估 MLP 的表現。
  - 衡量 MSE 誤差

- 可視化
  - 

## (1) Model

### 成功檔案

#### Version 1

In [None]:
class MLPPlanner(nn.Module):
    def __init__(
        self,
        n_track: int = 10,
        n_waypoints: int = 3,
    ):
        """
        Args:
            n_track (int): number of points in each side of the track
            n_waypoints (int): number of waypoints to predict
        """
        super().__init__()

        self.n_track = n_track
        self.n_waypoints = n_waypoints

        layers = []
        input_size = 2*2*n_track
        hidden_size = [64,32,32,32]

        for n_out in hidden_size:
            layers.append(torch.nn.Linear(input_size, n_out))
            layers.append(torch.nn.BatchNorm1d(n_out))
            layers.append(torch.nn.ReLU())
            input_size = n_out

        layers.append(torch.nn.Linear(n_out, 2*n_waypoints))
        self.network = torch.nn.Sequential(*layers)

    def forward(
        self,
        track_left: torch.Tensor,
        track_right: torch.Tensor,
        **kwargs,
    ) -> torch.Tensor:
        """
        Predicts waypoints from the left and right boundaries of the track.

        During test time, your model will be called with
        model(track_left=..., track_right=...), so keep the function signature as is.

        Args:
            track_left (torch.Tensor): shape (b, n_track, 2)
            track_right (torch.Tensor): shape (b, n_track, 2)

        Returns:
            torch.Tensor: future waypoints with shape (b, n_waypoints, 2)
        """
        track_all = torch.cat((track_left, track_right), dim=1).flatten(start_dim=1, end_dim=2)
        waypoints = self.network(track_all)
        waypoints = waypoints.view(-1, self.n_waypoints, 2)
        return waypoints

## (2) Train

### 訓練方式

In [None]:
python -m homework.train_planner

python batch_train.py        


In [None]:
from homework.train_planner import train
import torch.multiprocessing as mp

if __name__ == "__main__":
    mp.freeze_support() 
    
    batch_size = [
        {"batch_size": 128},
    ]

    lr = [
        {"lr": 0.001},
    ]

    for s in batch_size:
        for l in lr:
            print(f"batch_size: {s['batch_size']}, lr: {l['lr']}")
            train(**s, **l)

### 訓練內容

In [None]:
import torch
from torch.utils.tensorboard import SummaryWriter
from .metrics import PlannerMetric
from .models import load_model, save_model
from .datasets import road_dataset

def train(
    exp_dir: str = "logs",             # 儲存 TensorBoard 日誌和模型檔案的資料夾
    model_name: str = "mlp_planner",   # 模型名稱 (mlp_planner, transformer_planner, cnn_planner)
    num_epoch: int = 21,                # 訓練的總輪數 epoch
    lr: float = 0.001,                   # 學習率
    batch_size: int = 128,             # 每個 batch 的大小
    seed: int = 2024,                  # 隨機種子
    **kwargs,                          # 其他模型參數
):

    # 設定 GPU / CPU 運算裝置
    if torch.cuda.is_available():
        device = torch.device("cuda")        # 如果有 CUDA，則使用 GPU
    else:
        print("CUDA not available, using CPU")
        device = torch.device("cpu")         # 如果沒有 GPU，則使用 CPU

    # 設定隨機種子以確保可重現性
    torch.manual_seed(seed)

    # 設定 TensorBoard 日誌儲存路徑
    writer = SummaryWriter()
    
    # 載入模型
    model = load_model(model_name, **kwargs)   # 載入模型
    model = model.to(device)                   # 將模型權重移到 GPU / CPU
    model.train()                              # 設定模型為訓練模式

    # 載入訓練和驗證資料集
    train_data = road_dataset.load_data("drive_data/train", shuffle=True, batch_size=batch_size, num_workers=4)
    val_data = road_dataset.load_data("drive_data/val", shuffle=False)
    # 將資料移到 GPU / CPU
    train_data = [(i["track_left"].to(device), i["track_right"].to(device), i["waypoints"].to(device), i["waypoints_mask"].to(device)) for i in train_data]
    val_data = [(i["track_left"].to(device), i["track_right"].to(device), i["waypoints"].to(device), i["waypoints_mask"].to(device)) for i in val_data]

    # 設定損失函數和優化器
    loss_fn_x = torch.nn.L1Loss()
    loss_fn_y = torch.nn.SmoothL1Loss(beta=0.01) 
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)

    # 訓練迴圈的全域變數
    global_step = 0

    # 訓練迴圈
    for epoch in range(num_epoch):
        # 初始化計算器
        metric = PlannerMetric() 
        # 設定模型為訓練模式
        model.train()
        for track_left, track_right, waypoints, waypoints_mask in train_data:
            # 計算模型預測
            waypoints_pred = model(track_left, track_right)
            # loss
            metric.add(waypoints_pred, waypoints, waypoints_mask)
            longitudinal_error = metric.compute()["longitudinal_error"] 
            lateral_error = metric.compute()["lateral_error"]

            x_loss = loss_fn_x(waypoints_pred[:, :, 0] * waypoints_mask, waypoints[:, :, 0] * waypoints_mask)
            y_loss = loss_fn_y(waypoints_pred[:, :, 1] * waypoints_mask, waypoints[:, :, 1] * waypoints_mask)
            alpha = torch.sigmoid(torch.tensor(longitudinal_error - lateral_error))
            loss = alpha * x_loss + (1-alpha) * y_loss
            # 反向傳播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            # 更新全域步數
            global_step += 1

        # 記錄 Loss
        writer.add_scalar("train/loss", loss, global_step)

        # 清除每個 epoch 的 metrics
        metric.reset()
        
        # 切換到評估模式
        # 禁用梯度計算，加快推理速度
        model.eval()
        for track_left, track_right, waypoints, waypoints_mask in val_data:
            # 計算模型預測
            with torch.inference_mode(): # 禁用梯度計算，加快推理速度
                waypoints_pred = model(track_left, track_right)
            # 計算驗證準確率
            metric.add(waypoints_pred, waypoints, waypoints_mask)
            val_long_er = metric.compute()["longitudinal_error"] 
            cal_lt_er = metric.compute()["lateral_error"]

        # 列出誤差
        if epoch%5 == 0:
            print(f"Epoch {epoch}")
            print(f"Train= Long ER: {longitudinal_error:.4f}, Lateral ER: {lateral_error:.4f}, Valid= Long ER: {val_long_er:.4f}, Lateral ER: {cal_lt_er:.4f}")

        ## Early stopping
        if val_long_er < 0.17 and cal_lt_er < 0.55:
            torch.save(model.state_dict(), f"long_{val_long_er:.4f}_lateral_{cal_lt_er:.4f}_epoch{epoch}_b{batch_size}_lr{lr}.pth")

        # 清除每個 epoch 的 metrics
        metric.reset()
        # 將日誌寫入檔案
        writer.flush()

    # 儲存 .th 權重檔案
    save_model(model)

import argparse

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_epoch", type=int, default=21)
    parser.add_argument("--lr", type=float, default=0.001)
    parser.add_argument("--batch_size", type=int, default=128)
    parser.add_argument("--alpha", type=float, default=0.001)
    train(**vars(parser.parse_args()))

In [111]:
for i in train_data:
    print(i["image"].shape)
    print(i["track_left"].shape)
    print(i["track_right"].shape)
    print(i["waypoints"].shape)
    print(i["waypoints_mask"].shape)
    break

torch.Size([256, 3, 96, 128])
torch.Size([256, 10, 2])
torch.Size([256, 10, 2])
torch.Size([256, 3, 2])
torch.Size([256, 3])


# 4. Part 1b：Transformer 計畫器

- 使用 Transformer 來預測車輛軌跡 Waypoints

- 輸入來自 road_dataset.py 的輸出，每筆資料包含：

  - track_left：形狀為 (B, n_track, 2)，左側邊界的 x,z 座標

  - track_right：形狀為 (B, n_track, 2)，右側邊界的 x,z 座標

- 預測目標 (Output):

  - waypoints：預測結果形狀為 (B, n_waypoints, 2)，表示未來的導航點 (x, z)

## (1) Model

### 原始檔案

In [None]:
class TransformerPlanner(nn.Module):
    def __init__(
        self,
        n_track: int = 10,
        n_waypoints: int = 3,
        d_model: int = 64,
    ):
        super().__init__()

        self.n_track = n_track
        self.n_waypoints = n_waypoints

        self.query_embed = nn.Embedding(n_waypoints, d_model)

    def forward(
        self,
        track_left: torch.Tensor,
        track_right: torch.Tensor,
        **kwargs,
    ) -> torch.Tensor:
        """
        Predicts waypoints from the left and right boundaries of the track.

        During test time, your model will be called with
        model(track_left=..., track_right=...), so keep the function signature as is.

        Args:
            track_left (torch.Tensor): shape (b, n_track, 2)
            track_right (torch.Tensor): shape (b, n_track, 2)

        Returns:
            torch.Tensor: future waypoints with shape (b, n_waypoints, 2)
        """
        raise NotImplementedError

In [32]:
import torch 
d_model = 4
p = 3
query_embed = torch.nn.Embedding(p, d_model)
print("weight=\n",query_embed.weight)

x = torch.randn(5,d_model)
print(x)

x = x[-p:,...]
print(x)
print(x+query_embed.weight) 

weight=
 Parameter containing:
tensor([[-0.1868,  0.7949,  0.7277, -0.9573],
        [-0.2949,  0.0187,  0.7739, -0.9359],
        [-0.0500,  3.7119,  1.0982,  0.6698]], requires_grad=True)
tensor([[ 0.7467, -2.1993,  0.5563, -0.5789],
        [ 1.2145,  1.2570,  0.1477, -0.5473],
        [-0.2250, -1.4292, -0.0911,  1.6844],
        [ 1.2512,  0.1812,  0.9084,  0.0930],
        [ 0.4955,  0.2275, -0.0736, -0.4374]])
tensor([[-0.2250, -1.4292, -0.0911,  1.6844],
        [ 1.2512,  0.1812,  0.9084,  0.0930],
        [ 0.4955,  0.2275, -0.0736, -0.4374]])
tensor([[-0.4118, -0.6343,  0.6367,  0.7271],
        [ 0.9563,  0.1999,  1.6823, -0.8429],
        [ 0.4455,  3.9394,  1.0246,  0.2325]], grad_fn=<AddBackward0>)


### 成功檔案

#### Version 1

In [None]:
class TransformerPlanner(nn.Module):
    def __init__(
        self,
        n_track: int = 10,
        n_waypoints: int = 3,
        d_model: int = 64,
    ):
        super().__init__()

        self.n_track = n_track
        self.n_waypoints = n_waypoints

        self.query_embed = nn.Embedding(n_waypoints, d_model)
        
        self.network = nn.Sequential(
            nn.Linear(2, d_model),
            *[TransformerLayer(d_model, num_heads=8) for _ in range(4)],
            #nn.Linear(d_model, 2),
            )
        self.output_layer = nn.Linear(d_model, 2)

    def forward(
        self,
        track_left: torch.Tensor,
        track_right: torch.Tensor,
        **kwargs,
    ) -> torch.Tensor:
        """
        Predicts waypoints from the left and right boundaries of the track.

        During test time, your model will be called with
        model(track_left=..., track_right=...), so keep the function signature as is.

        Args:
            track_left (torch.Tensor): shape (b, n_track, 2)
            track_right (torch.Tensor): shape (b, n_track, 2)

        Returns:
            torch.Tensor: future waypoints with shape (b, n_waypoints, 2)
        """
        track_all = torch.cat((track_left, track_right), dim=1)
        waypoints = self.network(track_all)
        waypoints = waypoints[..., -self.n_waypoints:, :] + self.query_embed.weight
        waypoints = self.output_layer(waypoints)
        return waypoints

## (2) Train

### 訓練方式

In [None]:
python -m homework.train_planner
python batch_train.py        

In [None]:
from homework.train_planner import train
import torch.multiprocessing as mp

if __name__ == "__main__":
    mp.freeze_support() 
    
    batch_size = [
        {"batch_size": 128},
    ]

    lr = [
        {"lr": 0.001},
    ]

    for s in batch_size:
        for l in lr:
            print(f"batch_size: {s['batch_size']}, lr: {l['lr']}")
            train(**s, **l)

In [None]:
import torch
from torch.utils.tensorboard import SummaryWriter
from .metrics import PlannerMetric
from .models import load_model, save_model
from .datasets import road_dataset

def train(
    exp_dir: str = "logs",
    model_name: str = "transformer_planner", 
    num_epoch: int = 21, 
    lr: float = 0.001, 
    batch_size: int = 128, 
    seed: int = 2024, 
    **kwargs,
):
    if torch.cuda.is_available():
        device = torch.device("cuda")  
    else:
        print("CUDA not available, using CPU")
        device = torch.device("cpu") 
    torch.manual_seed(seed)
    writer = SummaryWriter()
    model = load_model(model_name, **kwargs)  
    model = model.to(device) 
    model.train()
    train_data = road_dataset.load_data("drive_data/train", shuffle=True, batch_size=batch_size, num_workers=4)
    val_data = road_dataset.load_data("drive_data/val", shuffle=False)
    train_data = [(i["track_left"].to(device), i["track_right"].to(device), i["waypoints"].to(device), i["waypoints_mask"].to(device)) for i in train_data]
    val_data = [(i["track_left"].to(device), i["track_right"].to(device), i["waypoints"].to(device), i["waypoints_mask"].to(device)) for i in val_data]
    loss_fn_x = torch.nn.SmoothL1Loss(beta=0.0001)
    loss_fn_y = torch.nn.SmoothL1Loss(beta=0.01) 
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    global_step = 0
    for epoch in range(num_epoch):
        metric = PlannerMetric() 
        model.train()
        for track_left, track_right, waypoints, waypoints_mask in train_data:
            waypoints_pred = model(track_left, track_right)
            metric.add(waypoints_pred, waypoints, waypoints_mask)
            longitudinal_error = metric.compute()["longitudinal_error"] 
            lateral_error = metric.compute()["lateral_error"]
            x_loss = loss_fn_x(waypoints_pred[:, :, 0] * waypoints_mask, waypoints[:, :, 0] * waypoints_mask)
            y_loss = loss_fn_y(waypoints_pred[:, :, 1] * waypoints_mask, waypoints[:, :, 1] * waypoints_mask)
            alpha = torch.sigmoid(torch.tensor(longitudinal_error - lateral_error))
            loss = alpha * x_loss + (1-alpha) * y_loss
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            global_step += 1
        writer.add_scalar("train/loss", loss, global_step)
        metric.reset()
        model.eval()
        for track_left, track_right, waypoints, waypoints_mask in val_data:
            with torch.inference_mode():
                waypoints_pred = model(track_left, track_right)
            metric.add(waypoints_pred, waypoints, waypoints_mask)
            val_long_er = metric.compute()["longitudinal_error"] 
            cal_lt_er = metric.compute()["lateral_error"]
        if epoch%5 == 0:
            print(f"Epoch {epoch}")
            print(f"Train= Long ER: {longitudinal_error:.4f}, Lateral ER: {lateral_error:.4f}, Valid= Long ER: {val_long_er:.4f}, Lateral ER: {cal_lt_er:.4f}")
        if val_long_er < 0.22 and cal_lt_er < 0.52:
            torch.save(model.state_dict(), f"long_{val_long_er:.4f}_lateral_{cal_lt_er:.4f}_epoch{epoch}_b{batch_size}_lr{lr}.pth")
        metric.reset()
        writer.flush()
    save_model(model)
import argparse
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_epoch", type=int, default=21)
    parser.add_argument("--lr", type=float, default=0.001)
    parser.add_argument("--batch_size", type=int, default=128)
    train(**vars(parser.parse_args()))

### 訓練內容

# 5. Part 2：CNN 計畫器

- 使用 CNN 直接從影像預測車道邊界

## (1) Model

### 原始檔案

In [None]:
class CNNPlanner(torch.nn.Module):
    def __init__(
        self,
        n_waypoints: int = 3,
    ):
        super().__init__()

        self.n_waypoints = n_waypoints

        self.register_buffer("input_mean", torch.as_tensor(INPUT_MEAN), persistent=False)
        self.register_buffer("input_std", torch.as_tensor(INPUT_STD), persistent=False)

    def forward(self, image: torch.Tensor, **kwargs) -> torch.Tensor:
        """
        Args:
            image (torch.FloatTensor): shape (b, 3, h, w) and vals in [0, 1]

        Returns:
            torch.FloatTensor: future waypoints with shape (b, n, 2)
        """
        x = image
        x = (x - self.input_mean[None, :, None, None]) / self.input_std[None, :, None, None]

        raise NotImplementedError

In [56]:
image = torch.arange(2*3*2*4, dtype=torch.float).reshape(2,3,2,4)
print(image.shape)
print(image)

torch.Size([2, 3, 2, 4])
tensor([[[[ 0.,  1.,  2.,  3.],
          [ 4.,  5.,  6.,  7.]],

         [[ 8.,  9., 10., 11.],
          [12., 13., 14., 15.]],

         [[16., 17., 18., 19.],
          [20., 21., 22., 23.]]],


        [[[24., 25., 26., 27.],
          [28., 29., 30., 31.]],

         [[32., 33., 34., 35.],
          [36., 37., 38., 39.]],

         [[40., 41., 42., 43.],
          [44., 45., 46., 47.]]]])


In [57]:
pool_1 = nn.AdaptiveAvgPool2d((1,1))
image_pool_1 = pool_1(image)
print(image_pool_1)

image_flatten = nn.Flatten()(image_pool_1)
print(image_flatten)


tensor([[[[ 3.5000]],

         [[11.5000]],

         [[19.5000]]],


        [[[27.5000]],

         [[35.5000]],

         [[43.5000]]]])
tensor([[ 3.5000, 11.5000, 19.5000],
        [27.5000, 35.5000, 43.5000]])


### 成功檔案

#### Version 1

In [None]:
class CNNPlanner(torch.nn.Module):
    def __init__(
        self,
        n_waypoints: int = 3,
    ):
        super().__init__()

        self.n_waypoints = n_waypoints

        self.register_buffer("input_mean", torch.as_tensor(INPUT_MEAN), persistent=False)
        self.register_buffer("input_std", torch.as_tensor(INPUT_STD), persistent=False)
        
        c1 = 128
        First_layer = [
            nn.Conv2d(3, c1, kernel_size=7, stride=4, padding=3),
            nn.BatchNorm2d(c1),
            nn.ReLU(),
        ]

        c2 = 64
        times = 3
        middle_layers = []
        for _ in range(times):
            middle_layers.append(nn.Conv2d(c1, c2, kernel_size=3, stride=2, padding=1))
            middle_layers.append(nn.BatchNorm2d(c2))
            middle_layers.append(nn.ReLU())
            c1 = c2

        out_layer = [
            # 輸出 (B, waypoints*2)
            # 從 3D tensor 轉換到 2D tensor
            # 1. 平均池化: 提取寬度和高度的特徵值，減少空間資訊
            # 把 (B, 8, H', W') 轉換成 (B, 8=8個色彩通道, 每個通道保留 1 個平均特徵值) 
            nn.AdaptiveAvgPool2d((1, 1)),
            # 2. 展平: 把 3D 轉換成 2D
            # 原本是 (B, 8, 1, 1) 轉換成 (B, 8)
            nn.Flatten(),
            # 3. 輸出成預期的形狀 (B, n_waypoints*2)
            # 從 8 個特徵值轉換成 n_waypoints*2 個特徵值 (x,z)座標
            nn.Linear(c2, n_waypoints * 2),
        ]

        self.network = nn.Sequential(*(First_layer + middle_layers + out_layer))

    def forward(self, image: torch.Tensor, **kwargs) -> torch.Tensor:
        """
        Args:
            image (torch.FloatTensor): shape (b, 3, h, w) and vals in [0, 1]

        Returns:
            torch.FloatTensor: future waypoints with shape (b, n, 2)
        """
        x = image # x.shape = (b, 3, h=96, w=128)
        # 標準化
        x = (x - self.input_mean[None, :, None, None]) / self.input_std[None, :, None, None]
        x = self.network(x)
        
        return x.view(-1, self.n_waypoints, 2)

## (2) Train

### 訓練方式

In [None]:
python -m homework.train_planner

python batch_train.py        


In [None]:
from homework.train_planner import train
import torch.multiprocessing as mp

if __name__ == "__main__":
    mp.freeze_support() 
    
    batch_size = [
        {"batch_size": 128},
    ]

    lr = [
        {"lr": 0.001},
    ]

    for s in batch_size:
        for l in lr:
            print(f"batch_size: {s['batch_size']}, lr: {l['lr']}")
            train(**s, **l)

### 訓練內容

# 6. 提交作業

In [None]:
python bundle.py homework pc29368
python -m grader pc29368.zip

# 7. 了解各部分程式碼

## (1) model.py

In [None]:
from pathlib import Path

import torch
import torch.nn as nn

HOMEWORK_DIR = Path(__file__).resolve().parent
INPUT_MEAN = [0.2788, 0.2657, 0.2629]
INPUT_STD = [0.2064, 0.1944, 0.2252]

class MLPPlanner(nn.Module):raise NotImplementedError
class TransformerPlanner(nn.Module):raise NotImplementedError
class CNNPlanner(torch.nn.Module):raise NotImplementedError

MODEL_FACTORY = {
    "mlp_planner": MLPPlanner,
    "transformer_planner": TransformerPlanner,
    "cnn_planner": CNNPlanner,
}

# 依據 model_name 建立對應的模型
def load_model(
    model_name: str,
    with_weights: bool = False,
    **model_kwargs,
) -> torch.nn.Module:
    """
    Called by the grader to load a pre-trained model by name
    """
    m = MODEL_FACTORY[model_name](**model_kwargs)

    if with_weights:
        model_path = HOMEWORK_DIR / f"{model_name}.th"
        assert model_path.exists(), f"{model_path.name} not found"

        try:
            m.load_state_dict(torch.load(model_path, map_location="cpu"))
        except RuntimeError as e:
            raise AssertionError(
                f"Failed to load {model_path.name}, make sure the default model arguments are set correctly"
            ) from e

    # limit model sizes since they will be zipped and submitted
    model_size_mb = calculate_model_size_mb(m)

    if model_size_mb > 20:
        raise AssertionError(f"{model_name} is too large: {model_size_mb:.2f} MB")

    return m

# 將訓練好的模型儲存成 .th 權重檔案。
def save_model(model: torch.nn.Module) -> str:
    """
    Use this function to save your model in train.py
    """
    model_name = None

    for n, m in MODEL_FACTORY.items():
        if type(model) is m:
            model_name = n

    if model_name is None:
        raise ValueError(f"Model type '{str(type(model))}' not supported")

    output_path = HOMEWORK_DIR / f"{model_name}.th"
    torch.save(model.state_dict(), output_path)

    return output_path

# 計算模型大小
def calculate_model_size_mb(model: torch.nn.Module) -> float:
    """
    Naive way to estimate model size
    """
    return sum(p.numel() for p in model.parameters()) * 4 / 1024 / 1024


## (2) datasets

### road_dataset.py

- 功能：

  - 定義 RoadDataset 類別

  - 讀取數據，包括影像、深度圖、.npz 文件

  - 返回包含 影像、waypoints、邊界線 等資訊的 sample。

- 主要步驟：

  - 讀取 影像 (.jpg) 和深度 (.png)。

  - 從 .npz 檔案讀取數據，並整理成 dict 格式。

  - 透過 dataset[idx] 取出第 idx 幀的數據樣本。

In [None]:
from pathlib import Path

import numpy as np
from torch.utils.data import ConcatDataset, DataLoader, Dataset

from . import road_transforms
from .road_utils import Track

# 讀取賽車軌跡數據
# 賽車軌跡數據包含了賽道的左邊界和右邊界的點座標
# 以及賽道的中心線和車輛的當前位置等信息
class RoadDataset(Dataset):
    """
    SuperTux dataset for road detection
    """

    # 載入數據
    def __init__(
        self,
        episode_path: str,
        transform_pipeline: str = "default",
    ):
        super().__init__()

        self.episode_path = Path(episode_path)

        # 載入賽道數據
        info = np.load(self.episode_path / "info.npz", allow_pickle=True)

        # 軌跡數據
        self.track = Track(**info["track"].item())
        # 影像數據與其他軌跡資訊
        self.frames: dict[str, np.ndarray] = {k: np.stack(v) for k, v in info["frames"].item().items()}
        # 決定數據如何處理
        self.transform = self.get_transform(transform_pipeline)

    # 決定 如何處理數據，支援三種模式
    def get_transform(self, transform_pipeline: str):
        """
        Creates a pipeline for processing data.

        Feel free to add your own pipelines (e.g. for data augmentation).
        Note that the grader will choose one of the predefined pipelines,
        so be careful if you modify the existing ones.
        """
        xform = None

        # 1. (標準處理)
        if transform_pipeline == "default":
            # image, track_left, track_right, waypoints, waypoints_mask
            xform = road_transforms.Compose(
                [
                    road_transforms.ImageLoader(self.episode_path),
                    road_transforms.EgoTrackProcessor(self.track),
                ]
            )

        # 2.  (只使用軌跡，不使用影像)
        elif transform_pipeline == "state_only":
            # track_left, track_right, waypoints, waypoints_mask
            xform = road_transforms.EgoTrackProcessor(self.track)
        
        # 3. (數據增強，尚未實作)
        elif transform_pipeline == "aug":
            # add your custom augmentations here
            pass

        if xform is None:
            raise ValueError(f"Invalid transform {transform_pipeline} specified!")

        return xform

    # 資料集大小
    def __len__(self):
        return len(self.frames["location"])

    # 返回模型的輸入數據
    def __getitem__(self, idx: int):
        # 建立 sample 字典
        sample = {"_idx": idx, "_frames": self.frames}
        # 載入影像、處理軌跡數據
        sample = self.transform(sample)

        # 移除 _idx 和 _frames
        # _idx 只是索引，不需要當成模型輸入。
        # _frames 是原始影像數據，經過處理後不需要了。
        # remove private keys
        for key in list(sample.keys()):
            if key.startswith("_"):
                sample.pop(key)

        # 輸出：包含 影像、導航點、道路邊界 等資訊的字典。
        return sample


def load_data(
    dataset_path: str,
    transform_pipeline: str = "default",
    return_dataloader: bool = True,
    num_workers: int = 2,
    batch_size: int = 32,
    shuffle: bool = False,
) -> DataLoader | Dataset:
    """
    Constructs the dataset/dataloader.
    The specified transform_pipeline must be implemented in the RoadDataset class.

    Args:
        transform_pipeline (str): 'default', 'aug', or other custom transformation pipelines
        return_dataloader (bool): returns either DataLoader or Dataset
        num_workers (int): data workers, set to 0 for VSCode debugging
        batch_size (int): batch size
        shuffle (bool): should be true for train and false for val

    Returns:
        DataLoader or Dataset
    """
    dataset_path = Path(dataset_path)
    scenes = [x for x in dataset_path.iterdir() if x.is_dir()]

    # can pass in a single scene like "road_data/val/cornfield_crossing_04"
    if not scenes and dataset_path.is_dir():
        scenes = [dataset_path]

    datasets = []
    for episode_path in sorted(scenes):
        datasets.append(RoadDataset(episode_path, transform_pipeline=transform_pipeline))
    dataset = ConcatDataset(datasets)

    print(f"Loaded {len(dataset)} samples from {len(datasets)} episodes")

    if not return_dataloader:
        return dataset

    return DataLoader(
        dataset,
        num_workers=num_workers,
        batch_size=batch_size,
        shuffle=shuffle,
    )


### road_transforms.py

- 功能：
  - 負責 數據預處理與增強 (Data Transformations)，在讀取數據後進行處理。

- 類別：

  - `ImageLoader`：讀取影像並正規化 (/ 255.0)。

  - `DepthLoader`：讀取深度圖並正規化 (/ 65535.0)。

  - `TrackProcessor`：計算並轉換道路邊界 (track_left & track_right) 到影像座標。

  - `RandomHorizontalFlip`：隨機水平翻轉影像與對應的 track。

- 從 `sample` 這個字典裡提取關鍵數據
  
  - `location:` 目前車輛的位置。
  
  - `front`: 車輛的前進方向。
  
  - `distance_down_track`: 沿著賽道的距離。
  
  - `waypoints`: 從 frames["location"] 提取未來的位置，作為目標點（導航點）。

- 從 `from_frame()` 計算

  - `track_left` 和 `track_right`

    - (n_track, 2) 浮點數，左右車道邊界點

    - 計算 賽道的左右邊界  (呼叫 `Track.get_boundaries()`)

    - 轉換到車輛自身的座標系 (`create_pose_matrix()`) 左右邊界乘上這個矩陣，就會轉換到車輛的視角。

  - `waypoints`

    - (n_waypoints, 2) 浮點數，目標航點

    - 來自 frames["location"]，代表未來的導航點。

    - 轉換到車輛自身的座標系，然後轉成 2D（只保留 x 和 z）。

  - `waypoints_mask`

    - (n_waypoints,) bool mask 表示「乾淨」的航路點。

    - 如果 waypoints 數量少於 n_waypoints (導航點的數量)，則補足缺少的點，確保 waypoints 長度固定。

    - 產生一個 mask，標記哪些是「真正的 waypoints」，哪些是「填充的無效點」。

    - 後續計算時，就可以忽略掉補上的無效點，避免影響模型的學習效果。


In [None]:
"""
This file is provided as-is and does not require modification.
If you want to add custom data augmentation during training, feel free to extend this file.

Design pattern of the transforms:
1. Take in dictionary of sample data
2. Look for specific inputs in the sample
3. Process the inputs
4. Add new data to the sample
"""

from pathlib import Path

import cv2
import numpy as np
from PIL import Image
from torchvision import transforms as tv_transforms

from .road_utils import Track, homogeneous

# 將 3D 座標點投影到 2D 影像平面。
def project(points, view, proj, h, w):
    points_uv_raw = points @ view @ proj
    points_uv = points_uv_raw / points_uv_raw[:, -1:]

    # convert from uv to pixel coordinates, [0, W] and [0, H]
    points_img = points_uv[:, :2]
    points_img[:, 0] = (points_img[:, 0] + 1) * w / 2
    points_img[:, 1] = (1 - points_img[:, 1]) * h / 2

    mask = (
        (points_uv_raw[:, -1] > 1)  # must be in front of camera
        & (points_uv_raw[:, -1] < 15)  # don't render too far
        & (points_img[:, 0] >= 0)  # projected in valid img width
        & (points_img[:, 0] < w)
        & (points_img[:, 1] >= 0)  # projected in valid img height
        & (points_img[:, 1] < h)
    )

    return points_img[mask], mask

# 將邊界線繪製到影像上。
def rasterize_lines(
    points: np.ndarray,
    canvas: np.ndarray,
    color: int,
    thickness: int = 4,
):
    for i in range(len(points) - 1):
        start = points[i].astype(int)
        end = points[i + 1].astype(int)

        cv2.line(canvas, tuple(start), tuple(end), color, thickness)

# 補足或截斷點序列到指定長度。
def pad(points: np.ndarray, max_length: int) -> tuple[np.ndarray, np.ndarray]:
    """
    Pads/truncates the points to a set length

    Args:
        points (np.ndarray): sequence of points with shape (n, d)

    Returns:
        tuple[np.ndarray, np.ndarray]: padded points (max_length, d) and mask (max_length,)
    """
    truncated_points = points[:max_length]

    # create a mask denoting which points are valid
    mask = np.ones(max_length, dtype=bool)
    mask[len(truncated_points) :] = False

    required_padding = max_length - len(truncated_points)

    if required_padding > 0:
        # pad with the last element
        if len(truncated_points) == 0:
            padding = np.zeros((required_padding, points.shape[1]), dtype=np.float32)
        else:
            padding = np.repeat(truncated_points[-1:], required_padding, axis=0)
        padded_points = np.concatenate([truncated_points, padding])
    else:
        padded_points = truncated_points

    return padded_points, mask

# 矩陣轉換
def create_pose_matrix(
    location: np.ndarray,
    front: np.ndarray,
    up: np.ndarray = [0, 1, 0],
    eps: float = 1e-5,
):
    """
    Args:
        location: cart position
        front: Point the camera is looking at
        up: up vector, default is Y-up [0, 1, 0]

    Returns:
        4x4 matrix
    """
    forward = front - location
    forward = forward / (np.linalg.norm(forward) + eps)

    # calculate right vector (x-axis)
    right = np.cross(forward, up)
    right = right / (np.linalg.norm(right) + eps)

    # recalculate up vector (y-axis) to ensure orthogonality
    up = np.cross(right, forward)

    # create matrix representations and compose
    R = np.eye(4)
    R[:3, :3] = np.vstack((-right, up, forward))
    T = np.eye(4)
    T[:3, 3] = -location
    pose_matrix = R @ T

    return pose_matrix

# transforms 是 torchvision.transforms 的一個子類別
# 它可以將多個轉換組合在一起，並且可以在一次呼叫中應用所有轉換。
# 這樣可以簡化數據預處理的過程，並且使代碼更具可讀性。
# 這個類別的主要作用是將多個轉換組合在一起，並且可以在一次呼叫中應用所有轉換。
class Compose(tv_transforms.Compose):
    def __call__(self, sample: dict):
        for t in self.transforms:
            sample = t(sample)
        return sample

# 讀取影像並正規化
# 這個類別的主要作用是讀取影像並將其轉換為 numpy 陣列，然後將其正規化到 [0, 1] 的範圍內。
# 這樣可以使影像數據更適合用於訓練深度學習模型。
class ImageLoader:
    def __init__(self, episode_path: str):
        self.episode_path = Path(episode_path)

    def __call__(self, sample: dict):
        image_path = self.episode_path / f"{sample['_idx']:05d}_im.jpg"
        image = np.uint8(Image.open(image_path)) / 255.0
        image = image.transpose(2, 0, 1)

        sample["image"] = image.astype(np.float32)

        return sample

# 讀取深度圖並正規化
# 這個類別的主要作用是讀取深度圖並將其轉換為 numpy 陣列，然後將其正規化到 [0, 1] 的範圍內。
# 這樣可以使深度圖數據更適合用於訓練深度學習模型。
class DepthLoader(ImageLoader):
    def __call__(self, sample: dict):
        depth_path = self.episode_path / f"{sample['_idx']:05d}_depth.png"
        depth = np.uint16(Image.open(depth_path)) / 65535.0

        sample["depth"] = depth.astype(np.float32)

        return sample

# 隨機水平翻轉影像與對應的 track。
# 這個類別的主要作用是隨機翻轉影像和對應的 track，以增加數據的多樣性。
# 這樣可以使模型更具魯棒性，並且能夠更好地適應不同的場景和條件。
class RandomHorizontalFlip(tv_transforms.RandomHorizontalFlip):
    def __call__(self, sample: dict):
        if np.random.rand() < self.p:
            sample["image"] = np.flip(sample["image"], axis=2)
            sample["track"] = np.flip(sample["track"], axis=1)

        return sample

# 計算並轉換道路邊界 (track_left & track_right) 到影像座標。
class TrackProcessor:
    """
    Provides segmentation labels for left and right track
    """
    def __init__(self, track: Track):
        self.track = track

    def __call__(self, sample: dict):
        idx = sample["_idx"]
        frames = sample["_frames"]
        image = sample["image"]
        distance_down_track = frames["distance_down_track"][idx]
        proj = frames["P"][idx].copy()
        view = frames["V"][idx].copy()
        view[-1, :3] += -1.0 * view[1, :3]

        track_left, track_right = self.track.get_boundaries(distance_down_track)

        # project to image plane
        h, w = image.shape[1:]
        track_left, _ = project(track_left, view, proj, h, w)
        track_right, _ = project(track_right, view, proj, h, w)

        # draw line segments onto a blank canvas
        track = np.zeros((h, w), dtype=np.uint8)
        rasterize_lines(track_left, track, color=1)
        rasterize_lines(track_right, track, color=2)

        sample["track"] = track.astype(np.int64)

        return sample

# 將賽道邊界轉換到車輛座標系 (ego coordinate system)。
# 這個轉換是為了讓模型能夠學習到車輛在賽道上的位置和方向。
# 這樣模型就能夠預測車輛在賽道上的行駛路徑。
class EgoTrackProcessor:
    """
    Provides round boundary point labels and target waypoints
    """
    def __init__(
        self,
        track: Track,
        n_track: int = 10,
        n_waypoints: int = 3,
        skip: int = 1,
    ):
        self.track = track
        self.n_track = n_track
        self.n_waypoints = n_waypoints
        self.skip = skip

    def __call__(self, sample: dict):
        frames = sample["_frames"]
        idx = sample["_idx"]

        front = frames["front"][idx]
        location = frames["location"][idx]
        distance_down_track = frames["distance_down_track"][idx]

        # use future location as target waypoints
        waypoints = frames["location"][idx : idx + (self.n_waypoints + 1) * self.skip : self.skip][1:]
        waypoints = homogeneous(waypoints)

        sample_info = self.from_frame(location, front, distance_down_track, waypoints)
        sample.update(sample_info)

        return sample

    def from_frame(
        self,
        location: np.ndarray,
        front: np.ndarray,
        distance_down_track: float,
        waypoints: np.ndarray | None = None,
        **kwargs,
    ):
        if waypoints is None:
            waypoints = np.zeros((1, 4), dtype=np.float32)

        world2ego = create_pose_matrix(location, front)
        track_left, track_right = self.track.get_boundaries(
            distance_down_track,
            n_points=self.n_track,
        )

        # convert to frame of kart (ego)
        track_left = track_left @ world2ego.T
        track_right = track_right @ world2ego.T
        waypoints = waypoints @ world2ego.T

        # project to bird's eye view (bev)
        track_left = track_left[:, [0, 2]]
        track_right = track_right[:, [0, 2]]
        waypoints = waypoints[:, [0, 2]]

        # make sure points are expected size
        track_left, _ = pad(track_left, self.n_track)
        track_right, _ = pad(track_right, self.n_track)
        waypoints, waypoints_mask = pad(waypoints, self.n_waypoints)

        return {
            "track_left": track_left.astype(np.float32),
            "track_right": track_right.astype(np.float32),
            "waypoints": waypoints.astype(np.float32),
            "waypoints_mask": waypoints_mask,
        }


### road_utils.py

- 根據原始導航點 (path_nodes) 計算出道路的左右邊界。

  - 計算道路中心
    - `path_nodes[:, 0]` 代表道路的中心點。

  - 計算道路法線 (n)
    - 法線是 垂直於道路方向的向量，用來決定道路左右邊界的位置。
    - 計算每個導航點與下一個導航點的方向向量。
    - 假設 track.left 是 (x, y, z)，那麼 homogeneous(track.left) 會變成 (x, y, z, 1)
    - 這樣的表示方法有助於矩陣變換，例如透視投影。

  - 計算左右邊界
    - `left` = 中心點 加上 法線方向 * (道路寬度的一半)
    - `right` = 中心點 減去 法線方向 * (道路寬度的一半)

  - 讓道路變成循環
    - 將道路數據翻倍，確保導航點在某些計算時不會超出邊界 (環形賽道處理)。

  - 讓導航點變得更均勻
    - 確保導航點之間的距離相等，方便後續模型學習。

- 屬性
  - `center`：道路的中心點 (n, 3)

  - `left`：道路的左邊界 (n, 3)

  - `right`：道路的右邊界 (n, 3)

  - `center_distance`：累積距離

  - `width`：道路寬度

  - `track_left`：齊次坐標版本

  - `track_right`：齊次坐標版本

In [None]:
from functools import cached_property

import numpy as np

# 將點轉換為齊次座標。
# 齊次座標是將三維空間中的點轉換為四維空間中的點的一種方法。
def homogeneous(points: np.ndarray) -> np.ndarray:
    """
    Args:
        points (np.ndarray): points with shape (n, d)

    Returns:
        np.ndarray: homogeneous (n, d+1)
    """
    return np.concatenate([points, np.ones((len(points), 1))], axis=1)

# 將點序列進行插值，使得每個點之間的距離相等。
def interpolate_smooth(
    points: np.ndarray,
    fixed_distance: float | None = None,
    fixed_number: int | None = None,
):
    """
    Args:
        points (np.ndarray): points with shape (n, d).
        fixed_distance (float): fixed distance between points.
        fixed_number (int): fixed number of points.
    """
    if fixed_distance is None and fixed_number is None:
        raise ValueError("Either fixed_distance or fixed_number must be provided")

    dists = np.sqrt(np.sum(np.diff(points, axis=0) ** 2, axis=1))
    cumulative = np.concatenate(([0], np.cumsum(dists)))

    if fixed_distance is not None:
        sample = np.arange(0, cumulative[-1], fixed_distance)
    elif fixed_number is not None:
        sample = np.linspace(0, cumulative[-1], fixed_number, endpoint=False)

    return np.array([np.interp(sample, cumulative, points[:, i]) for i in range(points.shape[1])]).T

# 計算道路邊界的法向量。
# 法向量是垂直於道路邊界的向量，用於計算道路的寬度和方向。
class Track:
    def __init__(
        self,
        path_distance: np.ndarray,
        path_nodes: np.ndarray,
        path_width: np.ndarray,
        interpolate: bool = True,
        fixed_distance: float = 2.0,
    ):
        """
        Args:
            path_distance (np.ndarray): distance between nodes with shape (n, 2)
            path_nodes (np.ndarray): nodes with shape (n, 2, 3)
            path_width (np.ndarray): width of the path with shape (n, 1)
        """
        self.path_distance = np.float32(path_distance)
        self.path_nodes = np.float32(path_nodes)
        self.path_width = np.float32(path_width)

        # slightly perturb for numerically stable normals
        center = path_nodes[:, 0] + 1e-5 * np.random.randn(*path_nodes[:, 0].shape)
        width = path_width

        # compute left and right track using normal
        d = np.diff(center, axis=0, append=center[:1])
        n = np.stack([-d[:, 2], np.zeros_like(d[:, 0]), d[:, 0]], axis=1)
        n = n / (np.linalg.norm(n, axis=1, keepdims=True) + 1e-5)

        left = center + n * (width / 2)
        right = center - n * (width / 2)

        # loop around
        center = np.concatenate([center, center])
        left = np.concatenate([left, left])
        right = np.concatenate([right, right])

        # resample points so each point is fixed_distance apart
        if interpolate:
            center = interpolate_smooth(center, fixed_distance=fixed_distance)
            left = interpolate_smooth(left, fixed_distance=fixed_distance)
            right = interpolate_smooth(right, fixed_distance=fixed_distance)

        # compute new cumulative distance (n,)
        center_delta = np.diff(center, axis=0, prepend=center[:1])
        center_delta_norm = np.linalg.norm(center_delta, axis=1)
        self.center_distance = np.cumsum(center_delta_norm)

        # (n, 3) points
        self.center = center
        self.left = left
        self.right = right
        self.width = interpolate_smooth(width, fixed_number=center.shape[0])

    # 這個類別的主要作用是計算道路邊界的法向量，並且將其轉換為齊次座標。
    # 這樣可以使模型能夠學習到道路的形狀和方向。
    def get_boundaries(
        self,
        distance: float,
        n_points: int = 10,
        interpolate: bool = True,
        fixed_distance: float = 2.5,
    ) -> np.ndarray:
        idx = np.searchsorted(self.center_distance, distance, side="left")
        center = self.center[idx : idx + n_points + 1]
        width = self.width[idx : idx + n_points]

        d = np.diff(center, axis=0)
        n = np.stack([-d[:, 2], np.zeros_like(d[:, 0]), d[:, 0]], axis=1)
        n = n / (np.linalg.norm(n, axis=1, keepdims=True) + 1e-7)
        left = center[:-1] + n * (width / 2)
        right = center[:-1] - n * (width / 2)

        if interpolate:
            center = interpolate_smooth(center, fixed_distance=fixed_distance)
            left = interpolate_smooth(left, fixed_distance=fixed_distance)
            right = interpolate_smooth(right, fixed_distance=fixed_distance)

        left = homogeneous(left)
        right = homogeneous(right)

        return left, right

    @cached_property
    def track(self):
        return homogeneous(self.center)

    @cached_property
    def track_left(self):
        return homogeneous(self.left)

    @cached_property
    def track_right(self):
        return homogeneous(self.right)


## (3) metrics.py

- 用來計算 L1 loss（絕對誤差），並細分為：
  - 縱向誤差（longitudinal error）：沿著前進方向的誤差。
  - 橫向誤差（lateral error）：左右偏移的誤差。

In [None]:
import numpy as np
import torch


class PlannerMetric:
    """
    Computes longitudinal and lateral errors for a planner
    """

    def __init__(self):
        self.l1_errors = []
        self.total = 0

    def reset(self):
        self.l1_errors = []
        self.total = 0

    @torch.no_grad()
    def add(
        self,
        preds: torch.Tensor,
        labels: torch.Tensor,
        labels_mask: torch.Tensor,
    ):
        """
        Args:
            preds (torch.Tensor): (b, n, 2) float tensor with predicted waypoints
            labels (torch.Tensor): (b, n, 2) ground truth waypoints
            labels_mask (torch.Tensor): (b, n) bool mask for valid waypoints
        """
        error = (preds - labels).abs()
        error_masked = error * labels_mask[..., None]

        # sum across batch and waypoints
        error_sum = error_masked.sum(dim=(0, 1)).cpu().numpy()

        self.l1_errors.append(error_sum)
        self.total += labels_mask.sum().item()

    def compute(self) -> dict[str, float]:
        error = np.stack(self.l1_errors, axis=0)
        longitudinal_error = error[:, 0].sum() / self.total
        lateral_error = error[:, 1].sum() / self.total
        l1_error = longitudinal_error + lateral_error

        return {
            "l1_error": float(l1_error),
            "longitudinal_error": float(longitudinal_error),
            "lateral_error": float(lateral_error),
            "num_samples": self.total,
        }
