In [1]:
import cv2
import numpy as np
import glob
import os
import logging
import json
from tqdm import tqdm
import concurrent.futures
import asyncio
from skimage.measure import label, regionprops
from skimage.segmentation import active_contour
from skimage.feature import local_binary_pattern

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s: %(message)s",
    filename="watermark_removal.log",
    filemode="w",
)

# Constants
ROOT_DIR = os.getcwd()
DATA_DIR = os.path.join(ROOT_DIR, "DATA", "3.IMAGES")
GROUPS_JSON = os.path.join(DATA_DIR, "image_groups.json")
MAX_GPU_VRAM_MB = 4000  # Assuming 4GB VRAM
IMAGE_SIZE_MB = 5  # Average image size when loaded into memory (adjust as needed)
BATCH_SIZE = MAX_GPU_VRAM_MB // IMAGE_SIZE_MB
WATERMARK_COLOR = np.array([202, 201, 196])
TOLERANCE = 10  # Tolerance for watermark color

In [2]:
class IOHelper:
    """Helper class for IO operations."""

    @staticmethod
    async def async_load_image(image_path):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, cv2.imread, image_path)

    @staticmethod
    async def async_save_image(img, save_path):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, cv2.imwrite, save_path, img)

    @staticmethod
    def load_images_by_dimensions():
        if os.path.exists(GROUPS_JSON):
            with open(GROUPS_JSON, "r") as f:
                return json.load(f)

        images = glob.glob(os.path.join(DATA_DIR, "*.jpg"))
        logging.info(f"Found {len(images)} images in {DATA_DIR}")

        loop = asyncio.get_event_loop()
        loaded_images = loop.run_until_complete(
            asyncio.gather(*[IOHelper.async_load_image(img) for img in images])
        )

        images_by_dimensions = {
            str(image.shape[:2]): image_path
            for image, image_path in zip(loaded_images, images)
        }

        with open(GROUPS_JSON, "w") as f:
            json.dump(images_by_dimensions, f)

        logging.info(
            f"Grouped images by dimensions: {len(images_by_dimensions)} groups found"
        )
        return images_by_dimensions

    @staticmethod
    def save_image(img, dimensions, idx):
        done_dir = os.path.join(DATA_DIR, "0.DONE")
        os.makedirs(done_dir, exist_ok=True)
        save_path = os.path.join(
            done_dir, f"processed_{dimensions[0]}x{dimensions[1]}_{idx}.jpg"
        )
        asyncio.run(IOHelper.async_save_image(img, save_path))
        logging.info(f"Saved image at: {save_path}")

    @staticmethod
    def save_mask(mask, dimensions):
        mark_path = os.path.join(DATA_DIR, "1.MARK", f"watermark_{dimensions}.jpg")
        os.makedirs(os.path.join(DATA_DIR, "1.MARK"), exist_ok=True)
        asyncio.run(IOHelper.async_save_image(mask, mark_path))
        logging.info(f"Watermark mask for dimensions {dimensions} created and saved!")

    @staticmethod
    def load_mask(dimensions):
        mark_path = os.path.join(DATA_DIR, "1.MARK", f"watermark_{dimensions}.jpg")
        if os.path.exists(mark_path):
            return cv2.imread(mark_path, cv2.IMREAD_GRAYSCALE)
        return None

In [3]:
class WatermarkDetector:
    """Class responsible for detecting and generating watermark masks."""

    @staticmethod
    def _prepare_image(img):
        return cv2.GaussianBlur(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), (5, 5), 0)

    @staticmethod
    def _mask_operations(mask):
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
        mask = cv2.subtract(cv2.dilate(mask, kernel), cv2.erode(mask, kernel))
        mode = np.argmax(np.bincount(mask.ravel()))
        return np.where(mask == mode, 255, 0).astype(np.uint8)

    @staticmethod
    def _get_detection_masks(blurred):
        masks = {
            "edge": cv2.Canny(blurred, 50, 150),
            "line": WatermarkDetector._detect_straight_lines(
                cv2.Canny(blurred, 50, 150)
            ),
            "adaptive": cv2.adaptiveThreshold(
                blurred, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 11, 2
            ),
            "texture": WatermarkDetector._lbp_texture_analysis(blurred),
        }
        return masks

    @staticmethod
    def _detect_straight_lines(edges):
        lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 100, 200, 5)
        line_mask = np.zeros_like(edges)
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                cv2.line(line_mask, (x1, y1), (x2, y2), 255, 1)

        return line_mask

    @staticmethod
    def _lbp_texture_analysis(img):
        lbp = local_binary_pattern(img, P=8, R=1, method="uniform")
        return cv2.threshold(
            lbp.astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
        )[1]

    @staticmethod
    def _background_subtraction(images, median_img):
        diffs = [cv2.absdiff(img, median_img) for img in images]
        thresholded_diffs = [
            cv2.threshold(diff, 30, 255, cv2.THRESH_BINARY)[1] for diff in diffs
        ]
        combined_mask = thresholded_diffs[0]
        for mask in thresholded_diffs[1:]:
            combined_mask = cv2.bitwise_or(combined_mask, mask)
        return combined_mask

    @staticmethod
    def _apply_active_contours(mask):
        s = np.linspace(0, 2 * np.pi, 400)
        init = np.array(
            [
                mask.shape[0] / 2 + mask.shape[0] / 2 * np.sin(s),
                mask.shape[1] / 2 + mask.shape[1] / 2 * np.cos(s),
            ]
        ).T
        snake = active_contour(mask, init, alpha=0.06, beta=1.0, gamma=0.3)

        # Ensure the coordinates are within bounds
        snake[:, 0] = np.clip(snake[:, 0], 0, mask.shape[0] - 1)
        snake[:, 1] = np.clip(snake[:, 1], 0, mask.shape[1] - 1)

        result = np.zeros_like(mask)
        result[
            np.round(snake[:, 0]).astype("int"), np.round(snake[:, 1]).astype("int")
        ] = 255
        return result

    @staticmethod
    def _connected_components(mask):
        for region in regionprops(label(mask)):
            if region.area < 500:
                for coord in region.coords:
                    mask[coord[0], coord[1]] = 0
        return mask

    @staticmethod
    def _save_sample_mask(mask, method_name):
        """Save the watermark mask sample."""
        save_dir = "samples"
        os.makedirs(save_dir, exist_ok=True)
        cv2.imwrite(os.path.join(save_dir, f"{method_name}_sample.jpg"), mask)

    @staticmethod
    def generate_watermark_mask(images):
        median_image = np.median(images, axis=0).astype(np.uint8)
        blurred = WatermarkDetector._prepare_image(median_image)
        masks = WatermarkDetector._get_detection_masks(blurred)
        masks["bg_sub"] = WatermarkDetector._background_subtraction(
            images, median_image
        )

        # Save samples of each mask
        for method, mask in masks.items():
            WatermarkDetector._save_sample_mask(mask, method)

        # Verify that all masks have the same shape
        mask_shapes = {k: v.shape for k, v in masks.items()}
        print(
            f"Mask Shapes: {mask_shapes}"
        )  # Add a logger instead of print in production

        # Initialize combined mask with the first mask
        combined_mask = list(masks.values())[0]

        # Combine masks while checking for size consistency
        for mask_name, mask in list(masks.items())[1:]:
            if mask.shape != combined_mask.shape:
                logging.error(
                    f"Shape mismatch: {mask_name} mask shape {mask.shape} "
                    f"is not equal to combined mask shape {combined_mask.shape}"
                )
                continue  # Skip this mask
            combined_mask = cv2.bitwise_or(combined_mask, mask)

        combined_mask = WatermarkDetector._apply_active_contours(combined_mask)
        combined_mask = WatermarkDetector._connected_components(combined_mask)
        return WatermarkDetector._mask_operations(combined_mask)

In [4]:
class ImageProcessor:
    """Class responsible for processing and inpainting images."""

    @staticmethod
    def _refine_single_mask(gray, watermark_mask, kernel):
        mean_val, std_val = cv2.meanStdDev(gray)
        mean_val = mean_val[0][0]
        std_val = std_val[0][0]

        lower_bound = np.full_like(gray, mean_val - 2 * std_val)
        upper_bound = np.full_like(gray, mean_val + 2 * std_val)

        confident_mask = np.bitwise_or(gray < lower_bound, gray > upper_bound).astype(
            np.uint8
        )
        confident_mask = cv2.bitwise_and(confident_mask, watermark_mask)

        confident_mask = cv2.morphologyEx(confident_mask, cv2.MORPH_OPEN, kernel)
        confident_mask = cv2.dilate(confident_mask, kernel)

        return confident_mask

    @staticmethod
    def _refine_mask(batch_gray, watermark_mask):
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))

        return [
            ImageProcessor._refine_single_mask(gray, watermark_mask, kernel)
            for gray in batch_gray
        ]

    @staticmethod
    def _remove_watermark(image, refined_mask):
        return cv2.inpaint(
            image, refined_mask, inpaintRadius=1, flags=cv2.INPAINT_TELEA
        )

    @staticmethod
    def process_images(images, watermark_mask):
        with tqdm(
            total=len(images),
            desc="Processing images",
            unit="img",
            dynamic_ncols=True,
            leave=True,
        ) as pbar:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                refined_masks_generator = (
                    ImageProcessor._refine_mask(
                        images[idx : idx + BATCH_SIZE], watermark_mask
                    )
                    for idx in range(0, len(images), BATCH_SIZE)
                )

                processed_images = [
                    result
                    for batch in refined_masks_generator
                    for result in executor.map(
                        ImageProcessor._remove_watermark, images, batch
                    )
                ]

                pbar.update(len(processed_images))

        return processed_images

In [5]:
class MainExecutor:
    """Main execution class."""

    @staticmethod
    def run(num_images_to_process=None):
        """Main execution function."""
        logging.info("Starting main execution")

        # Load or group images by dimensions
        images_by_dimensions = IOHelper.load_images_by_dimensions()

        # Process images by dimensions
        for dimensions, image_paths in images_by_dimensions.items():
            # Load all images for watermark generation
            all_imgs = [cv2.imread(p) for p in image_paths]

            # If num_images_to_process is specified, take only the first N images for processing
            imgs = (
                all_imgs[:num_images_to_process] if num_images_to_process else all_imgs
            )

            watermark_mask = IOHelper.load_mask(dimensions)
            if watermark_mask is None:
                logging.info(f"Generating watermark mask for dimensions {dimensions}")
                watermark_mask = WatermarkDetector.generate_watermark_mask(all_imgs)
                IOHelper.save_mask(watermark_mask, dimensions)

            processed_images = ImageProcessor.process_images(imgs, watermark_mask)
            for idx, img in enumerate(processed_images):
                IOHelper.save_image(img, eval(dimensions), idx)

        logging.info("Main execution completed")

In [None]:
import nest_asyncio

nest_asyncio.apply()

if __name__ == "__main__":
    MainExecutor.run(1)