In [7]:
import os
import random
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch import optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.io import read_image
from tqdm import tqdm

In [None]:
RANDOM_SEED = 42
BATCH_SIZE = 128
EPOCHS = 100
LEARNING_RATE = 1e-4
PATCH_SIZE = 8
HEIGHT = 32
WIDTH = 64
IN_CHANNELS = 3
NUM_HEADS = 8
DROPOUT = 0.001
ADAM_WEIGHT_DECAY = 0
ADAM_BETAS = (0.9, 0.999)
ACTIVATION="gelu"
NUM_ENCODERS = 4
EMBED_DIM = (PATCH_SIZE ** 2) * IN_CHANNELS # (8**2)*3=192
NUM_PATCHES = (HEIGHT // PATCH_SIZE) * (WIDTH // PATCH_SIZE) # 4*8=32

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = "cuda" if torch.cuda.is_available() else "cpu"

In [2]:
class CicIds2017(Dataset):
    BASE_PATH = "C:\VScode Projects\FIIT_MASTERS\DP\datasets\CIC-IDS-2017"
    MAPPING_FILE = "\cicids2017_img.csv"
    index: int
    batch_size: int
    classes_count: int
    classes_list: list
    
    def __init__(self, shuffle: bool = False):        
        self.mapping = pd.read_csv(self.BASE_PATH+self.MAPPING_FILE)
        self.mapping = pd.get_dummies(self.mapping, columns=['label'])
        
        if shuffle:
            self.mapping = self.mapping.sample(frac=1) # shuffle
        
        self.classes_list = [label.split("_")[1] for label in self.mapping.columns[1:]]
        
        self.mapping = self.mapping.to_numpy()
        
        self.classes_count = len(self.mapping[0]) - 1
        
        self.transform = transforms.Compose([transforms.ToTensor()]) 
        
    def __len__(self):
        return len(self.mapping)
    
    def __getitem__(self, idx):
        img_name = self.mapping[idx, 0]
        img_path = os.path.join(self.BASE_PATH + "\image", img_name)
        img = read_image(img_path)
        
        label = [1 if label_class is True else 0 for label_class in self.mapping[idx, 1:]]
        label = np.array(label)
        
        return img, label
    
    def translate_encoded_label(self, encoded_label):
        return self.classes_list[list(encoded_label).index(1)]

In [3]:
dataset = CicIds2017()
loader = DataLoader(dataset, batch_size=64)
train_features, train_labels = next(iter(loader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

Feature batch shape: torch.Size([64, 3, 32, 64])
Labels batch shape: torch.Size([64, 15])


In [23]:
class PatchEmbedding(nn.Module):
    def __init__(self, embed_dim, patch_size, num_patches, dropout, in_channels):
        super().__init__()
        self.patcher = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=embed_dim,
                kernel_size=patch_size,
                stride=patch_size,
            ),
            nn.Flatten(2))

        self.cls_token = nn.Parameter(torch.randn(size=(1, in_channels, embed_dim)), requires_grad=True)
        self.position_embeddings = nn.Parameter(torch.randn(size=(1, num_patches+in_channels, embed_dim)), requires_grad=True)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):        
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)

        x = self.patcher(x).permute(0, 2, 1)
        x = torch.cat([cls_token, x], dim=1)
        x = self.position_embeddings + x
        x = self.dropout(x)
        return x

In [25]:
model = PatchEmbedding(EMBED_DIM, PATCH_SIZE, NUM_PATCHES, DROPOUT, IN_CHANNELS).to(device)
x = torch.randn(BATCH_SIZE, IN_CHANNELS, HEIGHT, WIDTH).to(device)
print(model(x).shape)

torch.Size([128, 35, 192])


In [28]:
class ViT(nn.Module):
    def __init__(self, num_patches, num_classes, patch_size, embed_dim, num_encoders, num_heads, dropout, activation, in_channels):
        super().__init__()
        self.embeddings_block = PatchEmbedding(embed_dim, patch_size, num_patches, dropout, in_channels)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dropout=dropout, activation=activation, batch_first=True, norm_first=True)
        self.encoder_blocks = nn.TransformerEncoder(encoder_layer, num_layers=num_encoders)

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(normalized_shape=embed_dim),
            nn.Linear(in_features=embed_dim, out_features=num_classes)
        )

    def forward(self, x):
        x = self.embeddings_block(x)
        x = self.encoder_blocks(x)
        x = self.mlp_head(x[:, 0, :])  # Apply MLP on the CLS token only
        return x

In [29]:
model = ViT(NUM_PATCHES, dataset.classes_count, PATCH_SIZE, EMBED_DIM, NUM_ENCODERS, NUM_HEADS, DROPOUT, ACTIVATION, IN_CHANNELS).to(device)
x = torch.randn(BATCH_SIZE, IN_CHANNELS, HEIGHT, WIDTH).to(device)
print(model(x).shape) # BATCH_SIZE X NUM_CLASSES

torch.Size([128, 15])


