## Library

In [1]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from typing import List, Tuple, Optional
from torch import Tensor
import os
import cv2
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, Sampler
import torchvision
from glob import glob
from torchvision.io import read_image
from torchvision.io import ImageReadMode
import re
from torch.optim import AdamW 
from torch.nn import CrossEntropyLoss
from  sklearn.model_selection import train_test_split
from torchvision.transforms import v2
import pandas as pd
from torchvision.transforms import CenterCrop
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support
from copy import deepcopy
import gc
import seaborn as sns
import time
from datetime import timedelta
from timm import create_model
import shutil
import json

## Model

In [2]:
class DNN(nn.Module):
    def __init__(self, input_size: int, output_size: int, device: torch.device) -> None:
        super(DNN, self).__init__()
        self.device = device
        self.fc1 = nn.Linear(input_size, 200).to(device)
        self.bn1 = nn.BatchNorm1d(200).to(device)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(200, 100).to(device)
        self.bn2 = nn.BatchNorm1d(100).to(device)
        self.dropout2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(100, output_size).to(device)
        
        self._init_weights()

    def _init_weights(self) -> None:
        nn.init.kaiming_normal_(self.fc1.weight, nonlinearity='relu')
        nn.init.constant_(self.fc1.bias, 0)

    
    def forward(self, x: Tensor) -> Tensor:
        x = self.dropout1(F.relu(self.fc1(x)))
        x = self.dropout2(F.relu(self.fc2(x)))
        x = self.fc3(x)
        return x

In [3]:
class LSTMCell(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, bias: bool = True, device: torch.device = None) -> None:
        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.device = device

        self.weight_ih = nn.Parameter(torch.empty(4*hidden_size, input_size, device=device))
        self.weight_hh = nn.Parameter(torch.empty(4*hidden_size, hidden_size, device=device))

        self.bias = bias

        if self.bias:
            self.bias_ih = nn.Parameter(torch.empty(4*hidden_size, device=device))
            self.bias_hh = nn.Parameter(torch.empty(4*hidden_size, device=device))
        else:
            self.register_parameter('bias_ih', None)
            self.register_parameter('bias_hh', None)

        self.peephole_i = nn.Parameter(torch.empty(hidden_size, device=device))
        self.peephole_f = nn.Parameter(torch.empty(hidden_size, device=device))
        self.peephole_o = nn.Parameter(torch.empty(hidden_size, device=device))

        self._init_weights()

    def _init_weights(self) -> None:
        nn.init.orthogonal_(self.weight_hh)
        nn.init.xavier_normal_(self.weight_ih)
        nn.init.constant_(self.bias_hh, 0)
        nn.init.constant_(self.bias_ih, 0)
        nn.init.normal_(self.peephole_i, mean=0, std=0.01)
        nn.init.normal_(self.peephole_f, mean=0, std=0.01)
        nn.init.normal_(self.peephole_o, mean=0, std=0.01)

    def forward(self, input: Tensor, state: Tuple[Tensor, Tensor] = None) -> Tuple[Tensor, Tensor]:
        hx, cx = state
        gates = torch.mm(input, self.weight_ih.t()) + torch.mm(hx, self.weight_hh.t())
        if self.bias:
            gates += self.bias_ih + self.bias_hh

        input_gate, forget_gate, cell_gate, output_gate = gates.chunk(4, 1)

        input_gate = torch.sigmoid(input_gate + self.peephole_i * cx)
        forget_gate = torch.sigmoid(forget_gate + self.peephole_f * cx)
        cell_gate = torch.tanh(cell_gate)

        cy = (forget_gate * cx) + (input_gate * cell_gate)

        output_gate = torch.sigmoid(output_gate + self.peephole_o * cy)

        hy = output_gate * torch.tanh(cy)
        return hy, cy

class LSTMBlock(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_cells: int, device: torch.device) -> None:
        super(LSTMBlock, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_cells = num_cells
        self.device = device

        self.lstm_cells = nn.ModuleList([LSTMCell(self.input_size, hidden_size, device=device)
                                         if i == 0
                                         else LSTMCell(self.hidden_size, self.hidden_size, device=device)
                                         for i in range(self.num_cells)])

    def forward(self, input: Tensor, state: Tuple[Tensor, Tensor]=None)->Tuple[Tensor, Tensor]:
        batch_size = input.size(0)

        if state is None:
            zeros = torch.zeros(batch_size, self.hidden_size, device=self.device)
            state = (zeros, zeros)

        hx, cx = state

        outputs = []

        seq_len = input.size(1)
        for t in range(seq_len):
            x = input[:, t, :]
            for i, lstm_cell in enumerate(self.lstm_cells):
                hx, cx = lstm_cell(x, (hx, cx))
                x = hx
            outputs.append(hx)

        outputs = torch.stack(outputs, dim=1)
        return outputs, (hx, cx)

class BiLSTMBlock(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_cells: int, device: torch.device) -> None:
        super(BiLSTMBlock, self).__init__()
        self.forward_lstm = LSTMBlock(input_size, hidden_size // 2, num_cells, device)
        self.backward_lstm = LSTMBlock(input_size, hidden_size // 2, num_cells, device)

    def forward(self, input: Tensor) -> Tensor:
        forward_output, _ = self.forward_lstm(input)
        backward_output, _ = self.backward_lstm(torch.flip(input, [1]))
        backward_output = torch.flip(backward_output, [1])
        return torch.cat((forward_output, backward_output), dim=2)

class MultiBiLSTM(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, num_blocks: int, num_cells_per_block: int, device: torch.device) -> None:
        super(MultiBiLSTM, self).__init__()
        self.device = device
        self.blocks = nn.ModuleList([
            BiLSTMBlock(input_size, hidden_size, num_cells_per_block, device)
            if i == 0
            else BiLSTMBlock(hidden_size, hidden_size, num_cells_per_block, device)
            for i in range(num_blocks)])

    def forward(self, x: Tensor) -> Tensor:
        for block in self.blocks:
            x = block(x)
        return x

In [4]:
class CNN(nn.Module):
    def __init__(self, in_channels: int, output_size: int, device: torch.device) -> None:
        super(CNN, self).__init__()
        self.device = device
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3, stride=2, padding=1).to(device)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1).to(device)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1).to(device)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1).to(device)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, output_size).to(device)
        
        self._init_weights()

    def _init_weights(self) -> None:
        nn.init.kaiming_normal_(self.conv1.weight, mode='fan_in', nonlinearity='relu')
        nn.init.constant_(self.conv1.bias, 0)
        
    def forward(self, x: Tensor) -> Tensor:
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [8]:
class CLDNN(nn.Module):
    def __init__(self,
                 in_channels: int,
                 cnn_out_channels: int,
                 lstm_input_size: int,
                 lstm_hidden_size: int,
                 lstm_num_blocks: int,
                 lstm_num_cells_per_block: int,
                 dnn_output_size: int,
                 device: torch.device) -> None:
        super(CLDNN, self).__init__()
        self.device = device
        self.cnn = CNN(in_channels, cnn_out_channels, device)
        self.lstm = MultiBiLSTM(lstm_input_size, lstm_hidden_size, lstm_num_blocks, lstm_num_cells_per_block, device)
        self.dnn = DNN(lstm_hidden_size, dnn_output_size, device)
    
    def forward(self, x: Tensor) -> Tensor:
        batch_size, seq_len, channels, height, width = x.size()

        cnn_out = []
        for t in range(seq_len):
            img = x[:, t, :, :, :]
            cnn_out.append(self.cnn(img))
            
        cnn_out = torch.stack(cnn_out, dim=1)
        
        lstm_out = self.lstm(cnn_out)
        
        last_output = lstm_out[:, -1, :]
        
        out = self.dnn(last_output)
        out = torch.sigmoid(out)
        return out
    
in_channels = 3
cnn_out_channels = 256
lstm_input_size = cnn_out_channels
lstm_hidden_size = 256
lstm_num_blocks = 5
lstm_num_cells_per_block = 5
dnn_output_size = 2
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model = CLDNN(
    in_channels=in_channels,
    cnn_out_channels=cnn_out_channels,
    lstm_input_size=lstm_input_size, 
    lstm_hidden_size=lstm_hidden_size, 
    lstm_num_blocks=lstm_num_blocks,
    lstm_num_cells_per_block=lstm_num_cells_per_block,
    dnn_output_size=dnn_output_size,
    device=device)
# optimizer = AdamW(model.parameters(), lr = 0.001)
# loss_fn = CrossEntropyLoss()
# epochs = 100

In [49]:
torch.cuda.empty_cache()

In [12]:
def custom_sort_key(filename):
    match = re.search(r'frame_(\d+)', filename)
    if match:
        return int(match.group(1))
    return filename  

In [41]:
model.load_state_dict(torch.load('weight/train2/last.pt'))
path = 'img_resize/Thang3/2_220474_well02_zid99_30'
list_path_img = sorted(glob(os.path.join(path, '*.jpg')), key=custom_sort_key)

In [42]:
list_path_img

['img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_0.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_1.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_2.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_3.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_4.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_5.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_6.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_7.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_8.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_9.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_10.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_11.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Thang3_220474_frame_12.jpg',
 'img_resize/Thang3/2_220474_well02_zid99_30/Tha

In [43]:
image_sequence = [
    read_image(img_path, mode=ImageReadMode.RGB)
    for img_path in list_path_img
]
images = torch.stack(image_sequence, dim=0).float()

In [44]:
images.type

<function Tensor.type>

In [45]:
out = model(images.unsqueeze(0).to(device))

In [48]:
torch.max(out, 1)[1]

tensor([1], device='cuda:1')

## Dataset & DataLoader

In [6]:
def delete_augemetation():
    i = 0
    folder_path = glob('img_resize/*/**')
    for path in folder_path:
        if 'augmentation' in path:
            shutil.rmtree(path)
            print(f'Đã xóa thư mục: {path}')
            i+=1
    print(i)

delete_augemetation()

0


In [7]:
class SeqImageDataset(Dataset):
    def __init__(self, X, y, transforms=None):
        self.X = X
        self.y = y
        self.transforms = transforms
        self.image_paths = [
            sorted(glob(os.path.join(dir_path, '*.jpg')), key=self.custom_sort_key)
            for dir_path in self.X
        ]
        print(f'Loaded {len(self.image_paths)} sequences')
    
    @staticmethod
    def custom_sort_key(filename):
        match = re.search(r'frame_(\d+)', filename)
        if match:
            return int(match.group(1))
        return filename  

    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image_sequence = [
            read_image(img_path, mode=ImageReadMode.RGB)
            for img_path in self.image_paths[idx]
        ]
        images = torch.stack(image_sequence, dim=0)
        label = torch.tensor(self.y.iloc[idx])
        if self.transforms:
            images = self.transforms(images)
        return images, label

In [8]:
def custom_collate_fn(batch):
    batch.sort(key=lambda x: x[0].shape[0], reverse=True)
    sequences, labels = zip(*batch)
    max_len = max([s.shape[0] for s in sequences])
    padded_seqs = []
    for seq in sequences:
        seq_len = seq.shape[0]
        if seq_len < max_len:
            last_frame = seq[-1].unsqueeze(0)
            num_repeat = max_len - seq_len
            padding = last_frame.repeat(num_repeat, 1, 1, 1)
            padded = torch.cat([seq, padding], dim=0)
        else:
            padded = seq
        padded_seqs.append(padded)
    try:
        padded_seqs = torch.stack(padded_seqs, dim=0)
    except:
        print('error')
    labels = torch.stack(labels)
    return padded_seqs, labels

In [9]:
def get_stratified_test_set(X, y, n_samples_per_class=10):
    indices_class_0 = np.where(y == 0)[0]
    indices_class_1 = np.where(y == 1)[0]

    test_indices_class_0 = np.random.choice(indices_class_0, n_samples_per_class, replace=False)
    test_indices_class_1 = np.random.choice(indices_class_1, n_samples_per_class, replace=False)

    test_indices = np.concatenate([test_indices_class_0, test_indices_class_1])

    mask = np.zeros(len(y), dtype=bool)
    mask[test_indices] = True

    X_test, X_remainder = X[mask], X[~mask]
    y_test, y_remainder = y[mask], y[~mask]

    return X_remainder, X_test, y_remainder, y_test

In [11]:
class BalancedBatchSampler(Sampler):
    def __init__(self, labels, batch_size, class_0_weight=3):
        self.labels = labels
        self.batch_size = batch_size
        self.class_0_weight = class_0_weight
        self.idx_0 = np.where(self.labels == 0)[0]
        self.idx_1 = np.where(self.labels == 1)[0]
        self.num_0 = len(self.idx_0)
        self.num_1 = len(self.idx_1)
        self.start_0 = 0
        self.start_1 = 0
        
    def __len__(self):
        return (self.num_1 + self.batch_size//(self.class_0_weight + 1) - 1) // (self.batch_size // (self.class_0_weight + 1))
    
    def __iter__(self):
        np.random.shuffle(self.idx_0)
        np.random.shuffle(self.idx_1)
        
        max_batches = len(self)
    
        for i in range(max_batches):
            batch = []
            
            start_0 = self.start_0
            end_0 = start_0 + (self.batch_size * self.class_0_weight) // (self.class_0_weight + 1)
            if end_0 > self.num_0:
                batch.extend(self.idx_0[start_0: self.num_0])
                lack_0 = end_0 - self.num_0
                batch.extend(self.idx_0[:lack_0])
                self.start_0 = lack_0
            else:
                batch.extend(self.idx_0[start_0:end_0])
                self.start_0 = end_0 % self.num_0
            
            start_1 = self.start_1
            end_1 = start_1 + self.batch_size // (self.class_0_weight + 1)
            if end_1 > self.num_1:
                batch.extend(self.idx_1[start_1: self.num_1])
                lack_1 = end_1 - self.num_1
                batch.extend(self.idx_1[:lack_1])
                self.start_1 = lack_1
            else:
                batch.extend(self.idx_1[start_1:end_1])
                self.start_1 = end_1 % self.num_1
                
            np.random.shuffle(batch)
            yield batch

In [12]:
df = pd.read_csv('img_resize/img_resize.csv')
X, y = df['img_path'], df['label']
X_remainder, X_test, y_remainder, y_test = get_stratified_test_set(X, y, n_samples_per_class=7)
X_train, X_val, y_train, y_val = get_stratified_test_set(X_remainder, y_remainder, n_samples_per_class=10)
# class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
# class_weights = torch.tensor(class_weights, dtype=torch.float)

In [13]:
print(f'y_train:{y_train.value_counts()}\ny_val:{y_val.value_counts()}\ny_test:{y_test.value_counts()}')

y_train:label
1    68
0    12
Name: count, dtype: int64
y_val:label
1    10
0    10
Name: count, dtype: int64
y_test:label
1    7
0    7
Name: count, dtype: int64


In [24]:
def aug_func(img_path, select: int):
    img = cv2.imread(img_path)
    
    if select == 0:
        # Flip horizontally
        augmented = cv2.flip(img, 1)
    elif select == 1:
        # Rotate 90 degrees clockwise
        augmented = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
    elif select == 2:
        # Add Gaussian noise
        noise = np.random.normal(0, 25, img.shape).astype(np.uint8)
        augmented = cv2.add(img, noise)
    elif select == 3:
        # Adjust brightness
        brightness = 50
        augmented = cv2.add(img, (brightness,brightness,brightness,0))
    elif select == 4:
        # Apply Gaussian blur
        augmented = cv2.GaussianBlur(img, (5, 5), 0)
    elif select == 5:
        # Change color space (to grayscale)
        augmented = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    elif select == 6:
        # Adjust contrast
        contrast = 1.5
        augmented = cv2.convertScaleAbs(img, alpha=contrast, beta=0)
    elif select == 7:
        # Thay đổi độ bão hòa
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
        hsv[:,:,1] = hsv[:,:,1] * 1.5  # Tăng độ bão hòa lên 50%
        augmented = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    elif select == 8:
        # Áp dụng hiệu ứng cartoon
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        gray = cv2.medianBlur(gray, 5)
        edges = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 9, 9)
        color = cv2.bilateralFilter(img, 9, 300, 300)
        augmented = cv2.bitwise_and(color, color, mask=edges)
    elif select == 9:
        # Thay đổi gamma
        gamma = 1.5
        invGamma = 1.0 / gamma
        table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
        augmented = cv2.LUT(img, table)
    elif select == 10:
        # Áp dụng hiệu ứng chồng màu (color overlay)
        overlay_color = np.random.randint(0, 256, 3).tolist()
        overlay = np.full(img.shape, overlay_color, dtype=np.uint8)
        augmented = cv2.addWeighted(img, 0.8, overlay, 0.2, 0)
    else:
        augmented = img  # Trả về ảnh gốc nếu select nằm ngoài phạm vi
    
    return augmented

In [25]:
def custom_sort_key(filename):
    match = re.search(r'frame_(\d+)', filename)
    if match:
        return int(match.group(1))
    return filename  

In [12]:
def augmentation(X_train, y_train):
    list_idx = [i for i, j in enumerate(y_train) if j == 0]
    path_imbalanced = [X_train.iloc[i] for i in list_idx]
    path_imbalanced
    for path in path_imbalanced:
        img_paths = sorted(glob(os.path.join(path, '*.jpg')), key=custom_sort_key)
        all_selects = np.arange(11)
        np.random.shuffle(all_selects)
        used = all_selects[:5]
        
        for select in used:
            dir = f'{path}_augmentation_{select}'
            if not os.path.exists(dir):
                os.makedirs(dir)
            for img_path in img_paths:
                img_augmentated = aug_func(img_path, select)
                file_name = os.path.splitext(os.path.basename(img_path))[0] + '_augmentated'
                cv2.imwrite(f'{dir}/{file_name}.jpg', img_augmentated)
            X_train = pd.concat([X_train, pd.Series([dir])], ignore_index=True)
            y_train = pd.concat([y_train, pd.Series([0])], ignore_index=True)
    return X_train, y_train
X_train, y_train = augmentation(X_train, y_train)

In [14]:
print(f'y_train:{y_train.value_counts()}\ny_val:{y_val.value_counts()}\ny_test:{y_test.value_counts()}')

y_train:label
1    68
0    12
Name: count, dtype: int64
y_val:label
1    10
0    10
Name: count, dtype: int64
y_test:label
1    7
0    7
Name: count, dtype: int64


In [15]:
transform_pipeline = v2.Compose([
    v2.ToDtype(torch.float32, scale=True),
])
train_dataset = SeqImageDataset(X_train, y_train, transforms=transform_pipeline)
val_dataset = SeqImageDataset(X_val, y_val, transforms=transform_pipeline)
test_dataset = SeqImageDataset(X_test, y_test, transforms=transform_pipeline)


Loaded 80 sequences
Loaded 20 sequences
Loaded 14 sequences


In [16]:
batch_size = 8
train_sampler = BalancedBatchSampler(y_train, batch_size=4)
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=custom_collate_fn)

## Training & Evaluating

In [17]:
def train(model, train_loader, val_loader, epochs, optimizer, loss_fn, device, folder_name):
    model.to(device)
    train_losses = []
    val_losses = []
    
    start_time = time.time()
    
    for epoch in range(epochs):
        epoch_start_time = time.time()
        
        model.train()
        train_loss = 0.0
        for X, y in train_loader:
            X = X.to(device)
            y = y.to(device)
            
            y_hat = model(X)
            loss = loss_fn(y_hat, y)
            train_loss += loss.item()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        train_losses.append(train_loss / len(train_loader))
        
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X, y in val_loader:
                X = X.to(device)
                y = y.to(device)
                y_hat = model(X)
                loss = loss_fn(y_hat, y)
                val_loss += loss.item()
        val_losses.append(val_loss / len(val_loader))
        
        epoch_end_time = time.time()
        epoch_duration = epoch_end_time - epoch_start_time
        
        epochs_left = epochs - (epoch + 1)
        eta_seconds = epochs_left * epoch_duration
        eta = str(timedelta(seconds=int(eta_seconds)))
        
        print(f'Epoch: {epoch + 1:3d}/{epochs:<3d} | '
            f'Train Loss: {train_losses[-1]:.20f} | '
            f'Val Loss: {val_losses[-1]:.20f} | '
            f'Epoch Time: {epoch_duration:<7.2f}s | '
            f'ETA: {eta:<8}')
    
    total_time = time.time() - start_time
    print(f'Total training time: {str(timedelta(seconds=int(total_time)))}')
    
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, epochs+1), train_losses, 'b-', label='Training Loss')
    plt.plot(range(1, epochs+1), val_losses, 'r-', label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.tight_layout()
    
    plt.savefig(f'results/{folder_name}/loss_char.jpg')
    print("Loss chart saved")
    torch.save(model.state_dict(),f'weight/{folder_name}/last.pt')
    print("Model saved")

    return train_losses, val_losses

In [18]:
import json

class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)

In [20]:
def evaluate(model, data_loader, device, class_names, folder_name):
    model.eval()
    all_preds = []
    all_targets = []
    misclassified_indices = []
    
    with torch.no_grad():
        for batch_idx, (X, y) in enumerate(data_loader):
            X = X.to(device)
            y = y.to(device)
            outputs = model(X)
            _, predicted = torch.max(outputs, 1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(y.cpu().numpy())
            
            start_idx = batch_idx * data_loader.batch_size
            batch_misclassified = (predicted != y).nonzero(as_tuple=True)[0]
            misclassified_indices.extend(start_idx + batch_misclassified.cpu().numpy())
    
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    
    acc = accuracy_score(all_targets, all_preds)
    cm = confusion_matrix(all_targets, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_targets, all_preds, average='weighted')
    
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.savefig(f'results/{folder_name}/cfm.jpg')
    plt.close()
    
    results = {
        'accuracy': acc,
        'confusion_matrix': cm.tolist(), 
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'misclassified_indices': misclassified_indices
    }
    
    with open(f'results/{folder_name}/evaluation_metrics.json', 'w') as json_file:
        json.dump(results, json_file, indent=4, cls=NumpyEncoder)
    return results

### Training

In [21]:
train_losses, val_losses = train(model, train_loader, val_loader, epochs, optimizer, loss_fn, device, 'train2')

Epoch:   1/100 | Train Loss: 0.59805379282025727949 | Val Loss: 0.81325423717498779297 | Epoch Time: 619.37 s | ETA: 17:01:57
Epoch:   2/100 | Train Loss: 0.56325908092891463497 | Val Loss: 0.81325403849283850466 | Epoch Time: 642.66 s | ETA: 17:29:40
Epoch:   3/100 | Train Loss: 0.56326959501294526778 | Val Loss: 0.85491987069447838987 | Epoch Time: 627.42 s | ETA: 16:54:19
Epoch:   4/100 | Train Loss: 0.56327301439116983506 | Val Loss: 0.77159355084101355349 | Epoch Time: 635.56 s | ETA: 16:56:53
Epoch:   5/100 | Train Loss: 0.56326235129552726377 | Val Loss: 0.85492686430613196169 | Epoch Time: 621.24 s | ETA: 16:23:37
Epoch:   6/100 | Train Loss: 0.56325056272394513623 | Val Loss: 0.81325767437616980349 | Epoch Time: 675.56 s | ETA: 17:38:22
Epoch:   7/100 | Train Loss: 0.56325470699983481992 | Val Loss: 0.81325562795003258909 | Epoch Time: 620.50 s | ETA: 16:01:46
Epoch:   8/100 | Train Loss: 0.56323288907023039140 | Val Loss: 0.77158604065577185427 | Epoch Time: 626.91 s | ETA: 1

### Evaluating and Confusion Matrix

In [None]:
class_names = ['0','1']
results = evaluate(model, test_loader, device, class_names, 'train2')

print(f"Accuracy: {results['accuracy']:.4f}")
print(f"Precision: {results['precision']:.4f}")
print(f"Recall: {results['recall']:.4f}")
print(f"F1-score: {results['f1_score']:.4f}")
print(f"Number of misclassified samples: {len(results['misclassified_indices'])}")

Accuracy: 0.5000
Precision: 0.2500
Recall: 0.5000
F1-score: 0.3333
Number of misclassified samples: 7


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


## Saving Model

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from typing import List, Tuple


class YMufTTrainer:
    def __init__(self, model, train_dataset, val_dataset, test_dataset, batch_size, num_classes, device, lr, folder_name):
        self.model = model
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.test_dataset = test_dataset
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.device = device
        self.lr = lr
        self.folder_name = folder_name
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.lr, weight_decay=1e-4)
        self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode='min', factor=0.1, patience=10)
        self.loss_fn = nn.CrossEntropyLoss()
        os.makedirs(f'results/{self.folder_name}', exist_ok=True)
        os.makedirs(f'weight/{self.folder_name}', exist_ok=True)
    def stat_species(self, dataset):
        tke = [[] for _ in range(self.num_classes)]
        for idx, (_, label) in enumerate(dataset):
            tke[label].append(idx)
        return tke
    
    def arrange_data(self, list_IDs):
        return np.array([len(class_data) for class_data in list_IDs])

    
    def YMufT(self):
        A = deepcopy(self.list_IDs)
        B = deepcopy(self.lst_ratio)
        
        if not np.any(self.B_temp):
            print('End of data, resetting...')
            self.A_temp = deepcopy(self.list_IDs)
            self.B_temp = deepcopy(self.lst_ratio)
            gc.collect()
        
        MC = np.where(self.B_temp > 0, self.B_temp, np.inf).argmin()
        eps = 0.5 * self.B_temp[MC]
        bou1 = np.where(self.B_temp <= self.B_temp[MC] + eps)[0]
        bou2 = self.B_temp[bou1]
        MB = self.B_temp[bou1[np.argmax(bou2)]]
        
        F = []
        for i in range(self.num_classes):
            if self.B_temp[i] > 0:
                nt = int(min(self.B_temp[i], MB))
                np.random.shuffle(self.A_temp[i])
                F.extend(self.A_temp[i][:nt])
                del self.A_temp[i][:nt]
                self.B_temp[i] -= nt
            else:
                np.random.shuffle(A[i])
                nt = int(min(B[i], MB))
                F.extend(A[i][:nt])
        
        return F
    
    def train_epoch(self, train_loader):
        self.model.train()
        total_loss = 0.0
        for X, y in train_loader:
            X, y = X.to(self.device), y.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.model(X)
            loss = self.loss_fn(outputs, y)
            loss.backward()
            self.optimizer.step()
            total_loss += loss.item()
        return total_loss / len(train_loader)
    
    def validate(self, val_loader):
        self.model.eval()
        total_loss = 0.0
        all_preds = []
        all_targets = []
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(self.device), y.to(self.device)
                outputs = self.model(X)
                loss = self.loss_fn(outputs, y)
                total_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                all_preds.extend(predicted.cpu().numpy())
                all_targets.extend(y.cpu().numpy())
        
        val_loss = total_loss / len(val_loader)
        val_acc = accuracy_score(all_targets, all_preds)
        return val_loss, val_acc
    
def train(self, num_loop_eps, total_fold, epochs):
        self.list_IDs = self.stat_species(self.train_dataset)
        self.lst_ratio = self.arrange_data(self.list_IDs)
        self.A_temp = deepcopy(self.list_IDs)
        self.B_temp = deepcopy(self.lst_ratio)
        
        val_loader = DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, collate_fn=custom_collate_fn)
        
        best_val_acc = 0.0
        best_val_loss = float('inf')
        
        start_time = time.time()
        train_losses = []
        val_losses = []
        
        for training_period in range(num_loop_eps, 0, -1):
            for fold in range(total_fold):
                print(f'Training period: {num_loop_eps - training_period + 1}, fold: {fold + 1}')
                
                F = self.YMufT()
                fold_dataset = torch.utils.data.Subset(self.train_dataset, F)
                fold_loader = DataLoader(fold_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=custom_collate_fn)
                
                for epoch in range(epochs):
                    epoch_start_time = time.time()
                    train_loss = self.train_epoch(fold_loader)
                    val_loss, val_acc = self.validate(val_loader)
                    
                    train_losses.append(train_loss)
                    val_losses.append(val_loss)
                    
                    self.scheduler.step(val_loss)
                    
                    epoch_end_time = time.time()
                    epoch_duration = epoch_end_time - epoch_start_time
                    epochs_left = epochs * total_fold * num_loop_eps - (epoch + 1 + epochs * (fold + total_fold * (num_loop_eps - training_period)))
                    eta_seconds = epochs_left * epoch_duration
                    eta = str(timedelta(seconds=int(eta_seconds)))
                    print(f'Epoch: {epoch + 1:3d}/{epochs:<3d} | '
                          f'Train Loss: {train_loss:.5f} | '
                          f'Val Loss: {val_loss:.5f} | '
                          f'Val Accuracy: {val_acc:.5f} | '
                          f'LR: {self.optimizer.param_groups[0]["lr"]:.2e} | '
                          f'Epoch Time: {epoch_duration:<7.2f}s | '
                          f'ETA: {eta:<8}')
                    
                    if val_acc > best_val_acc or (val_acc == best_val_acc and val_loss < best_val_loss):
                        best_val_acc = val_acc
                        best_val_loss = val_loss
                        torch.save(self.model.state_dict(), f'weight/{self.folder_name}/best.pt')
                        print(f'Model improved: Val Acc: {best_val_acc:.5f}, Val Loss: {best_val_loss:.5f}')
                
                del fold_dataset, fold_loader
                gc.collect()
        
        total_time = time.time() - start_time
        print(f'Total training time: {str(timedelta(seconds=int(total_time)))}')
        
        # Save the loss chart
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, len(train_losses)+1), train_losses, 'b-', label='Training Loss')
        plt.plot(range(1, len(val_losses)+1), val_losses, 'r-', label='Validation Loss')
        plt.xlabel('Steps')
        plt.ylabel('Loss')
        plt.title('Training and Validation Loss')
        plt.legend()
        plt.tight_layout()
        
        plt.savefig(f'results/{self.folder_name}/loss_chart.jpg')
        print("Loss chart saved")
        print("Training completed")

    
    def test(self):
        self.model.load_state_dict(torch.load('weight/best_cldnn_model_v2.pt'))
        test_loader = DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False, collate_fn=custom_collate_fn)
        
        self.model.eval()
        all_preds = []
        all_targets = []
        
        with torch.no_grad():
            for X, y in test_loader:
                X, y = X.to(self.device), y.to(self.device)
                outputs = self.model(X)
                _, predicted = torch.max(outputs, 1)
                all_preds.extend(predicted.cpu().numpy())
                all_targets.extend(y.cpu().numpy())
        
        accuracy = accuracy_score(all_targets, all_preds)
        cm = confusion_matrix(all_targets, all_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(all_targets, all_preds, average='weighted')
        
        print(f'Test Accuracy: {accuracy:.5f}')
        print(f'Precision: {precision:.5f}')
        print(f'Recall: {recall:.5f}')
        print(f'F1-score: {f1:.5f}')
        print('Confusion Matrix:')
        print(cm)
        
        plt.figure(figsize=(10, 8))
        plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.colorbar()
        tick_marks = np.arange(self.num_classes)
        plt.xticks(tick_marks, range(self.num_classes))
        plt.yticks(tick_marks, range(self.num_classes))
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.tight_layout()
        plt.savefig('confusion_matrix.png')
        plt.close()

# Sử dụng YMufTTrainer
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model = CLDNN(
    in_channels=1,
    cnn_out_channels=300,
    lstm_input_size=300,
    lstm_hidden_size=150,
    lstm_num_blocks=10,
    lstm_num_cells_per_block=10,
    dnn_output_size=2,
    device=device
)
model.load_state_dict(torch.load('weight/best_cldnn_model.pt'))
trainer = YMufTTrainer(
    model=model,
    train_dataset=train_dataset,
    val_dataset=val_dataset,
    test_dataset=test_dataset,
    batch_size=8,
    num_classes=2,
    device=device,
    lr=0.01
)

trainer.train(num_loop_eps=5, total_fold=3, epochs=10)

In [None]:
# trainer.test()