In [29]:

import os
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
import time
import pickle
from pathlib import Path
import warnings
import zstandard as zstd
import io
import torch
import torch.nn as nn
import torch.nn.functional as F
import pickle
from torchviz import make_dot
from torchinfo import summary
import random
import shutil
import csv
import requests
import scipy.stats

warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x174441810>

In [32]:
def download_images(csv_file):
    with open(csv_file, 'r') as file:
        reader = csv.reader(file)
        reader.__next__()  # Skip header
        for row in reader:
            tiff_link = row[2]
            tiff_name = row[0] + '.tiff'
            tiff_path = os.path.join('images', tiff_name)
            if not os.path.exists(tiff_path):
                response = requests.get(tiff_link)
                with open(tiff_path, 'wb') as img_file:
                    img_file.write(response.content)
                print(f'Downloaded: {tiff_name}')

os.makedirs('images', exist_ok=True)
download_images('RAISE_8.csv')

Downloaded: r75abf0a6t.tiff
Downloaded: r1301676dt.tiff
Downloaded: r0cea5432t.tiff
Downloaded: raecffc3et.tiff
Downloaded: re8712078t.tiff
Downloaded: r11e134c7t.tiff
Downloaded: r45f0bdf7t.tiff
Downloaded: r647d6726t.tiff
Downloaded: r28360ddbt.tiff
Downloaded: r1a9de6e9t.tiff
Downloaded: r5a671b67t.tiff
Downloaded: rdb736502t.tiff
Downloaded: r56d61c92t.tiff
Downloaded: r66327327t.tiff
Downloaded: r64b46b1et.tiff
Downloaded: rca0a29eft.tiff
Downloaded: r4091b09ct.tiff
Downloaded: raaebdc82t.tiff
Downloaded: r8500c118t.tiff
Downloaded: rd169bc43t.tiff
Downloaded: r069e346et.tiff
Downloaded: r3bacc1d2t.tiff
Downloaded: r39436d87t.tiff
Downloaded: r95b08e31t.tiff
Downloaded: r885079b8t.tiff
Downloaded: rad5b5878t.tiff
Downloaded: r4359075bt.tiff
Downloaded: r65b20842t.tiff
Downloaded: r8abcf61dt.tiff
Downloaded: r0c5e9567t.tiff
Downloaded: r8578eafdt.tiff
Downloaded: r179ec4e8t.tiff
Downloaded: rc5c4bbd3t.tiff
Downloaded: r209f6d4at.tiff
Downloaded: r9c8bd24et.tiff
Downloaded: rbfb29a7

In [20]:
def get_descriptive_statistics(directory, output_dir_base):
    """
    Analyzes an image dataset using a streaming approach to conserve memory.

    Args:
        directory (str): Path to the image directory.
        output_dir_base (str): Base directory for output.
    """
    if not os.path.exists(directory):
        print(f"Error: Directory '{directory}' not found.")
        return

    output_dir = os.path.join(output_dir_base, os.path.basename(directory))
    os.makedirs(output_dir, exist_ok=True)
    
    image_files = [f for f in Path(directory).glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff']]
    if not image_files:
        print(f"No images found in '{directory}'. Skipping.")
        return

    # Streaming variables for calculations
    num_images = 0
    file_sizes = []
    
    widths = []
    heights = []
    aspect_ratios = []

    # New metrics for each image
    avg_stds = []
    laplacian_variances = []
    image_entropies = []
    unique_colors_list = []
    
    # Pre-allocated histogram bins
    intensity_bins = np.zeros(256, dtype=int)

    print(f"Starting comprehensive analysis of images in '{directory}'...")
    for img_path in image_files:
        try:
            num_images += 1
            file_size_kb = os.path.getsize(img_path) / 1024.0
            file_sizes.append(file_size_kb)

            with Image.open(img_path) as img:
                img_width, img_height = img.size
                widths.append(img_width)
                heights.append(img_height)
                aspect_ratios.append(img_width / img_height)
                
                # Convert to RGB to handle grayscale images consistently for unique colors
                rgb_img = img.convert('RGB')
                unique_colors_list.append(len(rgb_img.getcolors(maxcolors=rgb_img.size[0]*rgb_img.size[1])))

                # Convert to grayscale for consistent intensity, texture, and sharpness analysis
                gray_img_array = np.array(img.convert('L'))
                
                # Pixel Intensity Stats
                pixels = gray_img_array.flatten()
                
                # Add to histogram bins
                counts = np.bincount(pixels, minlength=256)
                intensity_bins += counts
                
                # Texture & Complexity
                avg_stds.append(np.std(pixels))
                image_entropies.append(scipy.stats.entropy(counts / counts.sum()))
                
                # Sharpness (Laplacian Variance)
                laplacian = cv2.Laplacian(gray_img_array, cv2.CV_64F)
                laplacian_variances.append(laplacian.var())

        except Exception as e:
            print(f"Warning: Could not process '{img_path}'. Reason: {e}")
            continue

    if num_images == 0:
        print(f"No valid images processed in '{directory}'.")
        return
    
    # Final Calculations and Summary
    print("\n--- Summary Statistics ---")
    print(f"Total images processed: {num_images}")
    print(f"Average Dimensions: {np.mean(widths):.0f} x {np.mean(heights):.0f} pixels")
    print(f"Average File Size: {np.mean(file_sizes):.2f} KB")
    print(f"Average Aspect Ratio: {np.mean(aspect_ratios):.2f}")
    print(f"Average Unique Colors: {np.mean(unique_colors_list):.0f}")
    print(f"Average Texture (Std Dev): {np.mean(avg_stds):.2f}")
    print(f"Average Sharpness (Laplacian Variance): {np.mean(laplacian_variances):.2f}")
    
    # Plotting
    plt.style.use('seaborn-v0_8-whitegrid')

    # Histogram for Aspect Ratio
    plt.figure(figsize=(10, 6))
    plt.hist(aspect_ratios, bins=50, edgecolor='black', alpha=0.7, color='skyblue')
    plt.title(f'Aspect Ratio Distribution for {os.path.basename(directory)}')
    plt.xlabel('Aspect Ratio (Width / Height)')
    plt.ylabel('Frequency')
    plt.savefig(os.path.join(output_dir, 'aspect_ratio_distribution.png'))
    plt.close()

    # Histogram for Pixel Intensity Standard Deviation (Texture)
    plt.figure(figsize=(10, 6))
    plt.hist(avg_stds, bins=50, edgecolor='black', alpha=0.7, color='purple')
    plt.title(f'Pixel Intensity Standard Deviation (Texture) Distribution for {os.path.basename(directory)}')
    plt.xlabel('Pixel Intensity Std Dev')
    plt.ylabel('Frequency')
    plt.savefig(os.path.join(output_dir, 'pixel_std_dev_distribution.png'))
    plt.close()

    # Histogram for Laplacian Variance (Sharpness)
    plt.figure(figsize=(10, 6))
    plt.hist(laplacian_variances, bins=50, edgecolor='black', alpha=0.7, color='coral')
    plt.title(f'Laplacian Variance (Sharpness) Distribution for {os.path.basename(directory)}')
    plt.xlabel('Laplacian Variance')
    plt.ylabel('Frequency')
    plt.savefig(os.path.join(output_dir, 'laplacian_variance_distribution.png'))
    plt.close()
    
    # Scatter Plot: File Size vs. Sharpness
    plt.figure(figsize=(10, 6))
    plt.scatter(laplacian_variances, file_sizes, alpha=0.6, color='seagreen')
    plt.title(f'File Size vs. Sharpness for {os.path.basename(directory)}')
    plt.xlabel('Laplacian Variance (Sharpness)')
    plt.ylabel('File Size (KB)')
    plt.savefig(os.path.join(output_dir, 'file_size_vs_sharpness.png'))
    plt.close()

    plt.figure(figsize=(10, 6))
    plt.bar(range(256), intensity_bins, color='lightgreen', width=1.0, edgecolor='black', linewidth=0.5)
    plt.title(f'Pixel Intensity Distribution for {os.path.basename(directory)}')
    plt.xlabel('Pixel Intensity (0-255)')
    plt.ylabel('Total Pixel Count')
    plt.savefig(os.path.join(output_dir, 'pixel_intensity_distribution.png'))
    plt.close()

    
    # Scatter Plot: File Size vs. Texture
    plt.figure(figsize=(10, 6))
    plt.scatter(avg_stds, file_sizes, alpha=0.6, color='darkorange')
    plt.title(f'File Size vs. Texture for {os.path.basename(directory)}')
    plt.xlabel('Pixel Intensity Std Dev (Texture)')
    plt.ylabel('File Size (KB)')
    plt.savefig(os.path.join(output_dir, 'file_size_vs_texture.png'))
    plt.close()

    print(f"\nAll plots and statistics for '{directory}' saved to '{output_dir}/'")


images_dir = 'images'
output_dir_base = 'descriptive_stats'

print("Starting comprehensive streaming dataset analysis...")

get_descriptive_statistics(images_dir, output_dir_base)

print("\nAnalysis complete.")


Starting comprehensive streaming dataset analysis...
Starting comprehensive analysis of images in 'images'...

--- Summary Statistics ---
Total images processed: 271
Average Dimensions: 4312 x 3479 pixels
Average File Size: 23249.44 KB
Average Aspect Ratio: 1.30
Average Unique Colors: 903601
Average Texture (Std Dev): 55.90
Average Sharpness (Laplacian Variance): 377.93

All plots and statistics for 'images' saved to 'descriptive_stats/images/'

Analysis complete.


In [21]:
def train_test_split(path_to_images_folder, test_size=0.2):
    # Convert path to a Path object for easier handling
    path_to_images = Path(path_to_images_folder)

    # Check if the source directory exists
    if not path_to_images.is_dir():
        print(f"Error: The specified folder does not exist at {path_to_images}")
        return

    # Define the output directories
    train_dir = Path('TRAIN')
    test_dir = Path('TEST')

    # Create the output directories if they don't exist
    for directory in [train_dir, test_dir]:
        directory.mkdir(exist_ok=True)

    # Get a list of all image files in the source folder
    # We'll use a simple approach, assuming image files have common extensions
    all_files = [f for f in path_to_images.iterdir() if f.is_file() and f.suffix.lower() in ['.tiff']]

    # Shuffle the list of files to ensure randomness
    random.shuffle(all_files)

    # Calculate the number of files for the test set
    test_count = int(len(all_files) * test_size)
    
    # Split the files into test and train sets
    test_files = all_files[:test_count]
    train_files = all_files[test_count:]

    print(f"Total files found: {len(all_files)}")
    print(f"Copying {len(train_files)} files to {train_dir}")

    for file in train_files:
        shutil.copy(str(file), str(train_dir / file.name))

    print(f"Copying {len(test_files)} files to {test_dir}")

    for file in test_files:
        shutil.copy(str(file), str(test_dir / file.name))
        
    print("Train-test split complete!")

train_test_split('images', test_size=0.2)

Total files found: 271
Copying 217 files to TRAIN
Copying 54 files to TEST
Train-test split complete!


In [22]:
def load_image(path, resize=None):
    """Load an image and convert to RGB."""
    try:
        rgb = cv2.imread(str(path))
        if rgb is None:
            return None
        rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB)
        if resize is not None:
            rgb = cv2.resize(rgb, resize, interpolation=cv2.INTER_AREA)
        return rgb
    except Exception:
        return None


def calculate_metrics(original, reconstructed):
    """
    Calculate compression metrics, handling cases where images are identical.
    """
    original_gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
    reconstructed_gray = cv2.cvtColor(reconstructed, cv2.COLOR_RGB2GRAY)
    
    # Check if images are identical; if so, SSIM is 1.0 to avoid 'inf'
    if np.all(original == reconstructed):
        psnr_val = psnr(original, reconstructed, data_range=255)
        ssim_val = 1.0  
    else:
        psnr_val = psnr(original, reconstructed, data_range=255)
        ssim_val = ssim(original_gray, reconstructed_gray, data_range=255)
    
    return {'psnr': psnr_val, 'ssim': ssim_val}

def evaluate_raw_jpeg_compression(
    images_dir='TEST',
    output_dir='raw_jpeg_results',
    target_size=(256, 256),
    num_examples=10,
    jpeg_quality=90
):
    """
    Evaluates raw JPEG compression performance on a dataset of images.

    Args:
        images_dir (str): Directory containing the images to evaluate.
        output_dir (str): Directory to save the results and plots.
        target_size (tuple): The size to resize images to (width, height).
        num_examples (int): The number of example images to save.
        jpeg_quality (int): The JPEG quality level (0-100).
    """
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'examples'), exist_ok=True)
    
    image_files = [f for f in Path(images_dir).glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']]
    if not image_files:
        print(f"No images found in {images_dir}")
        return

    results = []
    
    print(f"Evaluating raw JPEG compression with quality={jpeg_quality}...")
    
    for img_path in tqdm(image_files, desc="Processing images"):
        original_img = load_image(img_path, resize=target_size)
        if original_img is None:
            continue

        # Step 1: Compress with JPEG to a memory buffer
        img_pil = Image.fromarray(original_img)
        jpeg_buffer = io.BytesIO()
        img_pil.save(jpeg_buffer, format='JPEG', quality=jpeg_quality)
        jpeg_buffer.seek(0)
        
        # Get compressed size
        jpeg_size = len(jpeg_buffer.getvalue())

        # Step 2: Decompress from the buffer to get the reconstructed image
        reconstructed_img = np.array(Image.open(jpeg_buffer))

        # Step 3: Calculate metrics
        metrics = calculate_metrics(original_img, reconstructed_img)
        
        original_bytes = target_size[0] * target_size[1] * 3
        compression_ratio = original_bytes / jpeg_size
        bits_per_pixel = (jpeg_size * 8) / (target_size[0] * target_size[1])
        
        results.append({
            'image_name': img_path.name,
            'psnr': metrics['psnr'],
            'ssim': metrics['ssim'],
            'compression_ratio': compression_ratio,
            'bits_per_pixel': bits_per_pixel,
            'original_size': original_bytes,
            'compressed_size': jpeg_size
        })

    # --- Analysis and Visualization ---
    df = pd.DataFrame(results)
    df.to_csv(os.path.join(output_dir, 'raw_jpeg_results.csv'), index=False)
    
    # Plotting
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    sns.histplot(df.replace([np.inf, -np.inf], np.nan)['psnr'], bins=20, ax=axes[0])
    axes[0].set_title('Raw JPEG PSNR Distribution')
    sns.histplot(df['compression_ratio'], bins=20, ax=axes[1])
    axes[1].set_title('Raw JPEG Compression Ratio Distribution')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'raw_jpeg_metrics.png'))
    plt.close()
    
    # Save a few example images
    for i in range(min(num_examples, len(image_files))):
        img_path = image_files[i]
        original_img = load_image(img_path, resize=target_size)
        
        img_pil = Image.fromarray(original_img)
        jpeg_buffer = io.BytesIO()
        img_pil.save(jpeg_buffer, format='JPEG', quality=jpeg_quality)
        jpeg_buffer.seek(0)
        reconstructed_img = np.array(Image.open(jpeg_buffer))
        
        metrics = calculate_metrics(original_img, reconstructed_img)
        
        fig, axes = plt.subplots(1, 2, figsize=(12, 6))
        axes[0].imshow(original_img)
        axes[0].set_title('Original')
        axes[0].axis('off')
        
        axes[1].imshow(reconstructed_img)
        axes[1].set_title(f'JPEG Reconstruction\nPSNR: {metrics["psnr"]:.2f} dB, SSIM: {metrics["ssim"]:.4f}')
        axes[1].axis('off')
        
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'examples', f'jpeg_example_{i:02d}.png'), dpi=150, bbox_inches='tight')
        plt.close()

    print(f"\nEvaluation complete. Results saved to '{output_dir}/'")

evaluate_raw_jpeg_compression()

Evaluating raw JPEG compression with quality=90...


Processing images: 100%|██████████| 54/54 [00:13<00:00,  3.88it/s]



Evaluation complete. Results saved to 'raw_jpeg_results/'


In [23]:

class ImageDataset(Dataset):
    """Dataset for training the autoencoder on images."""
    def __init__(self, images):
        self.images = images
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        image_tensor = torch.from_numpy(image).float() / 255.0
        image_tensor = image_tensor.permute(2, 0, 1)
        return image_tensor

class SimpleAutoencoder(nn.Module):
    def __init__(self, input_channels=3, input_size=256, latent_dim=128):
        super(SimpleAutoencoder, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, 4, stride=2, padding=1), # 128x128
            nn.ReLU(True),
            nn.Conv2d(32, 64, 4, stride=2, padding=1),  # 64x64
            nn.ReLU(True),
            nn.Conv2d(64, 128, 4, stride=2, padding=1),  # 32x32
            nn.ReLU(True),
            nn.Conv2d(128, 256, 4, stride=2, padding=1), # 16x16
            nn.ReLU(True),
        )

        # Bottleneck to compress the latent space
        self.flatten = nn.Flatten()
        self.encoder_output_size = 16 * 16 * 256
        self.bottleneck = nn.Linear(self.encoder_output_size, self.latent_dim)
        
        # Decoder
        self.decoder_linear = nn.Linear(self.latent_dim, self.encoder_output_size)
        self.unflatten = nn.Unflatten(1, (256, 16, 16))
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1), # 32x32
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),  # 64x64
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),   # 128x128
            nn.ReLU(True),
            nn.ConvTranspose2d(32, input_channels, 4, stride=2, padding=1), # 256x256
            nn.Sigmoid()
        )
        
    def encode(self, x):
        x = self.encoder(x)
        x = self.flatten(x)
        return self.bottleneck(x)
    
    def decode(self, x):
        x = self.decoder_linear(x)
        x = self.unflatten(x)
        return self.decoder(x)
        
    def forward(self, x):
        latent = self.encode(x)
        return self.decode(latent)

class SimpleAutoencoderExperiment:
    """Main class for conducting the simple autoencoder compression experiment."""
    
    def __init__(self, train_dir='TRAIN', test_dir='TEST', output_dir='autoencoder_results', 
                 max_train_images=200, max_test_images=50, target_size=(256, 256)):
        self.train_dir = train_dir
        self.test_dir = test_dir
        self.output_dir = output_dir
        self.max_train_images = max_train_images
        self.max_test_images = max_test_images
        self.target_size = target_size
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
        
        print(f"Using device: {self.device}")
        
        # Create output directories
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(f"{output_dir}/examples", exist_ok=True)
        os.makedirs(f"{output_dir}/models", exist_ok=True)
        
        # Get list of training and test images
        self.train_files = self._get_image_files(self.train_dir, self.max_train_images)
        self.test_files = self._get_image_files(self.test_dir, self.max_test_images)
        print(f"Found {len(self.train_files)} training images and {len(self.test_files)} test images.")
        
        # Results storage
        self.results = []
        
    def _get_image_files(self, directory, max_files):
        """Get list of image files from specified directory."""
        image_files = []
        for ext in ['.tiff', '.tif', '.jpg', '.jpeg', '.png', '.bmp']:
            image_files.extend(Path(directory).glob(f'*{ext}'))
        return sorted(image_files)[:max_files]
    
    def load_image(self, path, resize=None):
        """Load an image and convert to RGB."""
        try:
            if not os.path.exists(path):
                print(f"Error: File not found at {path}")
                return None
            rgb = cv2.imread(str(path))
            if rgb is None:
                print(f"Error: Could not read image at {path}.")
                return None
            rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB)
            if resize is not None:
                rgb = cv2.resize(rgb, resize, interpolation=cv2.INTER_AREA)
            return rgb
        except Exception as e:
            print(f"Error loading {path}: {e}")
            return None
    
    def save_image_comparison(self, original, reconstructed, filename, metrics=None):
        """Save side-by-side comparison of original and reconstructed images."""
        fig, axes = plt.subplots(1, 2, figsize=(12, 6))
        
        axes[0].imshow(original)
        axes[0].set_title('Original')
        axes[0].axis('off')
        
        axes[1].imshow(reconstructed)
        title = 'Reconstructed'
        if metrics:
            title += f'\nPSNR: {metrics["psnr"]:.2f} dB, SSIM: {metrics["ssim"]:.4f}'
        axes[1].set_title(title)
        axes[1].axis('off')
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/examples/{filename}', dpi=150, bbox_inches='tight')
        plt.close()
    
    def calculate_metrics(self, original, reconstructed):
        """Calculate compression metrics."""
        if len(original.shape) == 3:
            original_gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
            reconstructed_gray = cv2.cvtColor(reconstructed, cv2.COLOR_RGB2GRAY)
        else:
            original_gray = original
            reconstructed_gray = reconstructed
            
        psnr_val = psnr(original, reconstructed, data_range=255)
        ssim_val = ssim(original_gray, reconstructed_gray, data_range=255)
        
        return {'psnr': psnr_val, 'ssim': ssim_val}
    
    def train_autoencoder(self, images, epochs=100, batch_size=8, learning_rate=0.001):
        """Train autoencoder on images."""
        dataset = ImageDataset(images)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        input_channels = 3
        model = SimpleAutoencoder(input_channels=input_channels, input_size=self.target_size[0]).to(self.device)
        
        total_params = sum(p.numel() for p in model.parameters())
        print(f"Autoencoder initialized with {total_params:,} parameters")
        print(f"Training on {len(images)} images")
        
        criterion = nn.MSELoss()
        optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
        
        model.train()
        best_loss = float('inf')
        patience_counter = 0
        train_losses = []
        
        for epoch in range(epochs):
            total_loss = 0
            for batch in dataloader:
                batch = batch.to(self.device)
                optimizer.zero_grad()
                reconstructed = model(batch)
                loss = criterion(reconstructed, batch)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                total_loss += loss.item()
            
            avg_loss = total_loss / len(dataloader)
            train_losses.append(avg_loss)
            
            if avg_loss < best_loss:
                best_loss = avg_loss
                patience_counter = 0
                torch.save(model.state_dict(), f'{self.output_dir}/models/best_autoencoder.pth')
            else:
                patience_counter += 1
            
            if (epoch + 1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}")
            
            if patience_counter >= 25:
                print(f"Early stopping at epoch {epoch+1}")
                break
        
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses)
        plt.title('Training Loss')
        plt.xlabel('Epoch')
        plt.ylabel('MSE Loss')
        plt.grid(True, alpha=0.3)
        plt.savefig(f'{self.output_dir}/training_loss.png', dpi=150, bbox_inches='tight')
        plt.close()
        
        print(f"Training completed. Best loss: {best_loss:.6f}")
        model.load_state_dict(torch.load(f'{self.output_dir}/models/best_autoencoder.pth'))
        
        return model
    
    def compress_decompress_image(self, image, model):
        """Compress and decompress an image using the autoencoder."""
        start_time = time.time()
        
        # Normalization handled by dataset
        image_tensor = torch.from_numpy(image).float() / 255.0
        image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0).to(self.device)
        
        model.eval()
        with torch.no_grad():
            latent = model.encode(image_tensor)
            reconstructed_tensor = model.decode(latent)
        
        reconstructed = reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
        reconstructed = np.clip(reconstructed * 255.0, 0, 255).astype(np.uint8)
        
        # Estimate compressed size using a simple proxy
        latent_bytes = pickle.dumps(latent.cpu().numpy().astype(np.float32))
        compressed_size = len(latent_bytes)
        
        compression_time = time.time() - start_time
        
        return {
            'reconstructed': reconstructed,
            'compressed_size': compressed_size,
            'compression_time': compression_time
        }
    
    def analyze_results(self):
        """Analyze and visualize compression results."""
        if not self.results:
            print("No results to analyze!")
            return
            
        df = pd.DataFrame(self.results)
        
        original_size = self.target_size[0] * self.target_size[1] * 3  # RGB
        df['compression_ratio'] = (original_size * 8) / df['compressed_size']
        df['bits_per_pixel'] = df['compressed_size'] / (self.target_size[0] * self.target_size[1])
        
        df.to_csv(f'{self.output_dir}/autoencoder_results.csv', index=False)
        print(f"Results saved to {self.output_dir}/autoencoder_results.csv")
        
        self._plot_quality_analysis(df)
        self._plot_compression_analysis(df, original_size)
    
    def _plot_quality_analysis(self, df):
        """Plot PSNR and SSIM distributions."""
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        sns.histplot(df['psnr'], bins=20, ax=axes[0], color='skyblue', edgecolor='black')
        axes[0].set_xlabel('PSNR (dB)')
        axes[0].set_title('PSNR Distribution')
        axes[0].grid(True, alpha=0.3)
        
        sns.histplot(df['ssim'], bins=20, ax=axes[1], color='lightgreen', edgecolor='black')
        axes[1].set_xlabel('SSIM')
        axes[1].set_title('SSIM Distribution')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/quality_analysis.png', dpi=300)
        plt.close()
    
    def _plot_compression_analysis(self, df, original_size):
        """Plot compression ratio and BPP vs quality."""
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        axes[0].scatter(df['compression_ratio'], df['psnr'], alpha=0.6, color='purple')
        axes[0].set_xlabel('Compression Ratio')
        axes[0].set_ylabel('PSNR (dB)')
        axes[0].set_title('Compression Ratio vs Quality')
        axes[0].grid(True, alpha=0.3)

        axes[1].scatter(df['bits_per_pixel'], df['psnr'], alpha=0.6, color='orange')
        axes[1].set_xlabel('Bits per Pixel (bpp)')
        axes[1].set_ylabel('PSNR (dB)')
        axes[1].set_title('Rate-Distortion Curve')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/compression_analysis.png', dpi=300)
        plt.close()

def run_simple_auto_encoder_experiment():
    """Main function to run the standalone autoencoder experiment."""
    print("Starting Standalone Autoencoder Experiment")
    print("=" * 50)
    
    experiment = SimpleAutoencoderExperiment(
        max_train_images=200, 
        max_test_images=50,
        target_size=(256, 256)
    )
    
    print("\n" + "="*50)
    print("PHASE 1: LOADING TRAINING IMAGES")
    print("="*50)
    
    training_images = []
    for train_file in tqdm(experiment.train_files, desc="Loading training images"):
        image = experiment.load_image(train_file, resize=experiment.target_size)
        if image is not None:
            training_images.append(image)
    
    print(f"Successfully loaded {len(training_images)} training images")
    
    print("\n" + "="*50)
    print("PHASE 2: TRAINING AUTOENCODER")
    print("="*50)
    
    if training_images:
        autoencoder_model = experiment.train_autoencoder(training_images, epochs=100)
        print("Autoencoder training completed!")
    else:
        print("No training images available!")
        return
    
    print("\n" + "="*50)
    print("PHASE 3: TESTING ON TEST IMAGES")
    print("="*50)
    
    for i, test_file in enumerate(tqdm(experiment.test_files, desc="Processing test images")):
        image = experiment.load_image(test_file, resize=experiment.target_size)
        if image is None:
            continue
        
        result = experiment.compress_decompress_image(image, autoencoder_model)
        
        metrics = experiment.calculate_metrics(image, result['reconstructed'])
        
        experiment.results.append({
            'image_id': i,
            'image_name': test_file.name,
            'compressed_size': result['compressed_size'],
            'psnr': metrics['psnr'],
            'ssim': metrics['ssim'],
            'compression_time': result['compression_time']
        })
        
        if i < 10:
            experiment.save_image_comparison(
                image, 
                result['reconstructed'], 
                f"example_{i:03d}_{test_file.stem}.png",
                metrics
            )
    
    print("\n" + "="*50)
    print("PHASE 4: ANALYZING RESULTS")
    print("="*50)
    experiment.analyze_results()
    
    print(f"\nExperiment completed successfully!")
    print(f"Results saved to {experiment.output_dir}/")
    print(f"Model weights saved to {experiment.output_dir}/models/")
    print(f"Example images saved to {experiment.output_dir}/examples/")

run_simple_auto_encoder_experiment()

Starting Standalone Autoencoder Experiment
Using device: mps
Found 200 training images and 50 test images.

PHASE 1: LOADING TRAINING IMAGES


Loading training images: 100%|██████████| 200/200 [00:50<00:00,  3.93it/s]


Successfully loaded 200 training images

PHASE 2: TRAINING AUTOENCODER
Autoencoder initialized with 18,222,915 parameters
Training on 200 images
Epoch [5/100], Loss: 0.048520
Epoch [10/100], Loss: 0.036996
Epoch [15/100], Loss: 0.032570
Epoch [20/100], Loss: 0.029977
Epoch [25/100], Loss: 0.025756
Epoch [30/100], Loss: 0.021420
Epoch [35/100], Loss: 0.018286
Epoch [40/100], Loss: 0.016204
Epoch [45/100], Loss: 0.014666
Epoch [50/100], Loss: 0.012181
Epoch [55/100], Loss: 0.011980
Epoch [60/100], Loss: 0.010346
Epoch [65/100], Loss: 0.010331
Epoch [70/100], Loss: 0.009119
Epoch [75/100], Loss: 0.008302
Epoch [80/100], Loss: 0.007539
Epoch [85/100], Loss: 0.007632
Epoch [90/100], Loss: 0.008419
Epoch [95/100], Loss: 0.006858
Epoch [100/100], Loss: 0.006652
Training completed. Best loss: 0.006652
Autoencoder training completed!

PHASE 3: TESTING ON TEST IMAGES


Processing test images: 100%|██████████| 50/50 [00:14<00:00,  3.41it/s]



PHASE 4: ANALYZING RESULTS
Results saved to autoencoder_results/autoencoder_results.csv

Experiment completed successfully!
Results saved to autoencoder_results/
Model weights saved to autoencoder_results/models/
Example images saved to autoencoder_results/examples/


In [24]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, input_channels=3, input_size=256, latent_dim=128):
        super(VariationalAutoencoder, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, 4, stride=2, padding=1), # 128x128
            nn.ReLU(True),
            nn.Conv2d(32, 64, 4, stride=2, padding=1),  # 64x64
            nn.ReLU(True),
            nn.Conv2d(64, 128, 4, stride=2, padding=1),  # 32x32
            nn.ReLU(True),
            nn.Conv2d(128, 256, 4, stride=2, padding=1), # 16x16
            nn.ReLU(True),
        )

        # Bottleneck
        self.flatten = nn.Flatten()
        self.encoder_output_size = 16 * 16 * 256
        self.fc_mu = nn.Linear(self.encoder_output_size, self.latent_dim)
        self.fc_logvar = nn.Linear(self.encoder_output_size, self.latent_dim)
        
        # Decoder
        self.decoder_linear = nn.Linear(self.latent_dim, self.encoder_output_size)
        self.unflatten = nn.Unflatten(1, (256, 16, 16))
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1), # 32x32
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1), # 64x64
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),  # 128x128
            nn.ReLU(True),
            nn.ConvTranspose2d(32, input_channels, 4, stride=2, padding=1), # 256x256
            nn.Sigmoid()
        )
        
    def encode(self, x):
        x = self.encoder(x)
        x = self.flatten(x)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        x = self.decoder_linear(z)
        x = self.unflatten(x)
        return self.decoder(x)
        
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# VAE Loss function
def vae_loss(recon_x, x, mu, logvar):
    # Reconstruction loss (BCE or MSE)
    recon_loss = F.binary_cross_entropy(recon_x, x, reduction='sum')
    
    # KL-divergence loss
    # KL_D = -0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + kl_div

class VariationalAutoencoderExperiment:
    """Main class for conducting the standalone VAE compression experiment."""
    
    def __init__(self, train_dir='TRAIN', test_dir='TEST', output_dir='vae_results', 
                 max_train_images=200, max_test_images=50, target_size=(256, 256)):
        self.train_dir = train_dir
        self.test_dir = test_dir
        self.output_dir = output_dir
        self.max_train_images = max_train_images
        self.max_test_images = max_test_images
        self.target_size = target_size
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
        
        print(f"Using device: {self.device}")
        
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(f"{output_dir}/examples", exist_ok=True)
        os.makedirs(f"{output_dir}/models", exist_ok=True)
        
        self.train_files = self._get_image_files(self.train_dir, self.max_train_images)
        self.test_files = self._get_image_files(self.test_dir, self.max_test_images)
        print(f"Found {len(self.train_files)} training images and {len(self.test_files)} test images.")
        
        self.results = []
        
    def _get_image_files(self, directory, max_files):
        """Get list of image files from specified directory."""
        image_files = []
        for ext in ['.tiff', '.tif', '.jpg', '.jpeg', '.png', '.bmp']:
            image_files.extend(Path(directory).glob(f'*{ext}'))
        return sorted(image_files)[:max_files]
    
    def load_image(self, path, resize=None):
        """Load an image and convert to RGB."""
        try:
            if not os.path.exists(path):
                print(f"Error: File not found at {path}")
                return None
            rgb = cv2.imread(str(path))
            if rgb is None:
                print(f"Error: Could not read image at {path}.")
                return None
            rgb = cv2.cvtColor(rgb, cv2.COLOR_BGR2RGB)
            if resize is not None:
                rgb = cv2.resize(rgb, resize, interpolation=cv2.INTER_AREA)
            return rgb
        except Exception as e:
            print(f"Error loading {path}: {e}")
            return None
    
    def save_image_comparison(self, original, reconstructed, filename, metrics=None):
        """Save side-by-side comparison of original and reconstructed images."""
        fig, axes = plt.subplots(1, 2, figsize=(12, 6))
        
        axes[0].imshow(original)
        axes[0].set_title('Original')
        axes[0].axis('off')
        
        axes[1].imshow(reconstructed)
        title = 'Reconstructed'
        if metrics:
            title += f'\nPSNR: {metrics["psnr"]:.2f} dB, SSIM: {metrics["ssim"]:.4f}'
        axes[1].set_title(title)
        axes[1].axis('off')
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/examples/{filename}', dpi=150, bbox_inches='tight')
        plt.close()
    
    def calculate_metrics(self, original, reconstructed):
        """Calculate compression metrics."""
        # Check if images are identical; if so, PSNR is infinity and SSIM is 1.0
        if np.all(original == reconstructed):
            psnr_val = np.inf
            ssim_val = 1.0
        else:
            if len(original.shape) == 3:
                original_gray = cv2.cvtColor(original, cv2.COLOR_RGB2GRAY)
                reconstructed_gray = cv2.cvtColor(reconstructed, cv2.COLOR_RGB2GRAY)
            else:
                original_gray = original
                reconstructed_gray = reconstructed
                
            psnr_val = psnr(original, reconstructed, data_range=255)
            ssim_val = ssim(original_gray, reconstructed_gray, data_range=255)
        
        return {'psnr': psnr_val, 'ssim': ssim_val}
    
    def train_autoencoder(self, images, epochs=100, batch_size=8, learning_rate=0.001):
        """Train autoencoder on images."""
        dataset = ImageDataset(images)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        input_channels = 3
        model = VariationalAutoencoder(input_channels=input_channels, input_size=self.target_size[0]).to(self.device)
        
        total_params = sum(p.numel() for p in model.parameters())
        print(f"VAE initialized with {total_params:,} parameters")
        print(f"Training on {len(images)} images")
        
        optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
        
        model.train()
        best_loss = float('inf')
        patience_counter = 0
        train_losses = []
        
        for epoch in range(epochs):
            total_loss = 0
            for batch in dataloader:
                batch = batch.to(self.device)
                optimizer.zero_grad()
                reconstructed, mu, logvar = model(batch)
                loss = vae_loss(reconstructed, batch, mu, logvar)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()
                total_loss += loss.item()
            
            avg_loss = total_loss / len(dataloader)
            train_losses.append(avg_loss)
            
            if avg_loss < best_loss:
                best_loss = avg_loss
                patience_counter = 0
                torch.save(model.state_dict(), f'{self.output_dir}/models/best_vae.pth')
            else:
                patience_counter += 1
            
            if (epoch + 1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}")
            
            if patience_counter >= 25:
                print(f"Early stopping at epoch {epoch+1}")
                break
        
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses)
        plt.title('Training Loss')
        plt.xlabel('Epoch')
        plt.ylabel('VAE Loss')
        plt.grid(True, alpha=0.3)
        plt.savefig(f'{self.output_dir}/training_loss.png', dpi=150, bbox_inches='tight')
        plt.close()
        
        print(f"Training completed. Best loss: {best_loss:.6f}")
        model.load_state_dict(torch.load(f'{self.output_dir}/models/best_vae.pth'))
        
        return model
    
    def compress_decompress_image(self, image, model):
        """Compress and decompress an image using the autoencoder."""
        start_time = time.time()
        
        # Normalization handled by dataset
        image_tensor = torch.from_numpy(image).float() / 255.0
        image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0).to(self.device)
        
        model.eval()
        with torch.no_grad():
            # Get the mean vector as the latent representation for compression
            mu, _ = model.encode(image_tensor)
            reconstructed_tensor = model.decode(mu)
        
        reconstructed = reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
        reconstructed = np.clip(reconstructed * 255.0, 0, 255).astype(np.uint8)
        
        # Estimate compressed size using a simple proxy
        latent_bytes = pickle.dumps(mu.cpu().numpy().astype(np.float32))
        compressed_size = len(latent_bytes)
        
        compression_time = time.time() - start_time
        
        return {
            'reconstructed': reconstructed,
            'compressed_size': compressed_size,
            'compression_time': compression_time
        }
    
    def analyze_results(self):
        """Analyze and visualize compression results."""
        if not self.results:
            print("No results to analyze!")
            return
            
        df = pd.DataFrame(self.results)
        
        original_size = self.target_size[0] * self.target_size[1] * 3 # RGB
        df['compression_ratio'] = (original_size * 8) / df['compressed_size']
        df['bits_per_pixel'] = df['compressed_size'] / (self.target_size[0] * self.target_size[1])
        
        df.to_csv(f'{self.output_dir}/vae_results.csv', index=False)
        print(f"Results saved to {self.output_dir}/vae_results.csv")
        
        self._plot_quality_analysis(df)
        self._plot_compression_analysis(df, original_size)
    
    def _plot_quality_analysis(self, df):
        """Plot PSNR and SSIM distributions."""
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        sns.histplot(df['psnr'], bins=20, ax=axes[0], color='skyblue', edgecolor='black')
        axes[0].set_xlabel('PSNR (dB)')
        axes[0].set_title('PSNR Distribution')
        axes[0].grid(True, alpha=0.3)
        
        sns.histplot(df['ssim'], bins=20, ax=axes[1], color='lightgreen', edgecolor='black')
        axes[1].set_xlabel('SSIM')
        axes[1].set_title('SSIM Distribution')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/quality_analysis.png', dpi=300)
        plt.close()
    
    def _plot_compression_analysis(self, df, original_size):
        """Plot compression ratio and BPP vs quality."""
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        axes[0].scatter(df['compression_ratio'], df['psnr'], alpha=0.6, color='purple')
        axes[0].set_xlabel('Compression Ratio')
        axes[0].set_ylabel('PSNR (dB)')
        axes[0].set_title('Compression Ratio vs Quality')
        axes[0].grid(True, alpha=0.3)

        axes[1].scatter(df['bits_per_pixel'], df['psnr'], alpha=0.6, color='orange')
        axes[1].set_xlabel('Bits per Pixel (bpp)')
        axes[1].set_ylabel('PSNR (dB)')
        axes[1].set_title('Rate-Distortion Curve')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(f'{self.output_dir}/compression_analysis.png', dpi=300)
        plt.close()

def run_vae_experiment():
    """Main function to run the standalone autoencoder experiment."""
    print("Starting Standalone Autoencoder Experiment")
    print("=" * 50)
    
    experiment = VariationalAutoencoderExperiment(
        max_train_images=200, 
        max_test_images=50,
        target_size=(256, 256)
    )
    
    print("\n" + "="*50)
    print("PHASE 1: LOADING TRAINING IMAGES")
    print("="*50)
    
    training_images = []
    for train_file in tqdm(experiment.train_files, desc="Loading training images"):
        image = experiment.load_image(train_file, resize=experiment.target_size)
        if image is not None:
            training_images.append(image)
    
    print(f"Successfully loaded {len(training_images)} training images")
    
    print("\n" + "="*50)
    print("PHASE 2: TRAINING AUTOENCODER")
    print("="*50)
    
    if training_images:
        autoencoder_model = experiment.train_autoencoder(training_images, epochs=100)
        print("Autoencoder training completed!")
    else:
        print("No training images available!")
        return
    
    print("\n" + "="*50)
    print("PHASE 3: TESTING ON TEST IMAGES")
    print("="*50)
    
    for i, test_file in enumerate(tqdm(experiment.test_files, desc="Processing test images")):
        image = experiment.load_image(test_file, resize=experiment.target_size)
        if image is None:
            continue
        
        result = experiment.compress_decompress_image(image, autoencoder_model)
        
        metrics = experiment.calculate_metrics(image, result['reconstructed'])
        
        experiment.results.append({
            'image_id': i,
            'image_name': test_file.name,
            'compressed_size': result['compressed_size'],
            'psnr': metrics['psnr'],
            'ssim': metrics['ssim'],
            'compression_time': result['compression_time']
        })
        
        if i < 10:
            experiment.save_image_comparison(
                image, 
                result['reconstructed'], 
                f"example_{i:03d}_{test_file.stem}.png",
                metrics
            )
    
    print("\n" + "="*50)
    print("PHASE 4: ANALYZING RESULTS")
    print("="*50)
    experiment.analyze_results()
    
    print(f"\nExperiment completed successfully!")
    print(f"Results saved to {experiment.output_dir}/")
    print(f"Model weights saved to {experiment.output_dir}/models/")
    print(f"Example images saved to {experiment.output_dir}/examples/")

run_vae_experiment()



Starting Standalone Autoencoder Experiment
Using device: mps
Found 200 training images and 50 test images.

PHASE 1: LOADING TRAINING IMAGES


Loading training images: 100%|██████████| 200/200 [00:50<00:00,  3.96it/s]


Successfully loaded 200 training images

PHASE 2: TRAINING AUTOENCODER
VAE initialized with 26,611,651 parameters
Training on 200 images
Epoch [5/100], Loss: 970147.442500
Epoch [10/100], Loss: 918504.185000
Epoch [15/100], Loss: 910506.147500
Epoch [20/100], Loss: 901458.415000
Epoch [25/100], Loss: 885001.495000
Epoch [30/100], Loss: 869600.720000
Epoch [35/100], Loss: 857716.302500
Epoch [40/100], Loss: 849286.725000
Epoch [45/100], Loss: 842877.770000
Epoch [50/100], Loss: 839832.077500
Epoch [55/100], Loss: 826528.587500
Epoch [60/100], Loss: 822755.112500
Epoch [65/100], Loss: 822974.800000
Epoch [70/100], Loss: 817280.165000
Epoch [75/100], Loss: 813347.000000
Epoch [80/100], Loss: 810363.877500
Epoch [85/100], Loss: 809325.207500
Epoch [90/100], Loss: 811064.817500
Epoch [95/100], Loss: 806017.480000
Epoch [100/100], Loss: 803363.670000
Training completed. Best loss: 802698.887500
Autoencoder training completed!

PHASE 3: TESTING ON TEST IMAGES


Processing test images: 100%|██████████| 50/50 [00:14<00:00,  3.41it/s]



PHASE 4: ANALYZING RESULTS
Results saved to vae_results/vae_results.csv

Experiment completed successfully!
Results saved to vae_results/
Model weights saved to vae_results/models/
Example images saved to vae_results/examples/


In [25]:
def save_comparison(original, reconstructed, filename, metrics, output_dir):
    """Save side-by-side comparison."""
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))
    axes[0].imshow(original)
    axes[0].set_title('Original')
    axes[0].axis('off')
    
    axes[1].imshow(reconstructed)
    axes[1].set_title(f'Hybrid Reconstruction\nPSNR: {metrics["psnr"]:.2f} dB, SSIM: {metrics["ssim"]:.4f}')
    axes[1].axis('off')
    
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, filename), dpi=150, bbox_inches='tight')
    plt.close()

def evaluate_autoencoder_hybrid_approach(
    model_path='autoencoder_results/models/best_autoencoder.pth', 
    images_dir='TEST', 
    output_dir='auto_encoder_hybrid_results',
    target_size=(256, 256),
    num_examples=10,
    quantization_bits=4
):
    """
    Evaluates a hybrid image compression approach using an autoencoder and residual compression.
    """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = SimpleAutoencoder(input_channels=3, input_size=target_size[0]).to(device)
    if not os.path.exists(model_path):
        print(f"Error: Model file not found at {model_path}")
        return
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'examples'), exist_ok=True)
    
    image_files = [f for f in Path(images_dir).glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']]
    if not image_files:
        print(f"No images found in {images_dir}")
        return

    results = []
    
    print("Evaluating hybrid compression...")
    
    # Use torch.no_grad() for the entire evaluation block
    with torch.no_grad():
        for img_path in tqdm(image_files, desc="Processing images"):
            original_img = load_image(img_path, resize=target_size)
            if original_img is None:
                continue

            # Step 1: Get autoencoder reconstruction
            img_tensor = torch.from_numpy(original_img).float() / 255.0
            img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0).to(device)
            reconstructed_tensor = model(img_tensor)
            
            # Detach and convert to numpy
            reconstructed_img = (reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8)

            # Step 2: Diff and quantize residuals
            residuals = original_img.astype(np.int16) - reconstructed_img.astype(np.int16)
            
            # Quantize residuals to a limited number of levels
            scale = 255.0 / (2**quantization_bits - 1)
            quantized_residuals = np.round(residuals / scale).astype(np.int8)

            # Step 3: Compress with zstd
            cctx = zstd.ZstdCompressor()
            compressed_residuals = cctx.compress(quantized_residuals.tobytes())
            compressed_size_residuals = len(compressed_residuals)

            # Step 4: Bitpack (not implemented in a simple way here, but size is accounted for)

            # Step 5: Total compressed size
            latent_tensor = model.encode(img_tensor)
            latent_size = len(latent_tensor.cpu().numpy().tobytes())
            total_compressed_size = latent_size + compressed_size_residuals
            
            # Step 6: Reconstruct the image from the hybrid data
            # Decompress residuals
            dctx = zstd.ZstdDecompressor()
            decompressed_residuals_bytes = dctx.decompress(compressed_residuals)
            decompressed_residuals = np.frombuffer(decompressed_residuals_bytes, dtype=np.int8).reshape(target_size[0], target_size[1], 3)
            
            # De-quantize
            final_residuals = (decompressed_residuals.astype(np.int16) * scale).astype(np.int16)
            
            # Add back to the autoencoder reconstruction
            final_reconstruction = np.clip(reconstructed_img.astype(np.int16) + final_residuals, 0, 255).astype(np.uint8)

            # Step 7: Recompute metrics
            metrics = calculate_metrics(original_img, final_reconstruction)
            
            original_bytes = target_size[0] * target_size[1] * 3
            compression_ratio = original_bytes / total_compressed_size
            bits_per_pixel = (total_compressed_size * 8) / (target_size[0] * target_size[1])

            results.append({
                'image_name': img_path.name,
                'psnr': metrics['psnr'],
                'ssim': metrics['ssim'],
                'compression_ratio': compression_ratio,
                'bits_per_pixel': bits_per_pixel,
                'original_size': original_bytes,
                'sum_of_latent_and_residual': total_compressed_size,
                'latent_size': latent_size,
                'residual_size': compressed_size_residuals
            })

    print("Analysis complete. Generating plots and examples.")
    
    # Analyze and plot results
    df = pd.DataFrame(results)
    df.to_csv(os.path.join(output_dir, 'hybrid_results.csv'), index=False)
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    sns.histplot(df['psnr'], bins=20, ax=axes[0])
    axes[0].set_title('Hybrid PSNR Distribution')
    sns.histplot(df['compression_ratio'], bins=20, ax=axes[1])
    axes[1].set_title('Hybrid Compression Ratio Distribution')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'hybrid_metrics.png'))
    plt.close()
    
    # Save a few example images
    with torch.no_grad():
        for i in range(min(num_examples, len(image_files))):
            img_path = image_files[i]
            original_img = load_image(img_path, resize=target_size)
            
            img_tensor = torch.from_numpy(original_img).float() / 255.0
            img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0).to(device)
            reconstructed_tensor = model(img_tensor)
            
            reconstructed_img = (reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8)

            residuals = original_img.astype(np.int16) - reconstructed_img.astype(np.int16)
            scale = 255.0 / (2**quantization_bits - 1)
            quantized_residuals = np.round(residuals / scale).astype(np.int8)

            cctx = zstd.ZstdCompressor()
            compressed_residuals = cctx.compress(quantized_residuals.tobytes())
            
            dctx = zstd.ZstdDecompressor()
            decompressed_residuals_bytes = dctx.decompress(compressed_residuals)
            decompressed_residuals = np.frombuffer(decompressed_residuals_bytes, dtype=np.int8).reshape(target_size[0], target_size[1], 3)
            final_residuals = (decompressed_residuals.astype(np.int16) * scale).astype(np.int16)
            final_reconstruction = np.clip(reconstructed_img.astype(np.int16) + final_residuals, 0, 255).astype(np.uint8)
            
            metrics = calculate_metrics(original_img, final_reconstruction)
            save_comparison(original_img, final_reconstruction, f'hybrid_example_{i:02d}.png', metrics, os.path.join(output_dir, 'examples'))

evaluate_autoencoder_hybrid_approach()

Evaluating hybrid compression...


Processing images: 100%|██████████| 54/54 [00:14<00:00,  3.66it/s]


Analysis complete. Generating plots and examples.


In [26]:
def evaluate_vae_hybrid_approach(
    model_path='vae_results/models/best_vae.pth',
    images_dir='TEST',
    output_dir='vae_hybrid_results',
    target_size=(256, 256),
    num_examples=10,
    quantization_bits=4
):
    """
    Evaluates a hybrid image compression approach using a VAE and residual compression.
    """
    device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
    model = VariationalAutoencoder(input_channels=3, input_size=target_size[0]).to(device)
    if not os.path.exists(model_path):
        print(f"Error: Model file not found at {model_path}")
        return
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    print("Model loaded successfully.")
    
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'examples'), exist_ok=True)
    
    image_files = [f for f in Path(images_dir).glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']]
    if not image_files:
        print(f"No images found in {images_dir}")
        return

    results = []
    
    print("Evaluating hybrid VAE compression...")
    
    with torch.no_grad():
        for img_path in tqdm(image_files, desc="Processing images"):
            original_img = load_image(img_path, resize=target_size)
            if original_img is None:
                continue

            # Step 1: Get VAE reconstruction
            img_tensor = torch.from_numpy(original_img).float() / 255.0
            img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0).to(device)
            
            # The VAE forward pass returns a tuple
            reconstructed_tensor, mu, logvar = model(img_tensor)
            
            reconstructed_img = (reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8)

            # Step 2: Diff and quantize residuals
            residuals = original_img.astype(np.int16) - reconstructed_img.astype(np.int16)
            
            scale = 255.0 / (2**quantization_bits - 1)
            quantized_residuals = np.round(residuals / scale).astype(np.int8)

            # Step 3: Compress with zstd
            cctx = zstd.ZstdCompressor()
            compressed_residuals = cctx.compress(quantized_residuals.tobytes())
            compressed_size_residuals = len(compressed_residuals)

            # Step 4: Total compressed size
            # Use the size of the 'mu' vector as the latent size
            latent_size = mu.numel() * 4  # Assuming float32 data type (4 bytes per element)
            total_compressed_size = latent_size + compressed_size_residuals
            
            # Step 5: Reconstruct the image from the hybrid data
            dctx = zstd.ZstdDecompressor()
            decompressed_residuals_bytes = dctx.decompress(compressed_residuals)
            decompressed_residuals = np.frombuffer(decompressed_residuals_bytes, dtype=np.int8).reshape(target_size[0], target_size[1], 3)
            
            # De-quantize
            final_residuals = (decompressed_residuals.astype(np.int16) * scale).astype(np.int16)
            
            final_reconstruction = np.clip(reconstructed_img.astype(np.int16) + final_residuals, 0, 255).astype(np.uint8)

            # Step 6: Recompute metrics
            metrics = calculate_metrics(original_img, final_reconstruction)
            
            original_bytes = target_size[0] * target_size[1] * 3
            compression_ratio = original_bytes / total_compressed_size
            bits_per_pixel = (total_compressed_size * 8) / (target_size[0] * target_size[1])

            results.append({
                'image_name': img_path.name,
                'psnr': metrics['psnr'],
                'ssim': metrics['ssim'],
                'compression_ratio': compression_ratio,
                'bits_per_pixel': bits_per_pixel,
                'original_size': original_bytes,
                'sum_of_latent_and_residual': total_compressed_size,
                'latent_size': latent_size,
                'residual_size': compressed_size_residuals
            })

    print("Analysis complete. Generating plots and examples.")
    
    df = pd.DataFrame(results)
    df.to_csv(os.path.join(output_dir, 'hybrid_results.csv'), index=False)
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    sns.histplot(df['psnr'], bins=20, ax=axes[0])
    axes[0].set_title('Hybrid PSNR Distribution')
    sns.histplot(df['compression_ratio'], bins=20, ax=axes[1])
    axes[1].set_title('Hybrid Compression Ratio Distribution')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'hybrid_metrics.png'))
    plt.close()
    
    with torch.no_grad():
        for i in range(min(num_examples, len(image_files))):
            img_path = image_files[i]
            original_img = load_image(img_path, resize=target_size)
            
            img_tensor = torch.from_numpy(original_img).float() / 255.0
            img_tensor = img_tensor.permute(2, 0, 1).unsqueeze(0).to(device)
            
            # Unpack the VAE output
            reconstructed_tensor, mu, logvar = model(img_tensor)
            
            reconstructed_img = (reconstructed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8)

            residuals = original_img.astype(np.int16) - reconstructed_img.astype(np.int16)
            scale = 255.0 / (2**quantization_bits - 1)
            quantized_residuals = np.round(residuals / scale).astype(np.int8)

            cctx = zstd.ZstdCompressor()
            compressed_residuals = cctx.compress(quantized_residuals.tobytes())
            
            dctx = zstd.ZstdDecompressor()
            decompressed_residuals_bytes = dctx.decompress(compressed_residuals)
            decompressed_residuals = np.frombuffer(decompressed_residuals_bytes, dtype=np.int8).reshape(target_size[0], target_size[1], 3)
            final_residuals = (decompressed_residuals.astype(np.int16) * scale).astype(np.int16)
            final_reconstruction = np.clip(reconstructed_img.astype(np.int16) + final_residuals, 0, 255).astype(np.uint8)
            
            metrics = calculate_metrics(original_img, final_reconstruction)
            save_comparison(original_img, final_reconstruction, f'hybrid_example_{i:02d}.png', metrics, os.path.join(output_dir, 'examples'))

evaluate_vae_hybrid_approach()


Model loaded successfully.
Evaluating hybrid VAE compression...


Processing images: 100%|██████████| 54/54 [00:14<00:00,  3.73it/s]


Analysis complete. Generating plots and examples.


In [27]:
def create_boxplots_from_file(csv_filepath, output_filename='boxplots.png'):
    """
    Reads data from a CSV file, creates side-by-side boxplots for PSNR, SSIM,
    and Compression Ratio, and saves the plot to a file.

    Args:
        csv_filepath (str): The path to the CSV file.
        output_filename (str): The name of the file to save the plot to.
    """
    try:
        if not os.path.exists(csv_filepath):
            print(f"Error: The file '{csv_filepath}' does not exist.")
            return
        
        df = pd.read_csv(csv_filepath)
    except Exception as e:
        print(f"Error reading CSV file '{csv_filepath}': {e}")
        return

    # Check for the required columns
    required_columns = ['psnr', 'ssim', 'compression_ratio']
    if not all(col in df.columns for col in required_columns):
        print("Error: The CSV data must contain 'psnr', 'ssim', and 'compression_ratio' columns.")
        return
    
    # Create a figure and a set of subplots
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    fig.suptitle('Distribution of Key Performance Metrics', fontsize=16)

    # Boxplot for PSNR
    axes[0].boxplot(df['psnr'], patch_artist=True, boxprops=dict(facecolor='lightblue'))
    axes[0].set_title('PSNR Distribution')
    axes[0].set_ylabel('PSNR (dB)')
    axes[0].set_xticks([1], [''])

    # Boxplot for SSIM
    axes[1].boxplot(df['ssim'], patch_artist=True, boxprops=dict(facecolor='lightgreen'))
    axes[1].set_title('SSIM Distribution')
    axes[1].set_ylabel('SSIM Value')
    axes[1].set_xticks([1], [''])

    # Boxplot for Compression Ratio
    axes[2].boxplot(df['compression_ratio'], patch_artist=True, boxprops=dict(facecolor='lightcoral'))
    axes[2].set_title('Compression Ratio Distribution')
    axes[2].set_ylabel('Ratio')
    axes[2].set_xticks([1], [''])

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.savefig(output_filename)
    plt.close()

    print(f"Box plots saved to {output_filename}")

os.makedirs('boxplots', exist_ok=True)
create_boxplots_from_file('auto_encoder_hybrid_results/hybrid_results.csv', 'boxplots/auto_encoder_hybrid_boxplots.png')
create_boxplots_from_file('vae_hybrid_results/hybrid_results.csv', 'boxplots/vae_hybrid_boxplots.png')
create_boxplots_from_file('raw_jpeg_results/raw_jpeg_results.csv', 'boxplots/raw_jpeg_boxplots.png')

Box plots saved to boxplots/auto_encoder_hybrid_boxplots.png
Box plots saved to boxplots/vae_hybrid_boxplots.png
Box plots saved to boxplots/raw_jpeg_boxplots.png
