In [1]:
import json
import sys
import os
# Add the parent directory to sys.path
sys.path.append(os.path.abspath('..'))
from src.tile_definitions import TILE_MAPPING

import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau

# Dataset Class

In [2]:
class GameDataset(Dataset):
    def __init__(self, json_file):
        with open(json_file, "r") as f:
            self.data = json.load(f)
        
        # Action to integer mapping
        self.action_mapping = {"UP": 0, "DOWN": 1, "LEFT": 2, "RIGHT": 3}

        # Create reverse mapping (tile_type -> ID) from TILE_MAPPING
        self.tile_type_to_id = {}
        for tile_id, (tile_type, _, _, _, _) in TILE_MAPPING.items():
            self.tile_type_to_id[tile_type] = tile_id
            
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sample = self.data[idx]
        state = sample["state"]
        action = self.action_mapping[sample["action"]]
        
        # Extract state components
        position = torch.tensor(state["position"], dtype=torch.float32)
        chips_collected = torch.tensor([state["player_collected_chips"]], dtype=torch.float32)
        total_chips_collected = torch.tensor([state["total_collected_chips"]], dtype=torch.float32)
        socket_unlocked = torch.tensor([int(state["socket_unlocked"])], dtype=torch.float32)
        nearest_chip = torch.tensor(state["nearest_chip"], dtype=torch.float32)
        exit_location = torch.tensor(state["exit_position"], dtype=torch.float32)
        
        # Process the full grid
        full_grid = []
        for row in state["full_grid"]:
            processed_row = []
            for tile_type in row:
                # Map to integer using the updated tile definitions
                processed_row.append(self.tile_type_to_id.get(tile_type, 1))
            full_grid.append(processed_row)
        
        full_grid_tensor = torch.tensor(full_grid, dtype=torch.float32)
        
        # Additional state information
        is_sliding = torch.tensor([float(state.get("is_sliding", False))], dtype=torch.float32)
        is_being_forced = torch.tensor([float(state.get("is_being_forced", False))], dtype=torch.float32)
        alive = torch.tensor([float(state.get("alive", True))], dtype=torch.float32)
        remaining_chips = torch.tensor([state.get("remaining_chips", 0)], dtype=torch.float32)
        other_player_pos = torch.tensor(state.get("other_player_position", [-1, -1]), dtype=torch.float32)
        
        # Concatenate all state information into a single vector
        state_vector = torch.cat([
            position,
            chips_collected, 
            total_chips_collected,
            socket_unlocked,
            nearest_chip,
            exit_location,
            full_grid_tensor.flatten(),
            is_sliding,
            is_being_forced,
            alive,
            remaining_chips,
            other_player_pos
        ])
        
        return state_vector, torch.tensor(action, dtype=torch.long)

## Load Dataset and Test

In [33]:
dataset = GameDataset("../data/human_play_data_level0.json")
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
sample_vector, sample_action = dataset[0]  
print(f"🔢 Sample Vector: {sample_vector.size()}")  # 🔥 샘플 벡터 크기 확인!
print(f"📊 Loaded {len(dataset)} samples.")

🔢 Sample Vector: torch.Size([184])
📊 Loaded 646 samples.


# Behavior Cloning Model

In [15]:
import torch.nn as nn
import torch.optim as optim

In [16]:
# FNC Network for behavior cloning
class BehaviorCloningModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(BehaviorCloningModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),                          
            nn.Linear(256, output_size)
        )

    def forward(self, x):
        return self.fc(x)

In [29]:
input_size = len(dataset[0][0])  # Size of State Vector
print(dataset[0])
output_size = 4  # Possible Actions (UP, DOWN, LEFT, RIGHT)
model = BehaviorCloningModel(input_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

(tensor([  2.,   6.,   0.,   0.,   0.,   3.,   3.,   6.,   6.,   1.,   1.,   1.,
          1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   0.,
          0.,   0.,   0.,   0.,  47.,   0.,   0.,   0.,   0.,   0.,   1.,   1.,
          0.,   0.,   4.,   1.,   0.,   0.,   0.,   1.,   3.,   0.,   0.,   1.,
          1.,   0.,   0., 200.,   1.,   0.,   0.,   0.,   1., 201.,   0.,   0.,
          1.,   1.,   0.,   1.,   1.,   1.,   0.,   0.,   0.,   1.,   1.,   1.,
          0.,   1.,   1.,   0.,   0.,   0.,   0.,   0.,  34.,   0.,   0.,   0.,
          0.,   0.,   1.,   1.,   0.,   0.,   0.,   0.,  34.,  21.,  34.,   0.,
          0.,   0.,   0.,   1.,   1.,   0.,   0.,   0.,   0.,   0.,  34.,   0.,
          0.,   0.,   0.,   0.,   1.,   1.,   0.,   0.,   0.,   0.,   0.,   0.,
          0.,   0.,   0.,   0.,   0.,   1.,   1.,   0.,   1.,   1.,   1.,   0.,
          0.,   0.,   1.,   1.,   1.,   0.,   1.,   1.,   0.,   3., 202.,   1.,
          0.,   0.,   0.,   1., 203.,  

In [18]:
print(f"input_size {input_size}, output_size: {output_size}")

input_size 184, output_size: 4


## Training Loop

In [34]:
# traning configuration
training_config = {
    "epochs": 2000,
    "model_name": "BehaviorCloningModel",
    "intput_size": input_size,
    "output_size": output_size,
    "optimizer": "Adam",
    "learning_rate": 0.01,
    "batch_size": 128,
    "criterion": "CrossEntropyLoss",
    "activation": "ReLU",
    "scheudler": "StepLR"
}

In [35]:
import wandb
wandb.login(key="294ac5de6babc54da53b9aadb344b3bb173b314d")
# change name for each run
wandb.init(project="bc_surrogate_partner", name="mar_05", id="whole_256_FCN-3Layer_2000ep_4", resume="never")
wandb.config.update(training_config)

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /Users/neo/.netrc


In [36]:
# 🔥 훈련 루프
num_epochs = training_config["epochs"]
scheduler = StepLR(optimizer, step_size=200, gamma=0.9)

for epoch in range(num_epochs):
    total_loss = 0

    for state_vector, action in dataloader:
        optimizer.zero_grad()

        # predict
        outputs = model(state_vector)

        # loss calculation
        loss = criterion(outputs, action)
        total_loss += loss.item()

        # back propagation
        loss.backward()
        optimizer.step()
    
    scheduler.step()
    
    if (epoch + 1) % 100 == 0:
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, LR: {scheduler.get_last_lr()[0]:.4f}")
        wandb.log({"epoch": epoch + 1, "loss": avg_loss, "learning_rate": scheduler.get_last_lr()[0]})
        
# close wandb
wandb.finish()

Epoch 100/2000, Loss: 0.1162, LR: 0.0003
Epoch 200/2000, Loss: 0.0595, LR: 0.0003
Epoch 300/2000, Loss: 0.0619, LR: 0.0003
Epoch 400/2000, Loss: 0.0616, LR: 0.0003
Epoch 500/2000, Loss: 0.0739, LR: 0.0003
Epoch 600/2000, Loss: 0.0493, LR: 0.0003
Epoch 700/2000, Loss: 0.1809, LR: 0.0003
Epoch 800/2000, Loss: 0.0524, LR: 0.0002
Epoch 900/2000, Loss: 0.0442, LR: 0.0002
Epoch 1000/2000, Loss: 0.0442, LR: 0.0002
Epoch 1100/2000, Loss: 0.0872, LR: 0.0002
Epoch 1200/2000, Loss: 0.0368, LR: 0.0002
Epoch 1300/2000, Loss: 0.0376, LR: 0.0002
Epoch 1400/2000, Loss: 0.0372, LR: 0.0002
Epoch 1500/2000, Loss: 0.0407, LR: 0.0002
Epoch 1600/2000, Loss: 0.0381, LR: 0.0002
Epoch 1700/2000, Loss: 0.0350, LR: 0.0002
Epoch 1800/2000, Loss: 0.0432, LR: 0.0001
Epoch 1900/2000, Loss: 0.0544, LR: 0.0001
Epoch 2000/2000, Loss: 0.0357, LR: 0.0001


0,1
epoch,▁▁▂▂▂▃▃▄▄▄▅▅▅▆▆▇▇▇██
learning_rate,█▇▇▆▆▅▅▄▄▄▄▃▃▂▂▂▂▁▁▁
loss,▅▂▂▂▃▂█▂▁▁▄▁▁▁▁▁▁▁▂▁

0,1
epoch,2000.0
learning_rate,0.00012
loss,0.03569


In [37]:
# Model save directory
model_path = "../model/lv1_bc_model_2.1.pth"
# Save model
torch.save(model.state_dict(), model_path)

In [13]:
def predict_action(model, state):
    model.eval()
    with torch.no_grad():
        state_vector = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        output = model(state_vector)
        action_idx = torch.argmax(output).item()
    
    action_mapping = {0: "UP", 1: "DOWN", 2: "LEFT", 3: "RIGHT"}
    return action_mapping[action_idx]

test_state, _ = dataset[5]  # Check the first sample in the dataset
predicted_action = predict_action(model, test_state)
print(f"AI Predicted Action: {predicted_action}")

AI Predicted Action: DOWN


  state_vector = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
