# Behavior Cloning

## Data Loader

In [None]:
import torch
from PIL import Image
from torch.utils.data import Dataset
import numpy as np

In [2]:
class CarRacingDataset_RNN(Dataset):
    def __init__(self, images, labels, sequence_length, transform=None):
        self.images = images
        self.labels = labels
        self.sequence_length = sequence_length
        self.transform = transform

    def __getitem__(self, index):
        # Extract a sequence of frames
        start_idx = index
        end_idx = start_idx + self.sequence_length
        sequence = self.images[start_idx:end_idx]
        #sequence = np.transpose(sequence, (0, 3, 1, 2))
        label_sequence = self.labels[start_idx:end_idx]

        if len(sequence) < self.sequence_length:
            # If the sequence is shorter than the specified length, pad it
            padding_frames = [np.zeros_like(sequence[0])] * (self.sequence_length - len(sequence))
            sequence = np.concatenate([sequence, padding_frames])
            # You might need to handle padding for labels as well based on your requirement
            last_label = label_sequence[-1]
            padding_labels = [last_label] * (self.sequence_length - len( label_sequence))

            label_sequence = np.concatenate((label_sequence,padding_labels), axis=0)

        if self.transform:

            transformed_sequence = [self.transform(frame) for frame in sequence]
            # Leave the transformed frames as PyTorch tensors
            sequence = torch.stack(transformed_sequence)

        return sequence, label_sequence

    def __len__(self):
        return len(self.images) - self.sequence_length



class CarRacingDataset(Dataset):
    def __init__(self, images, labels, transform=None, grayscale = False):
        self.images = images
        self.labels = labels
        self.transform = transform
        self.grayscale = grayscale

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.fromarray(self.images[idx])
        label = torch.tensor(self.labels[idx], dtype=torch.float32)

        if self.grayscale:
            image = image.convert("L")

        if self.transform:
            image = self.transform(image)

        return image, label


## Model

In [3]:
import torch
import torch.nn as nn

In [4]:
class CNN_RNN_Classifier(nn.Module):
    def __init__(self, in_channels=None, out_size=None, rnn_hidden_size=64, rnn_num_layers=1):
        super(CNN_RNN_Classifier, self).__init__()

        self.rnn_hidden_size = rnn_hidden_size
        self.rnn_num_layers = rnn_num_layers

        # CNN layers
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(64 * 22 * 22, 512)
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2, stride=2)

        # RNN layer
        self.rnn = nn.LSTM(64 * 22 * 22, rnn_hidden_size, rnn_num_layers, batch_first=True)

        # Fully connected layer for classification
        self.fc2 = nn.Linear(rnn_hidden_size, out_size)

        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        batch_size, seq_len, c, h, w = x.size()  # (batch,seq,3,96,96)
        c_in = x.view(batch_size * seq_len, c, h, w).float()

        x = self.relu(self.conv1(c_in))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool1(x)  # (batch size * seq, 64, 22, 22)
        x = x.reshape(batch_size * seq_len, -1)
        # rnn_input = self.relu(self.dropout(self.fc1(x)))
        rnn_input = self.relu(self.dropout(x))
        rnn_input = rnn_input.view(batch_size, seq_len, -1)
        rnn_out, _ = self.rnn(rnn_input)

        output = self.fc2(rnn_out)  # [batch, seq, 4]

        return output


class CNNClassifier(nn.Module):  # Architecture
    def __init__(self, in_channels=None, out_size=None):
        super(CNNClassifier, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(64 * 22 * 22, 512)
        self.fc2 = nn.Linear(512, out_size)
        self.pool1 = nn.MaxPool2d(2, stride=2)
        self.relu = nn.ReLU()

        self.dropout = nn.Dropout(0.0)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool1(x)
        x = x.view(x.size(0), -1)

        x = self.relu(self.dropout(self.fc1(x)))
        x = self.fc2(x)

        return x

## Visualizer

In [6]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import torchvision.transforms as transforms
import torch

In [7]:
def visualize(data_loader, num_samples=30):

    for images, actions in data_loader:

        rows = int(np.sqrt(num_samples))
        cols = int(np.ceil(num_samples / rows))
        plt.figure(figsize=(10, 10))

        for i in range(num_samples):
            plt.subplot(rows, cols, i + 1)
            observation = images[i]
            action = actions[i].tolist()
            plt.imshow(np.transpose(observation, (1, 2, 0)), interpolation='nearest')
            plt.title(f"Action: {action}", fontsize=7)
            plt.axis('off')

        plt.show()
        break


class ChangeColorTransform:

    def __call__(self, img):

        img = np.array(img)
        img = np.transpose(img, (1, 2, 0))
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

        mask_grey = cv2.inRange(hsv, (0, 0, 0.2), (180, 0.1, 0.5))  # Mask for grey color

        img[mask_grey > 0] = (50, 0.5, 0.3)  # Grey to Brown color

        img = img.transpose((2, 0, 1))
        img = torch.from_numpy(img).float()

        return img

class RandomCropAndRotation:
    def __init__(self, crop_size=(80, 80), rotation_angle=20):
        self.crop_size = crop_size
        self.rotation_angle = rotation_angle

    def __call__(self, img):
        # Random crop
        i, j, h, w = transforms.RandomCrop.get_params(img, output_size=self.crop_size)
        img = transforms.functional.crop(img, i, j, h, w)

        # Random rotation
        angle = np.random.uniform(-self.rotation_angle, self.rotation_angle)
        img = transforms.functional.rotate(img, angle)


        return img

## Data Collector

In [None]:
import gym
import numpy as np
import keyboard
from PIL import Image

In [None]:
def get_discrete_action(action):
    """
    Map continuous [steer, gas, brake] to one-hot 5-dim:
      [left, right, accel, brake, no-op]
    """
    if   action[0] == -0.5: return [1, 0, 0, 0, 0]
    elif action[0] ==  0.5: return [0, 1, 0, 0, 0]
    elif action[1] ==  0.5: return [0, 0, 1, 0, 0]
    elif action[2] ==  0.5: return [0, 0, 0, 1, 0]
    else:                  return [0, 0, 0, 0, 1]

def collect_manual_data(env,
                        num_episodes: int,
                        frames_per_ep: int,
                        trim_initial: int,
                        raw_out_path: str):
    """
    Plays `num_episodes` sequentially under human control, collects frames,
    drops the first `trim_initial` per episode, and writes one .npz.
    """
    all_imgs, all_labels = [], []

    for ep in range(1, num_episodes+1):
        print(f"[Episode {ep}/{num_episodes}]")
        imgs_buf, labs_buf = [], []
        env.reset()

        for t in range(frames_per_ep):
            action = [0.0, 0.0, 0.0]
            if   keyboard.is_pressed('left'):  action[0] = -0.5
            elif keyboard.is_pressed('right'): action[0] =  0.5
            elif keyboard.is_pressed('up'):    action[1] =  0.5
            elif keyboard.is_pressed('down'):  action[2] =  0.5

            obs, _, done, _, _ = env.step(action)
            env.render()

            cropped = obs[0:82, 0:96]
            imgs_buf.append(cropped)
            labs_buf.append(get_discrete_action(action))

            if done or keyboard.is_pressed('q'):
                print("  → terminated early")
                break

        # drop warm-up frames
        imgs_buf = imgs_buf[trim_initial:]
        labs_buf = labs_buf[trim_initial:]
        all_imgs.extend(imgs_buf)
        all_labels.extend(labs_buf)

    env.close()

    np.savez(raw_out_path,
             images=np.array(all_imgs, dtype=np.uint8),
             labels=np.array(all_labels, dtype=float))
    print(f"[Saved raw data] {raw_out_path}")

def prepare_training_data(raw_path: str,
                          train_out_path: str,
                          resize_to=(96,96)):
    """
    Loads raw .npz, resizes all frames, drops the no-op class,
    and writes the 4-class training .npz.
    """
    data = np.load(raw_path)
    imgs = data['images']   # shape: (N, H, W, 3)
    labs = data['labels']   # shape: (N, 5)

    N = len(labs)
    H, W = resize_to
    resized = np.zeros((N, H, W, 3), dtype=np.uint8)

    for i in range(N):
        pil = Image.fromarray(imgs[i])
        resized[i] = np.array(pil.resize((W, H)))

    # keep only actions [0–3], drop no-op (index 4)
    mask = labs[:,4] != 1
    final_imgs = resized[mask]
    final_labs = labs[mask, :4]

    np.savez(train_out_path,
             images=final_imgs,
             labels=final_labs)
    print(f"[Saved training data] {train_out_path}")
    print(f"[Final dataset size] {len(final_labs)} examples")

if __name__ == "__main__":
    # ───────── USER SETTINGS ─────────
    NUM_EPISODES       = 18
    FRAMES_PER_EPISODE = 800
    TRIM_INITIAL       = 30
    RAW_FILE           = "manual_control_all_eps.npz"
    TRAIN_FILE         = "training_data_4class_all_eps.npz"
    RESIZE_DIMS        = (96, 96)
    # ──────────────────────────────────

    env = gym.make('CarRacing-v2', render_mode='human')

    # 1) collect & save one raw file for all episodes
    collect_manual_data(env,
                        num_episodes=NUM_EPISODES,
                        frames_per_ep=FRAMES_PER_EPISODE,
                        trim_initial=TRIM_INITIAL,
                        raw_out_path=RAW_FILE)

    # 2) process that raw file into your final training set
    prepare_training_data(RAW_FILE,
                          TRAIN_FILE,
                          resize_to=RESIZE_DIMS)

## Trainer

In [9]:
from torchvision.transforms import Compose, ToTensor, Resize, RandomHorizontalFlip, RandomVerticalFlip, Normalize
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from tqdm import tqdm
import os
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [10]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


# TRAINING

def train(model, dataloader, loss_fn, optimizer):
    model.train()
    training_loss = 0
    training_acc = 0
    run_loss = 0
    total_train = 0
    correct_train = 0
    loop = tqdm(dataloader, leave=True)

    for batch_idx, (input, label) in enumerate(loop):
        input = input.to(DEVICE)
        label = label.to(DEVICE)

        optimizer.zero_grad()
        output = model(input)  # input one batch to model

        loss = loss_fn(output.view(-1, output.size(2)),
                       label.argmax(dim=2).view(-1).long())  # Calculates average loss of batch

        loss.backward()
        optimizer.step()
        loop.set_postfix(loss=loss.item())

        run_loss += loss.item() * input.size(0)
        _, predicted = torch.max(output.data, 2)
        total_train += label.view(-1, label.size(2)).size(0)
        correct_train += (predicted == label.argmax(dim=2)).sum().item()

    training_acc += (100 * correct_train / total_train)
    training_loss += (run_loss / len(dataloader))  # mean loss for all batches

    return training_loss, training_acc


def validation(model, val_loader, loss_fn):  # VALIDATION
    model.eval()  # evaluation mode
    validation_loss = 0
    validation_acc = 0
    total_test = 0
    correct_test = 0
    val_loss = 0

    with torch.no_grad():
        val_loop = tqdm(val_loader, leave=True)
        for batch_idx, (val_input, val_label) in enumerate(val_loop):
            val_input, val_label = val_input.to(DEVICE), val_label.to(DEVICE)

            val_output = model(val_input)
            loss = loss_fn(val_output.view(-1, val_output.size(2)), val_label.argmax(dim=2).view(-1).long())
            val_loss += loss.item() * val_input.size(0)
            _, predicted = torch.max(val_output.data, 2)
            total_test += val_label.view(-1, val_label.size(2)).size(0)
            correct_test += (predicted == val_label.argmax(dim=2)).sum().item()

    validation_acc += (100 * correct_test / total_test)
    validation_loss += (val_loss / len(val_loader))

    return validation_loss, validation_acc


def test(model, test_loader, loss_fn):
    model.eval()
    correct_test = 0
    total_test = 0
    total_loss = 0
    test_loss = 0

    with torch.no_grad():
        for input, label in test_loader:
            input, label = input.to(DEVICE), label.to(DEVICE)
            output = model(input)
            loss = loss_fn(output.view(-1, output.size(2)), label.argmax(dim=2).view(-1).long())
            test_loss += loss.item() * input.size(0)
            _, predicted = torch.max(output.data, 2)
            total_test += label.view(-1, label.size(2)).size(0)
            correct_test += (predicted == label.argmax(dim=2)).sum().item()

            # predicted_flat = predicted.view(-1).cpu().numpy()
            # label_flat = label.argmax(dim=2).view(-1).cpu().numpy()
            #
            # precision = precision_score(label_flat, predicted_flat, average='weighted')
            # recall = recall_score(label_flat, predicted_flat, average='weighted')
            # f1 = f1_score(label_flat, predicted_flat, average='weighted')
            #
            # print("Precision: {:.4f}".format(precision))
            # print("Recall: {:.4f}".format(recall))
            # print("F1 Score: {:.4f}".format(f1))

    accuracy = (100 * correct_test / total_test)
    total_loss += test_loss / len(test_loader)

    return accuracy, total_loss

In [11]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"


def main():
    model = CNN_RNN_Classifier(in_channels=3, out_size=4).to(DEVICE)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    BATCH_SIZE = 16
    EPOCH = 5

    TRAIN_4 = True
    TEST_4 = True
    PLOT = True

    data = np.load("../RL-proj/training_data_4class_all_eps.npz")
    images = data['images']
    labels = data['labels'].astype(float)

    # Train data
    split_ratio = 0.8
    split_index = int(len(images) * split_ratio)

    # 4 class
    X_train, X_temp = images[:split_index], images[split_index:]
    y_train, y_temp = labels[:split_index], labels[split_index:]

    # Validation, Test data
    split_ratio_val_test = 0.5 
    split_index_val = int(len(X_temp) * split_ratio_val_test)

    # 4 class
    X_val, X_test = X_temp[:split_index_val], X_temp[split_index_val:]
    y_val, y_test = y_temp[:split_index_val], y_temp[split_index_val:]

    transform_list = Compose([ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    train_dataset_RNN = CarRacingDataset_RNN(X_train, y_train, sequence_length=5, transform=transform_list)
    train_loader_RNN = DataLoader(train_dataset_RNN, batch_size=BATCH_SIZE, shuffle=True)

    val_dataset_RNN = CarRacingDataset_RNN(X_val, y_val, sequence_length=5, transform=transform_list)
    val_loader_RNN = DataLoader(val_dataset_RNN, batch_size=BATCH_SIZE, shuffle=False)

    test_dataset_RNN = CarRacingDataset_RNN(X_test, y_test, sequence_length=5, transform=transform_list)
    test_loader_RNN = DataLoader(test_dataset_RNN, batch_size=BATCH_SIZE, shuffle=False)

    val_losses, train_losses, train_accs, val_accs = [], [], [], []

    if TRAIN_4:

        best_acc = 0
        for i in range(EPOCH):
            train_loss, train_acc = train(model, train_loader_RNN, loss_fn, optimizer)
            val_loss, val_acc = validation(model, val_loader_RNN, loss_fn)

            print(f"EPOCH: {i+1} has validation accuracy of {val_acc:.2f} and loss of {val_loss:.4f}")
            if val_acc > best_acc:  # best accuracy out of all epochs
                best_acc = val_acc
                torch.save(model.state_dict(), 'weights/modelLSTM_new.pth')

                # Load the best model weights
            print("Saving best model with accuracy: ", best_acc)

            train_losses.append(train_loss)
            train_accs.append(train_acc)

            val_losses.append(val_loss)
            val_accs.append(val_acc)

    if PLOT:
        # loss plot
        plt.figure(figsize=(15, 60))

        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.legend()
        plt.title(f'Car Data loss plot, BATCH SIZE={BATCH_SIZE}, EPOCH = {EPOCH}')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')

        plt.show()

        # accuracy plot

        plt.figure(figsize=(10, 40))

        plt.plot(train_accs, label='Train Accuracy')
        plt.plot(val_accs, label='Validation Accuracy')
        plt.title(f'Car Data accuracy plot, BATCH SIZE={BATCH_SIZE}, EPOCH = {EPOCH}')
        plt.legend()
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy')

        plt.show()

    if TEST_4:

        if os.path.exists('weights/modelLSTM_new.pth'):
            model.load_state_dict(torch.load('weights/modelLSTM_new.pth'))
        acc, loss = test(model, test_loader_RNN, loss_fn)
        print("Test accuracy(4 class): ", acc)


In [12]:
# if __name__ == "__main__":
#     main()

## Simulator

In [13]:
import gym
import numpy as np
import torch
from torchvision.transforms import transforms
import keyboard

In [None]:
model_path= './weights/modelLSTM.pth'

model = CNN_RNN_Classifier(in_channels=3, out_size=4).to("cpu")


model.load_state_dict(torch.load(model_path))
model.eval()

# transform for input images
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((96, 96)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])


def preprocess_observation(observation, prev_obs_buffer):
    obs_array = observation[0:82, 0:96]  # crop
    obs_tensor = transform(obs_array)
    obs_tensor = obs_tensor.unsqueeze(0)  # Add batch dim

    # Append current observation to buffer
    prev_obs_buffer = torch.cat([prev_obs_buffer[1:], obs_tensor], dim=0)

    return prev_obs_buffer


def get_model_action(observation):
    with torch.no_grad():

        output = model(observation.unsqueeze(0))  # Add batch dim
        actions = torch.argmax(output, dim=2).squeeze().tolist()
        for action in actions:
            if action == 0:
                return [-0.2, 0.0, 0.0]
            if action == 1:
                return [0.2, 0.0, 0.0]
            if action == 2:
                return [0.0, 0.2, 0.0]
            if action == 3:
                return [0.0, 0.0, 0.2]
            else:
                return [0.0, 0.0, 0.0]


def main():
    env = gym.make('CarRacing-v2', render_mode='human')

    obs = env.reset()
    obs_buffer = torch.zeros(5, 3, 96, 96)

    for frame in range(1, 5000):  # frames
        if frame <= 30:
            # random actions for the first 30 frames
            action = env.action_space.sample()
        else:

            obs_buffer = preprocess_observation(obs, obs_buffer)
            action = get_model_action(obs_buffer)

        obs, _, done, _, _ = env.step(action)
        env.render()

        if keyboard.is_pressed('q' or 'Q'):
            break

        if done:
            break

    env.close()


if __name__ == "__main__":
    main()
