<h1>NN Model</h1>

In [2]:
from pycocotools.coco import COCO
import matplotlib
import matplotlib.pyplot as plt
import os
import cv2
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as functions
import torch.optim as optim
from tqdm import tqdm
from torch.utils.data import DataLoader, Dataset
#import torchvision
#from torchvision import transforms
import re

In [3]:
print("cuda" if torch.cuda.is_available() else "cpu")

cuda


In [5]:
DATADIR = "cocodoom/"
USED_RUNS = ["run1", "run2", "run3"]

dataSplit, TRAIN_RUN = "run-full-train", "run1"

annFile = '{}{}.json'.format(DATADIR,dataSplit)

In [6]:
coco_train = COCO(annFile)

loading annotations into memory...
Done (t=27.60s)
creating index...
index created!


In [7]:
dataSplit, VAL_RUN = "run-full-val", "run2"

annFile = '{}{}.json'.format(DATADIR,dataSplit)

In [8]:
coco_val = COCO(annFile)

loading annotations into memory...
Done (t=21.27s)
creating index...
index created!


In [9]:
dataSplit, TEST_RUN = "run-full-test", "run3"

annFile = '{}{}.json'.format(DATADIR,dataSplit)

In [10]:
coco_test = COCO(annFile)

loading annotations into memory...
Done (t=12.86s)
creating index...
index created!


In [11]:
player_positions = {"run1":[], "run2":[], "run3":[]}
motion_vectors = {"run1":[], "run2":[], "run3":[]}

for run in USED_RUNS:
    with open(DATADIR+run+"/log.txt", 'r') as log_file:
        for line in log_file:
            if "player" in line:
                line = line.strip()
                tic, stats = line.split("player:")
                x, y, z, angle = stats.split(",")
    
                # Store position in the dictionary
                player_positions[run].append((float(x), float(y), float(z), float(angle)))
                if len(player_positions[run]) >= 2:
                    player_position = player_positions[run][-1]
                    prev_player_position = player_positions[run][-2]
                    
                    dx = player_position[0] - prev_player_position[0]
                    dy = player_position[1] - prev_player_position[1]
                    dz = player_position[2] - prev_player_position[2]
                    dangle = np.pi - abs(abs(player_position[3] - prev_player_position[3]) - np.pi)
                    
                    dx_relative = dx * np.cos(2 * np.pi - prev_player_position[3]) + dy * np.cos(prev_player_position[3] - 1/2 * np.pi)
                    dy_relative = dx * np.sin(2 * np.pi - prev_player_position[3]) + dy * np.sin(prev_player_position[3] - 1/2 * np.pi)
                    motion_vector = (dx_relative, dy_relative, dz, dangle)
                    motion_vectors[run].append(motion_vector)

In [12]:
class DoomMotionDataset(Dataset):
    def __init__(self, coco, run, input_window, prediction_window, transform=None):
        self.coco = coco
        self.run = run
        self.img_ids = self.coco.getImgIds()
        self.transform = transform
        self.input_window = input_window
        self.prediction_window = prediction_window

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        # Load the RGB image
        rgb_filename = self.coco.loadImgs(self.img_ids[idx])[0]['file_name']
        tic = int(rgb_filename.replace(".png", "").split("/")[-1])
        next_tic = tic+1
        previous_tic = tic-1
        prev_motion_vectors = []
        next_motion_vectors = []

        for t in range(input_window, 0, -1):
            if tic-t < 0:
                prev_motion_vectors.append(motion_vectors[run][0])
                continue
            prev_motion_vectors.append(motion_vectors[run][tic-t])

        for t in range(1, prediction_window+1):
            if tic+t+1 >= len(motion_vectors[self.run]):
                next_motion_vectors.append(motion_vectors[run][-1])
                continue
            next_motion_vectors.append(motion_vectors[run][tic+t])

        # if dx > 1000:
        #     print(f"idx: {idx}")
        #     print(f"rgb_filename: {rgb_filename}")
        #     print(f"tic: {tic}")
        #     print(f"next_tic: {next_tic}")
        #     print(f"previous_tic: {previous_tic}")
        #     print(f"Sus {idx}")
        #     print(f"prev_player_position: {prev_player_position}")
        #     print(f"player_position: {player_position}")
        #     print(f"next_player_position: {next_player_position}")
        #     print(f"prev_motion_vector: {prev_motion_vector}")
        #     print(f"next_motion_vector: {next_motion_vector}")

        #print(prev_motion_vectors)
        #print(next_motion_vectors)
            
        prev_motion_vectors = torch.tensor(prev_motion_vectors, dtype=torch.float32)
        next_motion_vectors = torch.tensor(next_motion_vectors, dtype=torch.float32)
        
        return prev_motion_vectors, next_motion_vectors


In [13]:
class NeuralNetwork(nn.Module):
  def __init__(self, batch_size, input_length, sequence_length, activation_function=functions.relu, device=torch.device("cpu")):
    super(NeuralNetwork, self).__init__()
    self.batch_size = batch_size
    self.input_length = input_length
    self.sequence_length = sequence_length
      
    self.encoder = nn.LSTM(input_size=4, hidden_size=1024, batch_first=True).to(device)
    self.decoder = nn.LSTM(input_size=4, hidden_size=1024).to(device)
    self.fc = nn.Linear(1024, 4).to(device)

  def forward(self, x):
    hidden = None
    #print(x[:,0].shape)
    for t in range(self.input_length):
        if hidden != None:
            _, hidden = self.encoder(x[:,t], hidden)
        else:
            _, hidden = self.encoder(x[:,t])
    input_decoder = torch.zeros(batch_size, 4, dtype=torch.float)
    #input_decoder = x[-1, :, :].unsqueeze(0)
    #input_decoder = x[-1]
    #print(x)
    #print(x[0][-1])
    #hidden = hidden[:, -1:, :]
    #print(hidden.shape)
    #cell = cell[:, -1:, :]
    output_tensor = torch.zeros(self.sequence_length, self.batch_size, 4).to(x.device)
    for t in range(self.sequence_length):
        #print(t)
        output_t, hidden = self.decoder(input_decoder, hidden)
        #print(output_t.shape)
        output_t = self.fc(output_t[-1])
        #print(output_t.shape)
        output_t = output_t.unsqueeze(0)
        #print(output_t.shape)
        #print(input_decoder.shape)
        #input_decoder_resized = input_decoder.squeeze(0).expand(output_t.shape)
        #input_decoder = output_t.unsqueeze(0)
        #output_tensor[t] = output_t[0][t]#.unsqueeze(0)
        output_tensor[t] = output_t.unsqueeze(0)
        
    return output_tensor

In [16]:
batch_size = 256
learning_rate = 1e-3
num_epochs = 10
input_window = 5
prediction_window = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNetwork(batch_size, input_window, prediction_window, device=device).to(device)

train_dataset = DoomMotionDataset(coco_train, TRAIN_RUN, input_window, prediction_window)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

val_dataset = DoomMotionDataset(coco_val, VAL_RUN, input_window, prediction_window)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Loss function and optimizer
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
    for batch_idx, (inputs, targets) in enumerate(progress_bar):
        inputs, targets = inputs.to(device), targets.to(device)
        #print(inputs.shape)
        #print(targets.shape)

        optimizer.zero_grad()

        outputs = model(inputs)
        outputs = outputs.permute(1, 0, 2)
        
        loss = criterion(outputs, targets)
        loss.backward()

        optimizer.step()

        running_loss += loss.item()

        progress_bar.set_postfix({
            "batch_loss": loss.item(),
            "batch_index": batch_idx + 1,
            "batch_size": inputs.size(0)
        })

    # Average loss per epoch
    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1}, Loss: {epoch_loss:.4f}")

    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    
    
    progress_bar = tqdm(val_loader, desc="Validation", unit="batch")
    
    with torch.no_grad():  # Disable gradient calculations for evaluation
        for batch_idx, (inputs, targets) in enumerate(progress_bar):
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            running_loss += loss.item()
            
            progress_bar.set_postfix({
                "batch_loss": loss.item(),
                "batch_index": batch_idx + 1,
                "batch_size": inputs.size(0)
            })
    
    # Average loss over all batches
    val_loss = running_loss / len(val_loader)
    print(f"Val Loss: {val_loss:.4f}")

# Save the trained model
torch.save(model.state_dict(), "movement_nn.pth")

cuda


AttributeError: module 'gmpy2' has no attribute 'version'

In [None]:
device = (torch.device("cuda" if torch.cuda.is_available() else "cpu"))
criterion = torch.nn.MSELoss()
model = NeuralNetwork(device="cuda").to(device)
model.load_state_dict(torch.load("movement_nn.pth", weights_only=True))

test_dataset = DoomMotionDataset(coco_test, TEST_RUN)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

model.eval()  # Set the model to evaluation mode
running_loss = 0.0


progress_bar = tqdm(test_loader, desc="Testing", unit="batch")

with torch.no_grad():  # Disable gradient calculations for evaluation
    for batch_idx, (inputs, targets) in enumerate(progress_bar):
        inputs, targets = inputs.to(device), targets.to(device)
        
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        running_loss += loss.item()
        
        progress_bar.set_postfix({
            "batch_loss": loss.item(),
            "batch_index": batch_idx + 1,
            "batch_size": inputs.size(0)
        })

# Average loss over all batches
test_loss = running_loss / len(test_loader)
print(f"Test Loss: {test_loss:.4f}")

In [None]:
print(player_positions["run1"][2])