Mô hình hóa hành vi người đi bộ hoặc robot bằng SLDS (HMM + Lọc Kalman)

Mục tiêu:
    Mô hình hóa động học đa chế độ của người đi bộ hoặc robot bằng cách sử dụng
    Switching Linear Dynamical Systems (SLDS), trong đó HMM đảm nhiệm việc
    chuyển đổi giữa các chế độ động học tuyến tính và bộ lọc Kalman ước tính
    trạng thái trong mỗi chế độ.

Các bước chính:
    1. Định nghĩa các chế độ động học (ví dụ: đi bộ chậm, đi bộ nhanh, dừng lại cho người đi bộ; di chuyển thẳng, rẽ trái, rẽ phải cho robot).
    2. Xây dựng mô hình HMM để quản lý việc chuyển đổi giữa các chế độ.
    3. Định nghĩa mô hình động học tuyến tính (LDS) cho mỗi chế độ.
    4. Sử dụng bộ lọc Kalman để ước tính trạng thái liên tục trong từng chế độ.
    5. (Tùy chọn) Thực hiện học tham số cho mô hình SLDS từ dữ liệu quan sát.
    6. (Tùy chọn) Thực hiện suy luận về chuỗi trạng thái ẩn và trạng thái liên tục.
    7. (Tùy chọn) Trực quan hóa kết quả.


In [18]:
import numpy as np
import matplotlib.pyplot as plt
from hmmlearn import hmm
from filterpy.kalman import KalmanFilter

1. Định nghĩa các chế độ động học

In [4]:
n_modes = 3  # Ví dụ: 3 chế độ (đi chậm, đi nhanh, dừng) hoặc (thẳng, trái, phải)

2. Xây dựng mô hình HMM
Ma trận chuyển đổi trạng thái (Transition Probability Matrix)
Ví dụ:
       Chậm   Nhanh  Dừng
 Chậm   0.8    0.1    0.1
 Nhanh  0.2    0.7    0.1
 Dừng   0.1    0.1    0.8

In [5]:
transmat = np.array([[0.8, 0.1, 0.1],
                     [0.2, 0.7, 0.1],
                     [0.1, 0.1, 0.8]])

# Xác suất trạng thái ban đầu (Start Probability)
startprob = np.array([0.6, 0.3, 0.1])

# Khởi tạo mô hình HMM (chỉ cấu trúc, không có phân phối phát xạ vì LDS xử lý phần này)
model_hmm = hmm.CategoricalHMM(n_components=n_modes, init_params="")
model_hmm.startprob_ = startprob
model_hmm.transmat_ = transmat


3. Định nghĩa mô hình động học tuyến tính (LDS) cho mỗi chế độ
Chúng ta sẽ mô hình hóa vị trí (x, y) và vận tốc (vx, vy)

In [6]:
state_dim = 4  # [x, y, vx, vy]
obs_dim = 2    # [x_observed, y_observed]
dt = 0.1       # Giả sử khoảng thời gian giữa các bước là 0.1 giây

lds_params = []
for i in range(n_modes):
    # Ma trận chuyển trạng thái A (State Transition Matrix) - mô tả cách trạng thái tiến triển theo thời gian
    A = np.array([[1, 0, dt, 0],
                  [0, 1, 0, dt],
                  [0, 0, 1, 0],
                  [0, 0, 0, 1]])

    # Ma trận quan sát C (Observation Matrix) - mô tả cách trạng thái được quan sát
    C = np.array([[1, 0, 0, 0],
                  [0, 1, 0, 0]])

    # Ma trận hiệp phương sai nhiễu quá trình Q (Process Noise Covariance)
    # Giả sử nhiễu nhỏ tác động lên vận tốc
    q = 0.01
    Q = np.diag([0, 0, q, q])

    # Ma trận hiệp phương sai nhiễu đo R (Measurement Noise Covariance)
    r = 0.1
    R = np.diag([r, r])

    lds_params.append({'A': A, 'C': C, 'Q': Q, 'R': R})

4. Sử dụng bộ lọc Kalman để ước tính trạng thái trong từng chế độ

In [7]:
def run_kalman_filter(observations, initial_state, lds_params):
    """Chạy bộ lọc Kalman cho một chuỗi quan sát và một chế độ LDS."""
    kf = KalmanFilter(dim_x=state_dim, dim_z=obs_dim)
    kf.x = initial_state
    kf.F = lds_params['A']
    kf.H = lds_params['C']
    kf.Q = lds_params['Q']
    kf.R = lds_params['R']

    n_steps = len(observations)
    means = np.zeros((n_steps, state_dim))
    covariances = np.zeros((n_steps, state_dim, state_dim))

    for i in range(n_steps):
        kf.predict()
        kf.update(observations[i])
        means[i] = kf.x
        covariances[i] = kf.P

    return means, covariances

5. Insert dữ liệu mô phỏng

In [10]:
import pandas as pd

# Thay đổi 'đường_dẫn_tới_file.csv' bằng đường dẫn thực tế đến file CSV của bạn
df = pd.read_csv('robot_dataset.csv')

# In ra một vài dòng đầu tiên của DataFrame để kiểm tra
print(df.head())

  Robot_ID   Task_Type Component_ID Sensor_Type            Sensor_Data  \
0  RBT_001  Inspection      CMP_460       LIDAR  1 (obstacle detected)   
1  RBT_002    Assembly      CMP_252     Thermal              85.3 (°C)   
2  RBT_003  Inspection      CMP_248     Thermal       92% (visual fit)   
3  RBT_004     Welding      CMP_433      Camera      98% (defect-free)   
4  RBT_005    Assembly      CMP_992      Camera       92% (visual fit)   

   Processing_Time (s)  Accuracy (%) Environmental_Status  \
0                 67.0          90.4               Stable   
1                 71.2          98.1               Stable   
2                 49.2          95.3             Unstable   
3                 74.5          90.2               Stable   
4                 64.5          97.2             Unstable   

   Energy_Consumption (kWh) Human_Intervention_Needed Obstacle_Detected  \
0                       2.2                        No               Yes   
1                       2.7           

6.  In ra ID của robot và thời gian xử lý trung bình

In [11]:
print("Robot IDs:", df['Robot_ID'].unique())
print("Thời gian xử lý trung bình:", df['Processing_Time (s)'].mean())

Robot IDs: ['RBT_001' 'RBT_002' 'RBT_003' 'RBT_004' 'RBT_005' 'RBT_006' 'RBT_007'
 'RBT_008' 'RBT_009' 'RBT_010' 'RBT_011' 'RBT_012' 'RBT_013' 'RBT_014'
 'RBT_015' 'RBT_016' 'RBT_017' 'RBT_018' 'RBT_019' 'RBT_020' 'RBT_021'
 'RBT_022' 'RBT_023' 'RBT_024' 'RBT_025' 'RBT_026' 'RBT_027' 'RBT_028'
 'RBT_029' 'RBT_030' 'RBT_031' 'RBT_032' 'RBT_033' 'RBT_034' 'RBT_035'
 'RBT_036' 'RBT_037' 'RBT_038' 'RBT_039' 'RBT_040' 'RBT_041' 'RBT_042'
 'RBT_043' 'RBT_044' 'RBT_045' 'RBT_046' 'RBT_047' 'RBT_048' 'RBT_049'
 'RBT_050' 'RBT_051' 'RBT_052' 'RBT_053' 'RBT_054' 'RBT_055' 'RBT_056'
 'RBT_057' 'RBT_058' 'RBT_059' 'RBT_060' 'RBT_061' 'RBT_062' 'RBT_063'
 'RBT_064' 'RBT_065' 'RBT_066' 'RBT_067' 'RBT_068' 'RBT_069' 'RBT_070'
 'RBT_071' 'RBT_072' 'RBT_073' 'RBT_074' 'RBT_075' 'RBT_076' 'RBT_077'
 'RBT_078' 'RBT_079' 'RBT_080' 'RBT_081' 'RBT_082' 'RBT_083' 'RBT_084'
 'RBT_085' 'RBT_086' 'RBT_087' 'RBT_088' 'RBT_089' 'RBT_090' 'RBT_091'
 'RBT_092' 'RBT_093' 'RBT_094' 'RBT_095' 'RBT_096' 'RBT_097' 'RBT_

7. In bộ lọc kalman để in ra chuỗi trạng thái ẩn

In [19]:
estimated_states_per_mode = []
for i in range(n_modes):
    initial_state_est = np.array([0.0, 0.0, 0.5, 0.2]) # Ước tính ban đầu
    means, covariances = run_kalman_filter(observations, initial_state_est, lds_params[i])
    estimated_states_per_mode.append(means)

NameError: name 'observations' is not defined

8. Trực quan hóa kết quả

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(observations[:, 0], observations[:, 1], 'ro', label='Quan sát')
plt.plot(true_states[:, 0], true_states[:, 1], 'g-', label='Trạng thái thật')
for i, est_states in enumerate(estimated_states_per_mode):
    plt.plot(est_states[:, 0], est_states[:, 1], '--', label=f'Ước tính (chế độ {i})')
plt.xlabel('X')
plt.ylabel('Y')
plt.title('Mô hình hóa hành vi bằng SLDS (HMM + Lọc Kalman)')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 4))
plt.plot(hidden_states, label='Trạng thái ẩn thật')
plt.yticks(range(n_modes))
plt.ylabel('Chế độ')
plt.xlabel('Thời gian')
plt.title('Chuỗi trạng thái ẩn thật')
plt.legend()
plt.grid(True)
plt.show()