In [1]:
import os
os.chdir('../')

In [10]:
from dotenv import load_dotenv
load_dotenv()

os.environ["MLFLOW_TRACKING_URI"]=os.getenv("MLFLOW_TRACKING_URI")
os.environ["MLFLOW_TRACKING_USERNAME"]=os.getenv("MLFLOW_TRACKING_USERNAME")
os.environ["MLFLOW_TRACKING_PASSWORD"]=os.getenv("MLFLOW_TRACKING_PASSWORD")

In [3]:
from cvClassifier import logger
from cvClassifier.utils.common import get_size, read_yaml, create_directories, save_json 
from cvClassifier.constants import *

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class ModelEvalConfig:
    trained_model_path: Path
    training_data_path: Path
    validation_data_path: Path
    test_data_path: Path
    all_params: dict
    params_image_size: list
    params_batch_size: int
    mlflow_uri: str


In [5]:
from urllib.parse import urlparse
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import pytorch_lightning as pl

import mlflow

In [6]:
class ConfigurationManager:
    # this class manages the configuration of the model evaluation pipeline

    def __init__(self, config_filepath = CONFIG_FILE_PATH, params_filepath = PARAMS_FILE_PATH):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        #create_directories([self.config.artifacts_root])

    def get_model_eval_config(self) -> ModelEvalConfig:
        ''' Gets the config details for the model training pipeline '''
        params = self.params
        

        model_eval_config = ModelEvalConfig(
            training_data_path = self.config.model_training.training_data,
            validation_data_path = self.config.model_training.validation_data,
            test_data_path = self.config.model_training.test_data,
            trained_model_path = self.config.model_training.trained_model_path,
            all_params = params,
            params_image_size = params.IMAGE_SIZE,
            params_batch_size = params.BATCH_SIZE,
            mlflow_uri = self.config.model_evaluation.mlflow_tracking_uri
        )

        return model_eval_config

In [7]:
class LightningModel(pl.LightningModule):
    def __init__(self, model, learning_rate=0.01):
        super().__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.criterion = nn.CrossEntropyLoss()
        self.test_step_outputs = []
    
    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        
        self.log('train_loss', loss)
        self.log('train_acc', acc)
        return loss
    
    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        
        self.log('val_loss', loss)
        self.log('val_acc', acc)
    
    def test_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        
        # Store outputs for epoch-level metrics
        self.test_step_outputs.append({'test_loss': loss, 'test_acc': acc})
        
        self.log('test_loss', loss, on_step=True, on_epoch=True)
        self.log('test_acc', acc, on_step=True, on_epoch=True)
        
        return {'test_loss': loss, 'test_acc': acc}

    def configure_optimizers(self):
        return torch.optim.SGD(self.parameters(), lr=self.learning_rate)

    def on_test_epoch_end(self):
        # Calculate average metrics
        if self.test_step_outputs:
            avg_loss = torch.stack([x['test_loss'] for x in self.test_step_outputs]).mean()
            avg_acc = torch.stack([x['test_acc'] for x in self.test_step_outputs]).mean()
            
            self.log('avg_test_loss', avg_loss)
            self.log('avg_test_acc', avg_acc)
            
            # Clear the list for next epoch
            self.test_step_outputs.clear()

            

In [8]:
class ModelEvaluation:
    def __init__(self, config: ModelEvalConfig):
        self.config = config

    def load_model(self, path: Path) -> nn.Module:
        return torch.load(path)
        logger.info(f"Model loaded from {path}")
    
    def test_generator(self):

        # preparing the test dataset
        test_transforms = transforms.Compose([
            transforms.Resize(self.config.params_image_size[:-1]),  # Resize to target size
            transforms.ToTensor(),  # Converts to tensor and scales to [0,1]
        ])
        
        # load test dataset
        test_dataset = datasets.ImageFolder(
            root=self.config.test_data_path,
            transform=test_transforms
        )
        logger.info(f"Test dataset created from {self.config.test_data_path}")
        
        self.test_loader = DataLoader(
            test_dataset,
            batch_size=self.config.params_batch_size,
            shuffle=False,
            num_workers=0
        )
        
        
        logger.info(f"Test samples: {len(test_dataset)}")
        logger.info(f"Number of classes: {len(test_dataset.classes)}")
        logger.info(f"Classes: {test_dataset.classes}")

    
    def evaluation(self):
        """Perform model evaluation using PyTorch Lightning"""

        logger.info('Starting model evaluation...')
        
        self.model = self.load_model(self.config.trained_model_path)
        self.model.eval()
        
        self.test_generator()
        
        lightning_model = LightningModel(self.model)
        
        trainer = pl.Trainer(
            accelerator='auto',
            devices='auto',
            logger=False,  # Disable logging for evaluation
            enable_progress_bar=True,
            enable_model_summary=False,
            enable_checkpointing=False,
        )
        
        test_results = trainer.test(
            model=lightning_model,
            dataloaders=self.test_loader,
            verbose=True
        )
        
        if test_results and len(test_results) > 0:
            self.scores = {
                "loss": test_results[0].get('test_loss_epoch', 0.0),
                "accuracy": test_results[0].get('test_acc_epoch', 0.0)
            }
            logger.info(f"Evaluation completed!")
            logger.info(f"Loss: {self.scores['loss']:.4f}")
            logger.info(f"Accuracy: {self.scores['accuracy']:.4f}")
            
        else:
            logger.info('No results returned from evaluation.')
        
        self.save_score()
    
    
    def log_into_mlflow(self):
        mlflow.set_registry_uri(self.config.mlflow_uri)
        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
        
        with mlflow.start_run():
            mlflow.log_params(self.config.all_params)
            mlflow.log_metrics(
                {"loss": self.scores['loss'], "accuracy": self.scores['accuracy']}
            )

            # Model registry does not work with file store
            if tracking_url_type_store != "file":

                # Register the model
                # There are other ways to use the Model Registry, which depends on the use case,
                # please refer to the doc for more information:
                # https://mlflow.org/docs/latest/model-registry.html#api-workflow
                mlflow.pytorch.log_model(self.model, "model", registered_model_name="VGG16Model")
            else:
                mlflow.pytorch.log_model(self.model, "model")

    def save_score(self):
        """Save evaluation scores to JSON file"""

        save_json(path=Path("scores.json"), data=self.scores)
        logger.info(f"Scores saved to scores.json")



In [9]:
try:
    config = ConfigurationManager()
    eval_config = config.get_model_eval_config()
    evaluation = ModelEvaluation(eval_config)
    evaluation.evaluation()
    evaluation.log_into_mlflow()

except Exception as e:
   raise e

[2025-07-08 00:43:28,675: INFO: common]: yaml file successfully loaded from config/config.yaml
[2025-07-08 00:43:28,676: INFO: common]: yaml file successfully loaded from params.yaml
[2025-07-08 00:43:28,676: INFO: 1285874601]: Starting model evaluation...
[2025-07-08 00:43:28,687: INFO: 1285874601]: Test dataset created from artifacts/data_ingestion/Data/test
[2025-07-08 00:43:28,688: INFO: 1285874601]: Test samples: 315
[2025-07-08 00:43:28,688: INFO: 1285874601]: Number of classes: 4
[2025-07-08 00:43:28,688: INFO: 1285874601]: Classes: ['adenocarcinoma', 'large.cell.carcinoma', 'normal', 'squamous.cell.carcinoma']
[2025-07-08 00:43:28,707: INFO: setup]: GPU available: True (mps), used: True
[2025-07-08 00:43:28,708: INFO: setup]: TPU available: False, using: 0 TPU cores
[2025-07-08 00:43:28,708: INFO: setup]: HPU available: False, using: 0 HPUs


  return torch.load(path)
/opt/anaconda3/envs/cv-cancer/lib/python3.8/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:424: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Testing DataLoader 0: 100%|██████████| 20/20 [00:03<00:00,  5.69it/s]


[2025-07-08 00:43:32,449: INFO: 1285874601]: Evaluation completed!
[2025-07-08 00:43:32,449: INFO: 1285874601]: Loss: 1.4657
[2025-07-08 00:43:32,449: INFO: 1285874601]: Accuracy: 0.2159
[2025-07-08 00:43:32,450: INFO: common]: json file saved at scores.json
[2025-07-08 00:43:32,450: INFO: 1285874601]: Scores saved to scores.json


Registered model 'VGG16Model' already exists. Creating a new version of this model...
2025/07/08 00:43:51 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: VGG16Model, version 6
Created version '6' of model 'VGG16Model'.
