In [3]:
import os, glob, time
import numpy as np
from PIL import Image
from tqdm import tqdm


TARGET_PATH = "/Users/michellewan/Desktop/mosaic_project/data/targets/landscape.jpg"  
TILES_DIR   = os.path.expanduser("~/Desktop/mosaic_project/data/wikiart_tile_cache")  
OUTPUT_DIR  = os.path.expanduser("~/Desktop/mosaic_project/outputs/landscape")

TILE_SIZE   = 12       #size of the mosiac (pixel)
POOL_TILES  = 12000    
SEED        = 42

# VGG embedding
EMBED_SIZE  = 96       # VGG input solulution
CUT_SINGLE  = 22       # midlle/high layer
CUT3        = 16       # 更低层（纹理/边缘）
CUT4        = 22       # 更高层（结构/语义）
    #multi-layer feature eusion

# Step D: mix
ALPHA_VGG   = 0.75     # VGG+Color: consider structure more than color

# Anti-repeat (global)
MAX_USE_MEAN  = 1
MAX_USE_VGG   = 2
MAX_USE_BEST  = 3

PENALTY_MEAN  = 0.35 #higher means less tolerent for tile repeating
PENALTY_VGG   = 0.20
PENALTY_BEST  = 0.12

# Candidate pool sizes
TOPK_MEAN     = 12
TOPK_VGG_TREE = 60
TOPK_MIX      = 80
TOPK3_BEST    = 80
TOPK4_BEST    = 80
#multi-stage candidate filtering

# BEST weights (dual VGG + color)
W3, W4, W_COLOR = 0.6, 0.2, 0.2
#w3 for texture and edge; w4 for edges for eyes and noses

# BEST face-centric boost + local no-repeat
FACE_BOOST   = 1         #won't have a higher weight for the center 
LOCAL_RADIUS = 2          # 在网格半径2内不允许重复同一 tile
LOCAL_BAN    = True

# Optional: VGG brute-force baseline is slow
RUN_VGG_BRUTE = True  

def ensure_dir(p):
    os.makedirs(p, exist_ok=True)

class Timer:
    def __init__(self, name=""):
        self.name = name
    def __enter__(self):
        self.t0 = time.perf_counter()
        return self
    def __exit__(self, exc_type, exc, tb):
        dt = time.perf_counter() - self.t0
        print(f"[RUNTIME] {self.name}: {dt:.3f} sec")

def list_tiles(tile_dir):
    return sorted(glob.glob(os.path.join(tile_dir, "*.jpg"))) + \
           sorted(glob.glob(os.path.join(tile_dir, "*.jpeg"))) + \
           sorted(glob.glob(os.path.join(tile_dir, "*.png")))

#read target image and resize
def load_target_image(path, max_side=1200):
    if path.lower().endswith(".heic"):
        try:
            import pillow_heif
            pillow_heif.register_heif_opener()
        except Exception as e:
            raise RuntimeError("HEIC needs pillow-heif：pip install pillow-heif") from e
    img = Image.open(path).convert("RGB")
    w, h = img.size
    scale = min(1.0, max_side / max(w, h))
    if scale < 1.0:
        img = img.resize((int(w * scale), int(h * scale)), Image.BICUBIC)
    return np.array(img, dtype=np.uint8)

def crop_target_to_grid(target_rgb, tile_size):
    H, W, _ = target_rgb.shape
    H2 = (H // tile_size) * tile_size
    W2 = (W // tile_size) * tile_size
    return target_rgb[:H2, :W2].copy()

def preprocess_tile(path, tile_size):
    img = Image.open(path).convert("RGB").resize((tile_size, tile_size), Image.BILINEAR)
    return np.array(img, dtype=np.uint8)
    
#v2-v5
def preprocess_resize(path, size):
    img = Image.open(path).convert("RGB").resize((size, size), Image.BILINEAR)
    return np.array(img, dtype=np.uint8)
    #each tile image is resized to a higher resolution (embed_size) before feature 
    #extraction, ensuring that the CNN captures sufficient structural and textural information

def save_img(arr_uint8, out_path):
    Image.fromarray(arr_uint8).save(out_path, quality=95)
    print("[SAVE]", out_path)

#v4-v5
def normalize_dist(d):
    return d / (float(np.mean(d)) + 1e-8)
    #vgg distance might between 0-2, but RGB distance might be much more higher

#v5
def face_center_mask(i, j, gh, gw, frac=0.40):
    #If image falls in the center 40 percent by 40 percent area, 
    #higher the structural features(cuz it is considered to be the face area)
    ci0 = int((0.5 - frac/2) * gh); ci1 = int((0.5 + frac/2) * gh)
    cj0 = int((0.5 - frac/2) * gw); cj1 = int((0.5 + frac/2) * gw)
        #frac = 0.4
    return (ci0 <= i < ci1) and (cj0 <= j < cj1)

ensure_dir(OUTPUT_DIR)
print("OUTPUT_DIR =", OUTPUT_DIR)

OUTPUT_DIR = /Users/michellewan/Desktop/mosaic_project/outputs/landscape


In [4]:
rng = np.random.default_rng(SEED)

tile_paths_all = list_tiles(TILES_DIR)
if len(tile_paths_all) == 0:
    raise FileNotFoundError(f"No tiles found in {TILES_DIR}")

tile_paths_pool = tile_paths_all.copy()
rng.shuffle(tile_paths_pool)
tile_paths_pool = tile_paths_pool[:min(POOL_TILES, len(tile_paths_pool))]

target_rgb  = load_target_image(TARGET_PATH, max_side=1200)
target_grid = crop_target_to_grid(target_rgb, TILE_SIZE)

H, W, _ = target_grid.shape
gh, gw  = H // TILE_SIZE, W // TILE_SIZE
num_blocks = gh * gw

print("Target:", target_rgb.shape, "-> grid:", target_grid.shape)
print("Grid blocks:", num_blocks, "| tile_size:", TILE_SIZE)
print("Tiles in pool:", len(tile_paths_pool))

out0 = os.path.join(OUTPUT_DIR, f"00_target_grid_tile{TILE_SIZE}.jpg")
save_img(target_grid, out0)


Target: (800, 1200, 3) -> grid: (792, 1200, 3)
Grid blocks: 6600 | tile_size: 12
Tiles in pool: 12000
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/00_target_grid_tile12.jpg


In [5]:
def compute_tile_means(paths, tile_size, save_npy):
    means = np.zeros((len(paths), 3), dtype=np.float32)
    for i, p in enumerate(tqdm(paths, desc=f"tile means@{tile_size}")):
        arr = preprocess_tile(p, tile_size).astype(np.float32)
        means[i] = arr.reshape(-1, 3).mean(axis=0) #Get mean for each pixel
    np.save(save_npy, means)
    return means

def mosaic_mean_rgb(target_rgb, tile_paths, tile_means, tile_size,
                    max_use=1, penalty_strength=0.35, top_k=12, seed=42):
    rng = np.random.default_rng(seed)
    target_rgb = crop_target_to_grid(target_rgb, tile_size)
    H, W, _ = target_rgb.shape
    gh, gw = H // tile_size, W // tile_size
    N = len(tile_paths)

    use_count = np.zeros(N, dtype=np.int32)
    mosaic = np.zeros_like(target_rgb, dtype=np.uint8)

    for i in range(gh):
        for j in range(gw):
            block = target_rgb[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size].astype(np.float32)
            block_mean = block.reshape(-1, 3).mean(axis=0)

            dists = np.sum((tile_means - block_mean[None,:])**2, axis=1)
                #L2 distances: calculate the distance between each tile and block RGB mean 

            allowed = use_count < max_use #less repeatation
            if not np.any(allowed):
                allowed = use_count < (max_use + 3)
                #If all tiles are not allowed, extend the max_use

            penalty = (1.0 + penalty_strength * use_count.astype(np.float32))**2
            scores = dists * penalty
            scores_masked = np.where(allowed, scores, np.inf)
                #scores for picking the final tile consider both distance and repeteation
                #If a pic is used more, it is hard to get picked

            k = int(min(top_k, np.sum(np.isfinite(scores_masked))))
            if k <= 0:
                scores_masked = scores
                k = int(min(top_k, np.sum(np.isfinite(scores_masked))))

            best_k = np.argpartition(scores_masked, k-1)[:k]  #randomly picking 1 out of first k-
            best_k = best_k[np.isfinite(scores_masked[best_k])]
            chosen = int(rng.choice(best_k))

            use_count[chosen] += 1
            mosaic[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size] = preprocess_tile(tile_paths[chosen], tile_size)

        if (i+1) % 5 == 0 or (i+1) == gh:
            print(f"  MeanRGB row {i+1}/{gh} done")

    return mosaic, use_count

means_cache = os.path.join(OUTPUT_DIR, f"cache_means_tile{TILE_SIZE}_N{len(tile_paths_pool)}.npy")
if os.path.exists(means_cache):
    tile_means = np.load(means_cache)
    print("Loaded means:", tile_means.shape)
else:
    with Timer("Compute tile means"):
        tile_means = compute_tile_means(tile_paths_pool, TILE_SIZE, means_cache)

with Timer("Step1 MeanRGB Mosaic"):
    mosaic_mean, use_mean = mosaic_mean_rgb(
        target_grid, tile_paths_pool, tile_means, TILE_SIZE,
        max_use=MAX_USE_MEAN, penalty_strength=PENALTY_MEAN, top_k=TOPK_MEAN, seed=SEED
    )

out1 = os.path.join(OUTPUT_DIR, f"01_meanRGB_tile{TILE_SIZE}.jpg")
save_img(mosaic_mean, out1)
print("MeanRGB unique:", int((use_mean>0).sum()), "max_used:", int(use_mean.max()))


tile means@12: 100%|████████████████████| 12000/12000 [00:02<00:00, 4758.83it/s]


[RUNTIME] Compute tile means: 2.548 sec
  MeanRGB row 5/66 done
  MeanRGB row 10/66 done
  MeanRGB row 15/66 done
  MeanRGB row 20/66 done
  MeanRGB row 25/66 done
  MeanRGB row 30/66 done
  MeanRGB row 35/66 done
  MeanRGB row 40/66 done
  MeanRGB row 45/66 done
  MeanRGB row 50/66 done
  MeanRGB row 55/66 done
  MeanRGB row 60/66 done
  MeanRGB row 65/66 done
  MeanRGB row 66/66 done
[RUNTIME] Step1 MeanRGB Mosaic: 2.476 sec
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/01_meanRGB_tile12.jpg
MeanRGB unique: 6600 max_used: 1


In [6]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from sklearn.neighbors import KDTree

device = "mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)

vgg_tf = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]), #normalize using imageNet
])

class VGGEmbedder(nn.Module): #input an image, output a feature vector
    def __init__(self, cut_idx=22): #use 22 layers of vgg
        super().__init__()
        vgg = models.vgg16(weights=models.VGG16_Weights.DEFAULT).features.eval()
        for p in vgg.parameters(): 
            p.requires_grad_(False)
        self.vgg = vgg
        self.cut_idx = cut_idx
    def forward(self, x):
        x = self.vgg[:self.cut_idx](x)   # (B,C,H,W); get feature map; get layer before cut_idx(middle layer)
        x = x.mean(dim=(2,3))            # GAP -> (B,C) global average pooling  = In this image, how strong each image is
        x = nn.functional.normalize(x, dim=1)
        return x

@torch.no_grad()
def embed_paths(paths, cut_idx, embed_size, save_npy, batch_size=64):
    embedder = VGGEmbedder(cut_idx=cut_idx).to(device).eval()
    feats = []
    for i in tqdm(range(0, len(paths), batch_size), desc=f"vgg(cut={cut_idx})@{embed_size}"):
        batch_paths = paths[i:i+batch_size]
        imgs = []
        for p in batch_paths:
            arr = preprocess_resize(p, embed_size)
            imgs.append(vgg_tf(Image.fromarray(arr)))
        xb = torch.stack(imgs, dim=0).to(device)
        fb = embedder(xb).cpu().numpy().astype(np.float32)
        feats.append(fb) #save the features of this batch$
    feats = np.concatenate(feats, axis=0)
    np.save(save_npy, feats)
    return feats

@torch.no_grad()
def embed_block(block_uint8, cut_idx, embed_size, embedder_obj):
    arr = Image.fromarray(block_uint8).resize((embed_size, embed_size), Image.BILINEAR)
    x = vgg_tf(arr).unsqueeze(0).to(device) #resize the block into what VGG needs
    f = embedder_obj(x).cpu().numpy().astype(np.float32)[0]
    return f  # (D,)
    #embedding the block = turn a block of image into a feature vector

vgg_single_cache = os.path.join(OUTPUT_DIR, f"cache_vgg_cut{CUT_SINGLE}_embed{EMBED_SIZE}_N{len(tile_paths_pool)}.npy")
if os.path.exists(vgg_single_cache):
    tile_feats_single = np.load(vgg_single_cache)
    print("Loaded VGG feats:", tile_feats_single.shape)
else:
    with Timer("Compute VGG feats (single)"):
        tile_feats_single = embed_paths(tile_paths_pool, CUT_SINGLE, EMBED_SIZE, vgg_single_cache, batch_size=64)

#Turn image into tensor - normalize
#VGG6
#cut_idx
#output feature map - L2 normalize
#for each batch: resize to embed)size - stack into batch tensor 
    #- put into embedder get (B,D) feature; D represent feature of each block
#for each target block: get embedding

device: mps


vgg(cut=22)@96: 100%|█████████████████████████| 188/188 [00:14<00:00, 13.23it/s]

[RUNTIME] Compute VGG feats (single): 15.144 sec





In [7]:
def mosaic_vgg_bruteforce(target_rgb, tile_paths, tile_feats, tile_size,
                          cut_idx=CUT_SINGLE, embed_size=EMBED_SIZE,
                          max_use=2, penalty_strength=0.2, seed=42):
    rng = np.random.default_rng(seed)
    target_rgb = crop_target_to_grid(target_rgb, tile_size)
    H, W, _ = target_rgb.shape
    gh, gw = H // tile_size, W // tile_size

    embedder = VGGEmbedder(cut_idx=cut_idx).to(device).eval()
    N = len(tile_paths)
    use_count = np.zeros(N, dtype=np.int32)
    mosaic = np.zeros_like(target_rgb, dtype=np.uint8)

    for i in range(gh):
        for j in range(gw):
            block = target_rgb[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size]
            q = embed_block(block, cut_idx, embed_size, embedder)

            dists = np.linalg.norm(tile_feats - q[None,:], axis=1)
            penalty = (1.0 + penalty_strength * use_count.astype(np.float32))**2 #the more that tile used, the higher the penalty
            scores = dists * penalty 

            allowed = use_count < max_use
            if np.any(allowed):
                scores = np.where(allowed, scores, np.inf)

            chosen = int(np.argmin(scores))
            use_count[chosen] += 1
            mosaic[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size] = preprocess_tile(tile_paths[chosen], tile_size)

        if (i+1) % 5 == 0 or (i+1) == gh:
            print(f"  VGG-only row {i+1}/{gh} done")

    return mosaic, use_count

if RUN_VGG_BRUTE:
    with Timer("Step2 VGG only (Brute force)"):
        mosaic_vgg_only, use_vgg_only = mosaic_vgg_bruteforce(
            target_grid, tile_paths_pool, tile_feats_single, TILE_SIZE,
            cut_idx=CUT_SINGLE, embed_size=EMBED_SIZE,
            max_use=MAX_USE_VGG, penalty_strength=PENALTY_VGG, seed=SEED
        )

    out2 = os.path.join(OUTPUT_DIR, f"02_vggOnly_bruteforce_tile{TILE_SIZE}.jpg")
    save_img(mosaic_vgg_only, out2)
    print("VGG-only unique:", int((use_vgg_only>0).sum()), "max_used:", int(use_vgg_only.max()))
else:
    print("Skipped VGG brute force baseline (RUN_VGG_BRUTE=False).")

# go through every block in target and use VGG to get block embedding 
# For every tile embedding, do L2 (Bruteforce) - potential improvement?

  VGG-only row 5/66 done
  VGG-only row 10/66 done
  VGG-only row 15/66 done
  VGG-only row 20/66 done
  VGG-only row 25/66 done
  VGG-only row 30/66 done
  VGG-only row 35/66 done
  VGG-only row 40/66 done
  VGG-only row 45/66 done
  VGG-only row 50/66 done
  VGG-only row 55/66 done
  VGG-only row 60/66 done
  VGG-only row 65/66 done
  VGG-only row 66/66 done
[RUNTIME] Step2 VGG only (Brute force): 92.284 sec
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/02_vggOnly_bruteforce_tile12.jpg
VGG-only unique: 6441 max_used: 2


In [9]:
def mosaic_vgg_kdtree(target_rgb, tile_paths, tile_feats, tree, tile_size,
                      cut_idx=CUT_SINGLE, embed_size=EMBED_SIZE,
                      max_use=2, penalty_strength=0.2, top_k=60, seed=42):
    rng = np.random.default_rng(seed) #ensure randomness
    target_rgb = crop_target_to_grid(target_rgb, tile_size)
    H, W, _ = target_rgb.shape
    gh, gw = H // tile_size, W // tile_size

    embedder = VGGEmbedder(cut_idx=cut_idx).to(device).eval() #turn every block into embedding
    N = len(tile_paths)
    use_count = np.zeros(N, dtype=np.int32)  #keep the number of tile used to avoid repeatation
    mosaic = np.zeros_like(target_rgb, dtype=np.uint8)

    for i in range(gh):
        for j in range(gw):
            block = target_rgb[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size]
            q = embed_block(block, cut_idx, embed_size, embedder)

            #current block's embedding q shape(D,)
            #find the nearest k point from q
            k = min(top_k, N)
            dists, idxs = tree.query(q[None,:], k=k)
            dists, idxs = dists[0], idxs[0]

            cand_use = use_count[idxs].astype(np.float32)
            scores = dists * (1.0 + penalty_strength * cand_use)**2

            allowed = cand_use < max_use
            if np.any(allowed):
                idxs2 = idxs[allowed]
                scores2 = scores[allowed]
            else:
                idxs2 = idxs
                scores2 = scores

            kk = min(8, len(idxs2))
            best = np.argpartition(scores2, kk-1)[:kk]
            chosen = int(rng.choice(idxs2[best]))

            use_count[chosen] += 1
            mosaic[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size] = preprocess_tile(tile_paths[chosen], tile_size)

        if (i+1) % 5 == 0 or (i+1) == gh:
            print(f"  VGG+KDTree row {i+1}/{gh} done")

    return mosaic, use_count

with Timer("Build KDTree (single VGG feats)"):
    tree_single = KDTree(tile_feats_single)

with Timer("Step3 VGG + KDTree"):
    mosaic_vgg_tree, use_vgg_tree = mosaic_vgg_kdtree(
        target_grid, tile_paths_pool, tile_feats_single, tree_single, TILE_SIZE,
        cut_idx=CUT_SINGLE, embed_size=EMBED_SIZE,
        max_use=MAX_USE_VGG, penalty_strength=PENALTY_VGG, top_k=TOPK_VGG_TREE, seed=SEED
    )

out3 = os.path.join(OUTPUT_DIR, f"03_vggKDTree_tile{TILE_SIZE}.jpg")
save_img(mosaic_vgg_tree, out3)
print("VGG+KDTree unique:", int((use_vgg_tree>0).sum()), "max_used:", int(use_vgg_tree.max()))

#Use VGGEmbedder to turn block into embedding q - Use KDTree to search top_k that's close to q
#ideal KDTree: Large N and lower D (In this case, D is 512 since cut_idx = 22)

[RUNTIME] Build KDTree (single VGG feats): 0.141 sec
  VGG+KDTree row 5/66 done
  VGG+KDTree row 10/66 done
  VGG+KDTree row 15/66 done
  VGG+KDTree row 20/66 done
  VGG+KDTree row 25/66 done
  VGG+KDTree row 30/66 done
  VGG+KDTree row 35/66 done
  VGG+KDTree row 40/66 done
  VGG+KDTree row 45/66 done
  VGG+KDTree row 50/66 done
  VGG+KDTree row 55/66 done
  VGG+KDTree row 60/66 done
  VGG+KDTree row 65/66 done
  VGG+KDTree row 66/66 done
[RUNTIME] Step3 VGG + KDTree: 112.640 sec
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/03_vggKDTree_tile12.jpg
VGG+KDTree unique: 1771 max_used: 15


In [10]:
def mosaic_vgg_color_mix(target_rgb, tile_paths, tile_means, tile_feats, tree,
                         tile_size, embed_size,
                         alpha_vgg=0.75, max_use=2, penalty_strength=0.15, top_k=80, seed=42):
    rng = np.random.default_rng(seed)
    target_rgb = crop_target_to_grid(target_rgb, tile_size)
    H, W, _ = target_rgb.shape
    gh, gw = H//tile_size, W//tile_size
    N = len(tile_paths)

    embedder = VGGEmbedder(cut_idx=CUT_SINGLE).to(device).eval()
    use_count = np.zeros(N, dtype=np.int32)
    mosaic = np.zeros_like(target_rgb, dtype=np.uint8)

    for i in range(gh):
        for j in range(gw):
            block = target_rgb[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size]
            block_mean = block.reshape(-1,3).mean(axis=0).astype(np.float32)

            q = embed_block(block, CUT_SINGLE, embed_size, embedder)
            k = min(top_k, N)
            vgg_d, idxs = tree.query(q[None,:], k=k)
            vgg_d, idxs = vgg_d[0], idxs[0]

            color_d = np.sum((tile_means[idxs] - block_mean[None,:])**2, axis=1)

            vgg_dn = normalize_dist(vgg_d)
            col_dn = normalize_dist(color_d)

            score = alpha_vgg * vgg_dn + (1 - alpha_vgg) * col_dn
            #higher alpha_vgg - consier VGG more; lower - prioritize the color

            cand_use = use_count[idxs].astype(np.float32)
            score = score * (1.0 + penalty_strength * cand_use)**2

            allowed = cand_use < max_use
            if np.any(allowed):
                idxs2 = idxs[allowed]
                score2 = score[allowed]
            else:
                idxs2 = idxs
                score2 = score

            kk = min(8, len(idxs2))
            best = np.argpartition(score2, kk-1)[:kk]
            chosen = int(rng.choice(idxs2[best]))

            use_count[chosen] += 1
            mosaic[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size] = preprocess_tile(tile_paths[chosen], tile_size)

        if (i+1) % 5 == 0 or (i+1) == gh:
            print(f"  VGG+ColorMix row {i+1}/{gh} done")

    return mosaic, use_count

with Timer("Step4 VGG + Color Mix"):
    mosaic_mix, use_mix = mosaic_vgg_color_mix(
        target_grid, tile_paths_pool, tile_means, tile_feats_single, tree_single,
        TILE_SIZE, EMBED_SIZE,
        alpha_vgg=ALPHA_VGG, max_use=MAX_USE_VGG, penalty_strength=PENALTY_VGG,
        top_k=TOPK_MIX, seed=SEED
    )

out4 = os.path.join(OUTPUT_DIR, f"04_vggColorMix_tile{TILE_SIZE}.jpg")
save_img(mosaic_mix, out4)
print("VGG+ColorMix unique:", int((use_mix>0).sum()), "max_used:", int(use_mix.max()))
#KD tree implemented as well

  VGG+ColorMix row 5/66 done
  VGG+ColorMix row 10/66 done
  VGG+ColorMix row 15/66 done
  VGG+ColorMix row 20/66 done
  VGG+ColorMix row 25/66 done
  VGG+ColorMix row 30/66 done
  VGG+ColorMix row 35/66 done
  VGG+ColorMix row 40/66 done
  VGG+ColorMix row 45/66 done
  VGG+ColorMix row 50/66 done
  VGG+ColorMix row 55/66 done
  VGG+ColorMix row 60/66 done
  VGG+ColorMix row 65/66 done
  VGG+ColorMix row 66/66 done
[RUNTIME] Step4 VGG + Color Mix: 106.503 sec
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/04_vggColorMix_tile12.jpg
VGG+ColorMix unique: 1943 max_used: 12


In [11]:
vgg3_cache = os.path.join(OUTPUT_DIR, f"cache_vgg_cut{CUT3}_embed{EMBED_SIZE}_N{len(tile_paths_pool)}.npy")
vgg4_cache = os.path.join(OUTPUT_DIR, f"cache_vgg_cut{CUT4}_embed{EMBED_SIZE}_N{len(tile_paths_pool)}.npy")
        #cut3: good for details; #cut4: for structures 
        # save computed embedding

if os.path.exists(vgg3_cache):
    tile_f3 = np.load(vgg3_cache)
    print("Loaded f3:", tile_f3.shape)
else:
    with Timer("Compute VGG f3"):
        tile_f3 = embed_paths(tile_paths_pool, CUT3, EMBED_SIZE, vgg3_cache, batch_size=64)

if os.path.exists(vgg4_cache):
    tile_f4 = np.load(vgg4_cache)
    print("Loaded f4:", tile_f4.shape)
else:
    with Timer("Compute VGG f4"):
        tile_f4 = embed_paths(tile_paths_pool, CUT4, EMBED_SIZE, vgg4_cache, batch_size=64)

with Timer("Build KDTree f3"):
    tree3 = KDTree(tile_f3)
with Timer("Build KDTree f4"):
    tree4 = KDTree(tile_f4)

def mosaic_best_face(target_rgb, tile_paths, tile_means, tile_f3, tile_f4, tree3, tree4,
                     tile_size, embed_size,
                     w3=0.25, w4=0.55, w_color=0.20,
                     max_use=3, penalty_strength=0.12,
                     topk3=80, topk4=80, seed=42,
                     face_boost=1.25,
                     local_radius=2, local_ban=True):
    rng = np.random.default_rng(seed)
    target_rgb = crop_target_to_grid(target_rgb, tile_size)
    H, W, _ = target_rgb.shape
    gh, gw = H//tile_size, W//tile_size
    N = len(tile_paths)

    #corresponding 2 VGGEmbedder
    emb3 = VGGEmbedder(cut_idx=CUT3).to(device).eval()
    emb4 = VGGEmbedder(cut_idx=CUT4).to(device).eval()

    use_count = np.zeros(N, dtype=np.int32)
    mosaic = np.zeros_like(target_rgb, dtype=np.uint8)
    chosen_grid = -np.ones((gh, gw), dtype=np.int32)

    for i in range(gh):
        for j in range(gw):
            block = target_rgb[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size]
            block_mean = block.reshape(-1,3).mean(axis=0).astype(np.float32)
            #calculate block mean RGB

            q3 = embed_block(block, CUT3, embed_size, emb3)
            q4 = embed_block(block, CUT4, embed_size, emb4)

            k3 = min(topk3, N) #find top k3 in f3 space
            k4 = min(topk4, N) #find top k4 in f4 space
            _, idx3 = tree3.query(q3[None,:], k=k3)
            _, idx4 = tree4.query(q4[None,:], k=k4)
            idx3, idx4 = idx3[0], idx4[0]

            #combine both; make sure no repetition (Get the union)
            cand = np.unique(np.concatenate([idx3, idx4], axis=0))

            d3 = np.linalg.norm(tile_f3[cand] - q3[None,:], axis=1)
            d4 = np.linalg.norm(tile_f4[cand] - q4[None,:], axis=1)
            col = np.sum((tile_means[cand] - block_mean[None,:])**2, axis=1)

            
            #normalize all three distance 
            d3n, d4n, coln = normalize_dist(d3), normalize_dist(d4), normalize_dist(col)
        

            # face boost: center region emphasize structure
            #if ij is in the center of 40%, higher f4 weight
            if face_center_mask(i, j, gh, gw, frac=0.40):
                w4_eff = w4 * face_boost
                w3_eff = w3
                wc_eff = w_color
            else:
                w3_eff, w4_eff, wc_eff = w3, w4, w_color


            s = (w3_eff + w4_eff + wc_eff)
            w3_eff, w4_eff, wc_eff = w3_eff/s, w4_eff/s, wc_eff/s

            score = w3_eff*d3n + w4_eff*d4n + wc_eff*coln

            # global anti-repeat penalty
            cand_use = use_count[cand].astype(np.float32)
            score = score * (1.0 + penalty_strength * cand_use)**2

            # enforce max_use if possible
            allowed = cand_use < max_use
            if np.any(allowed):
                cand2 = cand[allowed]
                score2 = score[allowed]
            else:
                cand2 = cand
                score2 = score

            # local ban: forbid using the same tile near-by
            if local_ban and local_radius > 0:
                i0 = max(0, i-local_radius); i1 = min(gh, i+local_radius+1)
                j0 = max(0, j-local_radius); j1 = min(gw, j+local_radius+1)
                banned = set(chosen_grid[i0:i1, j0:j1].ravel().tolist())
                banned.discard(-1)
                if len(banned) > 0:
                    keep = np.array([c not in banned for c in cand2], dtype=bool)
                    if np.any(keep):
                        cand2 = cand2[keep]
                        score2 = score2[keep]

            # pick from top few for diversity
            kk = min(10, len(cand2))
            best = np.argpartition(score2, kk-1)[:kk]
            chosen = int(rng.choice(cand2[best]))

            use_count[chosen] += 1
            chosen_grid[i, j] = chosen
            mosaic[i*tile_size:(i+1)*tile_size, j*tile_size:(j+1)*tile_size] = preprocess_tile(tile_paths[chosen], tile_size)

        if (i+1) % 5 == 0 or (i+1) == gh:
            print(f"  BEST row {i+1}/{gh} done")

    return mosaic, use_count

with Timer("Step5 BEST (dual VGG + dual KDTree + color + face boost + local ban)"):
    mosaic_best, use_best = mosaic_best_face(
        target_grid, tile_paths_pool, tile_means, tile_f3, tile_f4, tree3, tree4,
        TILE_SIZE, EMBED_SIZE,
        w3=W3, w4=W4, w_color=W_COLOR,
        max_use=MAX_USE_BEST, penalty_strength=PENALTY_BEST,
        topk3=TOPK3_BEST, topk4=TOPK4_BEST, seed=SEED,
        face_boost=FACE_BOOST,
        local_radius=LOCAL_RADIUS, local_ban=LOCAL_BAN
    )

out5 = os.path.join(OUTPUT_DIR, f"05_BEST_tile{TILE_SIZE}.jpg")
save_img(mosaic_best, out5)
print("BEST unique:", int((use_best>0).sum()), "max_used:", int(use_best.max()))

#For each block: calculate mean RGB - using emb3, emb4 to calculate q3 q4 - tree3 and tree4
# Get union to calculate cand - boost the face in the center area 

vgg(cut=16)@96: 100%|█████████████████████████| 188/188 [00:10<00:00, 18.04it/s]


[RUNTIME] Compute VGG f3: 11.154 sec
Loaded f4: (12000, 512)
[RUNTIME] Build KDTree f3: 0.046 sec
[RUNTIME] Build KDTree f4: 0.140 sec
  BEST row 5/66 done
  BEST row 10/66 done
  BEST row 15/66 done
  BEST row 20/66 done
  BEST row 25/66 done
  BEST row 30/66 done
  BEST row 35/66 done
  BEST row 40/66 done
  BEST row 45/66 done
  BEST row 50/66 done
  BEST row 55/66 done
  BEST row 60/66 done
  BEST row 65/66 done
  BEST row 66/66 done
[RUNTIME] Step5 BEST (dual VGG + dual KDTree + color + face boost + local ban): 196.173 sec
[SAVE] /Users/michellewan/Desktop/mosaic_project/outputs/landscape/05_BEST_tile12.jpg
BEST unique: 2207 max_used: 11
