In [1]:
# Cell 1: Image Loader Utility
import os
from PIL import Image
import numpy as np
from pathlib import Path
from typing import List, Tuple, Optional
import time

class ImageLoader:
    """Handle loading and saving images for processing pipeline"""

    def __init__(self, dataset_path: str):
        self.dataset_path = Path(dataset_path)
        if not self.dataset_path.exists():
            raise ValueError(f"Dataset path does not exist: {dataset_path}")

    def get_image_paths(self, limit: Optional[int] = None) -> List[Path]:
        """Get list of all JPG images in dataset"""
        image_paths = sorted(self.dataset_path.glob("*.jpg"))

        if limit:
            image_paths = image_paths[:limit]

        print(f"Found {len(image_paths)} images")
        return image_paths

    def load_image(self, image_path: Path) -> np.ndarray:
        """Load single image as NumPy array"""
        img = Image.open(image_path)
        # Convert to RGB if needed (some images might be grayscale)
        if img.mode != 'RGB':
            img = img.convert('RGB')
        return np.array(img, dtype=np.float32)

    def save_image(self, image_array: np.ndarray, output_path: Path):
        """Save NumPy array as image"""
        # Ensure output directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Clip value to valid range and convert to uint8
        image_array = np.clip(image_array, 0, 255).astype(np.uint8)

        img = Image.fromarray(image_array)
        img.save(output_path, quality=95)

    def load_batch(self, image_paths: List[Path], max_size: Optional[Tuple[int, int]] = None) -> List[np.ndarray]:
        """Load multiple images, optionally resizing"""
        images = []
        for path in image_paths:
            img = self.load_image(path)

            # Optional resize for testing with smaller images
            if max_size:
                img_pil = Image.fromarray(img.astype(np.uint8))
                img_pil.thumbnail(max_size, Image.Resampling.LANCZOS)
                img = np.array(img_pil, dtype=np.float32)

            image.append(img)

# Test for loader
print("Testing ImageLoader....")
loader = ImageLoader("/nfs/shared/projects/image/datasets/coco/train2017")

# Get first 5 images
test_paths = loader.get_image_paths(limit=5)
print(f"\nFirst image path: {test_paths[0]}")

test_img = loader.load_image(test_paths[0])
print(f"Image shape: {test_img.shape}")
print(f"Image dtype: {test_img.dtype}")
print(f"Value range [{test_img.min():.1f}, {test_img.max():.1f}]")
        

Testing ImageLoader....
Found 5 images

First image path: /nfs/shared/projects/image/datasets/coco/train2017/000000000009.jpg
Image shape: (480, 640, 3)
Image dtype: float32
Value range [0.0, 255.0]


In [3]:
# Cell 2 Timing utility
import time
from contextlib import contextmanager
from typing import Dict
import json

class PerformanceTimer:
    """Track performance metrics for image processing"""

    def __init__(self):
        self.metrics: Dict[str, list] = {}

    @contextmanager
    def measure(self, operation_name: str):
        """Context manager for timing operations"""
        start = time.perf_counter()
        try:
            yield
        finally:
            elapsed = time.perf_counter() - start
            if operation_name not in self.metrics:
                self.metrics[operation_name] = []
            self.metrics[operation_name].append(elapsed)

    def get_stats(self, operation_name: str) -> Dict[str, float]:
        """Get statistics for an operation"""
        if operation_name not in self.metrics:
                return {}
            
        times = self.metrics[operation_name]
        return {
            'count': len(times),
            'total': sum(times),
            'mean': sum(times) / len(times),
            'min': min(times),
            'max': max(times),
            'images_per_second': len(times) / sum(times) if sum(times) > 0 else 0       
        }

    def print_summary(self):
        """Print sumary of all measurements"""
        print("\n=== Performance Summary ===")
        for op_name in sorted(self.metrics.keys()):
            stats = self.get_stats(op_name)
            print(f"\n{op_name}")
            print(f"  Images processed: {stats['count']}")
            print(f"  Total time: {stats['total']:.2f}s")
            print(f"  Mean time: {stats['mean']*1000:.2f}ms per image")
            print(f"  Throughput: {stats['images_per_second']:.2f} images/sec")
    
    def save_to_file(self, filepath: Path):
        """Save metrics to JSON file"""
        summary = {op: self.get_stats(op) for op in self.metrics.keys()}
        with open(filepath, 'w') as f:
            json.dump(summary, indent=2, fp=f)
        print(f"Metrics saved to {filepath}")

# Test for timer
print("Testing PerformanceTimer...")
timer = PerformanceTimer()

# Simulate some processing
for i in range(5):
    with timer.measure("test_operation"):
        time.sleep(0.1) # Simulate work

timer.print_summary()            

Testing PerformanceTimer...

=== Performance Summary ===

test_operation
  Images processed: 5
  Total time: 0.50s
  Mean time: 100.16ms per image
  Throughput: 9.98 images/sec
