# dermaCAP combiner

In [1]:
from pathlib import Path
import shutil, hashlib
import pandas as pd
from PIL import Image, ImageFile
from tqdm import tqdm
import os

from concurrent.futures import ProcessPoolExecutor, as_completed
from PIL import UnidentifiedImageError

In [None]:
BASE_DIR          = Path("/workspace/clip_xai/dermaCAP")

AIMI_CSV          = BASE_DIR / "aimi_prepared.csv"          
HAM_CSV           = BASE_DIR / "ham10000_prepared.csv"     
PAD_UFES_CSV      = BASE_DIR / "pad_ufes_prepared.csv"      
SKINCAP_CSV       = BASE_DIR / "skincap_prepared.csv"      

FINAL_DIR         = BASE_DIR / "dermaCAP_v1"            
FINAL_IMG_DIR     = FINAL_DIR / "dermaCAP_img"
FINAL_CSV         = FINAL_DIR / "dermaCAP_v1.csv"
FINAL_ZIP_BASENAME= BASE_DIR / "dermaCAP_v1"           

IMG_EXT           = ".png"
SRC_HAM           = "ham10000"
SRC_PAD           = "pad_ufes"
SRC_SKIN          = "skincap"

In [None]:
aimi = pd.read_csv(AIMI_CSV)
ham  = pd.read_csv(HAM_CSV)
pad  = pd.read_csv(PAD_UFES_CSV)
skin = pd.read_csv(SKINCAP_CSV)

ham["source"]  = SRC_HAM
pad["source"]  = SRC_PAD
skin["source"] = SRC_SKIN
skin["caption"] = skin["caption_gpt"]

aimi = aimi[["img_path", "caption", "source"]]
ham  = ham[["img_path", "caption", "source"]]
pad  = pad[["img_path", "caption", "source"]]
skin = skin[["img_path", "caption", "source"]]

df = pd.concat([aimi, ham, pad, skin], ignore_index=True)

In [None]:
if FINAL_DIR.exists():
    shutil.rmtree(FINAL_DIR)
FINAL_IMG_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [None]:
MAX_WORKERS = min(32, os.cpu_count() or 8)

In [None]:
def _process_one(idx, src_path_str, src_tag, img_ext, out_dir):
    src_path = Path(src_path_str)

    if not src_path.exists():
        return idx, None, None, f"файл не найден"

    h = hashlib.sha1(str(src_path).encode("utf-8")).hexdigest()[:12]
    new_name = f"derma_{src_tag}_{h}{img_ext}"
    dst_path = Path(out_dir) / new_name

    try:
        if src_path.suffix.lower() == ".png":
            shutil.copy2(src_path, dst_path)
        else:
            with Image.open(src_path) as im:
                im.load()
                if im.mode in ("RGBA", "LA", "P"):
                    im = im.convert("RGBA")
                else:
                    im = im.convert("RGB")
                im.save(dst_path, format="PNG", compress_level=1, optimize=False)
        return idx, dst_path.as_posix(), new_name, None

    except UnidentifiedImageError:
        return idx, None, None, "битое/не читается (UnidentifiedImageError)"
    except Exception as e:
        return idx, None, None, f"ошибка: {e.__class__.__name__}"

In [None]:
tasks = list(df[["img_path", "source"]].itertuples(index=True, name=None))

In [None]:
kept_idx, new_paths, new_names = [], {}, {}

In [None]:
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futures = [
        ex.submit(_process_one, idx, src_path, src_tag, IMG_EXT, FINAL_IMG_DIR)
        for (idx, src_path, src_tag) in tasks
    ]
    for fut in tqdm(as_completed(futures), total=len(futures), desc="Copy/convert"):
        idx, new_path, new_name, err = fut.result()
        if err:
            print(f"Пропущен {Path(df.at[idx, 'img_path']).name} (Причина: {err})")
            continue
        kept_idx.append(idx)
        new_paths[idx] = new_path
        new_names[idx] = new_name

In [None]:
kept_idx.sort()
df = df.loc[kept_idx].copy()
df["img_path"] = [new_paths[i] for i in kept_idx]
df["img_name"] = [new_names[i] for i in kept_idx]

In [None]:
FINAL_DIR.mkdir(parents=True, exist_ok=True)
df[["img_path", "caption", "source", "img_name"]].to_csv(FINAL_CSV, index=False, encoding="utf-8")
print(f"Saved CSV: {FINAL_CSV} | rows: {len(df)}")

In [None]:
shutil.make_archive(str(FINAL_ZIP_BASENAME), "zip", root_dir=FINAL_DIR)
print(f"ZIP created: {FINAL_ZIP_BASENAME}.zip")