In [1]:
!pip install torch torchvision



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import torch
import torch.nn as nn
import time
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision import transforms
from PIL import Image, ImageDraw
import os
import logging
from abc import ABC, abstractmethod

In [4]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[
    logging.StreamHandler()
])

In [5]:
def count_time(start_time: float, end_time: float) -> float:
    """
    Calculate opearation execution time in milliseconds.

    Args:
        start_time (float): Start time in seconds.
        end_time (float): End time in seconds.

    Returns:
        float: Elapsed time in milliseconds.
    """
    return (end_time - start_time) * 1000

In [6]:
class ImageProcessor(Dataset):
    """
    A custom dataset class for loading images and preprocessing them to the appropriate format.
    """

    def __init__(self, image_folder):
        self.image_folder = image_folder
        self.image_files = [
            f for f in os.listdir(image_folder)
            if f.lower().endswith(('.jpg', '.png', '.jpeg'))
        ]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        """
        Modified version of the __getitem__ function that support profiling.

        Args:
            idx (int): Index of the image.

        Returns:
            tuple: (image_tensor, image_name, load_time, preprocess_time, total_time)
                image_tensor (torch.Tensor): The preprocessed image tensor.
                image_name (str): The name of the image file.
                load_time (float): Time taken to load the image (milliseconds).
                preprocess_time (float): Time taken to preprocess the image (milliseconds).
                total_time (float): Total time for loading and preprocessing (milliseconds).
        """

        load_start_time = time.time()

        image_name = self.image_files[idx]
        image_path = os.path.join(self.image_folder, image_name)


        image = Image.open(image_path).convert('RGB')

        load_end_time = time.time()
        load_time = count_time(load_start_time, load_end_time)

        preprocess_start_time = time.time()

        preprocess_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
        ])
        image_tensor = preprocess_transform(image)

        preprocess_end_time = time.time()
        preprocess_time = count_time(preprocess_start_time, preprocess_end_time)

        total_time = count_time(load_start_time, preprocess_end_time)

        return image_tensor, image_name, load_time, preprocess_time, total_time

In [7]:
class BaseModelProfiler(ABC):
    """
    Abstract base class for model profilers.
    """
    def __init__(self, model):
        self.model = model

    @abstractmethod
    def profile(self, inputs):
        """
        Abstract method for model profiling injection

        Args:
            inputs: The inputs to the model.

        Returns:
            outputs: The outputs from the model.
            layer_times: Dictionary mapping layer names to execution times (milliseconds).
        """
        pass

In [8]:
class ModelProfiler(BaseModelProfiler):
    """
    Generic Model Profiler, can be used for any sequence to sequence model.
    """
    def profile(self, inputs):
        """
        Profile the model by iterating over its layers.

        Args:
            inputs: The inputs to the model.

        Returns:
            outputs: The outputs from the model.
            layer_times: Dictionary mapping layer names to execution times (milliseconds).
        """
        layer_times = {}
        x = inputs
        for name, layer in self.model.named_children():
            start_time = time.time()
            x = layer(x)
            end_time = time.time()
            layer_times[name] = count_time(start_time, end_time)
        return x, layer_times

For the model was selected **FasterRCNN**, because of the next reasons:


1.   Not a fully sequential model.
2.   Built-in pytorch support, no additional modules needed.
3.   More complex than YOLO.

Because creation of completely generic profiler that can be used for any model, without hooks is not available, I decided to go with approach of Abstract classes. Most of the code can be reused by simple inheritance from ModelProfiler or From BaseModelProfiler.



In [9]:
class FasterRCNNProfiler(BaseModelProfiler):
    """
    A model profiler class specially designed for FasterRCNN based on BaseModelProfiler.
    """
    def profile(self, images):
        """
        Perform a forward pass through the model, profiling each major layer

        Args:
            images (list[Tensor]): List of images to be processed.

        Returns:
            tuple: (detections, component_times)
                detections (list[dict]): Detection results from the model.
                component_times (dict): Dictionary mapping component names to execution times (milliseconds).
        """
        component_times = {}

        start_time = time.time()
        images, targets = self.model.transform(images)
        end_time = time.time()
        component_times['transform'] = count_time(start_time, end_time)

        start_time = time.time()
        features = self.model.backbone(images.tensors)
        end_time = time.time()
        component_times['backbone'] = count_time(start_time, end_time)

        start_time = time.time()
        proposals, proposal_losses = self.model.rpn(images, features, targets)
        end_time = time.time()
        component_times['rpn'] = count_time(start_time, end_time)

        start_time = time.time()
        detections, detector_losses = self.model.roi_heads(features, proposals, images.image_sizes, targets)
        end_time = time.time()
        component_times['roi_heads'] = count_time(start_time, end_time)

        start_time = time.time()
        detections = self.model.transform.postprocess(detections, images.image_sizes, images.image_sizes)
        end_time = time.time()
        component_times['postprocess'] = count_time(start_time, end_time)

        return detections, component_times

In [10]:
def postprocessing(image_name, image_tensor, detections, save_folder):
    """
    Postprocess function that draw bounding boxes and save them to a folder on Google Drive.

    Args:
        image_name (str): Name of the image file.
        image_tensor (torch.Tensor): The preprocessed image tensor.
        detections (dict): Model detections containing 'boxes' and 'scores'.
        save_folder (str): Folder to save the processed images.

    Returns:
        tuple: (post_time, processed_image)
            post_time (float): Time taken for postprocessing (milliseconds).
            processed_image (PIL.Image.Image): The image with bounding boxes drawn.
    """
    post_start_time = time.time()

    image = transforms.ToPILImage()(image_tensor.cpu())

    draw = ImageDraw.Draw(image)
    boxes = detections['boxes']
    scores = detections['scores']

    for i in range(len(boxes)):
        if scores[i] > 0.5:
            box = boxes[i]
            draw.rectangle(box.tolist(), outline='red', width=2)

    os.makedirs(save_folder, exist_ok=True)
    save_path = os.path.join(save_folder, image_name)
    image.save(save_path)

    post_end_time = time.time()
    post_time = count_time(post_start_time, post_end_time)

    return post_time, image

In [11]:
def run_pipeline(image_folder, save_folder, log_file_path, batch_size=1, device='cpu'):
    """
    Run the entire pipeline that includes: loading, preprocessing, inference, postprocessing, and logging.

    Args:
        image_folder (str): Path to the folder containing input images.
        save_folder (str): Path to the folder to save processed images.
        log_file_path (str): Path to the log file to save profiling logs.
        batch_size (int, optional): Batch size for processing. Defaults to 1.
        device (str, optional): Device to run the pipeline on ('cpu' or 'cuda'). Defaults to 'cpu'.
    """
    dataset = ImageProcessor(image_folder)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    model = fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    model.to(device)

    profiled_model = FasterRCNNProfiler(model)

    logger = logging.getLogger('PipelineLogger')
    logger.setLevel(logging.INFO)

    fh = logging.FileHandler(log_file_path)
    fh.setLevel(logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)

    if not logger.handlers:
        logger.addHandler(fh)

    for data in dataloader:
        images, image_names, load_times, preprocess_times, total_times = data

        images = images.to(device)

        images_list = [images[i] for i in range(images.shape[0])]

        with torch.no_grad():
            infer_start_time = time.time()
            detections_list, component_times = profiled_model.profile(images_list)
            infer_end_time = time.time()
            inference_time = count_time(infer_start_time, infer_end_time)

        for idx in range(len(images)):
            image_tensor = images[idx]
            image_name = image_names[idx]
            detections = detections_list[idx]

            post_time, processed_image = postprocessing(image_name, image_tensor, detections, save_folder)

            logger.info(f"Device: {device.upper()}")
            logger.info(f"Image: {image_name}")
            logger.info(f"Loading time: {load_times[idx].item():.2f} ms")
            logger.info(f"Preprocessing time: {preprocess_times[idx].item():.2f} ms")
            logger.info(f"Inference time: {inference_time:.2f} ms")
            logger.info(f"Postprocessing time: {post_time:.2f} ms")
            logger.info("Component times:")
            for component_name, c_time in component_times.items():
                logger.info(f"  {component_name}: {c_time:.2f} ms")
            logger.info("-" * 30)

    logger.removeHandler(fh)
    fh.close()

In [12]:
if __name__ == "__main__":
    image_folder = '/content/drive/My Drive/neureality_test_task'
    save_folder_gpu = '/content/drive/My Drive/output_images_gpu'
    save_folder_cpu = '/content/drive/My Drive/output_images_cpu'

    log_file_gpu = '/content/drive/My Drive/performance_log_gpu.txt'
    log_file_cpu = '/content/drive/My Drive/performance_log_cpu.txt'

    cuda_available = torch.cuda.is_available()

    if cuda_available:
        logging.info("Running on GPU...")
        run_pipeline(image_folder, save_folder_gpu, log_file_gpu, batch_size=1, device='cuda')
    else:
        logging.info("CUDA is not available. Skipping GPU run.")

    logging.info("Running on CPU...")
    run_pipeline(image_folder, save_folder_cpu, log_file_cpu, batch_size=1, device='cpu')

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:02<00:00, 63.0MB/s]
INFO:PipelineLogger:Device: CUDA
INFO:PipelineLogger:Image: gettyimages-76208034-612x612.jpg
INFO:PipelineLogger:Loading time: 581.72 ms
INFO:PipelineLogger:Preprocessing time: 4.36 ms
INFO:PipelineLogger:Inference time: 1911.06 ms
INFO:PipelineLogger:Postprocessing time: 309.64 ms
INFO:PipelineLogger:Component times:
INFO:PipelineLogger:  transform: 98.42 ms
INFO:PipelineLogger:  backbone: 722.84 ms
INFO:PipelineLogger:  rpn: 905.90 ms
INFO:PipelineLogger:  roi_heads: 183.54 ms
INFO:PipelineLogger:  postprocess: 0.31 ms
INFO:PipelineLogger:------------------------------
INFO:PipelineLogger:Device: CUDA
INFO:PipelineLogger:Image: gettyimages-1392016982-612x612.jpg
INFO:PipelineLogger:Loading time: 417.81 ms
INFO:PipelineLogger:Preprocessing time: 2.63 ms
INFO:Pipeli