In [3]:
# --- Imports ---
import numpy as np
import cv2
import pywt
from typing import Tuple

# --- Helpers ---
def rgb_to_ycbcr(img):
    return cv2.cvtColor(img, cv2.COLOR_RGB2YCrCb)  # OpenCV uses YCrCb ordering
def ycbcr_to_rgb(img):
    return cv2.cvtColor(img, cv2.COLOR_YCrCb2RGB)

def to_gray_binary(wm_img):  # keep original size; produce {0,1}
    if wm_img.ndim == 3:
        wm_img = cv2.cvtColor(wm_img, cv2.COLOR_RGB2GRAY)
    # binarize without resizing
    th = cv2.threshold(wm_img, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)[1]
    return (th > 0).astype(np.uint8)

def block_view(A, block_h, block_w):
    """Return a 4D view: (n_blocks_y, n_blocks_x, block_h, block_w) trimming edges."""
    H, W = A.shape
    Ht = (H // block_h) * block_h
    Wt = (W // block_w) * block_w
    A = A[:Ht, :Wt]
    return (A.reshape(Ht//block_h, block_h, Wt//block_w, block_w)
             .swapaxes(1,2))

def idct2(B):
    return cv2.idct(B.astype(np.float32))
def dct2(B):
    return cv2.dct(B.astype(np.float32))

def apply_qim_pair(c1, c2, bit, Delta):
    d = c1 - c2
    q = np.round(d / Delta)
    # enforce parity
    if bit == 1:
        if q % 2 == 0: q += 1
    else:
        if q % 2 != 0: q -= 1
    d_star = q * Delta
    # adjust c1, c2 to achieve new difference (keep average constant)
    avg = 0.5*(c1 + c2)
    c1_new = avg + 0.5*d_star
    c2_new = avg - 0.5*d_star
    return c1_new, c2_new

def decode_qim_pair(c1, c2, Delta):
    d = c1 - c2
    q = int(np.round(d / Delta))
    return 1 if (q % 2 != 0) else 0

def flatten_blocks_indices(shape_blocks, key, needed):
    nb_y, nb_x = shape_blocks
    idx = np.arange(nb_y*nb_x)
    rng = np.random.default_rng(key)
    rng.shuffle(idx)
    return idx[:needed], nb_y, nb_x


In [4]:
def embed_watermark_dwt_qim(
    host_rgb: np.ndarray,
    wm_img: np.ndarray,
    Delta: float = 4.0,
    block_size: int = 4,
    wavelet: str = 'haar',
    key: int = 12345
) -> np.ndarray:
    # 1) Host → Y channel
    ycrcb = rgb_to_ycbcr(host_rgb)
    Y = ycrcb[:,:,0].astype(np.float32)

    # 2) DWT
    LL, (LH, HL, HH) = pywt.dwt2(Y, wavelet=wavelet)

    # 3) Blocks in LH and HL
    LH_blocks = block_view(LH, block_size, block_size)
    HL_blocks = block_view(HL, block_size, block_size)

    # 4) Watermark bits (no resizing)
    wm_bits_2d = to_gray_binary(wm_img)
    H_wm, W_wm = wm_bits_2d.shape
    wm_bits = wm_bits_2d.flatten()
    Nbits = wm_bits.size

    # 5) Permute block indices with key; ensure capacity
    total_blocks = LH_blocks.shape[0]*LH_blocks.shape[1]
    capacity = min(total_blocks, HL_blocks.shape[0]*HL_blocks.shape[1])
    if Nbits > capacity:
        raise ValueError(f"Not enough blocks: bits={Nbits}, capacity={capacity}. "
                         f"Use larger image or smaller block size.")

    idx_perm, nby, nbx = flatten_blocks_indices((LH_blocks.shape[0], LH_blocks.shape[1]), key, Nbits)

    # 6) Embed into both LH and HL (redundancy)
    def embed_into_subband(blocks, bits, idx_perm):
        B = blocks.copy()
        for k, bit in enumerate(bits):
            flat_idx = idx_perm[k]
            by, bx = divmod(flat_idx, nbx)
            block = B[by, bx].copy()
            C = dct2(block)

            c1y, c1x = 1, 2
            c2y, c2x = 2, 1
            c1 = C[c1y, c1x]
            c2 = C[c2y, c2x]
            c1n, c2n = apply_qim_pair(c1, c2, int(bit), Delta)
            C[c1y, c1x] = c1n
            C[c2y, c2x] = c2n

            B[by, bx] = idct2(C)
        return B

    LH_emb = embed_into_subband(LH_blocks, wm_bits, idx_perm)
    HL_emb = embed_into_subband(HL_blocks, wm_bits, idx_perm)

    # 7) Stitch blocks back to arrays (trimmed to block grid size)
    def unstack_blocks(B):
        nby, nbx, bh, bw = B.shape
        Ht, Wt = nby*bh, nbx*bw
        out = np.zeros((Ht, Wt), dtype=np.float32)
        for by in range(nby):
            for bx in range(nbx):
                out[by*bh:(by+1)*bh, bx*bw:(bx+1)*bw] = B[by, bx]
        return out

    LH2 = unstack_blocks(LH_emb)
    HL2 = unstack_blocks(HL_emb)
    # Align shapes to original (pad unchanged borders if any trimming happened)
    LH_new = LH.copy()
    HL_new = HL.copy()
    LH_new[:LH2.shape[0], :LH2.shape[1]] = LH2
    HL_new[:HL2.shape[0], :HL2.shape[1]] = HL2

    # 8) Inverse DWT
    Yw = pywt.idwt2((LL, (LH_new, HL_new, HH)), wavelet=wavelet)

    # 9) Merge back to RGB
    Yw = np.clip(Yw, 0, 255).astype(np.uint8)
    ycrcb_w = ycrcb.copy()
    ycrcb_w[:,:,0] = Yw
    watermarked = ycbcr_to_rgb(ycrcb_w)
    return watermarked


optional

In [5]:
def estimate_rotation_translation(orig_like_LL, test_like_LL):
    # Minimalist: phase correlation suggests translation only.
    # For small rotations, we can brute-force ±2° in steps and keep the best correlation.
    best = (0.0, (0,0))  # (angle_deg, shift)
    best_val = -1e9
    angles = np.linspace(-2.0, 2.0, 9)  # small range
    H, W = test_like_LL.shape
    for a in angles:
        M = cv2.getRotationMatrix2D((W/2, H/2), a, 1.0)
        rotated = cv2.warpAffine(test_like_LL, M, (W, H), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
        (dx, dy), resp = cv2.phaseCorrelate(orig_like_LL.astype(np.float32), rotated.astype(np.float32))
        score = resp
        if score > best_val:
            best_val = score
            best = (a, (dx, dy))
    return best  # you can use this if you keep a tiny LL reference/template


Extraction

In [6]:
def extract_watermark_dwt_qim(
    wm_rgb: np.ndarray,
    wm_shape: Tuple[int,int],
    Delta: float = 4.0,
    block_size: int = 4,
    wavelet: str = 'haar',
    key: int = 12345
) -> np.ndarray:
    H_wm, W_wm = wm_shape
    Nbits = H_wm * W_wm

    # 1) Y channel
    ycrcb = rgb_to_ycbcr(wm_rgb)
    Y = ycrcb[:,:,0].astype(np.float32)

    # 2) DWT
    LL, (LH, HL, HH) = pywt.dwt2(Y, wavelet=wavelet)

    # 3) Blocks
    LH_blocks = block_view(LH, block_size, block_size)
    HL_blocks = block_view(HL, block_size, block_size)
    total_blocks = LH_blocks.shape[0]*LH_blocks.shape[1]
    capacity = min(total_blocks, HL_blocks.shape[0]*HL_blocks.shape[1])
    if Nbits > capacity:
        raise ValueError(f"Not enough blocks to extract: need {Nbits}, capacity {capacity}")

    idx_perm, nby, nbx = flatten_blocks_indices((LH_blocks.shape[0], LH_blocks.shape[1]), key, Nbits)

    def read_bits_from_subband(blocks, idx_perm, Nbits, Delta):
        bits = np.zeros(Nbits, dtype=np.uint8)
        for k in range(Nbits):
            flat_idx = idx_perm[k]
            by, bx = divmod(flat_idx, nbx)
            block = blocks[by, bx]
            C = dct2(block)
            c1 = C[1,2]; c2 = C[2,1]
            bits[k] = decode_qim_pair(c1, c2, Delta)
        return bits

    bits_LH = read_bits_from_subband(LH_blocks, idx_perm, Nbits, Delta)
    bits_HL = read_bits_from_subband(HL_blocks, idx_perm, Nbits, Delta)

    # 4) Majority vote per bit
    bits = ((bits_LH.astype(int) + bits_HL.astype(int)) >= 1).astype(np.uint8)

    # 5) Reshape to image
    wm_rec = bits.reshape(H_wm, W_wm) * 255
    return wm_rec.astype(np.uint8)


In [36]:
# Load images (keep original sizes)
host_rgb = cv2.cvtColor(cv2.imread("images/chihuahua.webp"), cv2.COLOR_BGR2RGB)
# wm_gray  = cv2.cvtColor(cv2.imread("wm_logo.png", cv2.IMREAD_GRAYSCALE), cv2.COLOR_BGR2RGB) if False else cv2.imread("wm_logo.png", 0)
wm_gray  = cv2.imread("images/sm_watermark.jpg", 0)

In [37]:
# Ensure watermark is mono and unchanged size
Delta = 10.0
key = 20250810
wimg = embed_watermark_dwt_qim(host_rgb, wm_gray, Delta=Delta, block_size=4, wavelet='db2', key=key)
cv2.imwrite("images/watermarked.png", cv2.cvtColor(wimg, cv2.COLOR_RGB2BGR))

cv2.imshow("watermarked", cv2.cvtColor(wimg, cv2.COLOR_RGB2BGR))
cv2.waitKey(0)

-1

In [43]:
# Extract (blind)
wm_loaded = cv2.cvtColor(cv2.imread("images/wa_watermarked.jpg"), cv2.COLOR_BGR2RGB)
rec = extract_watermark_dwt_qim(wm_loaded, wm_shape=wm_gray.shape, Delta=Delta, block_size=4, wavelet='db2', key=key)
cv2.imwrite("images/wm_recovered.png", rec)

cv2.imshow("wm_recovered", rec)
cv2.waitKey(0)

-1