In [1]:
from pathlib import Path
from typing import List, Dict
import yaml
import pandas as pd
from PIL import Image
import os
import numpy as np
import matplotlib.pyplot as plt

from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
from tqdm import tqdm

from collections import defaultdict
import shutil

In [2]:
SUPPORTED_FORMATS = [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif", ".webp"]

In [4]:
# Load the gallery path from config.yaml
with open("../config.yaml", "r") as f:
    config = yaml.safe_load(f)

BASE_DIR = Path(config["gallery_path"])

In [23]:
year = 2025
month = 4

TARGET_DIR = BASE_DIR / str(year) / f"{month:02d}"

In [24]:
def scan_images_metadata(root_dir: str) -> List[Dict]:
    image_files = []
    root = Path(root_dir)

    for filepath in root.rglob("*"):
        if filepath.suffix.lower() in SUPPORTED_FORMATS:
            try:
                stat = filepath.stat()
                image_files.append({
                    "path": str(filepath.resolve()),
                    "name": filepath.name,
                    "ext": filepath.suffix.lower(),
                    "size": stat.st_size,
                    "modified": stat.st_mtime,
                })
            except Exception as e:
                print(f"Skipping {filepath}: {e}")
    
    return image_files

In [25]:
metadata = scan_images_metadata(TARGET_DIR)
print(f"Found {len(metadata)} images")

Found 22 images


In [26]:
def load_and_preprocess_images(file_list, size=(224, 224)):
    images = []
    valid_paths = []

    for record in file_list:
        path = record["path"]
        try:
            img = Image.open(path).convert("RGB")
            img = img.resize(size)
            images.append(img)
            valid_paths.append(path)
        except Exception as e:
            print(f"Failed to load {path}: {e}")
    
    return images, valid_paths

In [27]:
images, valid_paths = load_and_preprocess_images(metadata)
print(f"Successfully loaded {len(images)} image(s)")

Successfully loaded 22 image(s)


In [28]:
device = "cuda"

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

Error during conversion: ChunkedEncodingError(ProtocolError('Response ended prematurely'))


In [29]:
def generate_clip_embeddings(images, model, processor, device):
    inputs = processor(images=images, return_tensors="pt", padding=True).to(device)

    with torch.no_grad():
        embeddings = model.get_image_features(**inputs)

    # Normalize embeddings (L2)
    embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
    return embeddings.cpu()

In [30]:
embeddings = generate_clip_embeddings(images, model, processor, device)
print(f"Embeddings shape: {embeddings.shape}")

Embeddings shape: torch.Size([22, 512])


In [31]:
def compute_similarity_matrix(embeddings: torch.Tensor) -> np.ndarray:
    return (embeddings @ embeddings.T).cpu().numpy()

In [32]:
def group_duplicates(sim_matrix: np.ndarray, paths: list, threshold):
    parent = {}

    def find(x):
        if parent.get(x, x) != x:
            parent[x] = find(parent[x])
        return parent.get(x, x)

    def union(x, y):
        px, py = find(x), find(y)
        if px != py:
            parent[py] = px

    n = len(paths)
    for i in range(n):
        for j in range(i + 1, n):
            if sim_matrix[i, j] >= threshold:
                union(paths[i], paths[j])

    groups = defaultdict(list)
    for path in paths:
        root = find(path)
        groups[root].append(path)

    return [group for group in groups.values() if len(group) > 1]

In [33]:
sim_matrix = compute_similarity_matrix(embeddings)
duplicate_groups = group_duplicates(sim_matrix, valid_paths, threshold=0.95)

print(f"Found {len(duplicate_groups)} duplicate group(s).")

Found 3 duplicate group(s).


In [34]:
def show_image_group(image_paths, columns=5, figsize=(15, 5)):
    n = len(image_paths)
    rows = (n + columns - 1) // columns
    plt.figure(figsize=figsize)
    
    for i, path in enumerate(image_paths):
        img = Image.open(path)
        plt.subplot(rows, columns, i + 1)
        plt.imshow(img)
        plt.title(os.path.basename(path), fontsize=8)
        plt.axis("off")
    
    plt.tight_layout()
    plt.show()

In [36]:
# for g in duplicate_groups:
#     show_image_group(g)

In [38]:
#Create duplicate directory in each year/month combo directory and move duplicates. Keep first copy as is
def move_duplicate_images(duplicate_groups, month_dir: Path):
    duplicates_dir = month_dir / "duplicates"
    duplicates_dir.mkdir(exist_ok=True)

    for group in duplicate_groups:
        if len(group) < 2:
            continue

        keep = group[0]
        to_move = group[1:]

        for src_path_str in to_move:
            src = Path(src_path_str)
            dst = duplicates_dir / src.name

            # Avoid name conflicts
            i = 1
            while dst.exists():
                dst = duplicates_dir / f"{src.stem}_{i}{src.suffix}"
                i += 1

            try:
                shutil.move(str(src), str(dst))
                print(f"Moved: {src.name} → {dst.name}")
            except Exception as e:
                print(f"Failed to move {src}: {e}")

In [39]:
move_duplicate_images(duplicate_groups, TARGET_DIR)

Moved: 20250401_234055397_iOS.jpg → 20250401_234055397_iOS.jpg
Moved: 20250409_135235000_iOS.png → 20250409_135235000_iOS.png
Moved: 20250409_135245000_iOS.png → 20250409_135245000_iOS.png
Moved: 20250420_211823186_iOS.jpg → 20250420_211823186_iOS.jpg
