In [1]:
import os
import glob
import torch
import torchvision.transforms as T
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import cv2
import psutil
import time
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim, mean_squared_error as mse
from model.networks import Generator
import lpips
import csv
from tqdm import tqdm
from torchvision.models import AlexNet_Weights
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
plt.rcParams['figure.facecolor'] = 'white'



In [2]:
def print_memory_usage(stage):
    process = psutil.Process(os.getpid())
    print(f"CPU Memory usage at {stage}: {process.memory_info().rss / 1024 ** 2:.2f} MB")
    if torch.cuda.is_available():
        print(f"GPU Memory allocated: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
        print(f"GPU Memory reserved: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")

def load_model_state(checkpoint_path, device):
    print(f"Loading model from {checkpoint_path}")
    if 'states_pt_places2.pth' in checkpoint_path:
        generator = Generator(checkpoint=checkpoint_path, return_flow=True).to(device)
    else:
        generator = Generator(return_flow=True).to(device)
        state_dict = torch.load(checkpoint_path, map_location=device)
        generator.load_state_dict(state_dict, strict=True)
    print("Model loaded successfully")
    return generator

def resize_output(output, original_size):
    output_image = output.astype(np.uint8)
    output_resized = cv2.resize(output_image, original_size)
    return output_resized

def calculate_metrics(original, inpainted, mask, lpips_model):
    mask_np = np.array(mask.convert('L')) / 255
    roi_original = original[mask_np > 0]
    roi_inpainted = inpainted[mask_np > 0]
    
    side = int(np.ceil(np.sqrt(roi_original.shape[0])))
    pad_size = side * side - roi_original.shape[0]
    roi_original = np.pad(roi_original, ((0, pad_size), (0, 0)), mode='constant')
    roi_inpainted = np.pad(roi_inpainted, ((0, pad_size), (0, 0)), mode='constant')
    
    roi_original = roi_original.reshape(side, side, 3)
    roi_inpainted = roi_inpainted.reshape(side, side, 3)
    
    metrics = {
        'psnr': psnr(roi_original, roi_inpainted, data_range=255),
        'mae': np.mean(np.abs(roi_original - roi_inpainted)),
        'mse': mse(roi_original, roi_inpainted)
    }
    
    if side >= 32:
        roi_original_tensor = torch.from_numpy(roi_original).permute(2, 0, 1).unsqueeze(0).float() / 255.0
        roi_inpainted_tensor = torch.from_numpy(roi_inpainted).permute(2, 0, 1).unsqueeze(0).float() / 255.0
        
        metrics['lpips'] = lpips_model(roi_original_tensor, roi_inpainted_tensor).item()
    else:
        metrics['lpips'] = 'N/A'
    
    return metrics

def process_image(generator, image_path, mask_path, device, lpips_model):
    image_pil = Image.open(image_path)
    mask_pil = Image.open(mask_path)
    original_size = image_pil.size
    
    if original_size[0] > 1000 or original_size[1] > 750:
        return create_na_results()
    
    image_tensor = T.ToTensor()(image_pil).to(device)
    mask_tensor = T.ToTensor()(mask_pil).to(device)

    torch.cuda.reset_peak_memory_stats()  # Reset peak stats
    start_time = time.time()
    try:
        with torch.cuda.amp.autocast(enabled=True):
            output = generator.infer(image_tensor, mask_tensor, return_vals=['inpainted', 'stage1', 'stage2', 'flow'])
        elapsed_time = time.time() - start_time
        mem_used = torch.cuda.max_memory_allocated() / 1024**2  # Peak memory in MB

        original_np = np.array(image_pil).astype(np.uint8)
        inpainted_resized_np = resize_output(output[0], original_size).astype(np.uint8)
        metrics = calculate_metrics(original_np, inpainted_resized_np, mask_pil, lpips_model)
        
        results = {
            'time': elapsed_time,
            'memory': mem_used,
            **metrics
        }
        return results
    except RuntimeError as e:
        if "CUDA out of memory" in str(e):
            print(f"CUDA OOM for {image_path}. Skipping.")
            return create_na_results()
        else:
            raise

def create_na_results():
    return {
        'time': 'N/A',
        'memory': 'N/A',
        'psnr': 'N/A',
        'mae': 'N/A',
        'mse': 'N/A',
        'lpips': 'N/A'
    }

def init_lpips():
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        return lpips.LPIPS(net='alex')

In [4]:
def main():
    models_dir = "C:/Users/tuant/Downloads/AI Thesis/FINAL_APP_AND_ANALYSIS/optimized_models"
    images_dir = "C:/Users/tuant/Downloads/AI Thesis/FINAL_APP_AND_ANALYSIS/examples/final_image"
    masks_dir = "C:/Users/tuant/Downloads/AI Thesis/FINAL_APP_AND_ANALYSIS/examples/final_masks"
    results_csv_path = "C:/Users/tuant/Downloads/AI Thesis/FINAL_APP_AND_ANALYSIS/results/results_single_model_6.csv"

    use_cuda_if_available = True
    device = torch.device('cuda' if torch.cuda.is_available() and use_cuda_if_available else 'cpu')
    print(f"Using device: {device}")

    # Select a single model (you can change this to test different models)
    model_path = os.path.join(models_dir, "pruned_model_6.pth")
    
    images = glob.glob(os.path.join(images_dir, "*.jpg"))
    masks = glob.glob(os.path.join(masks_dir, "*.jpg"))

    lpips_model = init_lpips()

    with open(results_csv_path, 'w', newline='') as csvfile:
        fieldnames = ['Model', 'Image', 'Mask', 'Time (s)', 'Memory (MB)', 'PSNR', 'MAE', 'MSE', 'LPIPS']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        generator = load_model_state(model_path, device)
        
        for image_path in tqdm(images, desc="Processing images"):
            image_name = os.path.basename(image_path)
            image_size = image_name.split('_')[1].split('.')[0]
            
            if image_size in ['4000x3000', '2000x1500']:
                continue  # Skip these large images
            
            for mask_path in masks:
                mask_name = os.path.basename(mask_path)
                if image_size in mask_name:
                    results = process_image(generator, image_path, mask_path, device, lpips_model)
                    
                    writer.writerow({
                        'Model': os.path.basename(model_path),
                        'Image': image_name,
                        'Mask': mask_name,
                        'Time (s)': f"{results['time']:.4f}" if results['time'] != 'N/A' else 'N/A',
                        'Memory (MB)': f"{results['memory']:.2f}" if results['memory'] != 'N/A' else 'N/A',
                        'PSNR': f"{results['psnr']:.4f}" if results['psnr'] != 'N/A' else 'N/A',
                        'MAE': f"{results['mae']:.4f}" if results['mae'] != 'N/A' else 'N/A',
                        'MSE': f"{results['mse']:.4f}" if results['mse'] != 'N/A' else 'N/A',
                        'LPIPS': f"{results['lpips']:.4f}" if results['lpips'] != 'N/A' else 'N/A'
                    })
                    
                    torch.cuda.empty_cache()

    print("Script completed successfully")

if __name__ == "__main__":
    main()

Using device: cuda
Setting up [LPIPS] perceptual loss: trunk [alex], v[0.1], spatial [off]
Loading model from: C:\Users\tuant\anaconda3\envs\torchgpu\Lib\site-packages\lpips\weights\v0.1\alex.pth
Loading model from C:/Users/tuant/Downloads/AI Thesis/FINAL_APP_AND_ANALYSIS/optimized_models\pruned_model_6.pth
Model loaded successfully


Processing images: 100%|█████████████████████████████████████████████████████████████| 125/125 [02:16<00:00,  1.09s/it]

Script completed successfully



