In [2]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from mazegenfromc import generate_maze

Tạo mê cung

In [3]:
def mazetensor(maze_np):
    maze_tensor = torch.from_numpy(maze_np).float()
    
    # Batch_size, Channels, Height, Width
    # Số lượng, số lớp, chiều cao, chiều rộng
    #  1 ảnh, 1 màu, cao 10, rộng 10
    maze_tensor = maze_tensor.unsqueeze(0).unsqueeze(0)
    
    return maze_tensor

Hàm lấy vị trí đích

In [4]:
def get_goal_position(maze):

    result = np.where(maze == 9)
    
    y = int(result[0][0])
    x = int(result[1][0])
    return (y, x)
    

Hàm lấy vị trí agent hiện tại

In [5]:
def get_current_position(maze):
    
    result = np.where(maze == 2)
    
    y = int(result[0][0])
    x = int(result[1][0])
    return (y, x)


Hàm lấy tầm nhìn

In [6]:
def get_9x9_view(maze_np, heatmap_np, agent_position):
    pad_size = 4
    
    # Padding
    padded_maze = np.pad(maze_np, pad_size, mode='constant', constant_values=1)
    padded_heat = np.pad(heatmap_np, pad_size, mode='constant', constant_values=99)
    
    y, x = agent_position[0] + pad_size, agent_position[1] + pad_size
    
    # Cắt vùng 9x9
    maze_cut = padded_maze[y-4:y+5, x-4:x+5]
    heat_cut = padded_heat[y-4:y+5, x-4:x+5]
    
    heat_norm = 1.0 / (1.0 + heat_cut)

    heat_norm[maze_cut == 1] = 0.0
    
    # Chồng thành Tensor [2, 9, 9]
    stack_map = torch.stack([
        torch.from_numpy(maze_cut).float(),
        torch.from_numpy(heat_norm).float()
    ], dim=0)
    
    # Thêm chiều [1, 2, 9, 9]
    return stack_map.unsqueeze(0)

Hàm lấy vector chỉ hướng đích

In [7]:
def get_goal_vector(agent_position, goal_position, maze_size):

    #agent_pos: (y, x)
    #goal_pos: (y, x)
    #maze_size: (H, W)

    H, W = maze_size
    
    # 1. Tính khoảng cách thô
    dy = goal_position[0] - agent_position[0]
    dx = goal_position[1] - agent_position[1]
    
    # 2. Chuẩn hóa về khoảng [-1, 1]
    dy_norm = dy / H
    dx_norm = dx / W
    
    return torch.tensor([dy_norm, dx_norm], dtype=torch.float32)

Hàm cập nhật ô đã đi

In [8]:
def update_visit_count(heatmap, agent_position):
    y, x = agent_position
    heatmap[y, x] += 1
    return heatmap

Tạo hàm ánh xạ số trong lớp quyết định và hướng đi của agent

In [9]:
def get_next_position(current_position, action):

    y, x = current_position
    
    action_map = {
        0: (-1, 0), # Lên
        1: (1, 0),  # Xuống
        2: (0, -1), # Trái
        3: (0, 1)   # Phải
    }
    
    dy, dx = action_map[action]
    next_y = y + dy
    next_x = x + dx
    
    return (next_y, next_x)

Tạo các lớp tích chập (convolution layer)

In [10]:
conv_layers = nn.Sequential(
    # Lớp 1: 9*9 -> 7*7
    nn.Conv2d(in_channels=2, out_channels=16, kernel_size=3), 
    nn.ReLU(),

    # Lớp 2: 7*7 -> 5*5
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
    nn.ReLU(),

    # Lớp 3: 5*5 -> 3*3
    nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
    nn.ReLU(),

    # Lớp 4: 3*3 -> 1*1
    nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3),
    nn.ReLU()
)

Tạo lớp quyết định

In [11]:
decision_layer = nn.Linear(66, 4)

Tạo hàm dự đoán bước tiếp theo

In [12]:
def predict_move(maze_np, heatmap_np, conv_layers, decision_layer, goal_position):
    with torch.no_grad():
        current_position = get_current_position(maze_np)

        live_view = get_9x9_view(maze_np, heatmap_np, current_position)
        view_4d = conv_layers(live_view)
        view_flat = torch.flatten(view_4d, start_dim=1)  # [1, 64, 1, 1] -> [1, 64]
    
        goal_vec = get_goal_vector(current_position, goal_position, maze_np.shape).unsqueeze(0)  # [dy_norm, dx_norm] -> [[dy_norm, dx_norm]]
    
        final_input = torch.cat((view_flat, goal_vec), dim=1)
    
        action_logits = decision_layer(final_input)
        action = torch.argmax(action_logits, dim=1).item()
    
        next_position = get_next_position(current_position, action)
    
    return action, next_position

Tạo hàm bước đi và hệ quả

In [13]:
def move(action, maze_np, heatmap_np, goal_position):

    curr_position = get_current_position(maze_np)
    old_dist = abs(goal_position[0] - curr_position[0]) + abs(goal_position[1] - curr_position[1])
    
    next_position = get_next_position(curr_position, action)
    nexty, nextx = next_position
    
    reward = 0
    done = False
    
    if nexty < 0 or nexty >= maze_np.shape[0] or nextx < 0 or nextx >= maze_np.shape[1] or maze_np[nexty, nextx] == 1:
        return -0.8, False 
        
    if maze_np[nexty, nextx] == 9:
        return 10.0, True
        
    # Lại gần đích thì thưởng, xa đích thì phạt
    new_dist = abs(goal_position[0] - nexty) + abs(goal_position[1] - nextx)
    reward = (old_dist - new_dist) * 0.2 
    
    if heatmap_np[nexty, nextx] > 0:
        reward -= 0.1 * heatmap_np[nexty, nextx] 
    else:
        reward -= 0.01 
    
    # Cập nhật trạng thái 2 lớp mê cung
    maze_np[curr_position[0], curr_position[1]] = 0
    maze_np[nexty, nextx] = 2
    update_visit_count(heatmap_np, next_position)
    
    return reward, done

Chọn optimizer và cách tính loss

In [14]:
all_parameters = list(conv_layers.parameters()) + list(decision_layer.parameters())
optimizer = optim.Adam(all_parameters, lr=0.001)

losscal = nn.MSELoss()

Deep Q-learning

In [15]:
def capture_state(maze_np, heatmap_np, goal_position):

    curr_position = get_current_position(maze_np)
    view = get_9x9_view(maze_np, heatmap_np, curr_position)
    
    goal_vec = get_goal_vector(curr_position, goal_position, maze_np.shape).unsqueeze(0) # [2] -> [1, 2]
    
    return (view, goal_vec)

def train_one_step(state, action, reward, done, goal_position, maze_np, heatmap_np, gamma=0.95):

    # Q-predict
    view, goal_vec = state

    view_out = conv_layers(view)
    view_flat = torch.flatten(view_out, start_dim = 1) # [1, 64, 1, 1] -> [1, 64]
    combined = torch.cat((view_flat, goal_vec), dim=1) # [1, 64] + [1, 2] -> [1, 66]
    
    current_logits = decision_layer(combined)
    q_value = current_logits[0, action] 

    # Q-target
    next_state = capture_state(maze_np, heatmap_np, goal_position)
    
    with torch.no_grad():
        if done:
            q_target = torch.tensor(reward, dtype=torch.float32)
        else:
            next_view, next_goal_vec = next_state 
            
            nv_out = conv_layers(next_view)
            nv_flat = torch.flatten(nv_out, 1)
            n_combined = torch.cat((nv_flat, next_goal_vec), dim=1) 
            
            next_logits = decision_layer(n_combined)
            
            q_target = reward + gamma * torch.max(next_logits) # Bellman equation

    # Hạ dốc sai số (Gradient Descent)
    loss = losscal(q_value, q_target)
    
    optimizer.zero_grad() 
    loss.backward()      
    optimizer.step()     
    
    return loss.item()

Replay Buffer

In [16]:
memory_buffer = []
limit_memory = 10000

def store_memory(state, action, reward, next_state, done):
    if len(memory_buffer) >= limit_memory:
        memory_buffer.pop(0)
    memory_buffer.append((state, action, reward, next_state, done))

def train_from_memory(batch_size=32, gamma=0.95):
    if len(memory_buffer) < batch_size:
        return 0
    
    batch = random.sample(memory_buffer, batch_size)
    
    total_loss = 0
    
    for state, action, reward, next_state, done in batch:
    
        view, goal_vec = state
        v_out = conv_layers(view) 
        v_flat = torch.flatten(v_out, 1)
        combined = torch.cat((v_flat, goal_vec), dim=1)
        q_values = decision_layer(combined) 
        q_value = q_values[0, action]
        
        with torch.no_grad():
            if done:
                q_target = torch.tensor(reward, dtype=torch.float32)
            else:
                nv, ng = next_state
                nv_out = conv_layers(nv)
                nv_flat = torch.flatten(nv_out, 1)
                n_combined = torch.cat((nv_flat, ng), dim=1)
                next_q = decision_layer(n_combined)
                q_target = reward + gamma * torch.max(next_q)
        
        loss = losscal(q_value, q_target)
        total_loss += loss
        
    # Gradient Descent
    optimizer.zero_grad() 
    avg_loss = total_loss / batch_size
    avg_loss.backward()
    optimizer.step()
    
    return avg_loss.item()

Đóng gói các hàm

In [17]:
def move_store_and_learn(maze_np, heatmap_np, goal_position, random_rate):

    state_before = capture_state(maze_np, heatmap_np, goal_position)
    
    # Cơ chế Epsilon-Greedy 
    if np.random.rand() < random_rate:
        action = np.random.randint(0, 4) # Chọn ngẫu nhiên 1 trong 4 hướng
    else:
        action, _ = predict_move(maze_np, heatmap_np, conv_layers, decision_layer, goal_position)
    
    reward, done = move(action, maze_np, heatmap_np, goal_position)

    state_after = capture_state(maze_np, heatmap_np, goal_position)
    
    #Lưu
    store_memory(state_before, action, reward, state_after, done)
    
    # Train
    loss = train_one_step(state_before, action, reward, done, goal_position, maze_np, heatmap_np)
    
    return reward, done, loss

In [None]:
def save_checkpoint(filename="maze_ai_checkpoint.pth", ep=0, epsilon=0.1):
    checkpoint = {
        'conv_state': conv_layers.state_dict(),       # Lưu lớp CNN
        'decision_state': decision_layer.state_dict(), # Lưu lớp Quyết định
        'optimizer_state': optimizer.state_dict(),     # Lưu trạng thái Optimizer                                
        'epsilon': epsilon                             
    }
    torch.save(checkpoint, filename)
    print(f"Đã lưu tham số vào file {filename}")

def load_checkpoint(filename="maze_ai_checkpoint.pth"):
    if torch.cuda.is_available():
        checkpoint = torch.load(filename)
    else:
        checkpoint = torch.load(filename, map_location=torch.device('cpu'))
    
    # Nạp trọng số vào các lớp
    conv_layers.load_state_dict(checkpoint['conv_state'])
    decision_layer.load_state_dict(checkpoint['decision_state'])
    optimizer.load_state_dict(checkpoint['optimizer_state'])
    
    epsilon = checkpoint['epsilon']
    
    print(f"Đã tải thành công! Tiếp tục ...")
    return epsilon

In [23]:
num_episodes = 1000  # Tổng số ván chơi để huấn luyện
max_steps_per_episode = 1000  # Giới hạn số bước mỗi ván để tránh đi luẩn quẩn
random_rate = 0.1  # Tỷ lệ 20% đi ngẫu nhiên để khám phá mê cung

print("Bắt đầu quá trình huấn luyện AI...")

for ep in range(num_episodes):
    # 1. Khởi tạo môi trường mới cho mỗi ván
    maze_np = generate_maze(30) 
    heatmap_np = np.zeros_like(maze_np, dtype=np.float32)
    goal_position = get_goal_position(maze_np)
    
    done = False
    step_count = 0
    total_reward = 0
    
    while not done and step_count < max_steps_per_episode:

        reward, done, loss = move_store_and_learn(maze_np, heatmap_np, goal_position, random_rate=random_rate)
        
        batch_loss = train_from_memory(batch_size=32)
        
        total_reward += reward
        step_count += 1
        
    # In kết quả sau mỗi ván để theo dõi tiến độ
    status = "THÀNH CÔNG" if done else "THẤT BẠI"
    print(f"Ván {ep:3d} | {status} | Bước: {step_count:3d} | Tổng điểm: {total_reward:6.2f} | Loss: {loss:.4f}")


    if random_rate > 0.01:
        random_rate *= 0.995

Bắt đầu quá trình huấn luyện AI...
Ván   0 | THẤT BẠI | Bước: 1000 | Tổng điểm: -507.69 | Loss: 0.0015
Ván   1 | THÀNH CÔNG | Bước: 478 | Tổng điểm: -49.43 | Loss: 35.7773
Ván   2 | THẤT BẠI | Bước: 1000 | Tổng điểm: -465.38 | Loss: 0.0157
Ván   3 | THẤT BẠI | Bước: 1000 | Tổng điểm: -206.29 | Loss: 0.0758
Ván   4 | THẤT BẠI | Bước: 1000 | Tổng điểm: -229.10 | Loss: 0.1885
Ván   5 | THẤT BẠI | Bước: 1000 | Tổng điểm: -205.60 | Loss: 0.0203
Ván   6 | THẤT BẠI | Bước: 1000 | Tổng điểm: -427.74 | Loss: 0.0153
Ván   7 | THÀNH CÔNG | Bước: 556 | Tổng điểm: -91.77 | Loss: 335.1788
Ván   8 | THẤT BẠI | Bước: 1000 | Tổng điểm: -225.89 | Loss: 0.0004
Ván   9 | THẤT BẠI | Bước: 1000 | Tổng điểm: -231.51 | Loss: 0.1100
Ván  10 | THẤT BẠI | Bước: 1000 | Tổng điểm: -233.86 | Loss: 1.0045
Ván  11 | THẤT BẠI | Bước: 1000 | Tổng điểm: -191.02 | Loss: 0.1419
Ván  12 | THÀNH CÔNG | Bước: 623 | Tổng điểm: -75.29 | Loss: 32.1550
Ván  13 | THẤT BẠI | Bước: 1000 | Tổng điểm: -431.05 | Loss: 8.1963
Ván  14 |

KeyboardInterrupt: 