# Before starting preprocessing, please complete the following steps:

# Category 2 / XA2017010006426
#    - This image requires manual re-grabbing.

# Category 3 / XA2017080014862
#    - This patient has a fixture; it is recommended to discard the images.

# Category 3 / XA2018070021907
#    - This image requires manual re-grabbing.

# Category 1/ XA2017030007058
#    - This patient has a fixture; it is recommended to discard the images.

# clean incomplete patient
# e.g. image number != 4

In [40]:
import os
import shutil

def clean_incomplete_patients(root_dir, expected_num=4):
    removed = []
    kept = 0
    total = 0

    for category in sorted(os.listdir(root_dir)):
        cat_path = os.path.join(root_dir, category)
        if not os.path.isdir(cat_path):
            continue

        for patient in sorted(os.listdir(cat_path)):
            p_path = os.path.join(cat_path, patient)
            if not os.path.isdir(p_path):
                continue

            imgs = [f for f in os.listdir(p_path)
                    if f.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.bmp'))]
            total += 1

            if len(imgs) < expected_num:
                removed.append((category, patient, len(imgs)))
                shutil.rmtree(p_path) # remove incomplete patient folder
            if len(imgs) > expected_num:
                print(f"⚠️  Warning: Patient {patient} in category {category} has {len(imgs)} images (expected {expected_num}).")
                print("    Please check manually.")
                kept += 1
            else:
                kept += 1

    print("🩻 Patient integrity cleaning complete.")
    print(f"✅ Kept folders   : {kept}")
    print(f"🗑️  Removed folders: {len(removed)} / {total} total")

    if removed:
        print("\nExamples of removed folders:")
        for i, (cat, pid, n) in enumerate(removed[:10]):
            print(f" {i+1}. [{cat}] {pid} → {n} images (removed)")
    else:
        print("🎉 All folders are complete (4 images each).")

# 使用範例
clean_incomplete_patients("check")

🩻 Patient integrity cleaning complete.
✅ Kept folders   : 3063
🗑️  Removed folders: 0 / 3063 total
🎉 All folders are complete (4 images each).


# rename as L-CC,R-CC,L-MLO,R-MLO

In [41]:
import os
import cv2
import numpy as np
from tqdm import tqdm
import shutil

def judge_left_right(filepath, threshold=30):
    """
    使用像素比例判斷左右（L/R）
    Args:
        filepath: 影像路徑
        threshold: 像素亮度閾值，預設30
    Returns:
        side: 'L' 或 'R'
    """
    img = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
    if img is None:
        return None

    h, w = img.shape
    left_half = img[:, :w // 2]
    right_half = img[:, w // 2:]

    left_ratio = np.sum(left_half > threshold)
    right_ratio = np.sum(right_half > threshold)
    if left_ratio == 0 and right_ratio == 0:
        return None
    if left_ratio > right_ratio:
        return "L"
    elif right_ratio > left_ratio:
        return "R"
    # 若相等則以較大亮度區域判斷
    left_bright = np.sum(left_half[left_half > threshold])
    right_bright = np.sum(right_half[right_half > threshold])
    if left_bright >= right_bright:
        side = "L"
    else:
        side = "R"
    return side


def extract_order_from_filename(filename):
    """
    從檔名中提取最後的數字，作為排序依據。
    e.g. "image-3.png" → 3
    """
    name, _ = os.path.splitext(filename)
    parts = name.split('-')
    for part in reversed(parts):
        if part.isdigit():
            return int(part)
    return 0


def rename_mammogram_images_by_order(root_dir, output_root_dir):
    """
    依據左右判斷與檔名尾數順序，
    將影像命名為 L-CC / L-MLO / R-CC / R-MLO，
    並輸出到新的資料夾結構中。
    """
    os.makedirs(output_root_dir, exist_ok=True)

    for category in sorted(os.listdir(root_dir)):
        category_path = os.path.join(root_dir, category)
        if not os.path.isdir(category_path):
            continue

        # 建立輸出類別資料夾
        output_category_path = os.path.join(output_root_dir, category)
        os.makedirs(output_category_path, exist_ok=True)

        for patient_id in sorted(os.listdir(category_path)):
            patient_path = os.path.join(category_path, patient_id)
            if not os.path.isdir(patient_path):
                continue

            output_patient_path = os.path.join(output_category_path, patient_id)
            os.makedirs(output_patient_path, exist_ok=True)

            # 找出該病人資料夾下所有影像
            image_files = [
                f for f in os.listdir(patient_path)
                if f.lower().endswith((".png", ".jpg", ".jpeg"))
            ]
            if not image_files:
                continue

            # 先依左右分類
            left_images, right_images = [], []
            for file in image_files:
                full_path = os.path.join(patient_path, file)
                side = judge_left_right(full_path)
                if side == "L":
                    left_images.append(file)
                elif side == "R":
                    right_images.append(file)
                else:
                    print(f"⚠️ 無法判斷左右：{file}")

            # 重新命名後複製
            def rename_and_copy(img_list, side):
                if not img_list:
                    return
                sorted_imgs = sorted(img_list, key=extract_order_from_filename)
                for i, file in enumerate(sorted_imgs):
                    label = "CC" if i == 0 else "MLO"
                    _, ext = os.path.splitext(file)
                    new_name = f"{side}-{label}{ext}"
                    old_path = os.path.join(patient_path, file)
                    new_path = os.path.join(output_patient_path, new_name)
                    shutil.copy2(old_path, new_path)

            rename_and_copy(left_images, "L")
            rename_and_copy(right_images, "R")

        print(f"{category} 影像已輸出至新資料夾。")

    print("\n🎯 所有影像已依規則重新命名並輸出完成！")


# 🚀 使用範例
if __name__ == "__main__":
    root_dir = "check"
    output_root_dir = "datasets"
    rename_mammogram_images_by_order(root_dir, output_root_dir)


Category 0 影像已輸出至新資料夾。
Category 1 影像已輸出至新資料夾。
Category 2 影像已輸出至新資料夾。
Category 3 影像已輸出至新資料夾。
Category 4 影像已輸出至新資料夾。
Category 5 影像已輸出至新資料夾。
Category 6 影像已輸出至新資料夾。

🎯 所有影像已依規則重新命名並輸出完成！


In [42]:
# print img size
# pip install opencv-python
import cv2

img_path = "datasets/Category 1/XA2015110006335/R-MLO.jpg"
img = cv2.imread(img_path)
print(img.shape)  # (height, width, channels)

(2294, 1914, 3)


# spilt datasets and make annotation for trainning

In [58]:
import os
import random
import pandas as pd
from math import ceil

VALID_VIEWS = ['L-CC', 'R-CC', 'L-MLO', 'R-MLO']

def load_patient_descriptions(base_datasets_path):
    descriptions = {}
    for category_num in range(7):
        csv_path = os.path.join(base_datasets_path, f"Catagory {category_num}.csv")
        if not os.path.exists(csv_path):
            print(f"⚠️ CSV not found: {csv_path}")
            continue
        try:
            df = pd.read_csv(csv_path)
            for _, row in df.iterrows():
                patient_id = str(row.iloc[0])
                description = str(row.iloc[-1]) if len(row) > 1 else "No description"
                descriptions[patient_id] = description
            print(f"✅ Loaded {len(df)} descriptions from {csv_path}")
        except Exception as e:
            print(f"⚠️ Failed to load {csv_path}: {e}")
    return descriptions


def stratified_split(df, train_ratio=0.7, val_ratio=0.1, test_ratio=0.2, seed=42):
    """
    改進版分層抽樣：使用四捨五入避免截斷誤差
    """
    random.seed(seed)
    train_list, val_list, test_list = [], [], []

    for label in sorted(df['label'].unique()):
        df_label = df[df['label'] == label].sample(frac=1, random_state=seed)
        n_total = len(df_label)
        
        # 使用四捨五入計算各集合大小
        n_train = max(1, round(n_total * train_ratio))
        n_val = max(1, round(n_total * val_ratio))
        
        # 確保總數正確：test 集合吸收所有誤差
        n_test = max(1, n_total - n_train - n_val)
        
        # 處理邊界情況：總和超過 n_total
        if n_train + n_val + n_test > n_total:
            excess = n_train + n_val + n_test - n_total
            # 優先從最大的集合減少
            if n_train >= n_val and n_train > excess:
                n_train -= excess
            elif n_val > excess:
                n_val -= excess
            else:
                n_test -= excess
        
        # 處理邊界情況：總和小於 n_total（理論上不應發生）
        elif n_train + n_val + n_test < n_total:
            n_test += (n_total - (n_train + n_val + n_test))
        
        # 確保每個集合至少有 1 筆資料（如果類別總數 >= 3）
        if n_total >= 3:
            n_train = max(1, n_train)
            n_val = max(1, n_val)
            n_test = max(1, n_test)
        
        train_list.append(df_label.iloc[:n_train])
        val_list.append(df_label.iloc[n_train:n_train+n_val])
        test_list.append(df_label.iloc[n_train+n_val:n_train+n_val+n_test])

    train_df = pd.concat(train_list).sample(frac=1, random_state=seed).reset_index(drop=True)
    val_df = pd.concat(val_list).sample(frac=1, random_state=seed).reset_index(drop=True)
    test_df = pd.concat(test_list).sample(frac=1, random_state=seed).reset_index(drop=True)

    return train_df, val_df, test_df


def generate_multiview_csvs(base_dir, base_datasets_path, output_dir,
                            train_ratio=0.7, val_ratio=0.1, test_ratio=0.2, seed=42):
    random.seed(seed)
    patient_descriptions = load_patient_descriptions(base_datasets_path)
    patients_data = []

    print(f"\n📂 Scanning dataset folder: {base_dir}")
    
    for category in sorted(os.listdir(base_dir)):
        category_path = os.path.join(base_dir, category)
        if not os.path.isdir(category_path):
            continue
        try:
            label = int(category.replace("Category ", ""))
            if label == 6:
                print(f"⏭️ Skip Category {label}")
                continue
        except ValueError:
            print(f"⚠️ Skip unrecognized folder name '{category}'")
            continue

        for patient_folder in os.listdir(category_path):
            patient_path = os.path.join(category_path, patient_folder)
            if not os.path.isdir(patient_path):
                continue

            patient_entry = {
                'patient_id': f"{category}/{patient_folder}",
                'label': label,
                'description': patient_descriptions.get(patient_folder, "No description available")
            }

            for img_file in os.listdir(patient_path):
                if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    view_name = os.path.splitext(img_file)[0]
                    if view_name in VALID_VIEWS:
                        patient_entry[view_name] = os.path.join(category, patient_folder, img_file)

            patients_data.append(patient_entry)

    df = pd.DataFrame(patients_data)
    df = df.reindex(columns=VALID_VIEWS + ['label', 'description', 'patient_id'])

    missing_mask = df[VALID_VIEWS].isna().any(axis=1)
    if missing_mask.any():
        print("\n⚠️ Patients missing some views:")
        for _, row in df[missing_mask].iterrows():
            missing_views = [v for v in VALID_VIEWS if pd.isna(row[v])]
            print(f"  - {row['patient_id']} missing {', '.join(missing_views)}")

    df = df.dropna(subset=VALID_VIEWS)

    # 分層抽樣（使用改進版）
    train_df, val_df, test_df = stratified_split(df, train_ratio, val_ratio, test_ratio, seed)

    os.makedirs(output_dir, exist_ok=True)
    train_df.to_csv(os.path.join(output_dir, "train_labels.csv"), index=False)
    val_df.to_csv(os.path.join(output_dir, "val_labels.csv"), index=False)
    test_df.to_csv(os.path.join(output_dir, "test_labels.csv"), index=False)

    print(f"\n✅ Output complete: {len(df)} patients")
    print(f"  Train: {len(train_df)} | Val: {len(val_df)} | Test: {len(test_df)}")
    print(f"📁 Output folder: {output_dir}")

    # 每個類別統計（包含實際比例）
    print("\n📊 Dataset counts per class (stratified with rounding):")
    for label in sorted(df['label'].unique()):
        n_total_c = (df['label'] == label).sum()
        n_train_c = (train_df['label'] == label).sum()
        n_val_c   = (val_df['label'] == label).sum()
        n_test_c  = (test_df['label'] == label).sum()
        
        actual_train_ratio = n_train_c / n_total_c if n_total_c > 0 else 0
        actual_val_ratio = n_val_c / n_total_c if n_total_c > 0 else 0
        actual_test_ratio = n_test_c / n_total_c if n_total_c > 0 else 0
        
        print(f"  Category {label} (Total: {n_total_c}):")
        print(f"    Train={n_train_c} ({actual_train_ratio:.1%}), "
              f"Val={n_val_c} ({actual_val_ratio:.1%}), "
              f"Test={n_test_c} ({actual_test_ratio:.1%})")


if __name__ == "__main__":
    base_dir = "datasets"
    base_datasets_path = base_dir
    output_dir = base_dir

    generate_multiview_csvs(
        base_dir=base_dir,
        base_datasets_path=base_datasets_path,
        output_dir=output_dir,
        train_ratio=0.7,
        val_ratio=0.1,
        test_ratio=0.2,
        seed=42
    )

✅ Loaded 1197 descriptions from datasets/Catagory 0.csv
✅ Loaded 499 descriptions from datasets/Catagory 1.csv
✅ Loaded 499 descriptions from datasets/Catagory 2.csv
✅ Loaded 499 descriptions from datasets/Catagory 3.csv
✅ Loaded 262 descriptions from datasets/Catagory 4.csv
✅ Loaded 38 descriptions from datasets/Catagory 5.csv
✅ Loaded 69 descriptions from datasets/Catagory 6.csv

📂 Scanning dataset folder: datasets
⏭️ Skip Category 6

✅ Output complete: 2992 patients
  Train: 2094 | Val: 300 | Test: 598
📁 Output folder: datasets

📊 Dataset counts per class (stratified with rounding):
  Category 0 (Total: 1195):
    Train=836 (70.0%), Val=120 (10.0%), Test=239 (20.0%)
  Category 1 (Total: 499):
    Train=349 (69.9%), Val=50 (10.0%), Test=100 (20.0%)
  Category 2 (Total: 500):
    Train=350 (70.0%), Val=50 (10.0%), Test=100 (20.0%)
  Category 3 (Total: 498):
    Train=349 (70.1%), Val=50 (10.0%), Test=99 (19.9%)
  Category 4 (Total: 261):
    Train=183 (70.1%), Val=26 (10.0%), Test=52 

# clahe 
# crop

In [48]:

import cv2
import numpy as np
import matplotlib.pyplot as plt


def preprocess_and_autocrop(image_path, threshold_offset=-50, padding_percent=0.08, min_area_ratio=0.01):
    """
    預處理並自動裁切乳房X光影像
    
    參數:
        image_path: 輸入影像路徑
        threshold_offset: 閾值偏移量（負值=更寬鬆，預設-20）
        padding_percent: 裁切邊距百分比（預設0.05=5%）
        min_area_ratio: 最小輪廓面積比例（預設0.02=2%）
    
    返回:
        original: 原始灰階影像
        enhanced: CLAHE 增強後的影像
        mask: 偵測到的乳房遮罩
        cropped: 自動裁切後的影像
    """
    
    # 讀取影像
    original = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if original is None:
        raise ValueError(f"無法讀取影像: {image_path}")
    
    # 1. CLAHE 對比度增強
    clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))
    enhanced = clahe.apply(original)
    
    # 2. 影像預處理
    blurred = cv2.GaussianBlur(enhanced, (5, 5), 0)
    
    # 3. 二值化 - 使用更低的閾值來包含暗區（如乳頭）
    # 先用 Otsu 得到基準閾值
    otsu_thresh, _ = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    # 大幅降低閾值以包含乳頭等暗區
    lower_thresh = max(5, otsu_thresh + threshold_offset)
    _, binary = cv2.threshold(blurred, lower_thresh, 255, cv2.THRESH_BINARY)
    
    # 4. 形態學處理 - 更激進的閉運算
    kernel_large = np.ones((11, 11), np.uint8)
    kernel_small = np.ones((5, 5), np.uint8)
    
    # 更多次閉運算，填補更多區域
    binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_large, iterations=5)
    # 減少開運算次數，保留更多區域
    binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel_small, iterations=1)
    
    # 5. 尋找輪廓
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # 6. 過濾並選擇最大輪廓 - 降低最小面積要求
    min_area = original.shape[0] * original.shape[1] * min_area_ratio
    valid_contours = [cnt for cnt in contours if cv2.contourArea(cnt) > min_area]
    
    if len(valid_contours) == 0:
        raise ValueError("未找到有效輪廓")
    
    largest_contour = max(valid_contours, key=cv2.contourArea)
    
    # 7. 計算凸包，包含更完整的區域
    hull = cv2.convexHull(largest_contour)
    
    # 8. 創建遮罩 - 使用凸包確保包含完整乳房區域
    mask = np.zeros_like(original)
    cv2.drawContours(mask, [hull], -1, 255, thickness=cv2.FILLED)
    
    # 9. 額外處理：填補內部孔洞（如乳頭區域）
    # 尋找所有內部輪廓
    contours_all, hierarchy = cv2.findContours(mask.copy(), cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)
    # 填補所有內部孔洞
    for i in range(len(contours_all)):
        if hierarchy[0][i][3] != -1:  # 如果有父輪廓（即內部孔洞）
            cv2.drawContours(mask, contours_all, i, 255, thickness=cv2.FILLED)
    
    # 10. 擴張遮罩邊緣（更溫和的擴張以保留細節）
    kernel_dilate = np.ones((15, 15), np.uint8)
    mask = cv2.dilate(mask, kernel_dilate, iterations=3)
    
    # 10. 羽化邊緣
    mask_blurred = cv2.GaussianBlur(mask, (21, 21), 0)
    
    # 11. 獲取邊界框進行裁切
    x, y, w, h = cv2.boundingRect(hull)
    
    # 添加更大的邊距（基於影像尺寸的百分比）
    padding_x = int(original.shape[1] * padding_percent)
    padding_y = int(original.shape[0] * padding_percent)
    
    x = max(0, x - padding_x)
    y = max(0, y - padding_y)
    w = min(original.shape[1] - x, w + 2 * padding_x)
    h = min(original.shape[0] - y, h + 2 * padding_y)
    
    # 12. 裁切影像和遮罩
    cropped = enhanced[y:y+h, x:x+w]

    return original, enhanced, mask, cropped


def display_results(original, enhanced, mask, cropped):
    """
    顯示處理結果
    """
    plt.figure(figsize=(15, 6))
    
    plt.subplot(1, 4, 1)
    plt.title("Original Image")
    plt.imshow(original, cmap='gray')
    plt.axis('off')
    
    plt.subplot(1, 4, 2)
    plt.title("After CLAHE")
    plt.imshow(enhanced, cmap='gray')
    plt.axis('off')
    
    plt.subplot(1, 4, 3)
    plt.title("Detected Mask")
    plt.imshow(mask, cmap='gray')
    plt.axis('off')
    
    plt.subplot(1, 4, 4)
    plt.title("Auto-cropped Image")
    plt.imshow(cropped, cmap='gray')
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()


# === 使用範例 ===
if __name__ == "__main__":
    # === 測試 ===
    image_path = "datasets/Category 0/XA2016010003906/R-CC.jpg"
    
    try:    
        original, enhanced, mask, cropped = preprocess_and_autocrop(
            image_path,
            threshold_offset=-50,    # 更寬鬆的閾值（預設-20）
            padding_percent=0.08,    # 更大的邊距（預設0.05=5%）
            min_area_ratio=0.01      # 更低的最小面積（預設0.02=2%）
        )
        
        #display_results(original, enhanced, mask, cropped)
        
        # 輸出統計資訊
        print(f"原始影像尺寸: {original.shape}")
        print(f"裁切後影像尺寸: {cropped.shape}")
        print(f"有效像素數: {np.sum(mask > 0)}")
        print(f"遮罩覆蓋率: {np.sum(mask > 0) / (mask.shape[0] * mask.shape[1]) * 100:.2f}%")
        
    except Exception as e:
        print(f"錯誤: {e}")

原始影像尺寸: (2294, 1914)
裁切後影像尺寸: (2142, 793)
有效像素數: 835177
遮罩覆蓋率: 19.02%


In [50]:
import os
import cv2
import numpy as np

def save_cropped_images(cropped, image_path, output_root):
    """
    保存裁切後的影像，並維持與原始資料夾相同的結構。
    """
    relative_path = os.path.relpath(image_path, start="datasets")
    output_dir = os.path.join(output_root, os.path.dirname(relative_path))
    os.makedirs(output_dir, exist_ok=True)
    base_name = os.path.splitext(os.path.basename(image_path))[0]
    cv2.imwrite(os.path.join(output_dir, f"{base_name}.jpg"), cropped)

def process_dataset(input_root, output_root):
    """
    遍歷資料夾中的所有影像，進行預處理並保存結果。
    """
    for root, _, files in os.walk(input_root):
        for file in files:
            if file.lower().endswith(('.jpg', '.png', '.jpeg', '.bmp', '.tiff')):
                image_path = os.path.join(root, file)
                try:
                    original, enhanced, mask, cropped = preprocess_and_autocrop(image_path)
                    save_cropped_images(cropped, image_path, output_root)
                    #print(f"處理完成: {image_path}")
                except Exception as e:
                    print(f"處理失敗: {image_path}, 錯誤: {e}")

# === 主程式 ===
if __name__ == "__main__":
    input_root = "datasets"  # 原始資料夾
    output_root = "cropped_datasets"  # 預處理後的資料夾
    process_dataset(input_root, output_root)