# Image Feature Loader & DB-Saver

Dieses Notebook lädt Bilder, extrahiert Features und speichert sie in einer SQLite-Datenbank & nötige DS Steps Ausgeführt um zu analysieren

In [None]:
import os, gc, time, traceback, sqlite3, cProfile, pstats, random
from pathlib import Path
from datetime import datetime

import numpy as np
from tqdm import tqdm
import psutil

from sklearn.decomposition import IncrementalPCA
import umap

try:
    from PIL import Image

    PIL_AVAILABLE = True
except Exception:
    PIL_AVAILABLE = False

# FIRST RUN TO EXTRACT SMALL IMAGES AND MARK BIG IMAGES 

In [None]:
# ========== CONFIG ==========
DB_PATH = r"C:\BIG_DATA\data\database.db"
PHOTO_FOLDER = r"D:\data\image_data"
EMBEDDING_DIR = r"C:\BIG_DATA\embeddings"
EMBEDDING_PCA_DIR = r"C:\BIG_DATA\embeddings_pca"

# Logging / Outputs
LOG_FILE = "verarbeitung_log.txt"
SKIPPED_LOG = "skipped_and_errors.txt"  # fehlerhafte / nicht verarbeitete Bilder
DEFERRED_CSV = "Large_images.csv"  # deferred wegen Größe/Pixeln

# Sharding
IMAGES_PER_TABLE = 50000
TABLE_PREFIX = "image_features_part_"

# Preprocess/Embedding
TARGET_SIZE = (224, 224)  # (W,H) für PIL
EMBED_INPUT_DTYPE = "float32"  # "float32" (0..1) oder "uint8"
MICRO_BATCH = 32  # Embedding-Microbatch
MAX_MP = 8  # >8 Megapixel => defer (nur Header lesen)
MAX_IMAGE_SIZE_BEFORE_REDUCE = 4 * 1024 * 1024  # 4 MB Datei-Threshold

# Pixel-Budget (statisch ODER dynamisch aktivieren)
INITIAL_BATCH_SIZE = 500
PIXEL_BUDGET = INITIAL_BATCH_SIZE * TARGET_SIZE[0] * TARGET_SIZE[1]  # statisch
AUTO_PIXEL_BUDGET_FRACTION = None  # z.B. 0.30 für 30% des freien RAMs; None = aus

# Dirs
Path(EMBEDDING_DIR).mkdir(parents=True, exist_ok=True)
Path(EMBEDDING_PCA_DIR).mkdir(parents=True, exist_ok=True)


# ========== DB ==========
conn = sqlite3.connect(DB_PATH, isolation_level=None)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=OFF;")
cursor.execute("PRAGMA synchronous=OFF;")
cursor.execute("PRAGMA temp_store=MEMORY;")
cursor.execute("PRAGMA mmap_size=0;")  # vermeidet aufgeblähten "sichtbaren" Speicher


# ========== Logging/Helfer ==========
def log_debug(logfile, msg):
    logfile.write(msg + "\n")


def log_skip_or_error(filename, path, reason):
    with open(SKIPPED_LOG, "a", encoding="utf-8") as f:
        f.write(f"{datetime.now()} | {filename} | {path} | {reason}\n")


def mark_deferred_large(filename, path, reason="LARGE_IMAGE_DEFERRED"):
    with open(DEFERRED_CSV, "a", encoding="utf-8") as f:
        f.write(f"{datetime.now()},{filename},{path},{reason}\n")


def print_resource_usage(stage, logfile):
    process = psutil.Process(os.getpid())
    mem = process.memory_info().rss / (1024**2)
    cpu = process.cpu_percent(interval=0.1)
    log_debug(logfile, f"[RESOURCE] {stage} | RAM: {mem:.2f} MB | CPU: {cpu:.2f}%")


def is_large_by_pixels(path, max_mp=MAX_MP):
    if not PIL_AVAILABLE:
        return False
    try:
        with Image.open(path) as im:
            w, h = im.size
        return (w * h) > max_mp * 1_000_000
    except Exception:
        return False


def should_defer(path):
    try:
        if os.path.getsize(path) > MAX_IMAGE_SIZE_BEFORE_REDUCE:
            return True
    except Exception:
        pass
    return is_large_by_pixels(path)


def compute_pixel_budget_from_ram():
    # Dynamisch: z.B. 30% des freien RAMs in Pixel umrechnen
    bytes_per_val = 4 if EMBED_INPUT_DTYPE == "float32" else 1
    channels = 3
    avail_bytes = psutil.virtual_memory().available
    target_frac = AUTO_PIXEL_BUDGET_FRACTION
    target_bytes = int(avail_bytes * target_frac)
    return target_bytes // (channels * bytes_per_val)


if AUTO_PIXEL_BUDGET_FRACTION:
    PIXEL_BUDGET = compute_pixel_budget_from_ram()


def create_table_if_not_exists(table_name):
    cursor.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            filename TEXT NOT NULL,
            path TEXT NOT NULL,
            color_hist TEXT,
            embedding_path TEXT,
            image_hash TEXT,
            resolution TEXT,
            file_size INTEGER,
            pca_embedding TEXT,
            umap_x REAL,
            umap_y REAL
        )
    """
    )
    cursor.execute(
        f"CREATE INDEX IF NOT EXISTS idx_{table_name}_filename_path ON {table_name}(filename, path);"
    )


def save_batch_to_db(entries, logfile, table_name):
    if not entries:
        return
    log_debug(logfile, f"[DEBUG] Speichere {len(entries)} Einträge in {table_name}...")
    start = time.time()
    cursor.executemany(
        f"""
        INSERT INTO {table_name}
        (filename, path, color_hist, embedding_path, image_hash, resolution, file_size,
         pca_embedding, umap_x, umap_y)
        VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
        entries,
    )
    dur = time.time() - start
    log_debug(logfile, f"[DEBUG] DB-Speicherung: {dur:.2f}s")


def preprocess_for_model(img_uint8, target_size=TARGET_SIZE):
    # Resize → PIL erwartet (width,height)
    if PIL_AVAILABLE:
        im = Image.fromarray(img_uint8)
        im = im.resize(target_size, Image.BILINEAR)
        arr = np.asarray(im)
    else:
        th, tw = target_size[1], target_size[0]
        y_idx = (np.linspace(0, img_uint8.shape[0] - 1, th)).astype(np.int32)
        x_idx = (np.linspace(0, img_uint8.shape[1] - 1, tw)).astype(np.int32)
        arr = img_uint8[np.ix_(y_idx, x_idx)]
    if EMBED_INPUT_DTYPE == "float32":
        return arr.astype(np.float32) / 255.0
    else:
        return arr.astype(np.uint8)


def to_embed_dtype(arr):
    if EMBED_INPUT_DTYPE == "float32":
        return arr.astype(np.float32) if arr.dtype != np.float32 else arr
    else:
        if arr.dtype == np.float32:
            return (np.clip(arr, 0.0, 1.0) * 255.0).round().astype(np.uint8)
        return arr.astype(np.uint8)


def extract_embeddings_micro(bimgs, chunk=MICRO_BATCH):
    outs = []
    for i in range(0, len(bimgs), chunk):
        outs.append(extract_embeddings(bimgs[i : i + chunk]))
    return np.vstack(outs)


def _unique_npy_path(base_path):
    base, ext = os.path.splitext(base_path)
    if ext.lower() != ".npy":
        base_path = base + ".npy"
        base, ext = os.path.splitext(base_path)
    out = base_path
    c = 1
    while os.path.exists(out):
        out = f"{base}_{c}.npy"
        c += 1
    return out


# ========== Feature-Vorbereitung ==========
def prepare_image_features(filename, path, logfile):
    """
    Defer (Datei/Pixel groß) -> CSV + skip (len==8 Rückgabe).
    Sonst: fast_load (uint8), Hash auf Original, Preprocess (klein) für Histogramm+Embedding.
    """
    try:
        if should_defer(path):
            log_debug(logfile, f"[INFO] Deferred (gross/pixelreich): {filename}")
            mark_deferred_large(filename, path, "LARGE_BY_FILE_OR_PIXELS")
            return (
                filename,
                path,
                None,
                None,
                None,
                None,
                None,
                "LARGE_IMAGE_DEFERRED",
            )

        img = fast_load(path)  # deine Loader-Funktion; Erwartung: uint8 ndarray
        if not isinstance(img, np.ndarray):
            raise TypeError("fast_load() must return a NumPy uint8 array.")
        if img.dtype != np.uint8:
            img = np.clip(img, 0, 255).astype(np.uint8)

        resolution = f"{img.shape[1]}x{img.shape[0]}"
        file_size = os.path.getsize(path)

        # Hash auf Original (robust gg. dtype)
        img_hash = calc_hash(img)

        # RAM-schonend: nur verkleinertes Bild im Batch halten
        img_small = preprocess_for_model(img)  # (224,224,3)
        embed_input = to_embed_dtype(img_small)  # zu erwartetem dtype

        color_hist = calc_histogram(img_small, bins=32)
        # Früh stringifizieren (klein halten)
        hist_str = ",".join(str(round(float(v), 6)) for v in np.ravel(color_hist))

        del img
        return (filename, path, embed_input, hist_str, img_hash, resolution, file_size)

    except Exception as e:
        tb = traceback.format_exc()
        reason = f"{e} | Traceback:\n{tb}"
        log_debug(logfile, f"[ERROR] Fehler bei {filename}: {e}")
        log_skip_or_error(filename, path, reason)
        return (filename, path, None, None, None, None, None, reason)


# ========== Globales Finalisieren über alle Shards ==========
def finalize_all_shards_global(
    logfile,
    sample_size: int = 100_000,
    pca_components: int = 100,
    chunk: int = 2048,
):
    """
    Globales Finalisieren über ALLE Tabellen (einmalig am Ende):
    1) IPCA PASS 1: partial_fit auf allen Embeddings (gestreamt)
    2) IPCA PASS 2: transform, PCA-Vektoren speichern, pca_embedding aktualisieren
       (+ Reservoir-Sampling der PCA-Vektoren für UMAP)
    3) UMAP global fit (auf Sample) + PASS 3: transform aller PCA-Vektoren, umap_x/y updaten
    """
    log_debug(logfile, "[DEBUG] === Globales Finalisieren (IPCA+UMAP) startet ===")

    # 0) Alle Shard-Tabellen finden
    cursor.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    tables = [r[0] for r in cursor.fetchall()]
    if not tables:
        log_debug(logfile, "[WARN] Keine Shard-Tabellen gefunden – Abbruch.")
        return

    # Helper
    def load_embeddings(paths):
        mats, keep_paths = [], []
        for p in paths:
            try:
                mats.append(np.load(p))
                keep_paths.append(p)
            except Exception as e:
                log_skip_or_error("UNKNOWN", p, f"PCA-Load-Fehler: {e}")
        if not mats:
            return None, []
        X = np.stack(mats, axis=0).astype(np.float32)
        return X, keep_paths

    # PASS 1: IncrementalPCA partial_fit
    ipca = None
    total_vectors = 0
    for tbl in tables:
        cursor.execute(f"SELECT id, embedding_path FROM {tbl}")
        batch_ids, batch_paths = [], []
        while True:
            row = cursor.fetchone()
            if row is None:
                if batch_paths:
                    X, kept = load_embeddings(batch_paths)
                    if X is not None:
                        if ipca is None:
                            nfeat = X.shape[1]
                            ncomp = min(pca_components, nfeat)
                            ipca = IncrementalPCA(n_components=ncomp)
                            log_debug(
                                logfile,
                                f"[DEBUG] IPCA init mit n_components={ncomp} (feat={nfeat})",
                            )
                        ipca.partial_fit(X)
                        total_vectors += X.shape[0]
                        del X
                    batch_ids.clear()
                    batch_paths.clear()
                    gc.collect()
                break

            _id, epath = row
            if epath:
                batch_ids.append(_id)
                batch_paths.append(epath)
                if len(batch_paths) >= chunk:
                    X, kept = load_embeddings(batch_paths)
                    if X is not None:
                        if ipca is None:
                            nfeat = X.shape[1]
                            ncomp = min(pca_components, nfeat)
                            ipca = IncrementalPCA(n_components=ncomp)
                            log_debug(
                                logfile,
                                f"[DEBUG] IPCA init mit n_components={ncomp} (feat={nfeat})",
                            )
                        ipca.partial_fit(X)
                        total_vectors += X.shape[0]
                        del X
                    batch_ids.clear()
                    batch_paths.clear()
                    gc.collect()

    if ipca is None or total_vectors == 0:
        log_debug(
            logfile,
            "[WARN] Keine Embeddings gefunden – IPCA konnte nicht trainiert werden.",
        )
        return

    log_debug(
        logfile, f"[DEBUG] PASS 1 fertig – partial_fit auf {total_vectors} Vektoren."
    )
    print_resource_usage("Nach IPCA PASS 1", logfile)

    # PASS 2: transform & PCA speichern + Sample sammeln
    rng = random.Random(42)
    sample_Xp = []
    sample_cnt = 0
    total_pca_saved = 0

    for tbl in tables:
        cursor.execute(f"SELECT id, embedding_path FROM {tbl}")
        batch_ids, batch_paths = [], []
        while True:
            row = cursor.fetchone()
            if row is None:
                if batch_paths:
                    X, kept_paths = load_embeddings(batch_paths)
                    if X is not None:
                        Xp = ipca.transform(X)
                        updates = []
                        for (_id, _path), vec in zip(batch_ids, Xp):
                            pca_path = _unique_npy_path(
                                os.path.join(EMBEDDING_PCA_DIR, f"{_id}_pca.npy")
                            )
                            try:
                                np.save(pca_path, vec.astype(np.float32))
                            except Exception as e:
                                log_skip_or_error(
                                    "UNKNOWN", f"DB_ID={_id}", f"PCA-Save-Fehler: {e}"
                                )
                                pca_path = ""
                            updates.append((pca_path, _id))
                            # Reservoir-Sampling für UMAP
                            sample_cnt += 1
                            if len(sample_Xp) < sample_size:
                                sample_Xp.append(vec.copy())
                            else:
                                j = rng.randrange(sample_cnt)
                                if j < sample_size:
                                    sample_Xp[j] = vec.copy()
                        cursor.executemany(
                            f"UPDATE {tbl} SET pca_embedding=? WHERE id=?", updates
                        )
                        conn.commit()
                        total_pca_saved += len(updates)
                        del X, Xp, updates
                    batch_ids.clear()
                    batch_paths.clear()
                    gc.collect()
                break

            _id, epath = row
            if epath:
                batch_ids.append((_id, epath))
                batch_paths.append(epath)
                if len(batch_paths) >= chunk:
                    X, kept_paths = load_embeddings(batch_paths)
                    if X is not None:
                        Xp = ipca.transform(X)
                        updates = []
                        for (_id2, _path2), vec in zip(batch_ids, Xp):
                            pca_path = _unique_npy_path(
                                os.path.join(EMBEDDING_PCA_DIR, f"{_id2}_pca.npy")
                            )
                            try:
                                np.save(pca_path, vec.astype(np.float32))
                            except Exception as e:
                                log_skip_or_error(
                                    "UNKNOWN", f"DB_ID={_id2}", f"PCA-Save-Fehler: {e}"
                                )
                                pca_path = ""
                            updates.append((pca_path, _id2))
                            sample_cnt += 1
                            if len(sample_Xp) < sample_size:
                                sample_Xp.append(vec.copy())
                            else:
                                j = rng.randrange(sample_cnt)
                                if j < sample_size:
                                    sample_Xp[j] = vec.copy()
                        cursor.executemany(
                            f"UPDATE {tbl} SET pca_embedding=? WHERE id=?", updates
                        )
                        conn.commit()
                        total_pca_saved += len(updates)
                        del X, Xp, updates
                    batch_ids.clear()
                    batch_paths.clear()
                    gc.collect()

    log_debug(
        logfile, f"[DEBUG] PASS 2 fertig – {total_pca_saved} PCA-Vektoren gespeichert."
    )
    print_resource_usage("Nach IPCA PASS 2", logfile)

    if not sample_Xp:
        log_debug(logfile, "[WARN] Kein PCA-Sample vorhanden – UMAP wird übersprungen.")
        return

    # UMAP fit (auf Sample)
    sample_mat = np.vstack(sample_Xp).astype(np.float32)
    reducer = umap.UMAP(
        n_components=2, metric="euclidean", random_state=42, low_memory=True
    )
    log_debug(
        logfile, f"[DEBUG] UMAP Fit auf Sample mit {sample_mat.shape[0]} Vektoren …"
    )
    reducer.fit(sample_mat)
    del sample_mat, sample_Xp
    gc.collect()
    print_resource_usage("Nach UMAP Fit (Sample)", logfile)

    # PASS 3: UMAP transform aller PCA-Vektoren & DB-Update
    total_umap_updated = 0
    for tbl in tables:
        cursor.execute(f"SELECT id, pca_embedding FROM {tbl}")
        batch_ids, pca_paths = [], []
        while True:
            row = cursor.fetchone()
            if row is None:
                if pca_paths:
                    mats, ids_keep = [], []
                    for _id, ppath in batch_ids:
                        try:
                            mats.append(np.load(ppath).astype(np.float32))
                            ids_keep.append(_id)
                        except Exception as e:
                            log_skip_or_error(
                                "UNKNOWN", ppath, f"UMAP-PCA-Load-Fehler: {e}"
                            )
                    if mats:
                        Xp = np.stack(mats, axis=0)
                        coords = reducer.transform(Xp)
                        updates = [
                            (float(x), float(y), _id)
                            for _id, (x, y) in zip(ids_keep, coords)
                        ]
                        cursor.executemany(
                            f"UPDATE {tbl} SET umap_x=?, umap_y=? WHERE id=?", updates
                        )
                        conn.commit()
                        total_umap_updated += len(updates)
                        del Xp, coords, updates, mats
                    batch_ids.clear()
                    pca_paths.clear()
                    gc.collect()
                break

            _id, ppath = row
            if ppath:
                batch_ids.append((_id, ppath))
                pca_paths.append(ppath)
                if len(pca_paths) >= chunk:
                    mats, ids_keep = [], []
                    for _id2, ppath2 in batch_ids:
                        try:
                            mats.append(np.load(ppath2).astype(np.float32))
                            ids_keep.append(_id2)
                        except Exception as e:
                            log_skip_or_error(
                                "UNKNOWN", ppath2, f"UMAP-PCA-Load-Fehler: {e}"
                            )
                    if mats:
                        Xp = np.stack(mats, axis=0)
                        coords = reducer.transform(Xp)
                        updates = [
                            (float(x), float(y), _id2)
                            for _id2, (x, y) in zip(ids_keep, coords)
                        ]
                        cursor.executemany(
                            f"UPDATE {tbl} SET umap_x=?, umap_y=? WHERE id=?", updates
                        )
                        conn.commit()
                        total_umap_updated += len(updates)
                        del Xp, coords, updates, mats
                    batch_ids.clear()
                    pca_paths.clear()
                    gc.collect()

    log_debug(
        logfile,
        f"[DEBUG] PASS 3 fertig – {total_umap_updated} UMAP-Koordinaten aktualisiert.",
    )
    print_resource_usage("Nach UMAP PASS 3", logfile)
    log_debug(
        logfile, "[DEBUG] === Globales Finalisieren (IPCA+UMAP) abgeschlossen ==="
    )


# ========== Hauptpipeline (Streaming + PIXEL_BUDGET) ==========
def main():
    logfile = open(LOG_FILE, "a", encoding="utf-8")
    # Logs frisch beginnen
    for f in (SKIPPED_LOG, DEFERRED_CSV):
        if os.path.exists(f):
            os.remove(f)

    print_resource_usage("Start", logfile)
    logfile.write(f"[{datetime.now()}] [DEBUG] Start Hauptfunktion\n")

    # Warm-up
    dummy_img = np.zeros((10, 10, 3), dtype=np.uint8)
    _ = calc_hash(dummy_img)
    _ = calc_histogram(dummy_img)
    log_debug(logfile, "[DEBUG] Warm-up abgeschlossen.")

    # Zählen ohne Materialisierung
    total_images = sum(1 for _ in image_generator(PHOTO_FOLDER))
    log_debug(logfile, f"[DEBUG] {total_images} Bilder gefunden")

    # Initiale Tabelle
    table_id = 1
    current_table_name = f"{TABLE_PREFIX}{table_id}"
    create_table_if_not_exists(current_table_name)

    pbar = tqdm(total=total_images, desc="Verarbeitung", unit="Bild")

    gen = image_generator(PHOTO_FOLDER)
    total_inserted = 0

    # Laufender Batch (nur vorverarbeitete Bilder)
    batch_meta, batch_imgs = [], []
    cur_pixels = 0  # Summe der (H*W) der preprocessed Bilder im RAM

    while True:
        try:
            filename, path = next(gen)
        except StopIteration:
            # Ende: evtl. Rest-Flush
            if batch_imgs:
                kept = flush_by_embeddings_and_insert(
                    batch_meta, batch_imgs, logfile, current_table_name
                )
                total_inserted += kept
                batch_meta.clear()
                batch_imgs.clear()
                cur_pixels = 0
                gc.collect()
            break

        # Existenz-Check (index-gestützt) – nach (filename,path)
        cursor.execute(
            f"SELECT 1 FROM {current_table_name} WHERE filename=? AND path=? LIMIT 1",
            (filename, path),
        )
        if cursor.fetchone():
            log_debug(logfile, f"[DEBUG] Übersprungen (bereits in DB): {filename}")
            log_skip_or_error(filename, path, "Bereits in DB")
            pbar.update(1)
            continue

        # Features vorbereiten
        result = prepare_image_features(filename, path, logfile)
        if len(result) == 8:
            # deferred oder Fehler
            pbar.update(1)
            continue

        fname, p, emb_input, hist_str, img_hash, resolution, size = result
        batch_meta.append((fname, p, hist_str, img_hash, resolution, size))
        batch_imgs.append(emb_input)
        cur_pixels += emb_input.shape[0] * emb_input.shape[1]
        pbar.update(1)

        # Pixel-Budget erreicht? → Flush
        if cur_pixels >= PIXEL_BUDGET:
            kept = flush_by_embeddings_and_insert(
                batch_meta, batch_imgs, logfile, current_table_name
            )
            total_inserted += kept

            # Cleanup für nächsten Mini-Batch
            batch_meta.clear()
            batch_imgs.clear()
            cur_pixels = 0
            gc.collect()
            try:
                import torch

                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                    log_debug(logfile, "[DEBUG] CUDA Cache geleert.")
            except Exception:
                pass
            print_resource_usage("Nach PixelBudget-Flush", logfile)

            # Sharding: neue Tabelle, falls Grenze erreicht
            if total_inserted >= table_id * IMAGES_PER_TABLE:
                table_id += 1
                current_table_name = f"{TABLE_PREFIX}{table_id}"
                create_table_if_not_exists(current_table_name)
                log_debug(
                    logfile, f"[DEBUG] Neue Tabelle angelegt: {current_table_name}"
                )

    # === Globales Finalisieren über ALLE Tabellen ===
    finalize_all_shards_global(
        logfile,
        sample_size=100_000,  # ggf. anpassen (RAM/Tempo)
        pca_components=100,
        chunk=2048,
    )

    pbar.close()
    logfile.write(
        f"[{datetime.now()}] ✓ Verarbeitung abgeschlossen. Insgesamt eingefügt: {total_inserted}\n"
    )
    print_resource_usage("Ende", logfile)
    logfile.close()
    conn.close()


def flush_by_embeddings_and_insert(batch_meta, batch_imgs, logfile, table_name):
    """Embeddings (micro-batched) berechnen, speichern, DB-Insert; Rückgabe: #eingefügt."""
    if not batch_imgs:
        return 0

    # Embeddings
    try:
        embs = extract_embeddings_micro(batch_imgs, chunk=MICRO_BATCH)
    except Exception as e:
        log_debug(logfile, f"[ERROR] Embedding-Batch fehlgeschlagen: {e}")
        embs = []
        for (filename, path, hist_str, img_hash, resolution, size), img_small in zip(
            batch_meta, batch_imgs
        ):
            try:
                embs.append(extract_embeddings([img_small])[0])
            except Exception as ee:
                tb = traceback.format_exc()
                log_skip_or_error(
                    filename, path, f"Embedding-Fehler (single): {ee}\n{tb}"
                )
        if not embs:
            return 0
        embs = np.stack(embs, axis=0)

    # Speichern + DB-Insert
    entries = []
    kept = 0
    for (filename, path, hist_str, img_hash, resolution, size), emb in zip(
        batch_meta, embs
    ):
        try:
            emb_path = _unique_npy_path(os.path.join(EMBEDDING_DIR, f"{filename}.npy"))
            np.save(emb_path, emb.astype(np.float32))
            entries.append(
                (
                    filename,
                    path,
                    hist_str,
                    emb_path,
                    img_hash,
                    resolution,
                    size,
                    "",
                    0.0,
                    0.0,
                )
            )
            kept += 1
        except Exception as e_save:
            tb = traceback.format_exc()
            log_skip_or_error(filename, path, f"Embedding-Save-Fehler: {e_save}\n{tb}")

    save_batch_to_db(entries, logfile, table_name)
    del embs, entries
    return kept


# ========== ENTRYPOINT ==========
if __name__ == "__main__":
    with cProfile.Profile() as pr:
        main()

    with open("profiling_results.txt", "w", encoding="utf-8") as f:
        stats = pstats.Stats(pr, stream=f)
        stats.sort_stats("cumtime").print_stats()
    stats.dump_stats("profiling_results.prof")

  warn(
Verarbeitung: 100%|██████████| 371202/371202 [4:29:28<00:00, 22.96Bild/s]


durch die bilder neu iterierne und mehrer hash funktionen berechnen weil nur eins (average hash) nicht informativ genug ist

In [4]:
import os
import gc
import sqlite3
import traceback
import numpy as np
import cv2 as cv
from tqdm import tqdm

# ========= CONFIG =========
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"
BATCH_SIZE = 2000  # DB-IDs pro Batch
RESIZE_DHASH = 8  # (8+1)x8 -> 64 Bit
RESIZE_PHASH = 32  # 32x32 -> DCT -> 8x8 -> 64 Bit
LOG_FILE = "hash_backfill_log.txt"


# ========= Logging =========
def log(msg: str) -> None:
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(msg + "\n")


# ========= Utils =========
def _bits_to_hex(bits_bool: np.ndarray) -> str:
    """Bool-Array -> Hex (64 Bit -> 16 Hex-Zeichen)."""
    packed = np.packbits(bits_bool.astype(np.uint8), bitorder="big")
    return packed.tobytes().hex()


def _imread_rgb(path: str) -> np.ndarray:
    """Robustes Laden (auch bei Sonderzeichen in Pfaden)."""
    data = np.fromfile(path, dtype=np.uint8)
    img = cv.imdecode(data, cv.IMREAD_COLOR)
    if img is None:
        raise ValueError("cv.imdecode returned None")
    return cv.cvtColor(img, cv.COLOR_BGR2RGB)


# ========= Hash-Funktionen =========
def dhash_hex(img_rgb: np.ndarray, size: int = RESIZE_DHASH) -> str:
    """Difference Hash (dHash), 64 Bit -> 16 Hex."""
    g = cv.cvtColor(img_rgb, cv.COLOR_RGB2GRAY)
    g = cv.resize(g, (size + 1, size), interpolation=cv.INTER_AREA)
    diff = g[:, 1:] > g[:, :-1]
    return _bits_to_hex(diff.reshape(-1))


def phash_hex(img_rgb: np.ndarray, full: int = RESIZE_PHASH, keep: int = 8) -> str:
    """Perceptual Hash (pHash via DCT), 64 Bit -> 16 Hex."""
    g = cv.cvtColor(img_rgb, cv.COLOR_RGB2GRAY)
    g = cv.resize(g, (full, full), interpolation=cv.INTER_AREA).astype(np.float32)
    dct = cv.dct(g)
    dct_low = dct[:keep, :keep].copy()
    dct_low[0, 0] = 0.0
    med = np.median(dct_low)
    bits = dct_low >= med
    return _bits_to_hex(bits.reshape(-1))


# ========= DB-Helfer =========
def get_shard_tables(cursor):
    cursor.execute(
        "SELECT name FROM sqlite_master WHERE type='table' "
        "AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cursor.fetchall()]


def ensure_columns(conn: sqlite3.Connection, table: str) -> None:
    cur = conn.cursor()
    cur.execute(f"PRAGMA table_info({table});")
    cols = {row[1] for row in cur.fetchall()}
    wanted = {
        "dhash": "TEXT",
        "phash": "TEXT",
    }
    for col, ctype in wanted.items():
        if col not in cols:
            cur.execute(f"ALTER TABLE {table} ADD COLUMN {col} {ctype};")
            log(f"[INFO] Added column {col} to {table}")
    conn.commit()


def count_pending(cur: sqlite3.Cursor, table: str) -> int:
    """Wie viele Zeilen fehlen (mind. einer der Hashes leer)?"""
    cur.execute(
        f"""
        SELECT COUNT(*) FROM {table}
        WHERE (dhash IS NULL OR dhash = '')
           OR (phash IS NULL OR phash = '')
        """
    )
    return int(cur.fetchone()[0])


def fetch_ids_and_paths_needing_hashes(cur: sqlite3.Cursor, table: str, limit: int):
    cur.execute(
        f"""
        SELECT id, path FROM {table}
        WHERE (dhash IS NULL OR dhash = '')
           OR (phash IS NULL OR phash = '')
        LIMIT ?
        """,
        (limit,),
    )
    return cur.fetchall()


def update_hashes(conn: sqlite3.Connection, table: str, rows) -> int:
    """Berechnet Hashes für rows und schreibt sie. Rückgabe: #erfolgreich aktualisiert."""
    cur = conn.cursor()
    updates = []
    for _id, path in rows:
        try:
            img = _imread_rgb(path)
            d = dhash_hex(img)
            p = phash_hex(img)
            updates.append((d, p, _id))
        except Exception as e:
            log(
                f"[ERROR] {table} id={_id} path='{path}': {e}\n{traceback.format_exc()}"
            )
    if updates:
        cur.executemany(
            f"UPDATE {table} SET dhash=?, phash=? WHERE id=?;",
            updates,
        )
        conn.commit()
    return len(updates)


# (Optional) Indizes
def ensure_indexes(conn: sqlite3.Connection, table: str) -> None:
    cur = conn.cursor()
    cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{table}_dhash ON {table}(dhash);")
    cur.execute(f"CREATE INDEX IF NOT EXISTS idx_{table}_phash ON {table}(phash);")
    conn.commit()


# ========= Main =========
def backfill_all_tables() -> None:
    conn = sqlite3.connect(DB_PATH, isolation_level=None)  # autocommit
    cur = conn.cursor()
    tables = get_shard_tables(cur)
    if not tables:
        log("[WARN] Keine Shard-Tabellen gefunden.")
        conn.close()
        return

    total_updated = 0
    for tbl in tables:
        ensure_columns(conn, tbl)
        pending = count_pending(cur, tbl)

        if pending == 0:
            log(f"[INFO] Nichts zu tun für {tbl}")
            continue

        with tqdm(total=pending, desc=f"{tbl}", unit="img") as pbar:
            # ensure_indexes(conn, tbl)  # optional
            while True:
                rows = fetch_ids_and_paths_needing_hashes(cur, tbl, BATCH_SIZE)
                if not rows:
                    break
                n_ok = update_hashes(conn, tbl, rows)
                total_updated += n_ok
                pbar.update(n_ok)  # Fortschritt = erfolgreich befüllte Zeilen
                gc.collect()

        log(f"[INFO] Fertig für {tbl}")

    log(f"[OK] Backfill abgeschlossen. Insgesamt aktualisiert: {total_updated}")
    conn.close()


if __name__ == "__main__":
    backfill_all_tables()

image_features_part_1: 100%|██████████| 100000/100000 [49:14<00:00, 33.85img/s]
image_features_part_2: 100%|██████████| 51500/51500 [32:02<00:00, 26.79img/s]
image_features_part_3: 100%|██████████| 50000/50000 [43:29<00:00, 19.16img/s]
image_features_part_4: 100%|██████████| 50000/50000 [47:35<00:00, 17.51img/s]
image_features_part_5: 100%|██████████| 50004/50004 [47:41<00:00, 17.48img/s]
image_features_part_6: 100%|██████████| 50000/50000 [4:07:22<00:00,  3.37img/s]  
image_features_part_7: 100%|██████████| 8250/8250 [40:01<00:00,  3.43img/s]


pca und umap spalten leeren (wenn neu rechnen nötig ist)

In [None]:

import os
import sqlite3
from pathlib import Path
from datetime import datetime

# ======= CONFIG =======
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"

# Wo liegen deine PCA-Dateien (zur optionalen Löschung)?
EMBEDDING_PCA_DIR = Path(r"C:\BIG_DATA\embeddings_pca")

# Verhalten
DRY_RUN = False  # True = nur zählen/anzeigen, nichts ändern/löschen
DELETE_PCA_FILES = (
    True  # True = referenzierte PCA-Dateien löschen (nur wenn DRY_RUN=False)
)

CHUNK = 10000  # Streaminggröße für SELECTs

# ======================


def get_tables(cur, prefix):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{prefix}%",),
    )
    return [r[0] for r in cur.fetchall()]


def exists_safe(p: str) -> bool:
    try:
        return os.path.exists(p)
    except Exception:
        return False


def main():
    conn = sqlite3.connect(DB_PATH, isolation_level=None)
    cur = conn.cursor()
    cur_read = conn.cursor()  # separater Lese-Cursor (sicherer bei Updates)

    tables = get_tables(cur, TABLE_PREFIX)
    if not tables:
        print("Keine Shard-Tabellen gefunden.")
        return

    print(f"[{datetime.now()}] Cleanup startet …")
    print(f"Tabellen: {tables}")
    print(f"DRY_RUN={DRY_RUN}, DELETE_PCA_FILES={DELETE_PCA_FILES}")

    total_rows = total_pca_set = total_umap_set = 0
    total_files_found = total_files_deleted = 0

    for tbl in tables:
        print(f"\n=== {tbl} ===")

        # Basiscounter
        cur.execute(f"SELECT COUNT(*) FROM {tbl}")
        rows_total = cur.fetchone()[0]
        total_rows += rows_total

        cur.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding!=''"
        )
        pca_set = cur.fetchone()[0]
        total_pca_set += pca_set

        cur.execute(
            f"""SELECT COUNT(*) FROM {tbl}
                        WHERE (umap_x IS NOT NULL AND umap_y IS NOT NULL)
                          AND (umap_x!=0.0 OR umap_y!=0.0)"""
        )
        umap_set = cur.fetchone()[0]
        total_umap_set += umap_set

        print(f"rows_total : {rows_total}")
        print(f"pca_set    : {pca_set}")
        print(f"umap_set   : {umap_set}")

        # PCA-Dateien einsammeln (optional löschen)
        files_found = files_deleted = 0
        if pca_set > 0 and (DELETE_PCA_FILES or DRY_RUN):
            cur_read.execute(
                f"SELECT pca_embedding FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding!=''"
            )
            while True:
                rows = cur_read.fetchmany(CHUNK)
                if not rows:
                    break
                for (ppath,) in rows:
                    if not ppath:
                        continue
                    files_found += 1
                    # Aus Sicherheitsgründen nur löschen, wenn Datei im erwarteten Root liegt
                    if (not DRY_RUN) and DELETE_PCA_FILES:
                        try:
                            p = Path(ppath)
                            if p.is_file():
                                # Safety: innerhalb EMBEDDING_PCA_DIR?
                                try:
                                    p.relative_to(EMBEDDING_PCA_DIR)
                                    os.remove(p)
                                    files_deleted += 1
                                except ValueError:
                                    # liegt außerhalb; zur Sicherheit überspringen
                                    print(f"⚠️  Übersprungen (außerhalb Root): {ppath}")
                        except Exception as e:
                            print(f"⚠️  Löschen fehlgeschlagen: {ppath} | {e}")

        total_files_found += files_found
        total_files_deleted += files_deleted
        print(f"PCA-Dateien gefunden : {files_found}")
        if not DRY_RUN and DELETE_PCA_FILES:
            print(f"PCA-Dateien gelöscht : {files_deleted}")

        # DB-Felder leeren
        if DRY_RUN:
            print("DRY_RUN: DB wird NICHT geändert.")
        else:
            # NULL setzen ist sauberer als 0.0/'' (klarer „kein Wert“)
            cur.execute(
                f"UPDATE {tbl} SET pca_embedding=NULL WHERE pca_embedding IS NOT NULL AND pca_embedding!=''"
            )
            cur.execute(
                f"UPDATE {tbl} SET umap_x=NULL, umap_y=NULL WHERE (umap_x IS NOT NULL) OR (umap_y IS NOT NULL)"
            )
            conn.commit()
            print("DB-Felder geleert: pca_embedding=NULL, umap_x=NULL, umap_y=NULL")

    print("\n===== Zusammenfassung =====")
    print(f"Tabellen insgesamt      : {len(tables)}")
    print(f"Zeilen gesamt           : {total_rows}")
    print(f"PCA gesetzt (DB) gesamt : {total_pca_set}")
    print(f"UMAP gesetzt (DB) gesamt: {total_umap_set}")
    print(f"PCA-Dateien gefunden    : {total_files_found}")
    if not DRY_RUN and DELETE_PCA_FILES:
        print(f"PCA-Dateien gelöscht    : {total_files_deleted}")

    print(f"\n[{datetime.now()}] Cleanup fertig.")
    conn.close()


if __name__ == "__main__":
    main()

[2025-08-14 23:44:27.563880] Cleanup startet …
Tabellen: ['image_features_part_1', 'image_features_part_2', 'image_features_part_3', 'image_features_part_4', 'image_features_part_5', 'image_features_part_6', 'image_features_part_7']
DRY_RUN=False, DELETE_PCA_FILES=True

=== image_features_part_1 ===
rows_total : 100000
pca_set    : 100000
umap_set   : 100000
PCA-Dateien gefunden : 100000
PCA-Dateien gelöscht : 0
DB-Felder geleert: pca_embedding=NULL, umap_x=NULL, umap_y=NULL

=== image_features_part_2 ===
rows_total : 51500
pca_set    : 51500
umap_set   : 51500
PCA-Dateien gefunden : 51500
PCA-Dateien gelöscht : 0
DB-Felder geleert: pca_embedding=NULL, umap_x=NULL, umap_y=NULL

=== image_features_part_3 ===
rows_total : 50000
pca_set    : 50000
umap_set   : 50000
PCA-Dateien gefunden : 50000
PCA-Dateien gelöscht : 0
DB-Felder geleert: pca_embedding=NULL, umap_x=NULL, umap_y=NULL

=== image_features_part_4 ===
rows_total : 50000
pca_set    : 50000
umap_set   : 50000
PCA-Dateien gefunden

In [None]:
import pstats

stats = pstats.Stats("profiling_results.prof")
stats.strip_dirs().sort_stats("cumulative").print_stats(30)

Sun Aug 10 21:49:22 2025    profiling_results.prof

         351036394 function calls (344465332 primitive calls) in 16175.470 seconds

   Ordered by: cumulative time
   List reduced from 5633 to 30 due to restriction <30>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    4.103    4.103 16175.799 16175.799 621606085.py:442(main)
   321202  103.743    0.000 10380.608    0.032 621606085.py:166(prepare_image_features)
   321202    2.113    0.000 5892.825    0.018 621606085.py:70(should_defer)
  1017866 3975.147    0.004 3975.309    0.004 {built-in method io.open}
   930620 3649.127    0.004 3649.127    0.004 {built-in method nt.stat}
   567406    1.545    0.000 3634.637    0.006 <frozen genericpath>:48(getsize)
      493    2.629    0.005 3576.141    7.254 621606085.py:548(flush_by_embeddings_and_insert)
      493    0.060    0.000 3397.526    6.892 621606085.py:146(extract_embeddings_micro)
     7879   13.108    0.002 3397.328    0.431 embedding_vec.py

<pstats.Stats at 0x25d49d3bb50>

Vergleichen wir zunächst Anzahl bearbietete bilder mit Anzahl bilder 

In [2]:
from pathlib import Path

photo_dir = Path(r"C:\BIG_DATA\embeddings")
n_files = sum(1 for p in photo_dir.iterdir() if p.is_file())
print(f"Dateien im Ordner: {n_files}")

Dateien im Ordner: 359474


In [6]:
import sqlite3

DB_PATH = r"C:\BIG_DATA\data\database.db"

# 1) Verbindung öffnen
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# 2) Alle relevanten Tabellen ermitteln
cursor.execute(
    """
    SELECT name
      FROM sqlite_master
     WHERE type='table'
       AND name LIKE 'image_features_part_%'
"""
)
tables = [row[0] for row in cursor.fetchall()]

# 3) Zeilen in jeder Tabelle zählen und aufsummieren
total_entries = 0
for table in tables:
    cursor.execute(f"SELECT COUNT(*) FROM {table}")
    count = cursor.fetchone()[0]
    print(f"{table}: {count} Einträge")
    total_entries += count

conn.close()

print(f"\nGesamtanzahl Einträge: {total_entries}")

image_features_part_1: 100000 Einträge
image_features_part_2: 51500 Einträge
image_features_part_3: 50000 Einträge
image_features_part_4: 50000 Einträge
image_features_part_5: 50004 Einträge
image_features_part_6: 50000 Einträge
image_features_part_7: 8250 Einträge

Gesamtanzahl Einträge: 359754


In [None]:
import os

PHOTO_FOLDER = r"D:\data\image_data"
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff"}

count = 0
for root, dirs, files in os.walk(PHOTO_FOLDER):
    for fname in files:
        ext = os.path.splitext(fname)[1].lower()
        if ext in IMAGE_EXTS:
            count += 1

print(f"Gefundene Bilder: {count}")

Gefundene Bilder: 371202


überprüfen wie viele bilder erfolgereich PCA embeddings und UMAP koordinaten bekommen haben

In [2]:
import os
import sqlite3
from pathlib import Path
from datetime import datetime

# ========== CONFIG ==========
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"
EMBEDDING_DIR = Path(r"C:\BIG_DATA\embeddings")
EMBEDDING_PCA_DIR = Path(r"C:\BIG_DATA\embeddings_pca")

WRITE_CSV = True
CSV_PATH = Path("report_embeddings_pca_umap.csv")

CHUNK = 10000  # DB-Streaminggröße


# ========== HELPERS ==========
def count_files_in_dir(d: Path) -> int:
    try:
        return sum(1 for p in d.iterdir() if p.is_file())
    except Exception:
        return -1


def exists_safe(p: str) -> bool:
    try:
        return os.path.exists(p)
    except Exception:
        return False


def get_tables(cur, prefix):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{prefix}%",),
    )
    return [r[0] for r in cur.fetchall()]


# ========== MAIN AUDIT ==========
def main():
    conn = sqlite3.connect(DB_PATH, isolation_level=None)
    cur = conn.cursor()

    tables = get_tables(cur, TABLE_PREFIX)
    if not tables:
        print("Keine Shard-Tabellen gefunden.")
        return

    print(f"Gefundene Tabellen: {len(tables)} → {tables}")

    # FS Grob-Check
    emb_fs_count = count_files_in_dir(EMBEDDING_DIR)
    pca_fs_count = count_files_in_dir(EMBEDDING_PCA_DIR)
    print(f"[FS] Dateien im EMBEDDING_DIR:   {emb_fs_count}")
    print(f"[FS] Dateien im EMBEDDING_PCA_DIR: {pca_fs_count}")

    # CSV vorbereiten
    if WRITE_CSV:
        import csv

        csvfile = open(CSV_PATH, "w", newline="", encoding="utf-8")
        writer = csv.writer(csvfile, delimiter=";")
        writer.writerow(
            [
                "table",
                "rows_total",
                "emb_db_set",
                "emb_fs_missing",
                "pca_db_set",
                "pca_fs_missing",
                "pca_db_not_set",
                "umap_set",
                "umap_missing_for_pca",
                "examples_missing_emb_paths",
                "examples_missing_pca_paths",
                "examples_pca_set_umap_missing_ids",
            ]
        )
    else:
        writer = None

    # Gesamtsummen
    G_rows = G_emb_set = G_pca_set = G_umap_set = 0
    G_emb_fs_missing = G_pca_fs_missing = 0
    G_pca_not_set = G_umap_missing_for_pca = 0

    for tbl in tables:
        print(f"\n=== Prüfe Tabelle: {tbl} ===")

        # Basiscounter aus DB
        cur.execute(f"SELECT COUNT(*) FROM {tbl}")
        rows_total = cur.fetchone()[0]

        cur.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE embedding_path IS NOT NULL AND embedding_path != ''"
        )
        emb_db_set = cur.fetchone()[0]

        cur.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding != ''"
        )
        pca_db_set = cur.fetchone()[0]

        cur.execute(
            f"""
            SELECT COUNT(*) FROM {tbl}
            WHERE (umap_x IS NOT NULL AND umap_y IS NOT NULL) AND (umap_x != 0.0 OR umap_y != 0.0)
        """
        )
        umap_set = cur.fetchone()[0]

        # Streaming-Check: fehlende Embedding-Dateien
        emb_fs_missing = 0
        miss_emb_examples = []

        cur.execute(
            f"SELECT id, embedding_path FROM {tbl} WHERE embedding_path IS NOT NULL AND embedding_path != ''"
        )
        while True:
            rows = cur.fetchmany(CHUNK)
            if not rows:
                break
            for _id, epath in rows:
                if not exists_safe(epath):
                    emb_fs_missing += 1
                    if len(miss_emb_examples) < 5:
                        miss_emb_examples.append(f"id={_id}|{epath}")

        # Streaming-Check: fehlende PCA-Dateien
        pca_fs_missing = 0
        miss_pca_examples = []

        cur.execute(
            f"SELECT id, pca_embedding FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding != ''"
        )
        while True:
            rows = cur.fetchmany(CHUNK)
            if not rows:
                break
            for _id, ppath in rows:
                if not exists_safe(ppath):
                    pca_fs_missing += 1
                    if len(miss_pca_examples) < 5:
                        miss_pca_examples.append(f"id={_id}|{ppath}")

        # PCA gesetzt? nein:
        pca_db_not_set = rows_total - pca_db_set

        # Fälle: PCA gesetzt, aber UMAP fehlt/placeholder
        umap_missing_for_pca = 0
        pca_umap_missing_examples = []
        cur.execute(
            f"""
            SELECT id, umap_x, umap_y
            FROM {tbl}
            WHERE pca_embedding IS NOT NULL AND pca_embedding != ''
              AND (umap_x IS NULL OR umap_y IS NULL OR (umap_x = 0.0 AND umap_y = 0.0))
        """
        )
        while True:
            rows = cur.fetchmany(CHUNK)
            if not rows:
                break
            for _id, x, y in rows:
                umap_missing_for_pca += 1
                if len(pca_umap_missing_examples) < 5:
                    pca_umap_missing_examples.append(f"id={_id}|umap=({x},{y})")

        # Ausgabe pro Tabelle
        print(f"rows_total            : {rows_total}")
        print(f"emb_db_set            : {emb_db_set}")
        print(f"emb_fs_missing        : {emb_fs_missing}")
        print(f"pca_db_set            : {pca_db_set}")
        print(f"pca_db_not_set        : {pca_db_not_set}")
        print(f"pca_fs_missing        : {pca_fs_missing}")
        print(f"umap_set              : {umap_set}")
        print(f"umap_missing_for_pca  : {umap_missing_for_pca}")

        if miss_emb_examples:
            print(f"  Beispiele fehlender EMB-Dateien: {miss_emb_examples}")
        if miss_pca_examples:
            print(f"  Beispiele fehlender PCA-Dateien: {miss_pca_examples}")
        if pca_umap_missing_examples:
            print(f"  Beispiele: PCA gesetzt, UMAP fehlt: {pca_umap_missing_examples}")

        # CSV-Zeile
        if writer:
            writer.writerow(
                [
                    tbl,
                    rows_total,
                    emb_db_set,
                    emb_fs_missing,
                    pca_db_set,
                    pca_fs_missing,
                    pca_db_not_set,
                    umap_set,
                    umap_missing_for_pca,
                    " | ".join(miss_emb_examples),
                    " | ".join(miss_pca_examples),
                    " | ".join(pca_umap_missing_examples),
                ]
            )

        # Globalsumme
        G_rows += rows_total
        G_emb_set += emb_db_set
        G_pca_set += pca_db_set
        G_umap_set += umap_set
        G_emb_fs_missing += emb_fs_missing
        G_pca_fs_missing += pca_fs_missing
        G_pca_not_set += pca_db_not_set
        G_umap_missing_for_pca += umap_missing_for_pca

    # Gesamt-Zusammenfassung
    print("\n===== GESAMT =====")
    print(f"rows_total            : {G_rows}")
    print(f"emb_db_set            : {G_emb_set}")
    print(f"emb_fs_missing        : {G_emb_fs_missing}")
    print(f"pca_db_set            : {G_pca_set}")
    print(f"pca_db_not_set        : {G_pca_not_set}")
    print(f"pca_fs_missing        : {G_pca_fs_missing}")
    print(f"umap_set              : {G_umap_set}")
    print(f"umap_missing_for_pca  : {G_umap_missing_for_pca}")

    if writer:
        csvfile.close()
        print(f"\nCSV-Report geschrieben nach: {CSV_PATH.resolve()}")

    conn.close()


if __name__ == "__main__":
    print(f"[{datetime.now()}] Audit startet …")
    main()
    print(f"[{datetime.now()}] Audit fertig.")

[2025-08-14 23:49:26.724628] Audit startet …
Gefundene Tabellen: 7 → ['image_features_part_1', 'image_features_part_2', 'image_features_part_3', 'image_features_part_4', 'image_features_part_5', 'image_features_part_6', 'image_features_part_7']
[FS] Dateien im EMBEDDING_DIR:   359474
[FS] Dateien im EMBEDDING_PCA_DIR: -1

=== Prüfe Tabelle: image_features_part_1 ===
rows_total            : 100000
emb_db_set            : 100000
emb_fs_missing        : 0
pca_db_set            : 0
pca_db_not_set        : 100000
pca_fs_missing        : 0
umap_set              : 0
umap_missing_for_pca  : 0

=== Prüfe Tabelle: image_features_part_2 ===
rows_total            : 51500
emb_db_set            : 51500
emb_fs_missing        : 0
pca_db_set            : 0
pca_db_not_set        : 51500
pca_fs_missing        : 0
umap_set              : 0
umap_missing_for_pca  : 0

=== Prüfe Tabelle: image_features_part_3 ===
rows_total            : 50000
emb_db_set            : 50000
emb_fs_missing        : 0
pca_db_set

# 2. RUN DURCH GESPEICHERTE LARGE FILES  (KLEINEREN BATCHSIZE)


In [None]:
import os, gc, csv, time, sqlite3, traceback, cProfile, pstats
from pathlib import Path
from datetime import datetime

import numpy as np
from tqdm import tqdm
import psutil

try:
    from PIL import Image

    PIL_AVAILABLE = True
except Exception:
    PIL_AVAILABLE = False

from features.hash import calc_hash
from features.color_vec import calc_histogram
from features.embedding_vec import extract_embeddings
from image_load import fast_load


# ========== CONFIG ==========
DB_PATH = r"C:\BIG_DATA\data\database.db"
EMBEDDING_DIR = r"C:\BIG_DATA\embeddings"
DEFERRED_CSV = r"Z:\CODING\UNI\BIG_DATA\src\Large_images.csv"

LOG_FILE = r"Z:\CODING\UNI\BIG_DATA\src\verarbeitung_log_deferred.txt"
SKIPPED_LOG = r"Z:\CODING\UNI\BIG_DATA\src\skipped_and_errors.txt"

# Sharding
IMAGES_PER_TABLE = 50_000
TABLE_PREFIX = "image_features_part_"

# Preprocess/Embedding
TARGET_SIZE = (224, 224)  # (W,H) für PIL
EMBED_INPUT_DTYPE = "float32"  # "float32" (0..1) oder "uint8"
MICRO_BATCH = 32  # Embedding-Microbatch

# Pixel-Budget: max. Summe der (H*W) der KLEINEN Bilder im RAM
BATCH_TARGET = 100  # grobe Ziel-Batchgröße in kleinen Bildern
PIXEL_BUDGET = BATCH_TARGET * TARGET_SIZE[0] * TARGET_SIZE[1]

# Dirs
Path(EMBEDDING_DIR).mkdir(parents=True, exist_ok=True)


# ========== DB ==========
conn = sqlite3.connect(DB_PATH, isolation_level=None)
cursor = conn.cursor()
cursor.execute("PRAGMA journal_mode=OFF;")
cursor.execute("PRAGMA synchronous=OFF;")
cursor.execute("PRAGMA temp_store=MEMORY;")
cursor.execute("PRAGMA mmap_size=0;")


def create_table_if_not_exists(table_name):
    cursor.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            filename TEXT NOT NULL,
            path TEXT NOT NULL,
            color_hist TEXT,
            embedding_path TEXT,
            image_hash TEXT,
            resolution TEXT,
            file_size INTEGER,
            pca_embedding TEXT,
            umap_x REAL,
            umap_y REAL
        )
    """
    )
    cursor.execute(
        f"CREATE INDEX IF NOT EXISTS idx_{table_name}_filename_path ON {table_name}(filename, path);"
    )


def get_existing_shards():
    cursor.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cursor.fetchall()]


def rows_in_table(table_name):
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    return cursor.fetchone()[0]


def exists_in_any_shard(filename, path, shard_tables):
    for tbl in shard_tables:
        cursor.execute(
            f"SELECT 1 FROM {tbl} WHERE filename=? AND path=? LIMIT 1", (filename, path)
        )
        if cursor.fetchone():
            return True
    return False


def save_batch_to_db(entries, table_name, logfile):
    if not entries:
        return
    log_debug(logfile, f"[DEBUG] Speichere {len(entries)} Einträge in {table_name} …")
    start = time.time()
    cursor.executemany(
        f"""
        INSERT INTO {table_name}
        (filename, path, color_hist, embedding_path, image_hash, resolution, file_size,
         pca_embedding, umap_x, umap_y)
        VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
        entries,
    )
    conn.commit()
    log_debug(logfile, f"[DEBUG] DB-Speicherung: {time.time()-start:.2f}s")


# ========== Logging ==========
def log_debug(logfile, msg):
    logfile.write(msg + "\n")
    logfile.flush()


def log_skip_or_error(filename, path, reason):
    with open(SKIPPED_LOG, "a", encoding="utf-8") as f:
        f.write(f"{datetime.now()} | {filename} | {path} | {reason}\n")


# ========== Utils ==========
def _unique_npy_path(base_path):
    base, ext = os.path.splitext(base_path)
    if ext.lower() != ".npy":
        base = base + ".npy"
    out = base
    c = 1
    while os.path.exists(out):
        out = f"{base[:-4]}_{c}.npy"
        c += 1
    return out


def preprocess_for_model(img_uint8, target_size=TARGET_SIZE):
    if PIL_AVAILABLE:
        im = Image.fromarray(img_uint8)
        im = im.resize(target_size, Image.BILINEAR)
        arr = np.asarray(im)
    else:
        th, tw = target_size[1], target_size[0]
        y_idx = (np.linspace(0, img_uint8.shape[0] - 1, th)).astype(np.int32)
        x_idx = (np.linspace(0, img_uint8.shape[1] - 1, tw)).astype(np.int32)
        arr = img_uint8[np.ix_(y_idx, x_idx)]
    if EMBED_INPUT_DTYPE == "float32":
        return arr.astype(np.float32) / 255.0
    else:
        return arr.astype(np.uint8)


def to_embed_dtype(arr):
    if EMBED_INPUT_DTYPE == "float32":
        return arr.astype(np.float32) if arr.dtype != np.float32 else arr
    else:
        if arr.dtype == np.float32:
            return (np.clip(arr, 0.0, 1.0) * 255.0).round().astype(np.uint8)
        return arr.astype(np.uint8)


def extract_embeddings_micro(bimgs, chunk=MICRO_BATCH):
    outs = []
    for i in range(0, len(bimgs), chunk):
        outs.append(extract_embeddings(bimgs[i : i + chunk]))
    return np.vstack(outs)


# ========== CSV-Quelle (Large_images.csv) ==========
def iter_deferred_csv(csv_path):
    """
    Erwartetes Format je Zeile: timestamp,filename,path,reason
    (wir ignorieren timestamp/reason und yielden (filename, path))
    """
    if not os.path.exists(csv_path):
        return
    with open(csv_path, "r", encoding="utf-8", newline="") as f:
        r = csv.reader(f)
        for row in r:
            if not row or len(row) < 3:
                continue
            # robust gg. evtl. Header
            if row[0].lower().startswith("timestamp"):
                continue
            ts, filename, path = row[0], row[1], row[2]
            yield filename, path


# ========== Feature-Vorbereitung ==========
def prepare_image_features(filename, path, logfile):
    """
    Lädt großes Bild, verkleinert sofort, berechnet Hash/Histogramm.
    Nur die KLEINE Version wird im Batch gehalten.
    """
    try:
        img = fast_load(path)  # -> uint8 RGB
        if not isinstance(img, np.ndarray):
            raise TypeError("fast_load() must return a NumPy uint8 array.")
        if img.dtype != np.uint8:
            img = np.clip(img, 0, 255).astype(np.uint8)

        resolution = f"{img.shape[1]}x{img.shape[0]}"
        file_size = os.path.getsize(path)

        img_hash = calc_hash(img)

        img_small = preprocess_for_model(img)
        emb_input = to_embed_dtype(img_small)
        color_hist = calc_histogram(img_small)  # bins=default deiner Funktion
        hist_str = ",".join(str(round(float(v), 6)) for v in np.ravel(color_hist))

        del img
        return (filename, path, emb_input, hist_str, img_hash, resolution, file_size)

    except Exception as e:
        tb = traceback.format_exc()
        reason = f"{e} | Traceback:\n{tb}"
        log_debug(logfile, f"[ERROR] Fehler bei {filename}: {e}")
        log_skip_or_error(filename, path, reason)
        return (filename, path, None, None, None, None, None, reason)


# ========== Flush: Embeddings + Insert ==========
def flush_by_embeddings_and_insert(batch_meta, batch_imgs, logfile, table_name):
    if not batch_imgs:
        return 0
    try:
        embs = extract_embeddings_micro(batch_imgs, chunk=MICRO_BATCH)
    except Exception as e:
        log_debug(logfile, f"[ERROR] Embedding-Chunk fehlgeschlagen: {e}")
        embs = []
        for (filename, path, hist_str, img_hash, resolution, size), img_small in zip(
            batch_meta, batch_imgs
        ):
            try:
                embs.append(extract_embeddings([img_small])[0])
            except Exception as ee:
                tb = traceback.format_exc()
                log_skip_or_error(
                    filename, path, f"Embedding-Fehler (single): {ee}\n{tb}"
                )
        if not embs:
            return 0
        embs = np.stack(embs, axis=0)

    entries = []
    kept = 0
    for (filename, path, hist_str, img_hash, resolution, size), emb in zip(
        batch_meta, embs
    ):
        try:
            emb_path = _unique_npy_path(os.path.join(EMBEDDING_DIR, f"{filename}.npy"))
            np.save(emb_path, emb.astype(np.float32))
            entries.append(
                (
                    filename,
                    path,
                    hist_str,
                    emb_path,
                    img_hash,
                    resolution,
                    size,
                    "",
                    0.0,
                    0.0,
                )
            )
            kept += 1
        except Exception as e_save:
            tb = traceback.format_exc()
            log_skip_or_error(filename, path, f"Embedding-Save-Fehler: {e_save}\n{tb}")

    save_batch_to_db(entries, table_name, logfile)
    del embs, entries
    return kept


# ========== MAIN ==========
def main():
    logfile = open(LOG_FILE, "a", encoding="utf-8", buffering=1)
    log_debug(logfile, f"[{datetime.now()}] [DEBUG] Start Deferred-Run (kein PCA/UMAP)")
    # Liste existierender Shards ermitteln
    shard_tables = get_existing_shards()
    if not shard_tables:
        shard_tables = [f"{TABLE_PREFIX}1"]
        create_table_if_not_exists(shard_tables[0])

    # In die letzte Tabelle weiter einfügen
    current_table_name = shard_tables[-1]
    current_count = rows_in_table(current_table_name)

    paths_iter = list(iter_deferred_csv(DEFERRED_CSV))
    total_items = len(paths_iter)
    pbar = tqdm(total=total_items, desc="Deferred-Verarbeitung", unit="Bild")

    batch_meta, batch_imgs = [], []
    cur_pixels = 0
    total_inserted = 0

    for filename, path in paths_iter:
        # Dedupe/Existenzcheck über alle Shards
        if exists_in_any_shard(filename, path, shard_tables):
            log_skip_or_error(filename, path, "Bereits in DB (deferred pass)")
            pbar.update(1)
            continue

        result = prepare_image_features(filename, path, logfile)
        if len(result) == 8:
            pbar.update(1)
            continue

        _, p, emb_input, hist_str, img_hash, resolution, size = result
        batch_meta.append((filename, p, hist_str, img_hash, resolution, size))
        batch_imgs.append(emb_input)
        cur_pixels += emb_input.shape[0] * emb_input.shape[1]
        pbar.update(1)

        if cur_pixels >= PIXEL_BUDGET:
            kept = flush_by_embeddings_and_insert(
                batch_meta, batch_imgs, logfile, current_table_name
            )
            total_inserted += kept
            current_count += kept

            batch_meta.clear()
            batch_imgs.clear()
            cur_pixels = 0
            gc.collect()
            try:
                import torch

                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                    log_debug(logfile, "[DEBUG] CUDA Cache geleert.")
            except Exception:
                pass

            # Sharding: neue Tabelle nötig?
            if current_count >= IMAGES_PER_TABLE:
                # neue Tabelle anlegen
                next_id = len(shard_tables) + 1
                current_table_name = f"{TABLE_PREFIX}{next_id}"
                create_table_if_not_exists(current_table_name)
                shard_tables.append(current_table_name)
                current_count = 0
                log_debug(
                    logfile, f"[DEBUG] Neue Tabelle angelegt: {current_table_name}"
                )

    # Finaler Flush
    if batch_imgs:
        kept = flush_by_embeddings_and_insert(
            batch_meta, batch_imgs, logfile, current_table_name
        )
        total_inserted += kept
        current_count += kept
        batch_meta.clear()
        batch_imgs.clear()
        cur_pixels = 0
        gc.collect()

    pbar.close()
    log_debug(
        logfile, f"[{datetime.now()}] ✓ Deferred-Run fertig. Inserts: {total_inserted}"
    )
    logfile.close()
    conn.close()


# ========= ENTRYPOINT =========
if __name__ == "__main__":
    with cProfile.Profile() as pr:
        main()
    with open("profiling_results_deferred.txt", "w", encoding="utf-8") as f:
        stats = pstats.Stats(pr, stream=f)
        stats.sort_stats("cumtime").print_stats()
    stats.dump_stats("profiling_results_deferred.prof")

Deferred-Verarbeitung: 100%|██████████| 62050/62050 [6:08:16<00:00,  2.81Bild/s]    


# FIT UND TRANSFORM PCA UND CALCULATE UMAP KOORDINATEN GLOBALLY AN ALLE IMAGES GLEICHZEITIG

In [None]:
"""
Globales Finalisieren (IPCA + UMAP) über alle Shards.
Fixes:
- PASS 2: korrektes Mapping (id <-> embedding_path), defekte Pfade werden pro-Item geloggt/übersprungen
- _unique_npy_path: Endung .npy bleibt korrekt erhalten (keine "pfad ohne endung" mehr)
- modell wird robust gespeichert
-ipca = load(MODEL_DIR / "ipca.joblib")
# base_vec = Roh-Embedding (wie bisher erzeugt)
pca_vec = ipca.transform(base_vec[None, :]).astype(np.float32)[0]
- Logging: Debug- und Fehlerausgaben in Logdatei, skipped_and_errors.txt
"""

import os, gc, sqlite3, random
from datetime import datetime
from pathlib import Path

import numpy as np
from tqdm import tqdm
import psutil

from sklearn.decomposition import IncrementalPCA
import umap

import json, time
from joblib import dump, load

# ========= CONFIG =========
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"
EMBEDDING_PCA_DIR = Path(r"C:\BIG_DATA\embeddings_pca")
MODEL_DIR = Path(r"C:\BIG_DATA\models")
LOG_FILE = "PCA_verarbeitung_log.txt"
SKIPPED_LOG = "PCA_skipped_and_errors.txt"

# Tuning
PCA_COMPONENTS = 64  # 64 oder 100 – kleiner = schneller/leichter
UMAP_SAMPLE_SIZE = 25_000  # Sample fürs UMAP-Fit (Rest wird transformiert)
CHUNK = 4096  # I/O-Chunkgröße (RAM vs. Speed)


# ========= Logging / Helpers =========
def log_debug(f, msg: str):
    f.write(msg + "\n")
    f.flush()


def log_skip_or_error(filename_or_id, path, reason):
    with open(SKIPPED_LOG, "a", encoding="utf-8") as ff:
        ff.write(f"{datetime.now()} | {filename_or_id} | {path} | {reason}\n")


def _unique_npy_path(base_path: str) -> str:
    """
    Gibt einen nicht-belegten .npy-Pfad zurück. Falls base_path bereits .npy hat,
    bleibt die Endung erhalten; sonst wird .npy angehängt.
    """
    base, ext = os.path.splitext(base_path)
    if ext.lower() != ".npy":
        out = base_path + ".npy"
    else:
        out = base_path
    c = 1
    while os.path.exists(out):
        # base ist ohne Endung
        out = f"{base}_{c}.npy"
        c += 1
    return out


def get_tables(cur, prefix):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{prefix}%",),
    )
    return [r[0] for r in cur.fetchall()]


def load_embeddings(paths):
    """Lädt eine Liste von .npy-Pfaden -> (N,D) float32, überspringt defekte. (Nur für PASS 1.)"""
    mats, keep = [], []
    for p in paths:
        try:
            mats.append(np.load(p))
            keep.append(p)
        except Exception as e:
            log_skip_or_error("UNKNOWN", p, f"PCA-Load-Fehler: {e}")
    if not mats:
        return None, []
    X = np.stack(mats, axis=0).astype(np.float32)
    return X, keep


# ========= Finalizer =========
def finalize_all_shards_global(
    conn,
    cursor,
    logfile,
    sample_size=UMAP_SAMPLE_SIZE,
    pca_components=PCA_COMPONENTS,
    chunk=CHUNK,
):
    log_debug(logfile, "[DEBUG] === Globales Finalisieren (IPCA+UMAP) startet ===")

    # Tabellen finden
    tables = get_tables(cursor, TABLE_PREFIX)
    if not tables:
        log_debug(logfile, "[WARN] Keine Shard-Tabellen gefunden – Abbruch.")
        return
    os.makedirs(EMBEDDING_PCA_DIR, exist_ok=True)
    os.makedirs(MODEL_DIR, exist_ok=True)
    log_debug(
        logfile, f"[DEBUG] MODEL_DIR exists={MODEL_DIR.exists()} path={MODEL_DIR}"
    )

    # eigener Lese-Cursor (SELECT vom UPDATE trennen!)
    cur_read = conn.cursor()

    # -------- totals für Progressbars --------
    total_embed_rows = 0
    for tbl in tables:
        cur_read.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE embedding_path IS NOT NULL AND embedding_path != ''"
        )
        total_embed_rows += int(cur_read.fetchone()[0])

    # -------- PASS 1: IPCA partial_fit (global, gestreamt) --------
    ipca = None
    seen = 0
    pbar1 = tqdm(total=total_embed_rows, desc="PASS1 IPCA partial_fit", unit="vec")
    for tbl in tables:
        cur_read.execute(
            f"SELECT embedding_path FROM {tbl} WHERE embedding_path IS NOT NULL AND embedding_path != ''"
        )
        while True:
            rows = cur_read.fetchmany(chunk)
            if not rows:
                break
            paths = [r[0] for r in rows]
            X, _ = load_embeddings(paths)  # für partial_fit reicht das
            if X is not None:
                if ipca is None:
                    nfeat = X.shape[1]
                    ncomp = min(pca_components, nfeat)
                    ipca = IncrementalPCA(n_components=ncomp)
                    log_debug(
                        logfile,
                        f"[DEBUG] IPCA init mit n_components={ncomp} (feat={nfeat})",
                    )
                ipca.partial_fit(X)
                seen += X.shape[0]
                del X
            # Fortschritt nach geplanter Menge im Chunk (auch wenn einzelne fehlschlugen)
            pbar1.update(len(paths))
            gc.collect()
    pbar1.close()

    if ipca is None or seen == 0:
        log_debug(
            logfile,
            "[WARN] Keine Embeddings gefunden – IPCA konnte nicht trainiert werden.",
        )
        return

    log_debug(logfile, f"[DEBUG] PASS 1 fertig – partial_fit auf {seen} Vektoren.")
    _print_res("Nach IPCA PASS 1", logfile)

    try:
        dump(ipca, MODEL_DIR / "ipca.joblib")
        log_debug(
            logfile, f"[DEBUG] IPCA gespeichert unter: {MODEL_DIR / 'ipca.joblib'}"
        )
    except Exception as e:
        log_debug(logfile, f"[ERROR] IPCA konnte nicht gespeichert werden: {e}")

    # -------- PASS 2: transform, PCA speichern, pca_embedding updaten, Sample sammeln --------
    rng = random.Random(42)
    sample_Xp, sample_cnt = [], 0
    saved = 0

    pbar2 = tqdm(total=total_embed_rows, desc="PASS2 IPCA transform+save", unit="vec")
    for tbl in tables:
        cur_read.execute(
            f"SELECT id, embedding_path FROM {tbl} WHERE embedding_path IS NOT NULL AND embedding_path != ''"
        )
        while True:
            rows = cur_read.fetchmany(chunk)
            if not rows:
                break

            ids = [r[0] for r in rows]
            paths = [r[1] for r in rows]

            # WICHTIG: pro (id, path) laden -> nur erfolgreiche Paare übernehmen
            mats, keep_ids = [], []
            for _id, p in zip(ids, paths):
                try:
                    mats.append(np.load(p).astype(np.float32))
                    keep_ids.append(_id)
                except Exception as e:
                    log_skip_or_error(f"DB_ID={_id}", p, f"PCA-Load-Fehler: {e}")

            if mats:
                X = np.stack(mats, axis=0)
                Xp = ipca.transform(X)

                updates = []
                for _id_ok, vec in zip(keep_ids, Xp):
                    pca_path = _unique_npy_path(
                        str(EMBEDDING_PCA_DIR / f"{_id_ok}_pca.npy")
                    )
                    try:
                        np.save(pca_path, vec.astype(np.float32))
                    except Exception as e:
                        log_skip_or_error(
                            f"DB_ID={_id_ok}", pca_path, f"PCA-Save-Fehler: {e}"
                        )
                        pca_path = ""
                    updates.append((pca_path, _id_ok))

                    # Reservoir-Sampling für UMAP-Fit
                    sample_cnt += 1
                    if len(sample_Xp) < sample_size:
                        sample_Xp.append(vec.copy())
                    else:
                        j = rng.randrange(sample_cnt)
                        if j < sample_size:
                            sample_Xp[j] = vec.copy()

                cursor.executemany(
                    f"UPDATE {tbl} SET pca_embedding=? WHERE id=?", updates
                )
                conn.commit()
                saved += len(updates)

                del X, Xp, updates, mats
                gc.collect()

            # Fortschritt: geplanter Chunk (auch wenn einzelne fehlschlugen)
            pbar2.update(len(paths))
    pbar2.close()

    log_debug(logfile, f"[DEBUG] PASS 2 fertig – {saved} PCA-Vektoren gespeichert.")
    _print_res("Nach IPCA PASS 2", logfile)

    if not sample_Xp:
        log_debug(logfile, "[WARN] Kein PCA-Sample – UMAP wird übersprungen.")
        return

    # -------- UMAP Fit (auf Sample) --------
    sample_mat = np.vstack(sample_Xp).astype(np.float32)
    reducer = umap.UMAP(
        n_components=2, metric="euclidean", random_state=42, low_memory=True
    )
    log_debug(
        logfile, f"[DEBUG] UMAP Fit auf Sample mit {sample_mat.shape[0]} Vektoren …"
    )
    reducer.fit(sample_mat)
    del sample_mat, sample_Xp
    gc.collect()
    _print_res("Nach UMAP Fit (Sample)", logfile)
    # UMAP-Reducer speichern
    try:
        dump(reducer, MODEL_DIR / "umap_reducer.joblib")
        log_debug(
            logfile,
            f"[DEBUG] UMAP-Reducer gespeichert unter: {MODEL_DIR / 'umap_reducer.joblib'}",
        )
    except Exception as e:
        log_debug(logfile, f"[ERROR] UMAP-Reducer konnte nicht gespeichert werden: {e}")

    # --- WRITE META ---
    try:
        # robust: zieh Werte aus Attributen, egal ob *_ verfügbar ist
        ipca_n_components = int(
            getattr(
                ipca,
                "n_components_",
                getattr(ipca, "n_components", ipca.components_.shape[0]),
            )
        )
        ipca_n_features_in = int(
            getattr(ipca, "n_features_in_", ipca.components_.shape[1])
        )

        meta = {
            "ipca": {
                "path": str(MODEL_DIR / "ipca.joblib"),
                "n_components": ipca_n_components,
                "n_features_in_": ipca_n_features_in,
                "trained_at": time.strftime("%Y-%m-%d %H:%M:%S"),
            },
            "umap": {
                "path": str(MODEL_DIR / "umap_reducer.joblib"),
                "n_components": 2,
                "metric": "euclidean",
            },
        }
        with open(MODEL_DIR / "model_meta.json", "w", encoding="utf-8") as f:
            json.dump(meta, f, indent=2)
        log_debug(
            logfile, f"[DEBUG] Meta gespeichert unter: {MODEL_DIR / 'model_meta.json'}"
        )
    except Exception as e:
        log_debug(logfile, f"[ERROR] Meta konnte nicht gespeichert werden: {e}")

    # -------- PASS 3: UMAP transform aller PCA-Vektoren & DB-Update --------
    # wie viele haben jetzt pca_embedding?
    total_pca_rows = 0
    for tbl in tables:
        cur_read.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding != ''"
        )
        total_pca_rows += int(cur_read.fetchone()[0])

    updated = 0
    pbar3 = tqdm(total=total_pca_rows, desc="PASS3 UMAP transform+update", unit="vec")
    for tbl in tables:
        cur_read.execute(
            f"SELECT id, pca_embedding FROM {tbl} WHERE pca_embedding IS NOT NULL AND pca_embedding != ''"
        )
        while True:
            rows = cur_read.fetchmany(chunk)
            if not rows:
                break

            ids = [r[0] for r in rows]
            ppaths = [r[1] for r in rows]

            mats, keep_ids = [], []
            for _id, p in zip(ids, ppaths):
                try:
                    mats.append(np.load(p).astype(np.float32))
                    keep_ids.append(_id)
                except Exception as e:
                    log_skip_or_error(f"DB_ID={_id}", p, f"UMAP-PCA-Load-Fehler: {e}")

            if mats:
                Xp = np.stack(mats, axis=0)
                coords = reducer.transform(Xp)
                updates = [
                    (float(x), float(y), i) for i, (x, y) in zip(keep_ids, coords)
                ]
                cursor.executemany(
                    f"UPDATE {tbl} SET umap_x=?, umap_y=? WHERE id=?", updates
                )
                conn.commit()
                updated += len(updates)

                del Xp, coords, updates, mats
                gc.collect()

            pbar3.update(len(rows))
    pbar3.close()

    log_debug(
        logfile, f"[DEBUG] PASS 3 fertig – {updated} UMAP-Koordinaten aktualisiert."
    )
    _print_res("Nach UMAP PASS 3", logfile)
    log_debug(
        logfile, "[DEBUG] === Globales Finalisieren (IPCA+UMAP) abgeschlossen ==="
    )


def _print_res(stage, logfile):
    # kurze Ressourcen-Info
    process = psutil.Process(os.getpid())
    mem = process.memory_info().rss / (1024**2)
    cpu = process.cpu_percent(interval=0.1)
    log_debug(logfile, f"[RESOURCE] {stage} | RAM: {mem:.2f} MB | CPU: {cpu:.2f}%")


# ========= main =========
if __name__ == "__main__":
    os.makedirs(EMBEDDING_PCA_DIR, exist_ok=True)
    os.makedirs(MODEL_DIR, exist_ok=True)

    conn = sqlite3.connect(DB_PATH, isolation_level=None)
    cursor = conn.cursor()

    # (Optionale) PRAGMAs
    cursor.execute("PRAGMA journal_mode=OFF;")
    cursor.execute("PRAGMA synchronous=OFF;")
    cursor.execute("PRAGMA temp_store=MEMORY;")
    cursor.execute("PRAGMA mmap_size=0;")

    with open(LOG_FILE, "a", encoding="utf-8", buffering=1) as logfile:
        log_debug(logfile, f"[{datetime.now()}] [DEBUG] finalize_only gestartet")
        try:
            finalize_all_shards_global(
                conn,
                cursor,
                logfile,
                sample_size=UMAP_SAMPLE_SIZE,
                pca_components=PCA_COMPONENTS,
                chunk=CHUNK,
            )
        finally:
            log_debug(logfile, f"[{datetime.now()}] [DEBUG] finalize_only beendet")

    conn.close()

PASS1 IPCA partial_fit: 100%|██████████| 359754/359754 [32:55<00:00, 182.08vec/s]
PASS2 IPCA transform+save: 100%|██████████| 359754/359754 [27:58<00:00, 214.38vec/s]
  warn(
PASS3 UMAP transform+update: 100%|██████████| 359754/359754 [29:27<00:00, 203.56vec/s]


In [None]:
import os, sqlite3
from tqdm import tqdm

DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"


def get_tables(cur):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cur.fetchall()]


def main():
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    tables = get_tables(cur)
    for tbl in tables:
        cur.execute(
            f"SELECT COUNT(*) FROM {tbl} WHERE embedding_path IS NOT NULL AND TRIM(embedding_path)<>''"
        )
        total = int(cur.fetchone()[0])
        if total == 0:
            print(f"{tbl}: 0 embedding_path – übersprungen")
            continue
        ok = missing = 0
        cur.execute(
            f"SELECT embedding_path FROM {tbl} WHERE embedding_path IS NOT NULL AND TRIM(embedding_path)<>''"
        )
        for (p,) in tqdm(cur.fetchall(), desc=tbl, unit="path", leave=False):
            p = p.strip()
            if os.path.exists(p):
                ok += 1
            elif os.path.exists(p + ".npy"):
                # Falls hier viele Treffer: vorherigen Repair-Job laufen lassen, um in der DB auf '.npy' zu korrigieren
                missing += 1
            else:
                missing += 1
        print(
            f"{tbl}: embeddings ok={ok}, missing_or_wrong={missing}, total_nonempty={total}"
        )
    conn.close()


if __name__ == "__main__":
    main()

                                                                                  

image_features_part_1: embeddings ok=100000, missing_or_wrong=0, total_nonempty=100000


                                                                                 

image_features_part_2: embeddings ok=51500, missing_or_wrong=0, total_nonempty=51500


                                                                                 

image_features_part_3: embeddings ok=50000, missing_or_wrong=0, total_nonempty=50000


                                                                                 

image_features_part_4: embeddings ok=50000, missing_or_wrong=0, total_nonempty=50000


                                                                                 

image_features_part_5: embeddings ok=50004, missing_or_wrong=0, total_nonempty=50004


                                                                                 

image_features_part_6: embeddings ok=50000, missing_or_wrong=0, total_nonempty=50000


                                                                               

image_features_part_7: embeddings ok=8250, missing_or_wrong=0, total_nonempty=8250




In [None]:
"""
Geht über alle image_features_part_% Tabellen,
zählt pro Spalte (alle!) filled/missing
und exportiert fehlende Pfade je Spalte in eine CSV.
Erzeugt:
- all_columns_summary.csv  (long format: table, column, col_type, total, filled, missing)
- missing_by_column_paths.csv (table, column, id, filename, path)
"""

import os, csv, sqlite3
from tqdm import tqdm

# ====== CONFIG ======
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"
OUTPUT_DIR = "hash_reports_all_columns"
USE_TQDM = True

os.makedirs(OUTPUT_DIR, exist_ok=True)
SUMMARY_CSV = os.path.join(OUTPUT_DIR, "all_columns_summary.csv")
MISSING_CSV = os.path.join(OUTPUT_DIR, "missing_by_column_paths.csv")


# ====== DB helpers ======
def get_shard_tables(cur):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cur.fetchall()]


def get_columns(cur, table):
    """
    Liefert Liste von (name, decl_type) aus PRAGMA table_info.
    decl_type kann z.B. 'TEXT', 'INTEGER', 'REAL', 'NUMERIC', 'BLOB' etc. sein.
    """
    cur.execute(f"PRAGMA table_info({table});")
    cols = [(row[1], (row[2] or "").upper()) for row in cur.fetchall()]
    # 'id' rauswerfen
    return [(n, t) for (n, t) in cols if n.lower() != "id"]


def count_total(cur, table):
    cur.execute(f"SELECT COUNT(*) FROM {table}")
    return int(cur.fetchone()[0])


def build_missing_condition(col_name, col_type_upper):
    # TEXT -> Null oder leere/Whitespace-Strings; sonst nur NULL
    if "TEXT" in col_type_upper:
        return f"({col_name} IS NULL OR TRIM({col_name}) = '')"
    else:
        return f"({col_name} IS NULL)"


def column_exists(cur, table, col_name):
    cur.execute(f"PRAGMA table_info({table});")
    return any(r[1] == col_name for r in cur.fetchall())


# ====== Export missing paths (streaming) ======
def export_missing_for_column(
    cur, table, col_name, cond, writer, has_filename, has_path
):
    # Baue SELECT dynamisch je nach vorhandenen Spalten
    select_cols = ["id"]
    if has_filename:
        select_cols.append("filename")
    if has_path:
        select_cols.append("path")
    sel = ", ".join(select_cols)

    cur.execute(f"SELECT {sel} FROM {table} WHERE {cond}")
    while True:
        rows = cur.fetchmany(5000)
        if not rows:
            break
        for row in rows:
            # row ist Tupel mit 1..3 Elementen je nach vorhandenen Spalten
            rid = row[0]
            fname = row[1] if has_filename else ""
            pth = (
                row[2]
                if (has_filename and has_path and len(row) > 2)
                else (
                    row[1] if (has_path and not has_filename and len(row) > 1) else ""
                )
            )
            writer.writerow(
                {
                    "table": table,
                    "column": col_name,
                    "id": rid,
                    "filename": fname,
                    "path": pth,
                }
            )


def main():
    conn = sqlite3.connect(DB_PATH, isolation_level=None)
    cur = conn.cursor()

    tables = get_shard_tables(cur)
    if not tables:
        print("Keine Shard-Tabellen gefunden.")
        return

    # Prepare CSVs
    with open(SUMMARY_CSV, "w", newline="", encoding="utf-8") as fsum, open(
        MISSING_CSV, "w", newline="", encoding="utf-8"
    ) as fmiss:

        sum_writer = csv.DictWriter(
            fsum,
            fieldnames=["table", "column", "col_type", "total", "filled", "missing"],
        )
        sum_writer.writeheader()

        miss_writer = csv.DictWriter(
            fmiss, fieldnames=["table", "column", "id", "filename", "path"]
        )
        miss_writer.writeheader()

        iterator = tqdm(tables, desc="Tabellen", unit="tbl") if USE_TQDM else tables
        for tbl in iterator:
            # Spalten & Basisinfos
            cols = get_columns(cur, tbl)
            total = count_total(cur, tbl)

            # Prüfen, ob filename/path vorhanden sind (für Export)
            present_colnames = {name for (name, _) in cols}
            has_filename = "filename" in present_colnames
            has_path = "path" in present_colnames

            # Fortschritt pro Tabelle (über Spalten)
            col_iter = (
                tqdm(cols, desc=f"{tbl}", unit="col", leave=False) if USE_TQDM else cols
            )
            for col_name, col_type in col_iter:
                cond = build_missing_condition(col_name, col_type)

                # Missing zählen
                cur.execute(f"SELECT COUNT(*) FROM {tbl} WHERE {cond}")
                missing = int(cur.fetchone()[0])
                filled = total - missing

                sum_writer.writerow(
                    {
                        "table": tbl,
                        "column": col_name,
                        "col_type": col_type,
                        "total": total,
                        "filled": filled,
                        "missing": missing,
                    }
                )

                # Fehlende Pfade exportieren
                if missing > 0:
                    export_missing_for_column(
                        cur, tbl, col_name, cond, miss_writer, has_filename, has_path
                    )

    conn.close()

    print(f"[OK] Summary: {SUMMARY_CSV}")
    print(f"[OK] Missing paths (alle Spalten): {MISSING_CSV}")


if __name__ == "__main__":
    main()

Tabellen: 100%|██████████| 7/7 [00:13<00:00,  1.94s/tbl]

[OK] Summary: hash_reports_all_columns\all_columns_summary.csv
[OK] Missing paths (alle Spalten): hash_reports_all_columns\missing_by_column_paths.csv





In [2]:
import os
import sqlite3
from pathlib import Path
from tqdm import tqdm

"""
Repariert embedding_path in allen image_features_part_% Tabellen – ID-weise paged,
damit Updates den Scan nicht abbrechen. Loggt fehlende als Bildpfade.
"""
# ===== CONFIG =====
DB_PATH = r"C:\BIG_DATA\data\database.db"
TABLE_PREFIX = "image_features_part_"
EMBEDDING_DIR = Path(r"C:\BIG_DATA\embeddings")
MISSING_TXT = "missing_embedding_images.txt"
PAGE = 5000  # Seitengröße nach id


# ===== Helpers =====
def get_tables(cur):
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cur.fetchall()]


def col_exists(cur, table, col):
    cur.execute(f"PRAGMA table_info({table});")
    return any(r[1] == col for r in cur.fetchall())


def index_embeddings(root: Path):
    idx = {}
    for base, _, files in os.walk(root):
        for fn in files:
            if fn.lower().endswith(".npy"):
                full = os.path.join(base, fn)
                idx.setdefault(fn, full)  # erster Treffer gewinnt
    return idx


def expected_basenames(filename: str):
    fn = (filename or "").strip()
    stem = Path(fn).stem
    return [f"{fn}.npy", f"{stem}.npy"]


def is_ok(embedding_path: str, filename: str) -> bool:
    if not embedding_path:
        return False
    p = embedding_path.strip()
    if not p or not p.lower().endswith(".npy"):
        return False
    if not os.path.exists(p):
        return False
    base = os.path.basename(p)
    return base in set(expected_basenames(filename))


def try_fix_path(p_current: str, filename: str, idx: dict, embedding_dir: Path):
    fn = (filename or "").strip()
    stem = Path(fn).stem
    cand_names = [f"{fn}.npy", f"{stem}.npy"]

    # 1) alter Pfad + .npy?
    if p_current:
        p_strip = p_current.strip()
        if p_strip and not p_strip.lower().endswith(".npy"):
            cand = p_strip + ".npy"
            if os.path.exists(cand):
                return cand

    # 2) direkt im Zielordner
    for name in cand_names:
        p = embedding_dir / name
        if os.path.exists(p):
            return str(p)

    # 3) Index
    for name in cand_names:
        if name in idx:
            return idx[name]

    # 4) eindeutiger Prefix-Treffer (stem_)
    stem_prefix = f"{stem}_"
    matches = [path for base, path in idx.items() if base.startswith(stem_prefix)]
    if len(matches) == 1:
        return matches[0]

    return None


# ===== Main per Tabelle (paged) =====
def process_table(conn, tbl, idx, missing_file):
    cur = conn.cursor()
    cur_write = conn.cursor()

    # Zählung für Progressbar
    cur.execute(f"SELECT COUNT(*) FROM {tbl}")
    total = int(cur.fetchone()[0])
    pbar = tqdm(total=total, desc=tbl, unit="row")

    ok = fixed = missing = 0
    last_id = 0

    while True:
        # Page nach Primärschlüssel
        cur.execute(
            f"""SELECT id, filename, path, embedding_path
                FROM {tbl}
                WHERE id > ?
                ORDER BY id ASC
                LIMIT ?""",
            (last_id, PAGE),
        )
        rows = cur.fetchall()
        if not rows:
            break

        updates = []  # (new_path, id)
        for _id, filename, img_path, emb_path in rows:
            if is_ok(emb_path, filename):
                ok += 1
                pbar.update(1)
                last_id = _id
                continue

            newp = try_fix_path(emb_path or "", filename or "", idx, EMBEDDING_DIR)
            if newp:
                updates.append((newp, _id))
                fixed += 1
            else:
                # fehlend -> Bildpfad loggen
                missing += 1
                missing_file.write(
                    f"{_id}; {tbl}; {filename or ''}; {img_path or ''}\n"
                )

            pbar.update(1)
            last_id = _id

        if updates:
            cur_write.executemany(
                f"UPDATE {tbl} SET embedding_path=? WHERE id=?", updates
            )
            conn.commit()

    pbar.close()
    print(f"{tbl}: ok={ok:,}, fixed={fixed:,}, missing={missing:,}, total={total:,}")
    return ok, fixed, missing


def main():
    print("Indexiere Embeddings im Ordner …")
    idx = index_embeddings(EMBEDDING_DIR)
    print(f"Gefundene Embeddings: {len(idx):,}")

    conn = sqlite3.connect(DB_PATH, isolation_level=None)
    cur = conn.cursor()
    tables = get_tables(cur)
    if not tables:
        print("Keine Tabellen gefunden.")
        return

    # Spalten-Check
    for t in tables:
        for c in ("filename", "path", "embedding_path"):
            if not col_exists(cur, t, c):
                print(f"[WARN] {t}: Spalte {c} fehlt – übersprungen.")
                tables.remove(t)
                break

    grand_ok = grand_fixed = grand_missing = 0
    with open(MISSING_TXT, "w", encoding="utf-8") as mf:
        mf.write("# id; table; filename; image_path\n")
        for tbl in tables:
            ok, fixed, missing = process_table(conn, tbl, idx, mf)
            grand_ok += ok
            grand_fixed += fixed
            grand_missing += missing

    conn.close()

    print("\n=== Zusammenfassung ===")
    print(f"OK:       {grand_ok:,}")
    print(f"Fixed:    {grand_fixed:,}")
    print(f"Missing:  {grand_missing:,}")
    print(f"Fehlende Bildpfade in: {MISSING_TXT}")


if __name__ == "__main__":
    main()

Indexiere Embeddings im Ordner …
Gefundene Embeddings: 359,474


image_features_part_1: 100%|██████████| 100000/100000 [00:07<00:00, 13850.67row/s]


image_features_part_1: ok=100,000, fixed=0, missing=0, total=100,000


image_features_part_2: 100%|██████████| 51500/51500 [00:03<00:00, 14453.34row/s]


image_features_part_2: ok=51,500, fixed=0, missing=0, total=51,500


image_features_part_3: 100%|██████████| 50000/50000 [00:03<00:00, 14801.86row/s]


image_features_part_3: ok=50,000, fixed=0, missing=0, total=50,000


image_features_part_4: 100%|██████████| 50000/50000 [00:03<00:00, 15238.08row/s]


image_features_part_4: ok=50,000, fixed=0, missing=0, total=50,000


image_features_part_5: 100%|██████████| 50004/50004 [00:03<00:00, 15031.16row/s]


image_features_part_5: ok=50,004, fixed=0, missing=0, total=50,004


image_features_part_6: 100%|██████████| 50000/50000 [02:34<00:00, 324.63row/s]  


image_features_part_6: ok=5,000, fixed=45,000, missing=0, total=50,000


image_features_part_7: 100%|██████████| 8250/8250 [00:11<00:00, 718.09row/s]  

image_features_part_7: ok=5,000, fixed=3,250, missing=0, total=8,250

=== Zusammenfassung ===
OK:       311,504
Fixed:    48,250
Missing:  0
Fehlende Bildpfade in: missing_embedding_images.txt





# UMAP FÜR 32D UND BESTEHENDE 512D EMBEDDINGSVEKTOREN BERECHNEN UND MODELLE SPEICHERN


In [2]:
# build_umap_32_512_from_64params_tqdm_pca32.py
# - Liest 64D-UMAP-Params (euclidean etc.)
# - Trainiert 32D/512D-UMAP und befüllt umap32_x/umap32_y, umap512_x/umap512_y
# - NEU: erzeugt 32D .npy pro Datensatz und speichert Pfad in Spalte pca_32 (TEXT)

import sqlite3, random
from pathlib import Path
from typing import List, Tuple, Dict, Any, Optional
import numpy as np
from joblib import load, dump
from tqdm.auto import tqdm

# =================== KONFIG (ANPASSEN) ===================
DB_PATH = Path(r"C:\BIG_DATA\data\database.db")
TABLE_PREFIX = "image_features_part_"
BATCH = 4096

# 64D-UMAP (liest Params)
UMAP64_PATH = Path(r"C:\BIG_DATA\models\umap_reducer.joblib")
PREFER_KEY_64 = 64

# Speichermodus 32/512 UMAP-Modelle
SAVE_MODE = "separate"  # "separate" | "single"
UMAP32_PATH = Path(r"C:\BIG_DATA\models\umap32_reducer.joblib")
UMAP512_PATH = Path(r"C:\BIG_DATA\models\umap512_reducer.joblib")
UMAP_SINGLE_PATH = Path(r"C:\BIG_DATA\models\umap_reducer.joblib")

# Sampling
SAMPLE_32 = 25_000
SAMPLE_512 = 25_000

# Preprocessing (bei metric="euclidean" meist False, um 64D-Setup zu spiegeln)
PREPROC_L2_32 = False
PREPROC_L2_512 = False

# Nur trainieren / nur befüllen
ONLY_TRAIN = False
ONLY_POPULATE = False

# tqdm / Progressbar
TQDM_ENABLE = True
TQDM_MININTERVAL = 0.2
COUNT_FOR_SAMPLING_TOTAL = True
COUNT_FOR_POPULATE_TOTAL = True

# ===== NEU: 32D .npy schreiben und Pfad in DB (Spalte pca_32) =====
WRITE_PCA32_FILES = True
PCA32_DIR = Path(r"C:\BIG_DATA\embeddings_pca32")  # Zielordner für 32D .npy
PCA32_DTYPE = np.float32
# =================== ENDE KONFIG ===================

try:
    import umap
except Exception as e:
    raise ImportError("Bitte 'umap-learn' installieren: pip install umap-learn") from e


# ========= Utils =========
def l2n(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    v = v.astype(np.float32, copy=False)
    n = float(np.linalg.norm(v))
    if n < eps:
        return v
    return (v / n).astype(np.float32, copy=False)


def safe_load_vec(path: str) -> np.ndarray:
    v = np.load(path, mmap_mode="r")
    if v.ndim == 2 and v.shape[0] == 1:
        v = v[0]
    return v.astype(np.float32, copy=False)


def list_tables(conn: sqlite3.Connection) -> List[str]:
    cur = conn.cursor()
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name",
        (f"{TABLE_PREFIX}%",),
    )
    return [r[0] for r in cur.fetchall()]


def ensure_cols(
    conn: sqlite3.Connection, table: str, cols: List[Tuple[str, str]]
) -> None:
    cur = conn.cursor()
    cur.execute(f'PRAGMA table_info("{table}")')
    have = {r[1] for r in cur.fetchall()}
    for name, typ in cols:
        if name not in have:
            cur.execute(f'ALTER TABLE "{table}" ADD COLUMN {name} {typ}')


def iter_paths(conn: sqlite3.Connection, table: str, where_clause: str = ""):
    cur = conn.cursor()
    sql = f'SELECT id, pca_embedding, embedding_path FROM "{table}" {where_clause}'.strip()
    cur.execute(sql)
    while True:
        rows = cur.fetchmany(BATCH)
        if not rows:
            break
        for rid, pca_path, emb_path in rows:
            yield int(rid), str(pca_path), str(emb_path)


def count_rows(
    conn: sqlite3.Connection, table: str, where_clause: Optional[str] = None
) -> int:
    cur = conn.cursor()
    sql = f'SELECT COUNT(*) FROM "{table}" {where_clause or ""}'.strip()
    cur.execute(sql)
    v = cur.fetchone()[0]
    return int(v or 0)


def reservoir_sample_paths(
    conn: sqlite3.Connection, tables: List[str], k: int, which: str
) -> List[str]:
    if k <= 0:
        return []
    reservoir: List[str] = []
    seen = 0
    total = None
    if TQDM_ENABLE and COUNT_FOR_SAMPLING_TOTAL:
        total = sum(count_rows(conn, t) for t in tables)
    pbar = tqdm(
        total=total,
        disable=not TQDM_ENABLE,
        mininterval=TQDM_MININTERVAL,
        desc=f"Reservoir-Sampling {which}",
        unit="row",
    )
    try:
        for t in tables:
            for _, pca_path, emb_path in iter_paths(conn, t):
                p = pca_path if which == "32" else emb_path
                seen += 1
                if len(reservoir) < k:
                    reservoir.append(p)
                else:
                    j = random.randint(0, seen - 1)
                    if j < k:
                        reservoir[j] = p
                if TQDM_ENABLE:
                    pbar.update(1)
    finally:
        pbar.close()
    return reservoir


def load_vectors_from_paths(paths: List[str], dim: int, l2: bool) -> np.ndarray:
    vecs = []
    pbar = tqdm(
        total=len(paths),
        disable=not TQDM_ENABLE,
        mininterval=TQDM_MININTERVAL,
        desc=f"Lade Vektoren {dim}D",
        unit="vec",
    )
    try:
        for p in paths:
            try:
                v = safe_load_vec(p)
                if dim == 32:
                    if v.shape[0] < 32:
                        pbar.update(1)
                        continue
                    v = v[:32]
                elif dim == 512:
                    if v.shape[0] < 128:
                        pbar.update(1)
                        continue
                v = l2n(v) if l2 else v
                vecs.append(v)
            except Exception:
                pass
            finally:
                pbar.update(1)
    finally:
        pbar.close()
    if not vecs:
        return np.empty((0, dim), dtype=np.float32)
    return np.vstack(vecs).astype(np.float32, copy=False)


# -------- 64D-Params laden --------
SAFE_KEYS = [
    "n_neighbors",
    "min_dist",
    "metric",
    "metric_kwds",
    "n_epochs",
    "learning_rate",
    "init",
    "spread",
    "low_memory",
    "set_op_mix_ratio",
    "local_connectivity",
    "repulsion_strength",
    "negative_sample_rate",
    "transform_queue_size",
    "random_state",
    "angular_rp_forest",
    "target_n_neighbors",
    "target_metric",
    "target_metric_kwds",
    "target_weight",
    "transform_seed",
    "transform_mode",
    "force_approximation_algorithm",
    "verbose",
    "unique",
    "densmap",
    "dens_lambda",
    "dens_frac",
    "dens_var_shift",
    "disconnection_distance",
    "output_metric",
    "output_metric_kwds",
]
FALLBACK_FROM_YOUR_64 = {
    "n_neighbors": 15,
    "min_dist": 0.1,
    "metric": "euclidean",
    "learning_rate": 1.0,
    "local_connectivity": 1.0,
    "low_memory": True,
    "init": "spectral",
    "n_epochs": None,
    "negative_sample_rate": 5,
    "random_state": 42,
    "repulsion_strength": 1.0,
    "set_op_mix_ratio": 1.0,
    "spread": 1.0,
    "transform_queue_size": 4.0,
    "transform_mode": "embedding",
    "transform_seed": 42,
    "unique": False,
    "verbose": False,
    "densmap": False,
    "dens_lambda": 2.0,
    "dens_frac": 0.3,
    "dens_var_shift": 0.1,
    "output_metric": "euclidean",
    "metric_kwds": None,
    "output_metric_kwds": None,
    "target_metric": "categorical",
    "target_metric_kwds": None,
    "target_n_neighbors": -1,
    "target_weight": 0.5,
    "angular_rp_forest": False,
    "force_approximation_algorithm": False,
    "disconnection_distance": None,
}


def load_base_params_from_64(path: Path) -> Dict[str, Any]:
    try:
        obj = load(path)
    except Exception:
        return FALLBACK_FROM_YOUR_64.copy()
    if isinstance(obj, dict):
        reducer = (
            obj.get(PREFER_KEY_64)
            or obj.get(str(PREFER_KEY_64))
            or next(iter(obj.values()))
        )
    else:
        reducer = obj
    params = reducer.get_params(deep=False)
    base = {k: params.get(k) for k in SAFE_KEYS if k in params}
    for k, v in FALLBACK_FROM_YOUR_64.items():
        base.setdefault(k, v)
    return base


def train_umap_with(base_params: Dict[str, Any], X: np.ndarray, n_components: int = 2):
    cfg = base_params.copy()
    cfg["n_components"] = n_components
    reducer = umap.UMAP(**cfg)
    reducer.fit(X)
    return reducer


def save_models(models: Dict[int, Any]):
    if SAVE_MODE == "separate":
        if 32 in models:
            UMAP32_PATH.parent.mkdir(parents=True, exist_ok=True)
            dump(models[32], UMAP32_PATH)
        if 512 in models:
            UMAP512_PATH.parent.mkdir(parents=True, exist_ok=True)
            dump(models[512], UMAP512_PATH)
    else:
        if UMAP_SINGLE_PATH.exists():
            obj = load(UMAP_SINGLE_PATH)
            d = (
                obj
                if isinstance(obj, dict)
                else {getattr(obj, "n_features_in_", 64): obj}
            )
            d.update(models)
            dump(d, UMAP_SINGLE_PATH)
        else:
            dump(models, UMAP_SINGLE_PATH)


def load_models_for_populate() -> Dict[int, Any]:
    models: Dict[int, Any] = {}
    if SAVE_MODE == "separate":
        if UMAP32_PATH.exists():
            models[32] = load(UMAP32_PATH)
        if UMAP512_PATH.exists():
            models[512] = load(UMAP512_PATH)
    else:
        if UMAP_SINGLE_PATH.exists():
            obj = load(UMAP_SINGLE_PATH)
            models = (
                obj
                if isinstance(obj, dict)
                else {getattr(obj, "n_features_in_", 2): obj}
            )
    return models


def _fetch_rows_missing(conn: sqlite3.Connection, table: str, where_clause: str):
    cur = conn.cursor()
    cur.execute(
        f'SELECT id, pca_embedding, embedding_path FROM "{table}" {where_clause}'
    )
    while True:
        rows = cur.fetchmany(BATCH)
        if not rows:
            break
        yield rows


# ===== NEU: 32D .npy erzeugen & Pfad in pca_32 speichern =====
def _pca32_target_path(table: str, rid: int) -> Path:
    return PCA32_DIR / table / f"{rid}_pca32.npy"


def populate_pca32_column(conn: sqlite3.Connection, tables: List[str]) -> None:
    if not WRITE_PCA32_FILES:
        print("[pca_32] Übersprungen (WRITE_PCA32_FILES=False).")
        return

    PCA32_DIR.mkdir(parents=True, exist_ok=True)

    for t in tables:
        # Spalte pca_32 (TEXT) anlegen, falls fehlt
        ensure_cols(conn, t, [("pca_32", "TEXT")])

        # Nur fehlende Einträge
        where = "WHERE pca_32 IS NULL"
        total_missing = (
            count_rows(conn, t, where_clause=where)
            if (TQDM_ENABLE and COUNT_FOR_POPULATE_TOTAL)
            else None
        )
        pbar = tqdm(
            total=total_missing,
            disable=not TQDM_ENABLE,
            mininterval=TQDM_MININTERVAL,
            desc=f"Write pca_32 {t}",
            unit="row",
        )

        updates = []
        written = 0
        try:
            for batch in _fetch_rows_missing(conn, t, where):
                for rid, pca_path, _ in batch:
                    try:
                        v64 = safe_load_vec(pca_path)
                        if v64.shape[0] < 32:
                            if TQDM_ENABLE:
                                pbar.update(1)
                            continue
                        v32 = v64[:32].astype(PCA32_DTYPE, copy=False)
                        out_path = _pca32_target_path(t, int(rid))
                        out_path.parent.mkdir(parents=True, exist_ok=True)
                        if not out_path.exists():
                            np.save(out_path, v32, allow_pickle=False)
                            written += 1
                        updates.append((str(out_path), int(rid)))
                    except Exception:
                        # defekte Quelle -> skip
                        pass
                    finally:
                        if TQDM_ENABLE:
                            pbar.update(1)
                if updates:
                    conn.executemany(f'UPDATE "{t}" SET pca_32=? WHERE id=?', updates)
                    conn.commit()
                    updates = []
        finally:
            pbar.close()
        print(f"[pca_32] {t}: neue Dateien geschrieben: {written}")


# ===== UMAP-Koordinaten befüllen =====
def populate_umap_coords(
    conn: sqlite3.Connection, tables: List[str], dim: int, reducer: Any, l2: bool
) -> None:
    if dim == 32:
        colx, coly = "umap32_x", "umap32_y"
        loader = lambda pca_path, emb_path: safe_load_vec(pca_path)[:32]
    else:
        colx, coly = "umap512_x", "umap512_y"
        loader = lambda pca_path, emb_path: safe_load_vec(emb_path)

    need_cols = [(colx, "REAL"), (coly, "REAL")]
    where = f"WHERE {colx} IS NULL OR {coly} IS NULL"

    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")
    conn.execute("PRAGMA temp_store=MEMORY")

    for t in tables:
        ensure_cols(conn, t, need_cols)
        total_missing = (
            count_rows(conn, t, where_clause=where)
            if (TQDM_ENABLE and COUNT_FOR_POPULATE_TOTAL)
            else None
        )
        pbar = tqdm(
            total=total_missing,
            disable=not TQDM_ENABLE,
            mininterval=TQDM_MININTERVAL,
            desc=f"Populate {t} → {colx}/{coly}",
            unit="row",
        )

        total_upd = 0
        try:
            for batch in _fetch_rows_missing(conn, t, where):
                ids, mats = [], []
                for rid, pca_path, emb_path in batch:
                    try:
                        v = loader(pca_path, emb_path)
                        if v.ndim == 2 and v.shape[0] == 1:
                            v = v[0]
                        v = l2n(v) if l2 else v.astype(np.float32, copy=False)
                        mats.append(v)
                        ids.append(int(rid))
                    except Exception:
                        continue
                if not ids:
                    continue
                X = np.vstack(mats).astype(np.float32, copy=False)
                XY = reducer.transform(X)
                rows = [
                    (float(x), float(y), rid) for (x, y), rid in zip(XY[:, :2], ids)
                ]
                conn.executemany(
                    f'UPDATE "{t}" SET {colx}=?, {coly}=? WHERE id=?', rows
                )
                total_upd += len(rows)
                if TQDM_ENABLE:
                    pbar.update(len(rows))
            conn.commit()
        finally:
            pbar.close()
        print(f"[{t}] aktualisiert: {total_upd} Zeilen.")


# =================== PIPELINE ===================
def main():
    conn = sqlite3.connect(str(DB_PATH))
    tables = list_tables(conn)
    if not tables:
        raise RuntimeError(f"Keine Tabellen mit Präfix '{TABLE_PREFIX}' gefunden.")

    # 1) pca_32-Spalte befüllen (Dateien + Pfade)
    populate_pca32_column(conn, tables)

    # 2) 64D-Params laden
    base = load_base_params_from_64(UMAP64_PATH)
    print(
        "[Base64] Params:",
        {
            k: base[k]
            for k in (
                "metric",
                "n_neighbors",
                "min_dist",
                "init",
                "low_memory",
                "negative_sample_rate",
                "random_state",
            )
        },
    )

    models: Dict[int, Any] = {}

    # 3) TRAINING (nur wenn nicht ONLY_POPULATE)
    if not ONLY_POPULATE:
        # 32D
        paths32 = reservoir_sample_paths(conn, tables, SAMPLE_32, which="32")
        X32 = load_vectors_from_paths(paths32, dim=32, l2=PREPROC_L2_32)
        if X32.shape[0] > 0:
            print(f"[Train] 32D-UMAP fit auf {X32.shape[0]} Vektoren …")
            models[32] = train_umap_with(base, X32, n_components=2)
        else:
            print("[Warn] Kein 32D-Trainingssample gefunden.")

        # 512D
        paths512 = reservoir_sample_paths(conn, tables, SAMPLE_512, which="512")
        X512 = load_vectors_from_paths(paths512, dim=512, l2=PREPROC_L2_512)
        if X512.shape[0] > 0:
            print(f"[Train] 512D-UMAP fit auf {X512.shape[0]} Vektoren …")
            models[512] = train_umap_with(base, X512, n_components=2)
        else:
            print("[Warn] Kein 512D-Trainingssample gefunden.")

        if models:
            save_models(models)
            if SAVE_MODE == "separate":
                print(
                    f"[Save] 32D -> {UMAP32_PATH if 32 in models else 'skip'} ; 512D -> {UMAP512_PATH if 512 in models else 'skip'}"
                )
            else:
                print(f"[Save] Dict -> {UMAP_SINGLE_PATH}")

    # 4) POPULATE (UMAP-Koordinaten)
    if not ONLY_TRAIN:
        if not models:
            models = load_models_for_populate()
        if 32 in models:
            populate_umap_coords(
                conn, tables, dim=32, reducer=models[32], l2=PREPROC_L2_32
            )
        else:
            print("[Skip] Kein 32D-Reducer verfügbar.")
        if 512 in models:
            populate_umap_coords(
                conn, tables, dim=512, reducer=models[512], l2=PREPROC_L2_512
            )
        else:
            print("[Skip] Kein 512D-Reducer verfügbar.")

    conn.close()
    print("Fertig.")


if __name__ == "__main__":
    main()

Write pca_32 image_features_part_1:   0%|          | 0/100000 [00:00<?, ?row/s]

[pca_32] image_features_part_1: neue Dateien geschrieben: 100000


Write pca_32 image_features_part_2:   0%|          | 0/51500 [00:00<?, ?row/s]

[pca_32] image_features_part_2: neue Dateien geschrieben: 51500


Write pca_32 image_features_part_3:   0%|          | 0/50000 [00:00<?, ?row/s]

[pca_32] image_features_part_3: neue Dateien geschrieben: 50000


Write pca_32 image_features_part_4:   0%|          | 0/50000 [00:00<?, ?row/s]

[pca_32] image_features_part_4: neue Dateien geschrieben: 50000


Write pca_32 image_features_part_5:   0%|          | 0/50004 [00:00<?, ?row/s]

[pca_32] image_features_part_5: neue Dateien geschrieben: 50004


Write pca_32 image_features_part_6:   0%|          | 0/50000 [00:00<?, ?row/s]

[pca_32] image_features_part_6: neue Dateien geschrieben: 50000


Write pca_32 image_features_part_7:   0%|          | 0/8250 [00:00<?, ?row/s]

[pca_32] image_features_part_7: neue Dateien geschrieben: 8250
[Base64] Params: {'metric': 'euclidean', 'n_neighbors': 15, 'min_dist': 0.1, 'init': 'spectral', 'low_memory': True, 'negative_sample_rate': 5, 'random_state': 42}


Reservoir-Sampling 32:   0%|          | 0/359754 [00:00<?, ?row/s]

Lade Vektoren 32D:   0%|          | 0/25000 [00:00<?, ?vec/s]

[Train] 32D-UMAP fit auf 25000 Vektoren …


  warn(


Reservoir-Sampling 512:   0%|          | 0/359754 [00:00<?, ?row/s]

Lade Vektoren 512D:   0%|          | 0/25000 [00:00<?, ?vec/s]

[Train] 512D-UMAP fit auf 25000 Vektoren …


  warn(


[Save] 32D -> C:\BIG_DATA\models\umap32_reducer.joblib ; 512D -> C:\BIG_DATA\models\umap512_reducer.joblib


Populate image_features_part_1 → umap32_x/umap32_y:   0%|          | 0/100000 [00:00<?, ?row/s]

[image_features_part_1] aktualisiert: 100000 Zeilen.


Populate image_features_part_2 → umap32_x/umap32_y:   0%|          | 0/51500 [00:00<?, ?row/s]

[image_features_part_2] aktualisiert: 51500 Zeilen.


Populate image_features_part_3 → umap32_x/umap32_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_3] aktualisiert: 50000 Zeilen.


Populate image_features_part_4 → umap32_x/umap32_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_4] aktualisiert: 50000 Zeilen.


Populate image_features_part_5 → umap32_x/umap32_y:   0%|          | 0/50004 [00:00<?, ?row/s]

[image_features_part_5] aktualisiert: 50004 Zeilen.


Populate image_features_part_6 → umap32_x/umap32_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_6] aktualisiert: 50000 Zeilen.


Populate image_features_part_7 → umap32_x/umap32_y:   0%|          | 0/8250 [00:00<?, ?row/s]

[image_features_part_7] aktualisiert: 8250 Zeilen.


Populate image_features_part_1 → umap512_x/umap512_y:   0%|          | 0/100000 [00:00<?, ?row/s]

[image_features_part_1] aktualisiert: 100000 Zeilen.


Populate image_features_part_2 → umap512_x/umap512_y:   0%|          | 0/51500 [00:00<?, ?row/s]

[image_features_part_2] aktualisiert: 51500 Zeilen.


Populate image_features_part_3 → umap512_x/umap512_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_3] aktualisiert: 50000 Zeilen.


Populate image_features_part_4 → umap512_x/umap512_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_4] aktualisiert: 50000 Zeilen.


Populate image_features_part_5 → umap512_x/umap512_y:   0%|          | 0/50004 [00:00<?, ?row/s]

[image_features_part_5] aktualisiert: 50004 Zeilen.


Populate image_features_part_6 → umap512_x/umap512_y:   0%|          | 0/50000 [00:00<?, ?row/s]

[image_features_part_6] aktualisiert: 50000 Zeilen.


Populate image_features_part_7 → umap512_x/umap512_y:   0%|          | 0/8250 [00:00<?, ?row/s]

[image_features_part_7] aktualisiert: 8250 Zeilen.
Fertig.


In [1]:
# print_umap64_params.py
from joblib import load
import json

UMAP_PATH = r"C:\BIG_DATA\models\umap_reducer.joblib"  # ggf. anpassen


def pick_64(obj):
    # joblib kann entweder ein einzelnes UMAP-Objekt oder ein Dict {32:...,64:...,512:...} enthalten
    if isinstance(obj, dict):
        # bevorzugt Key 64 (int oder str)
        if 64 in obj:
            return obj[64]
        if "64" in obj:
            return obj["64"]
        # sonst: bestes Matching nehmen
        for k in obj:
            try:
                if int(k) == 64:
                    return obj[k]
            except Exception:
                pass
        # Fallback: irgendein Eintrag
        return next(iter(obj.values()))
    return obj


def main():
    reducer = pick_64(load(UMAP_PATH))
    # Konstruktor-Parameter
    params = reducer.get_params(deep=False)

    # Nützliche Zusatzinfos
    extra = {
        "n_features_in_": getattr(reducer, "n_features_in_", None),
        "n_components_": (
            getattr(reducer, "embedding_", None).shape[1]
            if getattr(reducer, "embedding_", None) is not None
            else None
        ),
        "has_transform": hasattr(reducer, "transform"),
        "learned_ab": {
            "a_": getattr(reducer, "a_", None),
            "b_": getattr(reducer, "b_", None),
        },
    }

    print("\n=== UMAP(64D) get_params() ===")
    print(json.dumps(params, indent=2, default=str))
    print("\n=== Extra ===")
    print(json.dumps(extra, indent=2, default=str))


if __name__ == "__main__":
    main()


=== UMAP(64D) get_params() ===
{
  "a": null,
  "angular_rp_forest": false,
  "b": null,
  "dens_frac": 0.3,
  "dens_lambda": 2.0,
  "dens_var_shift": 0.1,
  "densmap": false,
  "disconnection_distance": null,
  "force_approximation_algorithm": false,
  "init": "spectral",
  "learning_rate": 1.0,
  "local_connectivity": 1.0,
  "low_memory": true,
  "metric": "euclidean",
  "metric_kwds": null,
  "min_dist": 0.1,
  "n_components": 2,
  "n_epochs": null,
  "n_jobs": 1,
  "n_neighbors": 15,
  "negative_sample_rate": 5,
  "output_dens": false,
  "output_metric": "euclidean",
  "output_metric_kwds": null,
  "precomputed_knn": [
    null,
    null,
    null
  ],
  "random_state": 42,
  "repulsion_strength": 1.0,
  "set_op_mix_ratio": 1.0,
  "spread": 1.0,
  "target_metric": "categorical",
  "target_metric_kwds": null,
  "target_n_neighbors": -1,
  "target_weight": 0.5,
  "tqdm_kwds": {
    "desc": "Epochs completed",
    "bar_format": "{desc}: {percentage:3.0f}%| {bar} {n_fmt}/{total_fmt} [

In [None]:
import numpy as np, sqlite3
import cv2
from joblib import load

IMG = r"D:\data\image_data\pixabay_dataset_v1\images_01\adult-background-business-computer-15700.jpg"
DB = r"C:\BIG_DATA\data\database.db"
MODELS_DIR = Path(r"C:\BIG_DATA\models")
IPCA_PATH = MODELS_DIR / "ipca.joblib"


class PCATransformer:
    def __init__(self, ipca_path: Path):
        self.ipca = load(ipca_path)  # IncrementalPCA

    def transform64(self, vec512_raw: np.ndarray) -> np.ndarray:
        # Erwartet den ROHEN 512D-Vektor (nicht normiert), um exakt wie im DB-Build zu sein
        return self.ipca.transform(vec512_raw[None, :]).astype(np.float32)[0]


def _read_rgb(p: Path) -> np.ndarray:
    data = np.fromfile(str(p), dtype=np.uint8)
    bgr = cv2.imdecode(data, cv2.IMREAD_COLOR)  # BGR
    if bgr is None:
        raise ValueError(f"cv2.imdecode konnte Bild nicht lesen: {p}")
    return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)  # -> RGB


# 1) Query-Embeddings (512→64→32) so wie im Script
q512_raw = extract_embeddings([_read_rgb(IMG)])[0].astype(np.float32).ravel()
q64_raw = PCATransformer(IPCA_PATH).transform64(q512_raw)
q32_raw = q64_raw[:32]


def l2n(x):
    x = x.astype(np.float32).ravel()
    n = np.linalg.norm(x) + 1e-12
    return x / n


# 2) Die passenden DB-Dateien holen
conn = sqlite3.connect(DB)
cur = conn.cursor()
cur.execute(
    "SELECT embedding_path, pca_embedding, pca_32 FROM image_features_part_1 WHERE path=? LIMIT 1",
    (IMG,),
)
row = cur.fetchone()
conn.close()
emb512_path, emb64_path, emb32_path = row

db512 = np.load(emb512_path).astype(np.float32).ravel()
db64 = np.load(emb64_path).astype(np.float32).ravel()[:64]
db32 = np.load(emb32_path).astype(np.float32).ravel()[:32]

print("cos512:", float(l2n(q512_raw) @ l2n(db512)))
print("cos64 :", float(l2n(q64_raw) @ l2n(db64)))
print("cos32 :", float(l2n(q32_raw) @ l2n(db32)))

cos512: 0.2743372619152069
cos64 : -0.13994933664798737
cos32 : -0.15374481678009033


In [14]:
# calibrate_embed_match.py
import os, numpy as np, cv2, torch
from pathlib import Path
from torchvision.models import resnet18, ResNet18_Weights

# --- Eingaben: pass hier 1) Pfad zum DB-Bild  2) Pfad zum dazugehörigen DB-Embedding (512D .npy)
IMG = r"D:\data\image_data\pixabay_dataset_v1\images_01\adult-background-business-computer-15700.jpg"
VEC_DB = r"C:\BIG_DATA\embeddings\adult-background-business-computer-15700.jpg.npy"

# --- Lade DB-Vector
v_db = np.load(VEC_DB).astype(np.float32).ravel()
v_db /= np.linalg.norm(v_db) + 1e-12

# --- Kandidaten: Weights / Kanal / ResizeLib / Norm
weight_opts = []
for name in ["DEFAULT", "IMAGENET1K_V1", "IMAGENET1K_V2"]:
    if hasattr(ResNet18_Weights, name):
        weight_opts.append(name)
if not weight_opts:
    weight_opts = ["DEFAULT"]

channel_modes = ["rgb", "bgr"]
resize_libs = ["cv2", "pil"]
norm_modes = ["imagenet", "none"]  # einige Pipelines speichern evtl. ohne Mean/Std

IM_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
IM_STD = np.array([0.229, 0.224, 0.225], dtype=np.float32)


def read_img(path, mode):
    data = np.fromfile(path, dtype=np.uint8)
    bgr = cv2.imdecode(data, cv2.IMREAD_COLOR)
    if bgr is None:
        raise ValueError("cv2.imdecode failed")
    if mode == "rgb":
        return cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    return bgr  # "bgr"


def resize_224(img, lib):
    if lib == "cv2":
        return cv2.resize(img, (224, 224), interpolation=cv2.INTER_LINEAR)
    else:
        from PIL import Image

        im = Image.fromarray(img if img.dtype == np.uint8 else img.astype(np.uint8))
        return np.asarray(im.resize((224, 224), Image.BILINEAR), dtype=np.uint8)


def to_chw_float01(arr):
    x = arr.astype(np.float32)
    if x.max() > 1.5:
        x = x / 255.0
    x = np.transpose(x, (2, 0, 1))
    return x


def norm_imageNet(chw):
    return (chw - IM_MEAN[:, None, None]) / IM_STD[:, None, None]


def get_model(weights_name):
    weights = getattr(ResNet18_Weights, weights_name)
    net = resnet18(weights=weights)
    net = torch.nn.Sequential(*list(net.children())[:-1]).eval()
    return net


def to_vec(net, chw):
    with torch.no_grad():
        t = torch.from_numpy(chw[None, ...])
        v = net(t).squeeze(-1).squeeze(-1).cpu().numpy().astype(np.float32).ravel()
    v /= np.linalg.norm(v) + 1e-12
    return v


best = (-1.0, None)
img_cache = {}
for ch in channel_modes:
    img_cache[ch] = read_img(IMG, ch)

for w in weight_opts:
    net = get_model(w)
    for ch in channel_modes:
        for rl in resize_libs:
            for nm in norm_modes:
                img = resize_224(img_cache[ch], rl)
                x = to_chw_float01(img)
                if nm == "imagenet":
                    x = norm_imageNet(x)
                vq = to_vec(net, x)
                cos = float(vq @ v_db)
                if cos > best[0]:
                    best = (
                        cos,
                        {"weights": w, "channel": ch, "resize": rl, "norm": nm},
                    )

print("[best]", best[0], best[1])

[best] 0.2993033528327942 {'weights': 'DEFAULT', 'channel': 'rgb', 'resize': 'pil', 'norm': 'imagenet'}


In [1]:
# run_once_migrate_color_hist_to_blob.py
import sqlite3, numpy as np
from pathlib import Path

DB = Path(r"C:\BIG_DATA\data\database.db")
TABLE_PREFIX = "image_features_part_"
TEXT_COL = "color_hist"  # dein Name
BLOB_COL = "color_hist_blob"  # neu
DIM = 96  # 3 * 32 bins

with sqlite3.connect(str(DB)) as conn:
    cur = conn.cursor()
    # Tabellen finden
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    tables = [r[0] for r in cur.fetchall()]
    for t in tables:
        print("Tabelle:", t)

        # Spalte hinzufügen (falls nicht vorhanden)
        cur.execute(f'PRAGMA table_info("{t}")')
        cols = {r[1] for r in cur.fetchall()}
        if BLOB_COL not in cols:
            cur.execute(f'ALTER TABLE "{t}" ADD COLUMN {BLOB_COL} BLOB')
            conn.commit()

        # Alle TEXT-Hists holen und einmalig konvertieren
        cur.execute(f'SELECT id, "{TEXT_COL}" FROM "{t}" WHERE "{BLOB_COL}" IS NULL')
        rows = cur.fetchall()
        print(f"  konvertiere {len(rows)} Einträge …")
        for rid, txt in rows:
            if not txt:
                cur.execute(f'UPDATE "{t}" SET "{BLOB_COL}"=? WHERE id=?', (None, rid))
                continue
            s = txt if isinstance(txt, str) else txt.decode("utf-8", "ignore")
            arr = np.fromstring(
                s.strip().lstrip("[").rstrip("]"), sep=",", dtype=np.float32
            )
            if arr.size != DIM:
                cur.execute(f'UPDATE "{t}" SET "{BLOB_COL}"=? WHERE id=?', (None, rid))
                continue
            ssum = float(arr.sum())
            if ssum > 0:
                arr /= ssum
            cur.execute(
                f'UPDATE "{t}" SET "{BLOB_COL}"=? WHERE id=?', (arr.tobytes(), rid)
            )
        conn.commit()
print("Fertig.")

Tabelle: image_features_part_1
  konvertiere 100000 Einträge …
Tabelle: image_features_part_2
  konvertiere 51500 Einträge …
Tabelle: image_features_part_3
  konvertiere 50000 Einträge …
Tabelle: image_features_part_4
  konvertiere 50000 Einträge …
Tabelle: image_features_part_5
  konvertiere 50004 Einträge …
Tabelle: image_features_part_6
  konvertiere 50000 Einträge …
Tabelle: image_features_part_7
  konvertiere 8250 Einträge …
Fertig.


In [2]:
import sqlite3, re
from pathlib import Path

DB = Path(r"C:\BIG_DATA\data\database.db")
TABLE_PREFIX = "image_features_part_"
TEXT_COL = "color_hist"
BLOB_COL = "color_hist_blob"

with sqlite3.connect(str(DB)) as conn:
    cur = conn.cursor()
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name ASC",
        (f"{TABLE_PREFIX}%",),
    )
    tables = [r[0] for r in cur.fetchall()]

    for t in tables:
        print(f"\nTabelle: {t}")
        cur.execute(f'SELECT COUNT(*) FROM "{t}"')
        total = cur.fetchone()[0]
        cur.execute(f'SELECT COUNT(*) FROM "{t}" WHERE "{BLOB_COL}" IS NULL')
        null_blob = cur.fetchone()[0]
        cur.execute(
            f'SELECT COUNT(*) FROM "{t}" WHERE "{TEXT_COL}" IS NULL OR TRIM("{TEXT_COL}")=""'
        )
        empty_txt = cur.fetchone()[0]
        print(
            f"  total={total:,} | blob NULL={null_blob:,} | text leer/NULL={empty_txt:,}"
        )

        # Beispiele: TEXT vorhanden aber blob NULL -> Parsing/Dimension/Summe-Problem
        cur.execute(
            f"""
            SELECT id, substr("{TEXT_COL}",1,120)
            FROM "{t}"
            WHERE "{BLOB_COL}" IS NULL AND "{TEXT_COL}" IS NOT NULL AND TRIM("{TEXT_COL}")<>""
            LIMIT 5
        """
        )
        rows = cur.fetchall()
        if rows:
            print("  Beispiele problematische TEXT-Hists:")
            for rid, snippet in rows:
                print(f"    id={rid}  text='{snippet}...'")


Tabelle: image_features_part_1
  total=100,000 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_2
  total=51,500 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_3
  total=50,000 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_4
  total=50,000 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_5
  total=50,004 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_6
  total=50,000 | blob NULL=0 | text leer/NULL=0

Tabelle: image_features_part_7
  total=8,250 | blob NULL=0 | text leer/NULL=0
