Standardizing the Mosaic Image

In [63]:
import os
import math
import numpy as np
from PIL import Image, ImageOps, ImageFile, ImageDraw
from roboflow import Roboflow

# Allow PIL to open very large images
Image.MAX_IMAGE_PIXELS = None
ImageFile.LOAD_TRUNCATED_IMAGES = True


# Standardize the Background
def standardize_background(image, bg_color=(255, 255, 255)):
    """
    If the image has an alpha channel, composite it onto a white background.
    Otherwise, convert to RGB.
    """
    if image.mode in ("RGBA", "LA"):
        background = Image.new("RGB", image.size, bg_color)
        background.paste(image, mask=image.split()[-1])
        return background
    else:
        return image.convert("RGB")



In [64]:
# Crop to the Mosaic Content
def crop_to_content(image, threshold=240):
    """
    Crops out the white margins. Pixels with all channels above the threshold 
    are considered background.
    """
    img_np = np.array(image)
    # Create mask: True if any channel is less than threshold (i.e. non-white)
    mask = (img_np[:,:,0] < threshold) | (img_np[:,:,1] < threshold) | (img_np[:,:,2] < threshold)
    coords = np.argwhere(mask)
    if coords.size == 0:
        return image  # No content detected; return the original image
    y0, x0 = coords.min(axis=0)
    y1, x1 = coords.max(axis=0) + 1  # +1 to include the last pixel row/column
    return image.crop((x0, y0, x1, y1))



In [65]:
# Split the Image into Patches
def split_into_patches(image, patch_size=640):
    """
    Splits the image into patches on a grid.
    If the patch at the right/bottom edge is smaller than patch_size,
    it will remain at its natural size.
    Returns a list of tuples: (patch, (x, y, width, height)).
    """
    patches = []
    width, height = image.size
    for y in range(0, height, patch_size):
        for x in range(0, width, patch_size):
            # Ensure the box does not extend past the image dimensions
            box = (x, y, min(x + patch_size, width), min(y + patch_size, height))
            patch = image.crop(box)
            patches.append((patch, box))
    return patches



In [66]:
# Filter Out Mostly-White Patches
def is_patch_valid(patch, bg_color=(255, 255, 255), tolerance=5, max_fraction=0.5):
    """
    Returns True if less than max_fraction of the patch's pixels are within 
    tolerance of the background color.
    """
    patch_np = np.array(patch)
    # Boolean mask: True where pixel is close to background color
    close_to_bg = np.all(np.abs(patch_np - bg_color) <= tolerance, axis=-1)
    fraction_bg = np.mean(close_to_bg)
    return fraction_bg < max_fraction, fraction_bg

In [67]:
# Draw Bounding Boxes on Image
def draw_patch_boxes(image, patch_boxes, color="red", width=2):
    """
    image: PIL Image object
    patch_boxes: list of bounding boxes (x1, y1, x2, y2)
    color: outline color for the boxes
    width: thickness of the rectangle outline
    Returns a copy of `image` with rectangles drawn on it.
    """
    # Make a copy so you don't modify the original in memory
    drawn_image = image.copy()
    draw = ImageDraw.Draw(drawn_image)
    
    for box in patch_boxes:
        x1, y1, x2, y2 = box
        # Draw the rectangle in the chosen color
        draw.rectangle([x1, y1, x2, y2], outline=color, width=width)
    
    return drawn_image

In [68]:
def visualize_patches_downscaled(cropped_mosaic, valid_boxes, output_dir, max_dim=65500):
    # 1. Check dimensions
    w, h = cropped_mosaic.size
    if w > max_dim or h > max_dim:
        # 2. Compute a scale factor to fit within max_dim
        scale_factor = min(max_dim / w, max_dim / h)
        new_w = int(w * scale_factor)
        new_h = int(h * scale_factor)
        # 3. Resize the mosaic
        mosaic_small = cropped_mosaic.resize((new_w, new_h), resample=Image.Resampling.BICUBIC)

        # 4. Scale the bounding boxes
        scaled_boxes = []
        for (x1, y1, x2, y2) in valid_boxes:
            sx1 = int(x1 * scale_factor)
            sy1 = int(y1 * scale_factor)
            sx2 = int(x2 * scale_factor)
            sy2 = int(y2 * scale_factor)
            scaled_boxes.append((sx1, sy1, sx2, sy2))

        # 5. Draw the scaled boxes
        mosaic_with_boxes = draw_patch_boxes(mosaic_small, scaled_boxes, color="yellow", width=10)

        # 6. Save
        os.makedirs(output_dir, exist_ok=True)
        mosaic_with_boxes.save(os.path.join(output_dir, "cropped_mosaic_with_patches_downscaled.jpg"))
        print(f"Saved downscaled visualization to cropped_mosaic_with_patches_downscaled.jpg")
    else:
        # If already within limits, just draw directly
        mosaic_with_boxes = draw_patch_boxes(cropped_mosaic, valid_boxes, color="red", width=2)
        os.makedirs(output_dir, exist_ok=True)
        mosaic_with_boxes.save(os.path.join(output_dir, "cropped_mosaic_with_patches.jpg"))
        print(f"Saved full-resolution visualization to cropped_mosaic_with_patches.jpg")

In [69]:
# Main Processing Pipeline
def process_mosaic(image_path, patch_size=2000, output_dir="./patches"):
    # Load image
    mosaic = Image.open(image_path)
    
    # Standardize the background to ensure consistency (white background)
    mosaic = standardize_background(mosaic)
    
    # Crop the mosaic to only include the actual content (non-white areas)
    cropped_mosaic = crop_to_content(mosaic)
    
    # Optionally, save the cropped mosaic for verification:
    # os.makedirs(output_dir, exist_ok=True)
    # cropped_path = os.path.join(output_dir, "cropped_mosaic.jpg")
    # cropped_mosaic.save(cropped_path)
    
    # Split into patches using a grid
    all_patches = split_into_patches(cropped_mosaic, patch_size)
    
    # Filter valid patches
    valid_patches = []
    valid_boxes = []
    for i, (patch, box) in enumerate(all_patches):
        # TODO: Change tolerance and max_fraction
        validity, fraction = is_patch_valid(patch, tolerance=5, max_fraction=0.5)
        if validity:
            valid_patches.append((patch, box))
            valid_boxes.append(box)
            # print(f"Patch {i} is valid with fraction {fraction:.2f} being close to background.")
    
    # Visualize patch boxes on the cropped mosaic
    # Only drawing valid patch boxes, but you can draw all
    # mosaic_with_boxes = draw_patch_boxes(cropped_mosaic, valid_boxes, color="yellow", width=10)
    # os.makedirs(output_dir, exist_ok=True)
    # mosaic_with_boxes.save(os.path.join(output_dir, "cropped_mosaic_with_patches.jpg"))
    visualize_patches_downscaled(cropped_mosaic, valid_boxes, output_dir)
    
    # Save valid patches
    for idx, (patch, box) in enumerate(valid_patches):
        patch_filename = os.path.join(output_dir, f"patch_{idx}_box_{box[0]}_{box[1]}_{box[2]}_{box[3]}.jpg")
        patch.save(patch_filename)
    
    print(f"Total patches generated: {len(all_patches)}")
    print(f"Valid patches (with content): {len(valid_patches)}")
    return valid_patches

# Example usage:
if __name__ == "__main__":
    # image_path = "./DC Mosaic 2.18.23.tif"
    image_path = "./AL Mosaic 2.16.23.png"
    valid_patches = process_mosaic(image_path, patch_size=2500, output_dir="./patches")

Saved downscaled visualization to cropped_mosaic_with_patches_downscaled.jpg
Total patches generated: 243
Valid patches (with content): 112


In [70]:
# # Get the dimensions of an image
# def get_image_dimensions(image_path):
#     with Image.open(image_path) as img:
#         return img.size  # Returns (width, height)
# # Example usage:
# if __name__ == "__main__":
#     image_path = "./DJI_0110.JPG"  # Change this to your image path
#     dimensions = get_image_dimensions(image_path)
#     print(f"Image dimensions: {dimensions}")

In [71]:
# patch_np = np.array(valid_patches[42][0])
# # For debugging, check min/max across each channel:
# print("Min pixel values:", patch_np.min(axis=(0,1)))
# print("Max pixel values:", patch_np.max(axis=(0,1)))

Inference On 1 Image:

In [72]:
# import os
# from dotenv import load_dotenv

# load_dotenv()
# # Run Inference on Each Patch with Roboflow
# # Initialize Roboflow and your model. Replace with your API key, workspace, project, and version.
# api_key = os.getenv("ROBOFLOW_API_KEY")
# rf = Roboflow(api_key)
# project = rf.workspace().project("elephant-seals-project-mark-1")
# model = project.version("6").model

# total_clumps = 0
# total_seals = 0

# # for i, patch in enumerate(patches):
# for i in range(0, 1):
#     # Save the patch temporarily or use an in-memory file if supported by Roboflow.
#     # patch_path = f"patches/temp_patch_{i}.jpg"
#     patch_path = f"./temp_patch_115.jpg"
#     # patch.save(patch_path)
    
#     # Run inference. Adjust confidence and overlap parameters as needed.
#     result = model.predict(patch_path, confidence=0, overlap=0)

#     result.plot()
    
#     # Assuming the response JSON has a "predictions" key with a list of detections.
#     predictions = result.json().get("predictions", [])

    
#     # Count predictions by class.
#     clump_count = sum(1 for pred in predictions if pred["class"] == "clump")
#     seal_count = sum(1 for pred in predictions if pred["class"] == "seals")
    
#     total_clumps += clump_count
#     total_seals += seal_count

# # --- 4. Output the Results ---
# print("Total clumps detected:", total_clumps)
# print("Total seals detected:", total_seals)