In [None]:
#mount Drive
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [None]:
#unzip UMID.zip and Urinary_Sediment.zip into data/raw/...
%cd /content/drive/MyDrive/urine_uti/ml-services/member1/data/raw
!unzip -q UMID.zip -d UMID
!unzip -q Urinary_Sediment.zip -d Urinary_Sediment

In [None]:
#1. Make a tiny “label normalizer” - This function handles case, underscores, plurals

In [None]:
import re

# canonical classes for this specialist
CANONICAL = {"WBC": 0}   # single-class detector; id 0

# all names that should map to WBC
WBC_ALIASES = {
    "wbc", "wbcs", "white blood cell", "white blood cells",
    "leukocyte", "leukocytes", "pus cell", "pus cells", "pus_cell", "pus_cells"
}

def normalize_label(name: str):
    """Return canonical class name or None if we should ignore."""
    if name is None:
        return None
    # tidy up
    s = name.strip().lower()
    s = s.replace("-", " ").replace("_", " ")
    s = re.sub(r"\s+", " ", s)  # collapse spaces

    if s in WBC_ALIASES:
        return "WBC"
    # ignore everything else for this specialist (RBC, epithelial, crystals, etc.)
    return None


In [None]:
#2.UMID → YOLO (WBC only) with normalization - filter to WBC-ish rows and write YOLO txt files.

In [None]:
# === UMID (CSV boxes) → YOLO (WBC-only) ===
from pathlib import Path
import pandas as pd
from PIL import Image
import re, shutil

ROOT = Path("/content/drive/MyDrive/urine_uti/ml-services/kasundi")
RAW  = ROOT / "data/raw/UMID"
OUT  = ROOT / "data/interim/wbc_umid_yolo"

# ----- 1) Canonical class + normalizer -----
CANONICAL = {"WBC": 0}
WBC_ALIASES = {
    "wbc","wbcs","white blood cell","white blood cells",
    "leukocyte","leukocytes","leuco","leukocytes",
    "pus","pus cell","pus cells","pus_cell","pus_cells"
}
def normalize_label(name: str):
    if not name: return None
    s = name.strip().lower()
    s = s.replace("-", " ").replace("_", " ")
    s = re.sub(r"\s+"," ", s)
    return "WBC" if s in WBC_ALIASES else None

# ----- 2) Make output dirs -----
for split in ["train","val","test"]:
    (OUT / f"images/{split}").mkdir(parents=True, exist_ok=True)
    (OUT / f"labels/{split}").mkdir(parents=True, exist_ok=True)

# ----- 3) Helper to resolve image path -----
def resolve_img(src_str: str):
    p = RAW / src_str
    if p.exists(): return p
    # try under images/
    p = RAW / "images" / Path(src_str).name
    if p.exists(): return p
    # try jpg/png by stem
    stem = Path(src_str).stem
    for ext in (".jpg",".png",".jpeg",".bmp",".tif",".tiff"):
        q = RAW / "images" / (stem+ext)
        if q.exists(): return q
        q2 = RAW / (stem+ext)
        if q2.exists(): return q2
    return None

# ----- 4) Convert one split -----
def convert_umid_split(csv_basename: str, split: str):
    csv_path = RAW / f"{csv_basename}.csv"
    if not csv_path.exists():
        print(f"[WARN] Missing {csv_path}")
        return 0,0

    df = pd.read_csv(csv_path)
    lower = {c.lower(): c for c in df.columns}

    # try to find columns in a tolerant way
    fn_col  = lower.get("filename") or lower.get("image") or lower.get("path") or list(df.columns)[0]
    x1_col  = lower.get("xmin") or lower.get("x1")
    y1_col  = lower.get("ymin") or lower.get("y1")
    x2_col  = lower.get("xmax") or lower.get("x2")
    y2_col  = lower.get("ymax") or lower.get("y2")
    lbl_col = lower.get("class") or lower.get("label") or lower.get("name") or lower.get("category")

    if not all([fn_col, x1_col, y1_col, x2_col, y2_col, lbl_col]):
        raise ValueError(f"Could not resolve columns in {csv_path}. Columns = {list(df.columns)}")

    wrote_imgs = wrote_boxes = 0

    # group rows by image file
    for img_key, rows in df.groupby(fn_col):
        src = resolve_img(str(img_key))
        if not src:
            # print(f"[MISS] {img_key}")
            continue

        try:
            W, H = Image.open(src).size
        except Exception as e:
            # print(f"[OPEN FAIL] {src}: {e}")
            continue

        lines = []
        for _, r in rows.iterrows():
            cname = normalize_label(str(r[lbl_col]))
            if cname != "WBC":
                continue

            try:
                x1 = float(r[x1_col]); y1 = float(r[y1_col])
                x2 = float(r[x2_col]); y2 = float(r[y2_col])
            except Exception:
                continue

            # clamp and skip degenerate boxes
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(W, x2), min(H, y2)
            if x2 <= x1 or y2 <= y1:
                continue

            cx, cy = (x1 + x2)/2.0, (y1 + y2)/2.0
            w,  h  = (x2 - x1), (y2 - y1)
            lines.append(f"{CANONICAL['WBC']} {cx/W:.6f} {cy/H:.6f} {w/W:.6f} {h/H:.6f}")

        if lines:
            dst_img = OUT / f"images/{split}" / src.name
            dst_txt = OUT / f"labels/{split}" / (src.stem + ".txt")
            if not dst_img.exists():
                shutil.copyfile(src, dst_img)
            with open(dst_txt, "w") as f:
                f.write("\n".join(lines))
            wrote_imgs += 1
            wrote_boxes += len(lines)

    print(f"[{split}] wrote {wrote_imgs} images, {wrote_boxes} boxes")
    return wrote_imgs, wrote_boxes

# ----- 5) Run all splits + quick summary -----
convert_umid_split("train", "train")
convert_umid_split("val",   "val")
convert_umid_split("test",  "test")

# summary
import glob
for split in ["train","val","test"]:
    ni = len(glob.glob(str(OUT/f"images/{split}/*.*")))
    nl = len(glob.glob(str(OUT/f"labels/{split}/*.txt")))
    print(f"SUMMARY {split}: {ni} images, {nl} label files → {OUT}/images/{split}")

[train] wrote 164 images, 703 boxes
[val] wrote 24 images, 115 boxes
[test] wrote 33 images, 168 boxes
SUMMARY train: 164 images, 164 label files → /content/drive/MyDrive/urine_uti/ml-services/kasundi/data/interim/wbc_umid_yolo/images/train
SUMMARY val: 24 images, 24 label files → /content/drive/MyDrive/urine_uti/ml-services/kasundi/data/interim/wbc_umid_yolo/images/val
SUMMARY test: 33 images, 33 label files → /content/drive/MyDrive/urine_uti/ml-services/kasundi/data/interim/wbc_umid_yolo/images/test


In [None]:
#Urinary-Sediment (Pascal VOC XML) → YOLO (WBC only) with normalization
#We parse each XML, keep only objects where <name> maps to "WBC" via our normalizer

In [None]:
# === Urinary-Sediment (VOC XML) → YOLO (WBC-only) ===
from pathlib import Path
import xml.etree.ElementTree as ET
from PIL import Image
import shutil, re, glob

# Set your project root
ROOT = Path("/content/drive/MyDrive/urine_uti/ml-services/kasundi")
US_ROOT = ROOT / "data/raw/Urinary_Sediment"   # expects JPEGImages/, Annotations/, ImageSets/Main/
US_OUT  = ROOT / "data/interim/wbc_us_yolo"

# -------- WBC normalizer (includes 'leuko') ----------
def is_wbc_label(name: str) -> bool:
    if not name:
        return False
    s = name.lower()
    s = s.replace("_"," ").replace("-"," ")
    s = re.sub(r"[^a-z ]","", s)
    s = re.sub(r"\s+"," ", s).strip()
    # include 'leuko' short form explicitly
    keywords = [
        "wbc", "white blood cell", "white blood cells",
        "leuko", "leukocyte", "leukocytes", "leucocyte", "leucocytes",
        "pus cell", "pus cells", "pus"
    ]
    return any(k in s for k in keywords)

CLASS_ID_WBC = 0

# make output dirs
for split in ["train","val","test"]:
    (US_OUT / f"images/{split}").mkdir(parents=True, exist_ok=True)
    (US_OUT / f"labels/{split}").mkdir(parents=True, exist_ok=True)

def resolve_img_xml(img_id: str):
    stem = Path(img_id).stem  # handle ids with or without .jpg
    xml_path = US_ROOT / "Annotations" / f"{stem}.xml"
    if not xml_path.exists():
        hits = list((US_ROOT/"Annotations").glob(f"{stem}*.xml"))
        xml_path = hits[0] if hits else None

    # prefer .jpg then .png; else glob anything with same stem
    img_path = US_ROOT / "JPEGImages" / f"{stem}.jpg"
    if not img_path.exists():
        alt = US_ROOT / "JPEGImages" / f"{stem}.png"
        img_path = alt if alt.exists() else None
    if img_path is None or not img_path.exists():
        hits = list((US_ROOT/"JPEGImages").glob(f"{stem}.*"))
        img_path = hits[0] if hits else None

    return img_path, xml_path

def convert_us_split(split: str):
    ids_file = US_ROOT / "ImageSets/Main" / f"{split}.txt"
    if not ids_file.exists():
        print(f"[WARN] {ids_file} not found. Skipping {split}.")
        return

    ids = [l.strip() for l in open(ids_file) if l.strip()]
    wrote_imgs = wrote_boxes = 0

    for img_id in ids:
        img_path, xml_path = resolve_img_xml(img_id)
        if not (img_path and xml_path and img_path.exists() and xml_path.exists()):
            # uncomment to debug missing pairs:
            # print(f"[MISS] {img_id} img:{img_path} xml:{xml_path}")
            continue

        try:
            W, H = Image.open(img_path).size
            root = ET.parse(xml_path).getroot()
        except Exception as e:
            # print("Parse/open fail:", img_path, xml_path, e)
            continue

        lines = []
        for obj in root.findall("object"):
            name = (obj.findtext("name") or "").strip()
            if not is_wbc_label(name):   # <-- catches 'leuko'
                continue
            b = obj.find("bndbox")
            if b is None:
                continue
            x1 = float(b.findtext("xmin")); y1 = float(b.findtext("ymin"))
            x2 = float(b.findtext("xmax")); y2 = float(b.findtext("ymax"))
            # clamp
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(W, x2), min(H, y2)
            if x2 <= x1 or y2 <= y1:
                continue
            cx, cy = (x1+x2)/2.0, (y1+y2)/2.0
            w,  h  = (x2-x1), (y2-y1)
            lines.append(f"{CLASS_ID_WBC} {cx/W:.6f} {cy/H:.6f} {w/W:.6f} {h/H:.6f}")

        if lines:
            dst_img = US_OUT / f"images/{split}" / img_path.name
            dst_txt = US_OUT / f"labels/{split}" / (img_path.stem + ".txt")
            if not dst_img.exists():
                shutil.copyfile(img_path, dst_img)
            with open(dst_txt, "w") as f:
                f.write("\n".join(lines))
            wrote_imgs += 1
            wrote_boxes += len(lines)

    print(f"[{split}] wrote {wrote_imgs} images, {wrote_boxes} boxes")

# run conversion
convert_us_split("train")
convert_us_split("val")
convert_us_split("test")

# quick summary
import glob
for split in ["train","val","test"]:
    ni = len(glob.glob(str(US_OUT / f"images/{split}/*.*")))
    nl = len(glob.glob(str(US_OUT / f"labels/{split}/*.txt")))
    print(f"SUMMARY {split}: {ni} images, {nl} label files")

[train] wrote 1052 images, 5112 boxes
[val] wrote 232 images, 796 boxes
[test] wrote 54 images, 261 boxes
SUMMARY train: 1052 images, 1052 label files
SUMMARY val: 232 images, 232 label files
SUMMARY test: 54 images, 54 label files


In [None]:
#Merge the two interim sets into one training set.here, concatenate by split.

In [None]:
import glob

FINAL = ROOT / "data/processed/wbc_detect"
for d in ["images/train","images/val","images/test","labels/train","labels/val","labels/test"]:
    (FINAL / d).mkdir(parents=True, exist_ok=True)

def copy_all(src_dir, dst_dir):
    for p in glob.glob(str(src_dir)):
        shutil.copy(p, dst_dir)

# UMID
copy_all(OUT / "images/train/*", FINAL / "images/train")
copy_all(OUT / "labels/train/*", FINAL / "labels/train")
copy_all(OUT / "images/val/*",   FINAL / "images/val")
copy_all(OUT / "labels/val/*",   FINAL / "labels/val")
copy_all(OUT / "images/test/*",  FINAL / "images/test")
copy_all(OUT / "labels/test/*",  FINAL / "labels/test")

# Urinary-Sediment
copy_all(US_OUT / "images/train/*", FINAL / "images/train")
copy_all(US_OUT / "labels/train/*", FINAL / "labels/train")
copy_all(US_OUT / "images/val/*",   FINAL / "images/val")
copy_all(US_OUT / "labels/val/*",   FINAL / "labels/val")
copy_all(US_OUT / "images/test/*",  FINAL / "images/test")
copy_all(US_OUT / "labels/test/*",  FINAL / "labels/test")