In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pickle
import os
import os.path as path

In [2]:
DATA_DIR = '/Users/ngocanhh/Documents/Study/tinhToanTienHoa/Lab_Robot/rrt_ml/data'

In [3]:
collection_data = pickle.load(open(path.join(DATA_DIR, 'avoidance_dataset.pkl'), 'rb'))
len(collection_data)

3528

Cấu trúc của 1 phần tử trong collection data
```python
(state, action)
```

Trong đó:

1.  **`state` (Trạng thái):**
    *   **Kiểu dữ liệu:** Là một mảng NumPy (`numpy.ndarray`) với kiểu dữ liệu `np.float32`.
    *   **Nội dung:** Đây chính là **bản sao** của `observation` mà controller nhận được từ môi trường *ngay tại thời điểm* nó quyết định cần phải "chờ" (`WaitingRule.should_wait()` trả về `True`).
    *   **Cấu trúc chi tiết của `state` (dựa trên `IndoorRobotEnv._get_observation`):**
        *   Nó là một mảng 1 chiều (flat array).
        *   **5 phần tử đầu tiên (`OBS_ROBOT_STATE_SIZE`):** Trạng thái của robot và đích.
            *   `state[0]`: `robot_x` (tọa độ x của robot)
            *   `state[1]`: `robot_y` (tọa độ y của robot)
            *   `state[2]`: `robot_orientation` (hướng của robot, radian)
            *   `state[3]`: `goal_x` (tọa độ x của đích)
            *   `state[4]`: `goal_y` (tọa độ y của đích)
        *   **Các phần tử tiếp theo (chia thành các khối `OBS_OBSTACLE_DATA_SIZE` = 9):** Thông tin về các vật cản được cảm biến phát hiện (tối đa `max_obstacles_in_observation` vật cản).
            *   **Khối vật cản thứ `i` (từ 0 đến `max_obstacles_in_observation - 1`):** Nằm ở chỉ số từ `5 + i * 9` đến `5 + (i+1) * 9 - 1`.
                *   `state[5 + i*9 + 0]`: `obstacle_x` (tọa độ x của vật cản)
                *   `state[5 + i*9 + 1]`: `obstacle_y` (tọa độ y của vật cản)
                *   `state[5 + i*9 + 2]`: `shape_type` (số nguyên đại diện loại hình dạng, ví dụ: 0=Circle, 1=Rectangle)
                *   `state[5 + i*9 + 3]`: `param1` (ví dụ: radius cho Circle, width cho Rectangle)
                *   `state[5 + i*9 + 4]`: `param2` (ví dụ: 0 cho Circle, height cho Rectangle)
                *   `state[5 + i*9 + 5]`: `param3` (ví dụ: 0 cho Circle, angle cho Rectangle)
                *   `state[5 + i*9 + 6]`: `is_dynamic` (1.0 nếu động, 0.0 nếu tĩnh)
                *   `state[5 + i*9 + 7]`: `velocity_x` (thành phần vận tốc x của vật cản động, 0 nếu tĩnh)
                *   `state[5 + i*9 + 8]`: `velocity_y` (thành phần vận tốc y của vật cản động, 0 nếu tĩnh)
            *   **Padding:** Nếu số vật cản phát hiện được ít hơn `max_obstacles_in_observation`, các khối còn lại sẽ được điền bằng số 0.0.
        *   **Tổng độ dài của mảng `state`:** `5 + max_obstacles_in_observation * 9`. Ví dụ, nếu `max_obstacles_in_observation = 10`, độ dài sẽ là `5 + 10 * 9 = 95`.

2.  **`action` (Hành động mong muốn):**
    *   **Kiểu dữ liệu:** Là một mảng NumPy (`numpy.ndarray`) với kiểu dữ liệu `np.float32`.
    *   **Nội dung:** Đây là **bản sao** của `hypothetical_action` được tính toán bởi hàm `_calculate_path_following_action`. Nó đại diện cho hành động (vận tốc dài và vận tốc góc) mà controller *sẽ thực hiện* nếu nó *bỏ qua* quy tắc chờ và chỉ tuân theo logic bám đường và tránh vật cản tĩnh. Đây chính là "nhãn" (label) mà Agent cần học để dự đoán.
    *   **Cấu trúc chi tiết của `action`:**
        *   Nó là một mảng 1 chiều có 2 phần tử.
        *   `action[0]`: `target_velocity` (vận tốc dài mục tiêu)
        *   `action[1]`: `target_steering_angle` (góc lái mục tiêu / vận tốc góc)

**Ví dụ về một phần tử trong `collected_data`:**

```python
(
  # State (np.ndarray, float32, shape=(95,) assuming max_obstacles=10)
  np.array([
      305.2, 101.8, 1.57, 400.0, 300.0,  # Robot state (x, y, theta, gx, gy)
      # Obstacle 1 (Dynamic Circle near robot)
      300.0, 150.0, 0.0, 20.0, 0.0, 0.0, 1.0, -2.5, 0.5,
      # Obstacle 2 (Static Rectangle further away)
      180.0, 150.0, 1.0, 50.0, 30.0, 0.78, 0.0, 0.0, 0.0,
      # Obstacle 3...9 (Padded with zeros if not sensed)
      0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
      # ... (repeat padding block 7 more times)
      0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
  ], dtype=np.float32),

  # Action (np.ndarray, float32, shape=(2,))
  np.array([5.5, -0.15], dtype=np.float32) # [target_velocity, target_steering_angle]
)
```

In [4]:
X, y = zip(*collection_data)
X = np.array(X)
y = np.array(y)
X = torch.from_numpy(X).float()
y = torch.from_numpy(y).float()
X.shape, y.shape

(torch.Size([3528, 95]), torch.Size([3528, 2]))

In [5]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train.shape, y_train.shape, X_test.shape, y_test.shape

(torch.Size([2822, 95]),
 torch.Size([2822, 2]),
 torch.Size([706, 95]),
 torch.Size([706, 2]))

In [6]:
class RobotAvoidanceNetwork(nn.Module):
    def __init__(self, obs_robot_state_size, obs_obstacle_data_size):
        self.obs_robot_state_size = obs_robot_state_size
        self.obs_obstacle_data_size = obs_obstacle_data_size
        super(RobotAvoidanceNetwork, self).__init__()

        self.state_mlp = nn.Sequential(
            nn.Linear(self.obs_robot_state_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
        )

        self.attention_obs_mlp = nn.Sequential(
            nn.Linear(self.obs_obstacle_data_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
        )

        self.obs_mlp = nn.Sequential(
            nn.Linear(self.obs_obstacle_data_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
        )

        self.final_mlp_out = nn.Sequential(
            nn.Linear(16 + 16, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 2),
        )

    def forward(self, observations): # N, obs_robot_state_size + num_obstacles * obs_obstacle_data_size
        num_obstacles = int((observations.shape[1] - self.obs_robot_state_size) // self.obs_obstacle_data_size)
        robot_state = observations[:, :self.obs_robot_state_size]
        obstacle_data = observations[:, self.obs_robot_state_size:]
        obstacle_data = obstacle_data.view(-1, num_obstacles, self.obs_obstacle_data_size) # N, num_obstacles, obs_obstacle_data_size

        # Robot state
        robot_state = self.state_mlp(robot_state) # N, 16

        # Obstacle data
        obstacle_attention = F.softmax(torch.sum(self.attention_obs_mlp(obstacle_data), dim=2, keepdim=True), dim=1)
        obstacle_data = torch.mul(obstacle_data, obstacle_attention) # N, num_obstacles, obs_obstacle_data_size
        obstacle_data = self.obs_mlp(obstacle_data) # N, num_obstacles, 16
        obstacle_data = torch.sum(obstacle_data, dim=1) # N, 16
        
        # Concatenate robot state and obstacle data
        x = torch.cat((robot_state, obstacle_data), dim=1)
        x = self.final_mlp_out(x)
        return x

        


In [None]:
model = RobotAvoidanceNetwork(5, 9)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
# Training
num_epochs = 1000
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    y_pred = model(X_train)
    loss = loss_fn(y_pred, y_train)

    loss.backward()
    optimizer.step()
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [100/1000], Loss: 3.2797
Epoch [200/1000], Loss: 1.8738
Epoch [300/1000], Loss: 1.6020
Epoch [400/1000], Loss: 1.4369
Epoch [500/1000], Loss: 1.1775
Epoch [600/1000], Loss: 1.0932
Epoch [700/1000], Loss: 1.0734
Epoch [800/1000], Loss: 1.0304
Epoch [900/1000], Loss: 1.0125
Epoch [1000/1000], Loss: 0.9992
Test Loss: 1.0229


In [9]:
# Evaluation
with torch.no_grad():
    model.eval()
    y_pred = model(X_test)
    test_loss = loss_fn(y_pred, y_test)
    print(f'Test Loss: {test_loss.item():.4f}')
    print(f'y_pred: {y_pred[:5]}')
    print(f'y_test: {y_test[:5]}')

Test Loss: 1.0229
y_pred: tensor([[ 3.5552,  0.4136],
        [ 0.9523, -0.1212],
        [ 0.9784,  0.7189],
        [ 8.2379,  0.1761],
        [ 3.5552,  0.4136]])
y_test: tensor([[ 1.0000,  0.7854],
        [ 1.1469, -0.1733],
        [ 1.0000,  0.7854],
        [ 8.1834,  0.2505],
        [ 1.0000,  0.7854]])


In [10]:
# Save the model
model_path = path.join(DATA_DIR, 'robot_avoidance_model.pth')
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')

Model saved to /Users/ngocanhh/Documents/Study/tinhToanTienHoa/Lab_Robot/rrt_ml/data/robot_avoidance_model.pth
