In [None]:
import numpy as np
from PIL import Image
import random
from collections import deque

# Constants
RED_THRESHOLD = 200  # Threshold for red detection
WHITE_BG = (255, 255, 255)

def is_red(pixel):
    r, g, b = pixel
    return r > RED_THRESHOLD and g < 50 and b < 50

def bfs_flood_fill(image, visited, start_x, start_y, new_color):
    height, width = image.shape[:2]
    queue = deque([(start_x, start_y)])
    visited[start_x, start_y] = True
    
    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
    
    while queue:
        x, y = queue.popleft()
        image[x, y] = new_color
        
        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if (0 <= nx < height and 0 <= ny < width and 
                not visited[nx, ny] and is_red(image[nx, ny])):
                visited[nx, ny] = True
                queue.append((nx, ny))

def process_image(image_path):
    # Load image
    img = Image.open(image_path).convert('RGB')
    img_array = np.array(img, dtype=np.uint8)
    height, width = img_array.shape[:2]
    
    # Initialize visited array and output
    visited = np.zeros((height, width), dtype=bool)
    output = img_array.copy()
    blob_count = 0
    
    # Scan for blobs
    for x in range(height):
        for y in range(width):
            if is_red(output[x, y]) and not visited[x, y]:
                # Generate random color (0-254 ensures not white or bright red)
                new_color = (random.randint(0, 254), random.randint(0, 254), random.randint(0, 254))
                bfs_flood_fill(output, visited, x, y, new_color)
                blob_count += 1
    
    return Image.fromarray(output), blob_count


from pathlib import Path
HOME = Path.cwd()
image_path = HOME / "images" / "input" / "input_blobs.png"
output_path = HOME / "images" / "output" / "colored_blobs_cpu_mvp.png"
result_img, blob_count = process_image(image_path)
print(f"Detected {blob_count} blobs")
result_img.save(output_path)

In [None]:
import numpy as np
import math
from numba import cuda

# -----------------------------------------------------------------------------
# Kernel 1: Initialize label array based on the input image.
# For each pixel:
#   - If it is white (255,255,255), mark as -1 (background)
#   - Otherwise, assign a unique label (its linear index)
# -----------------------------------------------------------------------------
@cuda.jit
def init_labels_kernel(image, labels):
    i, j = cuda.grid(2)
    height = image.shape[0]
    width = image.shape[1]
    if i < height and j < width:
        # Check if the pixel is white
        if image[i, j, 0] == 255 and image[i, j, 1] == 255 and image[i, j, 2] == 255:
            labels[i, j] = -1
        else:
            labels[i, j] = i * width + j  # Unique label for non-background pixel

# -----------------------------------------------------------------------------
# Kernel 2: Propagate labels.
# For each non-background pixel, look at its 4-connected neighbors
# (up, down, left, right) and take the minimum label.
# If a change is made, we set a flag in the global 'change' variable.
# -----------------------------------------------------------------------------
@cuda.jit
def propagate_kernel(labels, new_labels, change):
    i, j = cuda.grid(2)
    height = labels.shape[0]
    width = labels.shape[1]
    if i < height and j < width:
        current = labels[i, j]
        if current == -1:
            new_labels[i, j] = -1  # Background remains unchanged
        else:
            min_label = current
            # Check up
            if i > 0:
                nb = labels[i - 1, j]
                if nb != -1 and nb < min_label:
                    min_label = nb
            # Check down
            if i < height - 1:
                nb = labels[i + 1, j]
                if nb != -1 and nb < min_label:
                    min_label = nb
            # Check left
            if j > 0:
                nb = labels[i, j - 1]
                if nb != -1 and nb < min_label:
                    min_label = nb
            # Check right
            if j < width - 1:
                nb = labels[i, j + 1]
                if nb != -1 and nb < min_label:
                    min_label = nb
            new_labels[i, j] = min_label
            # If the label changed, mark that a change occurred.
            if min_label != current:
                change[0] = 1

# -----------------------------------------------------------------------------
# Kernel 3: Assign a unique color to each blob.
# For non-background pixels, we use a simple hash function on the label to
# generate a pseudo-random color. White background remains white.
# -----------------------------------------------------------------------------
@cuda.jit
def assign_colors_kernel(labels, output):
    i, j = cuda.grid(2)
    height = labels.shape[0]
    width = labels.shape[1]
    if i < height and j < width:
        label = labels[i, j]
        if label == -1:
            # Background: keep white
            output[i, j, 0] = 255
            output[i, j, 1] = 255
            output[i, j, 2] = 255
        else:
            # Generate a pseudo-random color from the label
            r = (label * 123457) % 256
            g = (label * 98765) % 256
            b = (label * 54321) % 256
            output[i, j, 0] = r
            output[i, j, 1] = g
            output[i, j, 2] = b

# -----------------------------------------------------------------------------
# Main function: Process the image to color blobs and count them.
#
# Expected outputs:
#  - colored_image: the input image where each blob is filled with a unique color.
#  - num_blobs: the total number of distinct blobs detected.
#
# Debug assertions check:
#  - That the input image has 3 channels.
#  - That at least one blob is detected if there are non-white pixels.
# -----------------------------------------------------------------------------
def color_blobs(image):
    # Check that the image is HxWx3
    assert image.ndim == 3 and image.shape[2] == 3, "Input image must be HxWx3"
    
    height, width = image.shape[0], image.shape[1]
    
    # Allocate device arrays for labels (int32)
    labels = cuda.device_array((height, width), dtype=np.int32)
    new_labels = cuda.device_array((height, width), dtype=np.int32)
    
    # Define block and grid dimensions using cuda.grid() indexing
    threadsperblock = (16, 16)
    blockspergrid_x = math.ceil(height / threadsperblock[0])
    blockspergrid_y = math.ceil(width / threadsperblock[1])
    blockspergrid = (blockspergrid_x, blockspergrid_y)
    
    # Step 1: Initialize labels
    init_labels_kernel[blockspergrid, threadsperblock](image, labels)
    cuda.synchronize()
    
    # Step 2: Iteratively propagate labels until no further changes occur.
    iteration = 0
    while True:
        # Reset change flag to 0 (on host) and send to device
        change_host = np.array([0], dtype=np.int32)
        change_device = cuda.to_device(change_host)
        
        propagate_kernel[blockspergrid, threadsperblock](labels, new_labels, change_device)
        cuda.synchronize()
        
        # Copy the change flag back to host to check if any label changed.
        change_host = change_device.copy_to_host()
        iteration += 1
        
        # Swap the label arrays for the next iteration.
        labels, new_labels = new_labels, labels
        
        # If no change was made, we have converged.
        if change_host[0] == 0:
            break
        # Optional safeguard: break if too many iterations occur.
        if iteration > 10000:
            print("Max iterations reached!")
            break

    print(f"Converged after {iteration} iterations")
    # Step 3: Copy final label image to host and count unique blob labels.
    labels_host = labels.copy_to_host()
    # Exclude background (-1) and count unique labels.
    blob_labels = labels_host[labels_host != -1]
    unique_labels = np.unique(blob_labels)
    num_blobs = unique_labels.size
    
    # Simple assertion: if non-white pixels exist, we expect at least one blob.
    if blob_labels.size > 0:
        assert num_blobs > 0, "There should be at least one blob"
    
    # Step 4: Use a kernel to assign a unique color to each blob.
    output_image_device = cuda.device_array((height, width, 3), dtype=np.uint8)
    assign_colors_kernel[blockspergrid, threadsperblock](labels, output_image_device)
    cuda.synchronize()
    
    # Copy the colored image back to the host.
    colored_image = output_image_device.copy_to_host()
    
    return colored_image, num_blobs

# -----------------------------------------------------------------------------
# Suggestions, Trade-Offs and Discussion:
#
# 1. This code uses an iterative label propagation (a form of BFS-like traversal) to
#    label connected components. It is simple to implement in CUDA because it avoids
#    dynamic memory (no explicit queues) but may require many iterations if blobs are
#    large.
#
# 2. An alternative approach would be to implement a true BFS using a per-blob queue.
#    However, managing dynamic queues in CUDA is challenging and can lead to less
#    efficient code.
#
# 3. Another option is to use a union-find (disjoint-set) algorithm on the GPU which
#    can be more efficient but is also more complex to implement.
#
# 4. Memory trade-offs: Storing an extra label array doubles the memory needed for
#    labels. For a 9000x9000 image, this is manageable.
#
# -----------------------------------------------------------------------------
# Example usage:
# if __name__ == '__main__':
# Create a synthetic test image of size 9000x9000 with a white background.
# test_img = np.full((9000, 9000, 3), 255, dtype=np.uint8)

# # Draw a few small blobs (for example, small black squares) on the image.
# np.random.seed(0)
# for _ in range(5000):
#     x = np.random.randint(0, 9000)
#     y = np.random.randint(0, 9000)
#     w = np.random.randint(5, 100)
#     h = np.random.randint(5, 100)
#     test_img[y:y+h, x:x+w] = [255, 0, 0]


# Run our GPU-connected component labeling.
colored_img, blob_count = color_blobs(test_img)
print("Number of blobs detected:", blob_count)

# # Debug assertion: since we drew 3 blobs, we expect at least 3 blobs.
# assert blob_count >= 3, "At least 3 blobs should be detected"




Converged after 515 iterations
Number of blobs detected: 3528


In [14]:
import timeit

test_img = np.full((9000, 9000, 3), 255, dtype=np.uint8)

# Draw a few small blobs (for example, small black squares) on the image.
np.random.seed(0)
for _ in range(5000):
    x = np.random.randint(0, 9000)
    y = np.random.randint(0, 9000)
    w = np.random.randint(5, 100)
    h = np.random.randint(5, 100)
    test_img[y:y+h, x:x+w] = [255, 0, 0]
    
# Benchmark GPU implementation
def run_gpu_implementation():
    return color_blobs(test_img)

gpu_time = timeit.timeit('run_gpu_implementation()', globals=globals(), number=5) / 5
print(f"GPU average time: {gpu_time:.6f} seconds")



Converged after 515 iterations
Converged after 515 iterations
Converged after 515 iterations
Converged after 515 iterations
Converged after 515 iterations
GPU average time: 14.619436 seconds


In [None]:
import time
import timeit
import numpy as np
from PIL import Image

# Load a real test image
image_path = HOME / "images" / "input" / "input_blobs.png"
img = Image.open(image_path).convert('RGB')
test_img_real = np.array(img, dtype=np.uint8)



# Benchmark CPU implementation
def run_cpu_implementation():
    return process_image(image_path)

# First run once to compile the GPU code (warm-up)
print("Warming up GPU implementation...")
colored_gpu, blob_count_gpu = run_gpu_implementation()
print(f"GPU detected {blob_count_gpu} blobs")

# Now benchmark GPU implementation
gpu_time = timeit.timeit('run_gpu_implementation()', globals=globals(), number=5) / 5
print(f"GPU average time: {gpu_time:.6f} seconds")

# Benchmark CPU implementation
cpu_time = timeit.timeit('run_cpu_implementation()', globals=globals(), number=5) / 5
print(f"CPU average time: {cpu_time:.6f} seconds")

# Calculate speedup
speedup = cpu_time / gpu_time
print(f"GPU speedup: {speedup:.2f}x faster than CPU")

In [12]:
from PIL import Image
from pathlib import Path
HOME = Path.cwd()
to_save_colored_img = Image.fromarray(colored_img)
output_path = HOME / "images" / "output" / "colored_blobs_gpu_mvp.png"
to_save_colored_img.save(output_path)
print("Output saved to:", output_path)

Output saved to: /home/lrn/Repos/flood-fill-cuda/images/output/colored_blobs_gpu_mvp.png
