# Import Libs

In [None]:
import os
import requests
import zipfile
import cv2
import numpy as np
import random
from PIL import Image
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

# Setting up `DIRS` & `PATHS`

In [None]:
DATASET_DIR = "dataset"  # Directory for COCO dataset & generated outputs
COCO_URL = "http://images.cocodataset.org/zips/val2017.zip"  # URL for COCO train dataset
 
# For inpainted images
NUM_IMAGES_TO_PROCESS = 5000

In [None]:
COCO_ZIP = os.path.join(DATASET_DIR, "coco_val2017.zip")  # Path where the train zip will be saved
COCO_DIR = os.path.join(DATASET_DIR, "test2017")  # Folder name after extraction
MASK_DIR = os.path.join(DATASET_DIR, "masks")  # For generated dynamic masks
INPAINTED_DIR = os.path.join(DATASET_DIR, "inpainted") 

In [None]:
# Create Necessary dirs:
os.makedirs(DATASET_DIR, exist_ok=True)
os.makedirs(MASK_DIR, exist_ok=True)
os.makedirs(INPAINTED_DIR, exist_ok=True)

# Extract and Download COCO Train Dataset

In [None]:
if not os.path.exists(COCO_ZIP):
    print("Downloading COCO train dataset...")
    response = requests.get(COCO_URL, stream=True)
    with open(COCO_ZIP, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)

if not os.path.exists(COCO_DIR):
    print("Extracting COCO Train Dataset")
    with zipfile.ZipFile(COCO_ZIP, "r") as zip_ref:
        zip_ref.extractall(DATASET_DIR)

Downloading COCO train dataset...
Extracting COCO Train Dataset


In [None]:
len(os.listdir(COCO_DIR))

40670

# Dynamic Mask Generations Functions:

In [None]:
def generate_dynamic_mask(image_path, mask_path):
    """
    Generates a dynamic binary mask using random geometric shapes.
    Options include rectangle, circle, ellipse, bacteria-like shape, or a freeform (shapeless) blob.
    With a 30% chance, the mask will contain two shapes.

    Args:
      image_path (str): Path to the input image.
      mask_path (str): Path to save the generated mask.
    """

    image = cv2.imread(image_path)
    h, w, _ = image.shape
    mask = np.zeros((h, w), dtype=np.uint8)

    # List of shape types
    shape_types = ["rectangle", "circle", "ellipse", "bacteria", "freeform"]

    def draw_shape(shape, mask):
        if shape == "rectangle":
            x = random.randint(50, max(50, w - 150))
            y = random.randint(50, max(50, h - 150))
            mask_size = random.randint(50, min(150, w - x - 50, h - y - 50))
            cv2.rectangle(mask, (x, y), (x + mask_size, y + mask_size), 255, -1)


        elif shape == "circle":
            center_x = random.randint(50, w - 50)
            center_y = random.randint(50, h - 50)
            radius = random.randint(25, min(center_x, center_y, w - center_x, h - center_y, 75))
            cv2.circle(mask, (center_x, center_y), radius, 255, -1)


        elif shape == "ellipse":
            center_x = random.randint(50, w - 50)
            center_y = random.randint(50, h - 50)
            axis_length1 = random.randint(25, 75)
            axis_length2 = random.randint(25, 75)
            angle = random.randint(0, 360)
            cv2.ellipse(mask, (center_x, center_y), (axis_length1, axis_length2), angle, 0, 360, 255, -1)


        elif shape == "bacteria":
            # Bacteria-like shape: irregular polygon simulating a bacterial outline.
            center_x = random.randint(100, w - 100)
            center_y = random.randint(100, h - 100)
            base_radius = random.randint(30, 60)
            num_points = random.randint(8, 20)
            angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False)
            points = []

            for angle in angles:

                noise = random.uniform(0.7, 1.3)
                r = base_radius * noise
                x_val = int(center_x + r * np.cos(angle))
                y_val = int(center_y + r * np.sin(angle))
                points.append([x_val, y_val])
            points = np.array(points, dtype=np.int32)
            cv2.fillPoly(mask, [points], 255)


        elif shape == "freeform":


            # Freeform shape: create an irregular blob around a central point,
            # ensuring the shape remains constrained within a limited area.
            margin = 100  # Ensure the center is not too close to the image boundaries.
            cx = random.randint(margin, w - margin)
            cy = random.randint(margin, h - margin)
            max_radius = random.randint(50, 100)  # Maximum extent of the freeform shape.
            num_points = random.randint(5, 15)
            points = []

            for _ in range(num_points):
                angle = random.random() * 2 * np.pi
                # The radius is randomly chosen so that the shape remains within a certain bound.
                r = random.uniform(10, max_radius)
                x_val = int(cx + r * np.cos(angle))
                y_val = int(cy + r * np.sin(angle))
                points.append([x_val, y_val])
            points = np.array(points, dtype=np.int32)

            # Use the convex hull to obtain a smooth, enclosed shape.
            hull = cv2.convexHull(points)
            cv2.fillPoly(mask, [hull], 255)
        return mask

    # Draw the first shape.
    first_shape = random.choice(shape_types)
    mask = draw_shape(first_shape, mask)

    # With a 30% chance, add a second shape on the same mask.
    if random.random() < 0.3:
        second_shape = random.choice(shape_types)
        mask = draw_shape(second_shape, mask)

    # Dilate the mask to smooth edges and expand the masked region.
    kernel = np.ones((15, 15), dtype=np.uint8)
    mask = cv2.dilate(mask, kernel, iterations=5)

    cv2.imwrite(mask_path, mask)


# Accelerated Mask Generation (Threading)

In [None]:
def generate_masks_concurrently(image_paths, mask_paths, num_workers=8):
  """
  Generates masks concurrently using a ThreadPoolExecutor.

  Args:
      image_paths (List[str]): List of paths to the input images.
      mask_paths (List[str]): List of paths to save the generated masks.
      num_workers (int, optional): The number of worker threads to use. Defaults to 8.
  """

  from concurrent.futures import ThreadPoolExecutor, as_completed
  with ThreadPoolExecutor(max_workers=num_workers) as executor:
    futures = [executor.submit(generate_dynamic_mask, img_path, mask_path) for img_path, mask_path in zip(image_paths, mask_paths)]
    for future in as_completed(futures):  # Corrected 'future' to 'futures'
      try:
        future.result()
      except Exception as e:
        print(f"Error: {e}")

# Batch Inpainting Functions with Dynamic Prompts

In [None]:
def inpaint_image_batch(original_paths, mask_paths, output_paths):
    """
    Batch inpainting using Stable Diffusion with a static text prompt.

    Args:
      original_paths (List[str]): List of paths to original images.
      mask_paths (List[str]): List of paths to corresponding mask images.
      output_paths (List[str]): List of paths to save the inpainted images.
    """

    originals = [Image.open(p).convert("RGB") for p in original_paths]
    masks = [Image.open(p).convert("L") for p in mask_paths]

    # Static prompt used for every image.
    prompt = "Fill the missing area with a natural, serene background"

    # Create a list of the same prompt to match the number of images.
    prompts = [prompt] * len(originals)

    # Perform batch inpainting.
    result = pipe(prompt=prompts, image=originals, mask_image=masks).images

    # Save each result to its corresponding output path.
    for img, out_path in zip(result, output_paths):
        img.save(out_path)


In [None]:
from diffusers import StableDiffusionInpaintPipeline
import torch

pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    torch_dtype=torch.float16,
)

In [None]:
pipe = pipe.to("cuda")
pipe.enable_xformers_memory_efficient_attention()
pipe.enable_attention_slicing()

In [None]:
# Define the starting index
start_index = 3144  # Since indexing starts at 0, index 3144 corresponds to the 1129th image

# Get list of image filenames from COCO train folder
coco_images = sorted(os.listdir(COCO_DIR))
# Limit to the desired number of images
coco_images = coco_images[start_index:min(len(coco_images), NUM_IMAGES_TO_PROCESS)]

BATCH_SIZE = 8  # Adjust based on your GPU capabilities

for i in tqdm(range(0, len(coco_images), BATCH_SIZE), desc="Processing Batches"):
    batch_imgs = coco_images[i : i + BATCH_SIZE]

    img_paths = [os.path.join(COCO_DIR, img) for img in batch_imgs]
    mask_paths = [os.path.join(MASK_DIR, img.replace(".jpg", "_mask.png")) for img in batch_imgs]
    inpainted_paths = [os.path.join(INPAINTED_DIR, img.replace(".jpg", "_inpainted.jpg")) for img in batch_imgs]

    # Generate dynamic masks for each image in the batch
    for img_path, mask_path in zip(img_paths, mask_paths):
        generate_dynamic_mask(img_path, mask_path)

    # Inpaint images using dynamic prompts
    inpaint_image_batch(img_paths, mask_paths, inpainted_paths)

print("Dynamic mask generation and inpainting complete.")


# Appending Dataset

In [None]:
import os
import zipfile

In [None]:
def extract_and_append_zip(zip_file1, zip_file2, output_zip):
   # Create a temporary directory to extract the first zip file
   temp_dir = "temp_extract"
   os.makedirs(temp_dir, exist_ok=True)

   # Extract the first zip file to the temporary directory
   with zipfile.ZipFile(zip_file1, 'r') as zip_ref:
     zip_ref.extractall(temp_dir)

   # Create the output zip file
   with zipfile.ZipFile(output_zip, 'w') as output_zip_ref:
     # Add the contents of the temporary directory to the output zip file
     for root, _, files in os.walk(temp_dir):
       for file in files:
         file_path = os.path.join(root, file)
         output_zip_ref.write(file_path, os.path.relpath(file_path, temp_dir))

     # Add the contents of the second zip file to the output zip file
     with zipfile.ZipFile(zip_file2, 'r') as zip_ref2:
       for file in zip_ref2.namelist():
         if not file.endswith('/'):  # Exclude directories
           output_zip_ref.writestr(file, zip_ref2.read(file))


In [None]:
extract_and_append_zip("/content/combined_masks.zip", "/content/dataset_masks.zip", "masks_8k.zip")