In [1]:
import os

In [2]:
os.chdir("../")
%pwd

'f:\\ProjectAI\\FaceEmotionRecognitionSystem'

In [3]:
from dataclasses import dataclass
from pathlib import Path

@dataclass
class TrainingConfig:
    root_dir: Path
    trained_model_path: Path 
    updated_base_model_path: Path
    training_data_path: Path
    dataset_path: Path
    model_name: str
    optimizer_name: str
    params_epochs: int
    params_batch_size: int
    params_is_augmentation: bool
    params_learning_rate: float
    params_num_classes: int
    params_image_size: list

In [4]:
from FacialExpressionRecognition.constants import *
from FacialExpressionRecognition.utils.common import read_yaml, create_directories

In [5]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath=CONFIG_FILE_PATH,
        params_filepath=PARAMS_FILE_PATH
    ):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        create_directories([self.config.artifacts_root])

    def get_training_config(self) -> TrainingConfig:
        training_config = self.config.training
        artifacts_root = self.config.artifacts_root
        trained_model_path = os.path.join(training_config.trained_model_path)
        updated_base_model_path = os.path.join(self.config.prepare_base_model.updated_base_model_path)
        training_data = os.path.join(self.config.data_ingestion.unzip_dir, "data.zip")
        dataset_path = os.path.join(training_config.dataset_path)

        create_directories([Path(training_config.root_dir)])

        params = self.params
        params_epochs = params.EPOCHS
        params_batch_size = params.BATCH_SIZE
        params_is_augmentation = params.AUGMENTATION
        params_learning_rate = params.LEARNING_RATE
        params_num_classes = params.NUM_CLASSES
        params_image_size = params.IMAGE_SIZE
        params_model_name = params.MODEL_NAME
        params_optimizer_name = params.OPTIMIZER

        training_config = TrainingConfig(
            root_dir=Path(artifacts_root),
            trained_model_path=Path(trained_model_path),
            updated_base_model_path=Path(updated_base_model_path),
            training_data_path=Path(training_data),
            dataset_path=Path(dataset_path),
            model_name=params_model_name,
            optimizer_name=params_optimizer_name,
            params_epochs=params_epochs,
            params_batch_size=params_batch_size,
            params_is_augmentation=params_is_augmentation,
            params_learning_rate=params_learning_rate,
            params_num_classes=params_num_classes,
            params_image_size=params_image_size
        )

        return training_config

In [6]:
import os
import sys
import torch
import torchvision.models as models
import torchvision
from tqdm import tqdm
import numpy as np
from zipfile import ZipFile
from FacialExpressionRecognition import logger
from FacialExpressionRecognition.entity.config_entity import TrainingConfig
from FacialExpressionRecognition.models.resnet34 import get_resnet34_model
from FacialExpressionRecognition.models.emonext import get_model 
import numpy as np
from torch.cuda.amp import GradScaler

In [7]:
class Training:
    def __init__(self, config: TrainingConfig):
        self.config = config
        self.early_stopping_patience = 12
        self.best_val_accuracy = 0
        self.scaler = GradScaler(enabled=False)
    
    def get_model(self):
        if self.config.model_name == "resnet34":
            self.model = get_resnet34_model(num_classes=self.config.params_num_classes)
            self.model.load_state_dict(torch.load(self.config.updated_base_model_path))
            self.model.eval()
            for param in self.model.parameters():
                param.requires_grad = False
            for param in self.model.fc.parameters():
                param.requires_grad = True
        elif self.config.model_name == "emonext":
            self.model = get_model(num_classes=self.config.params_num_classes)
            self.model.eval()
        else:
            raise ValueError(f"Model {self.config.model_name} not supported.")
        

    def extract_dataset(self):
        with ZipFile(self.config.training_data_path, 'r') as zip_ref:
            os.makedirs(self.config.dataset_path, exist_ok=True)
            zip_ref.extractall(self.config.dataset_path)

    def get_optimizer(self):
        if self.config.optimizer_name == "adam":
            optimizer = torch.optim.Adam(self.model.parameters(), lr=self.config.params_learning_rate)
        elif self.config.optimizer_name == "sgd":
            optimizer = torch.optim.SGD(self.model.parameters(), lr=self.config.params_learning_rate, momentum=0.9)
        elif self.config.optimizer_name == "adamw":
            optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.config.params_learning_rate)
        else:
            raise ValueError(f"Optimizer {self.config.optimizer_name} not supported.")
        return optimizer

    def train(self):
        train_transforms = torchvision.transforms.Compose([
            torchvision.transforms.RandomHorizontalFlip(),
            torchvision.transforms.RandomVerticalFlip(),
            torchvision.transforms.Grayscale(),
            torchvision.transforms.Resize(140),
            torchvision.transforms.RandomRotation(degrees=20),
            torchvision.transforms.RandomCrop(self.config.params_image_size[0]),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
        ])

        val_transforms = torchvision.transforms.Compose([
            torchvision.transforms.Grayscale(),
            torchvision.transforms.Resize(140),
            torchvision.transforms.RandomCrop(self.config.params_image_size[0]),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
        ])

        test_transform = torchvision.transforms.Compose(
        [
            torchvision.transforms.Grayscale(),
            torchvision.transforms.Resize(140),
            torchvision.transforms.TenCrop(self.config.params_image_size[0]),
            torchvision.transforms.Lambda(
                lambda crops: torch.stack(
                    [torchvision.transforms.ToTensor()(crop) for crop in crops]
                )
            ),
            torchvision.transforms.Lambda(
                lambda crops: torch.stack([crop.repeat(3, 1, 1) for crop in crops])
            ),
        ])

        self.extract_dataset()
        
        train_data = torchvision.datasets.ImageFolder(root=self.config.dataset_path / "FER2013" / "train", transform=train_transforms)
        self.train_loader = torch.utils.data.DataLoader(train_data, batch_size=self.config.params_batch_size, shuffle=True)

        val_data = torchvision.datasets.ImageFolder(root=self.config.dataset_path / "FER2013" / "val", transform=val_transforms)
        self.val_loader = torch.utils.data.DataLoader(val_data, batch_size=1, shuffle=False)

        test_data = torchvision.datasets.ImageFolder(root=self.config.dataset_path / "FER2013" / "test", transform=test_transform)
        self.test_loader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=False)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.get_model()
        self.model = self.model.to(self.device)


        self.criterion = torch.nn.CrossEntropyLoss()
        self.optimizer = self.get_optimizer()

        num_epochs = self.config.params_epochs

        counter = 0
        for epoch in range(num_epochs):
            train_loss, train_accuracy = self.train_epoch()
            val_loss, val_accuracy = self.val_epoch()
            logger.info(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")
            if val_accuracy > self.best_val_accuracy:
                self.save_model(self.model, self.config.trained_model_path)
                counter = 0
                self.best_val_accuracy = val_accuracy
            else:
                counter += 1
                if counter >= self.early_stopping_patience:
                    print(
                        "Validation loss did not improve for %d epochs. Stopping training."
                        % self.early_stopping_patience
                    )
                    break

            self.test_model()


    def train_epoch(self):
        self.model.train()

        avg_accuracy = []
        avg_loss = []
        pbar = tqdm(unit="batch", file=sys.stdout, total=len(self.train_loader))
        for batch_idx, data in enumerate(self.train_loader):
            inputs, labels = data

            inputs = inputs.to(self.device)
            labels = labels.to(self.device)

            with torch.autocast(self.device.type):
                outputs = self.model(inputs)
                loss = torch.nn.functional.cross_entropy(outputs, labels, label_smoothing=0.2)

            self.scaler.scale(loss).backward()      # scale loss trước khi backward
            self.scaler.step(self.optimizer)
            self.optimizer.zero_grad(set_to_none=True)
            self.scaler.update()

            predictions = torch.argmax(outputs, dim=1)
            batch_accuracy = (predictions == labels).sum().item() / labels.size(0)
            avg_accuracy.append(batch_accuracy)
            avg_loss.append(loss.item())
            pbar.set_postfix({"loss": np.mean(avg_loss), "accuracy": np.mean(avg_accuracy) * 100.0})
            pbar.update(1)

        pbar.close()
        return np.mean(avg_loss), np.mean(avg_accuracy) * 100.0
    
    def val_epoch(self):
        self.model.eval()

        avg_loss = []
        predicted_labels = []
        true_labels = []

        pbar = tqdm(
            unit="batch", file=sys.stdout, total=len(self.val_loader)
        )

        for batch_idx, (inputs, labels) in enumerate(self.val_loader):
            inputs = inputs.to(self.device)
            labels = labels.to(self.device)

            with torch.autocast(self.device.type):
                outputs = self.model(inputs)
                loss = torch.nn.functional.cross_entropy(outputs, labels, label_smoothing=0.2)
            
            predictions = torch.argmax(outputs, dim=1)
            avg_loss.append(loss.item())
            predicted_labels.extend(predictions.tolist())
            true_labels.extend(labels.tolist())

            pbar.update(1)

        pbar.close()
        accuracy = (
            torch.eq(torch.tensor(predicted_labels), torch.tensor(true_labels))
            .float()
            .mean()
            .item()
        )
        return np.mean(avg_loss), accuracy * 100.0
    
    def test_model(self):

        predicted_labels = []
        true_labels = []

        pbar = tqdm(unit="batch", file=sys.stdout, total=len(self.test_loader))
        for batch_idx, (inputs, labels) in enumerate(self.test_loader):
            bs, ncrops, c, h, w = inputs.shape
            inputs = inputs.view(-1, c, h, w)

            inputs = inputs.to(self.device)
            labels = labels.to(self.device)

            with torch.autocast(self.device.type):
                logits = self.model(inputs)
            outputs_avg = logits.view(bs, ncrops, -1).mean(1)
            predictions = torch.argmax(outputs_avg, dim=1)

            predicted_labels.extend(predictions.tolist())
            true_labels.extend(labels.tolist())

            pbar.update(1)

        pbar.close()

        accuracy = (
            torch.eq(torch.tensor(predicted_labels), torch.tensor(true_labels))
            .float()
            .mean()
            .item()
        )
        print("Test Accuracy: %.4f %%" % (accuracy * 100.0))


    @staticmethod
    def save_model(model, path):
        torch.save(model.state_dict(), path)
        logger.info(f"Model saved at {path}")

In [8]:
try:
    config = ConfigurationManager()
    training_config = config.get_training_config()
    training = Training(config=training_config)
    training.get_model()
    training.train()
    logger.info("Training completed successfully.")
except Exception as e:
    logger.exception(e)
    raise e

[2025-10-14 14:25:52,480: INFO: common]: yaml file: config\config.yaml loaded successfully
[2025-10-14 14:25:52,486: INFO: common]: yaml file: params.yaml loaded successfully
[2025-10-14 14:25:52,487: INFO: common]: Directory created at: artifacts
[2025-10-14 14:25:52,488: INFO: common]: Directory created at: artifacts\training


  self.scaler = GradScaler(enabled=False)


100%|██████████| 300/300 [04:10<00:00,  1.20batch/s, loss=1.8, accuracy=30.6] 
100%|██████████| 3589/3589 [01:38<00:00, 36.30batch/s]
[2025-10-14 14:31:51,825: INFO: 3924460229]: Epoch [1/100], Train Loss: 1.8038, Train Accuracy: 30.63%, Val Loss: 1.7855, Val Accuracy: 34.94%
[2025-10-14 14:31:52,042: INFO: 3924460229]: Model saved at artifacts\training\model.pth
100%|██████████| 113/113 [00:40<00:00,  2.77batch/s]
Test Accuracy: 32.3767 %
100%|██████████| 300/300 [00:38<00:00,  7.77batch/s, loss=1.73, accuracy=37.6]
100%|██████████| 3589/3589 [01:12<00:00, 49.60batch/s]
[2025-10-14 14:34:23,866: INFO: 3924460229]: Epoch [2/100], Train Loss: 1.7291, Train Accuracy: 37.55%, Val Loss: 1.7118, Val Accuracy: 38.87%
[2025-10-14 14:34:24,007: INFO: 3924460229]: Model saved at artifacts\training\model.pth
100%|██████████| 113/113 [00:10<00:00, 10.75batch/s]
Test Accuracy: 36.7790 %
100%|██████████| 300/300 [00:40<00:00,  7.44batch/s, loss=1.69, accuracy=41.1]
100%|██████████| 3589/3589 [01:08

KeyboardInterrupt: 

 64%|██████▍   | 2308/3589 [01:00<00:23, 54.45batch/s]