<a href="https://colab.research.google.com/github/ykitaguchi77/PCAF_AI_Project/blob/main/Split_image_into_patches_1120px.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Divide fundus photograph into patches**

2000x2000pxの画像を、1120px四方のバッチに分解する

In [None]:
# prompt: mount gdrive

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


###**2000x2000→1120x1120 patches**

stride 880px, overlap 240px (21.4%)

In [None]:
"""
We divided the 2000×2000 pixel images into 1120×1120 pixel patches with a stride of 880 pixels (21.4% overlap)...
"""

import os
import glob
from PIL import Image
import numpy as np
from pathlib import Path
import time
from datetime import datetime
from tqdm.notebook import tqdm
import math

def create_processing_log(output_dir):
    """ログファイルを作成"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_filename = os.path.join(output_dir, f"processing_log_{timestamp}.txt")
    return log_filename

def log_processing_info(log_file, message):
    """ログを記録"""
    with open(log_file, 'a', encoding='utf-8') as f:
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        f.write(f"[{timestamp}] {message}\n")

def calculate_patch_info(image_size, patch_size, stride):
    """パッチ情報を計算"""
    width, height = image_size
    # 完全なグリッドのパッチ数を計算
    n_patches_w = math.floor((width - patch_size) / stride) + 1
    n_patches_h = math.floor((height - patch_size) / stride) + 1

    # 開始位置のリストを生成
    x_positions = [i * stride for i in range(n_patches_w)]
    y_positions = [i * stride for i in range(n_patches_h)]

    return {
        'n_patches_w': n_patches_w,
        'n_patches_h': n_patches_h,
        'total_patches': n_patches_w * n_patches_h,
        'x_positions': x_positions,
        'y_positions': y_positions,
        'overlap_ratio': (patch_size - stride) / patch_size * 100
    }

def process_directory(input_dir, output_dir, patch_size=1120, stride=880):
    """ディレクトリ内の全PNG画像を処理"""
    start_time = time.time()

    # 出力ディレクトリの作成
    os.makedirs(output_dir, exist_ok=True)

    # ログファイルの作成
    log_file = create_processing_log(output_dir)
    log_processing_info(log_file, "=== 処理パラメータ ===")
    log_processing_info(log_file, f"パッチサイズ: {patch_size}x{patch_size} pixels")
    log_processing_info(log_file, f"ストライド: {stride} pixels")
    log_processing_info(log_file, f"オーバーラップ: {patch_size - stride} pixels")
    log_processing_info(log_file, f"オーバーラップ率: {(patch_size - stride) / patch_size * 100:.1f}%")

    # 画像パスの取得
    image_paths = glob.glob(os.path.join(input_dir, "*.png"))
    log_processing_info(log_file, f"\n処理対象画像数: {len(image_paths)}")

    processing_stats = {}

    # 画像処理のメインループ
    for image_path in tqdm(image_paths, desc="画像処理中", unit="枚"):
        original_name = Path(image_path).stem
        log_processing_info(log_file, f"\n=== 画像処理開始: {original_name} ===")

        try:
            # 画像読み込み
            img = Image.open(image_path)
            img_array = np.array(img)
            height, width = img_array.shape[:2]

            # パッチ情報の計算
            patch_info = calculate_patch_info((width, height), patch_size, stride)

            log_processing_info(log_file, f"画像サイズ: {width}x{height}")
            log_processing_info(log_file, f"予想パッチ数: {patch_info['total_patches']}")

            patch_count = 0
            with tqdm(total=patch_info['total_patches'],
                     desc=f"{original_name} パッチ生成中",
                     unit="パッチ",
                     leave=False) as pbar:

                for y in patch_info['y_positions']:
                    for x in patch_info['x_positions']:
                        # パッチを切り出し
                        patch = img_array[y:y + patch_size, x:x + patch_size]

                        # ファイル名生成と保存
                        patch_filename = f"{original_name}_x{x:04d}_y{y:04d}.png"
                        patch_img = Image.fromarray(patch)
                        patch_img.save(os.path.join(output_dir, patch_filename))

                        patch_count += 1
                        pbar.update(1)

            # 統計情報を保存
            processing_stats[original_name] = {
                'image_size': (width, height),
                'total_patches': patch_count,
                'patches_width': patch_info['n_patches_w'],
                'patches_height': patch_info['n_patches_h'],
                'overlap_pixels': patch_size - stride,
                'overlap_ratio': patch_info['overlap_ratio'],
                'status': 'success'
            }

            log_processing_info(log_file,
                f"画像処理完了: {original_name}\n"
                f"生成パッチ数: {patch_count}\n"
                f"パッチ分布: {patch_info['n_patches_w']}x{patch_info['n_patches_h']}\n"
                f"X座標範囲: 0-{max(patch_info['x_positions'])}\n"
                f"Y座標範囲: 0-{max(patch_info['y_positions'])}")

        except Exception as e:
            log_processing_info(log_file, f"エラー発生: {original_name} - {str(e)}")
            processing_stats[original_name] = {
                'status': 'error',
                'error_message': str(e)
            }

    # 処理時間の計算
    end_time = time.time()
    processing_time = end_time - start_time
    log_processing_info(log_file, f"\n=== 処理完了 ===")
    log_processing_info(log_file, f"総処理時間: {processing_time:.2f}秒")

    return processing_stats, log_file

def print_processing_summary(stats, log_file):
    """処理サマリーの表示とログ記録"""
    summary = "\n=== 処理サマリー ===\n" + "-" * 50
    total_patches = 0
    successful_images = 0
    failed_images = 0

    for image_name, info in stats.items():
        summary += f"\n\n画像: {image_name}"
        if info['status'] == 'success':
            summary += f"\n画像サイズ: {info['image_size'][0]}x{info['image_size'][1]}"
            summary += f"\nパッチ数: {info['total_patches']} ({info['patches_width']}x{info['patches_height']})"
            summary += f"\nオーバーラップ: {info['overlap_pixels']}px ({info['overlap_ratio']:.1f}%)"
            total_patches += info['total_patches']
            successful_images += 1
        else:
            summary += f"\n処理失敗: {info['error_message']}"
            failed_images += 1

    summary += f"\n\n{'-' * 50}"
    summary += f"\n総パッチ数: {total_patches}"
    summary += f"\n処理成功: {successful_images}枚"
    summary += f"\n処理失敗: {failed_images}枚"
    summary += f"\n{'-' * 50}"

    print(summary)
    log_processing_info(log_file, summary)


# メイン処理の実行
input_directory = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R"
output_directory = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img"

# 処理パラメータ
patch_size = 1120   # パッチサイズ
stride = 880        # ストライド（240pxオーバーラップ）

print(f"処理を開始します...")
print(f"入力ディレクトリ: {input_directory}")
print(f"出力ディレクトリ: {output_directory}")

# 処理実行
stats, log_file = process_directory(
    input_directory,
    output_directory,
    patch_size=patch_size,
    stride=stride
)

# 結果表示
print_processing_summary(stats, log_file)
print(f"\nログファイルは以下に保存されました: {log_file}")

処理を開始します...
入力ディレクトリ: /content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R
出力ディレクトリ: /content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img


画像処理中:   0%|          | 0/7 [00:00<?, ?枚/s]

pt21R_0_pre_AF パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_1_pre_Photo パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_2_0M_Photo パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_3_1M_AF パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_4_1M_Photo パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_5_1Y3M_AF パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]

pt21R_6_1Y3M_Photo パッチ生成中:   0%|          | 0/4 [00:00<?, ?パッチ/s]


=== 処理サマリー ===
--------------------------------------------------

画像: pt21R_0_pre_AF
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_1_pre_Photo
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_2_0M_Photo
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_3_1M_AF
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_4_1M_Photo
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_5_1Y3M_AF
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

画像: pt21R_6_1Y3M_Photo
画像サイズ: 2000x2000
パッチ数: 4 (2x2)
オーバーラップ: 240px (21.4%)

--------------------------------------------------
総パッチ数: 28
処理成功: 7枚
処理失敗: 0枚
--------------------------------------------------

ログファイルは以下に保存されました: /content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img/processing_log_20250210_001618.txt


##**Combine annotations of 4 image patches(1120x1120)**



In [None]:
import numpy as np
import os

def calculate_iou(box1, box2):
    """
    2つのバウンディングボックス間のIoUを計算

    Args:
        box1, box2: (x, y, w, h) の形式の辞書
    Returns:
        float: IoU値
    """
    # ボックスの座標を計算
    box1_x1 = box1['x'] - box1['w']/2
    box1_y1 = box1['y'] - box1['h']/2
    box1_x2 = box1['x'] + box1['w']/2
    box1_y2 = box1['y'] + box1['h']/2

    box2_x1 = box2['x'] - box2['w']/2
    box2_y1 = box2['y'] - box2['h']/2
    box2_x2 = box2['x'] + box2['w']/2
    box2_y2 = box2['y'] + box2['h']/2

    # 交差領域の座標を計算
    x1 = max(box1_x1, box2_x1)
    y1 = max(box1_y1, box2_y1)
    x2 = min(box1_x2, box2_x2)
    y2 = min(box1_y2, box2_y2)

    # 交差領域の面積を計算
    if x2 < x1 or y2 < y1:
        return 0.0

    intersection = (x2 - x1) * (y2 - y1)

    # 各ボックスの面積を計算
    box1_area = box1['w'] * box1['h']
    box2_area = box2['w'] * box2['h']

    # IoUを計算
    union = box1_area + box2_area - intersection

    return intersection / union

def merge_boxes(box1, box2):
    """
    2つのバウンディングボックスの平均を計算
    """
    return {
        'class_id': box1['class_id'],  # クラスIDは同じはず
        'x': (box1['x'] + box2['x']) / 2,
        'y': (box1['y'] + box2['y']) / 2,
        'w': (box1['w'] + box2['w']) / 2,
        'h': (box1['h'] + box2['h']) / 2,
        'track_id': box1['track_id']  # track_idは最初のボックスのものを使用
    }

def process_overlapping_boxes(annotations, iou_threshold=0.3):
    """
    重複するバウンディングボックスを処理

    Args:
        annotations: アノテーションのリスト
        iou_threshold: IoUの閾値

    Returns:
        処理済みのアノテーションリスト
    """
    processed = []
    used_indices = set()

    for i in range(len(annotations)):
        if i in used_indices:
            continue

        current_box = annotations[i]
        merged_box = current_box.copy()
        merged_count = 1

        # 他のすべてのボックスとの重複をチェック
        for j in range(i + 1, len(annotations)):
            if j in used_indices:
                continue

            other_box = annotations[j]

            # 同じクラスのボックスのみ比較
            if current_box['class_id'] != other_box['class_id']:
                continue

            iou = calculate_iou(current_box, other_box)

            if iou >= iou_threshold:
                # ボックスを統合
                merged_box = merge_boxes(merged_box, other_box)
                used_indices.add(j)
                merged_count += 1

        processed.append(merged_box)
        used_indices.add(i)

    return processed

def convert_patch_to_original(x, y, w, h, patch_x, patch_y, patch_size=1120, stride=880, original_size=2000):
    """
    パッチ内の相対座標を元画像の相対座標に変換

    Args:
        x, y, w, h: パッチ内での相対座標とサイズ（0-1の範囲）
        patch_x, patch_y: パッチの左上座標（ピクセル単位）
        patch_size: パッチの一辺の長さ（ピクセル単位）
        stride: パッチのストライド（ピクセル単位）
        original_size: 元画像の一辺の長さ（ピクセル単位）

    Returns:
        orig_x, orig_y, orig_w, orig_h: 元画像での相対座標とサイズ（0-1の範囲）
    """
    # パッチ内の相対座標を絶対座標（ピクセル）に変換
    abs_x = x * patch_size + patch_x
    abs_y = y * patch_size + patch_y
    abs_w = w * patch_size
    abs_h = h * patch_size

    # 絶対座標を元画像での相対座標に変換
    orig_x = abs_x / original_size
    orig_y = abs_y / original_size
    orig_w = abs_w / original_size
    orig_h = abs_h / original_size

    return orig_x, orig_y, orig_w, orig_h

def read_yolo_annotations(file_path):
    """YOLOフォーマットのアノテーションファイルを読み込む"""
    annotations = []
    with open(file_path, 'r') as f:
        for line in f:
            class_id, x, y, w, h, track_id = map(float, line.strip().split())
            annotations.append({
                'class_id': int(class_id),
                'x': x,
                'y': y,
                'w': w,
                'h': h,
                'track_id': int(track_id)
            })
    return annotations

def merge_annotations(base_path, output_path):
    """4つのパッチのアノテーションを統合"""
    # パッチの位置（ストライド880pxで設定）
    patch_positions = [
        (0, 0),    # 左上
        (0, 880),  # 左下
        (880, 0),  # 右上
        (880, 880) # 右下
    ]

    merged_annotations = []

    # 各パッチのアノテーションを読み込んで変換
    for patch_x, patch_y in patch_positions:
        file_name = f"pt21R_2_0M_Photo_x{patch_x:04d}_y{patch_y:04d}.txt"
        file_path = os.path.join(base_path, f"x{patch_x:04d}_y{patch_y:04d}",
                                "labels/Train", file_name)

        if not os.path.exists(file_path):
            print(f"Warning: File not found: {file_path}")
            continue

        annotations = read_yolo_annotations(file_path)

        # 座標変換
        for ann in annotations:
            orig_x, orig_y, orig_w, orig_h = convert_patch_to_original(
                ann['x'], ann['y'], ann['w'], ann['h'],
                patch_x, patch_y
            )

            merged_annotations.append({
                'class_id': ann['class_id'],
                'x': orig_x,
                'y': orig_y,
                'w': orig_w,
                'h': orig_h,
                'track_id': ann['track_id']
            })

    # 重複検出の処理
    processed_annotations = process_overlapping_boxes(merged_annotations, iou_threshold=0.3)

    # y座標でソートしてから新しいtrack_idを振り直し
    processed_annotations.sort(key=lambda x: x['y'])

    # 結果を保存（ファイル名のみの場合は現在のディレクトリに保存）
    output_dir = os.path.dirname(output_path)
    if output_dir:  # ディレクトリパスが指定されている場合のみmakedirsを実行
        os.makedirs(output_dir, exist_ok=True)

    with open(output_path, 'w') as f:
        for i, ann in enumerate(processed_annotations):
            f.write(f"{ann['class_id']} {ann['x']:.6f} {ann['y']:.6f} "
                   f"{ann['w']:.6f} {ann['h']:.6f} {i}\n")

    print(f"Processed {len(processed_annotations)} annotations saved to {output_path}")

if __name__ == "__main__":
    base_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img/label"
    output_path = "merged_annotations.txt"
    merge_annotations(base_path, output_path)

Processed 583 annotations saved to merged_annotations.txt


In [None]:
import os
import glob
from typing import List, Dict

def get_image_types(base_path: str) -> List[str]:
    """
    指定されたディレクトリから画像の種類を取得

    Args:
        base_path: ベースディレクトリのパス

    Returns:
        画像の種類のリスト (例: ["2_0M_Photo", "3_1M_AF", ...])
    """
    # 最初のパッチディレクトリからファイル一覧を取得
    first_patch_path = os.path.join(base_path, "x0000_y0000", "labels", "Train")
    pattern = "pt21R_*_x0000_y0000.txt"
    files = glob.glob(os.path.join(first_patch_path, pattern))

    # ファイル名から画像の種類を抽出
    image_types = []
    for file_path in files:
        file_name = os.path.basename(file_path)
        # "pt21R_2_0M_Photo_x0000_y0000.txt" -> "2_0M_Photo"
        image_type = file_name.split("pt21R_")[1].split("_x")[0]
        image_types.append(image_type)

    return sorted(image_types)

def merge_annotations_by_type(base_path: str, output_base_path: str):
    """
    画像の種類ごとに4つのパッチのアノテーションを統合

    Args:
        base_path: 入力ディレクトリのパス
        output_base_path: 出力ディレクトリのパス
    """
    # パッチの位置（ストライド880px）
    patch_positions = [
        (0, 0),    # 左上
        (0, 880),  # 左下
        (880, 0),  # 右上
        (880, 880) # 右下
    ]

    # 画像の種類を取得
    image_types = get_image_types(base_path)
    print(f"Found {len(image_types)} image types: {image_types}")

    # 画像の種類ごとに処理
    for image_type in image_types:
        print(f"\nProcessing image type: {image_type}")
        merged_annotations = []

        # 各パッチのアノテーションを読み込んで変換
        for patch_x, patch_y in patch_positions:
            file_name = f"pt21R_{image_type}_x{patch_x:04d}_y{patch_y:04d}.txt"
            file_path = os.path.join(base_path, f"x{patch_x:04d}_y{patch_y:04d}",
                                   "labels/Train", file_name)

            if not os.path.exists(file_path):
                print(f"Warning: File not found: {file_path}")
                continue

            print(f"Reading: {file_path}")
            annotations = read_yolo_annotations(file_path)

            # 座標変換
            for ann in annotations:
                orig_x, orig_y, orig_w, orig_h = convert_patch_to_original(
                    ann['x'], ann['y'], ann['w'], ann['h'],
                    patch_x, patch_y
                )

                merged_annotations.append({
                    'class_id': ann['class_id'],
                    'x': orig_x,
                    'y': orig_y,
                    'w': orig_w,
                    'h': orig_h,
                    'track_id': ann['track_id']
                })

        # 重複検出の処理
        processed_annotations = process_overlapping_boxes(merged_annotations, iou_threshold=0.3)

        # y座標でソートしてから新しいtrack_idを振り直し
        processed_annotations.sort(key=lambda x: x['y'])

        # 結果を保存
        output_path = os.path.join(output_base_path, f"Pt21R_{image_type}.txt")
        os.makedirs(output_base_path, exist_ok=True)

        with open(output_path, 'w') as f:
            for i, ann in enumerate(processed_annotations):
                f.write(f"{ann['class_id']} {ann['x']:.6f} {ann['y']:.6f} "
                       f"{ann['w']:.6f} {ann['h']:.6f} {i}\n")

        print(f"Saved {len(processed_annotations)} annotations to {output_path}")

if __name__ == "__main__":
    base_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img/label"
    output_base_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R"
    merge_annotations_by_type(base_path, output_base_path)

Found 0 image types: []


In [None]:
import os
import glob
from typing import List, Dict
import numpy as np

def calculate_iou(box1: Dict, box2: Dict) -> float:
    """
    2つのバウンディングボックス間のIOUを計算

    Args:
        box1: 1つ目のバウンディングボックス (x, y, w, h)
        box2: 2つ目のバウンディングボックス (x, y, w, h)

    Returns:
        IOU値 (0-1の範囲)
    """
    # ボックスの座標を計算
    box1_x1 = box1['x'] - box1['w']/2
    box1_y1 = box1['y'] - box1['h']/2
    box1_x2 = box1['x'] + box1['w']/2
    box1_y2 = box1['y'] + box1['h']/2

    box2_x1 = box2['x'] - box2['w']/2
    box2_y1 = box2['y'] - box2['h']/2
    box2_x2 = box2['x'] + box2['w']/2
    box2_y2 = box2['y'] + box2['h']/2

    # 交差領域の座標を計算
    intersect_x1 = max(box1_x1, box2_x1)
    intersect_y1 = max(box1_y1, box2_y1)
    intersect_x2 = min(box1_x2, box2_x2)
    intersect_y2 = min(box1_y2, box2_y2)

    # 交差領域の面積を計算
    if intersect_x2 < intersect_x1 or intersect_y2 < intersect_y1:
        return 0.0

    intersection_area = (intersect_x2 - intersect_x1) * (intersect_y2 - intersect_y1)

    # 各ボックスの面積を計算
    box1_area = box1['w'] * box1['h']
    box2_area = box2['w'] * box2['h']

    # IOUを計算
    union_area = box1_area + box2_area - intersection_area
    if union_area == 0:
        return 0.0

    return intersection_area / union_area

def process_overlapping_boxes(annotations: List[Dict], iou_threshold: float = 0.3) -> List[Dict]:
    """
    重複するバウンディングボックスを処理

    Args:
        annotations: アノテーションのリスト
        iou_threshold: 重複と判定するIOUの閾値

    Returns:
        重複を除去したアノテーションのリスト
    """
    if not annotations:
        return []

    # y座標でソート
    annotations = sorted(annotations, key=lambda x: x['y'])
    processed = []

    for i, current_box in enumerate(annotations):
        is_duplicate = False

        # 既に処理済みのボックスとIOUを計算
        for processed_box in processed:
            iou = calculate_iou(current_box, processed_box)
            if iou > iou_threshold:
                is_duplicate = True
                break

        if not is_duplicate:
            processed.append(current_box)

    return processed

def read_yolo_annotations(file_path: str) -> List[Dict]:
    """
    YOLOフォーマットのアノテーションファイルを読み込み

    Args:
        file_path: アノテーションファイルのパス

    Returns:
        アノテーションのリスト
    """
    annotations = []

    with open(file_path, 'r') as f:
        lines = f.readlines()
        print(f"Raw line count in {os.path.basename(file_path)}: {len(lines)}")

        for i, line in enumerate(lines):
            parts = line.strip().split()
            if len(parts) >= 6:
                try:
                    annotations.append({
                        'class_id': int(parts[0]),
                        'x': float(parts[1]),
                        'y': float(parts[2]),
                        'w': float(parts[3]),
                        'h': float(parts[4]),
                        'track_id': int(parts[5])
                    })
                except (ValueError, IndexError) as e:
                    print(f"Error parsing line {i+1}: {line.strip()}")
                    print(f"Error details: {str(e)}")
            else:
                print(f"Invalid line format at line {i+1}: {line.strip()}")

    return annotations

def convert_patch_to_original(x: float, y: float, w: float, h: float,
                            patch_x: int, patch_y: int) -> tuple:
    """
    パッチ内の座標を元の画像の座標に変換

    Args:
        x, y, w, h: パッチ内のバウンディングボックスの座標とサイズ (0-1の範囲)
        patch_x, patch_y: パッチの左上座標

    Returns:
        元の画像での座標とサイズ (x, y, w, h)
    """
    patch_size = 1120  # パッチサイズ
    original_size = 2000  # 元画像のサイズ

    # パッチ内の相対座標を元画像での絶対座標に変換
    abs_x = (x * patch_size + patch_x) / original_size
    abs_y = (y * patch_size + patch_y) / original_size

    # サイズも元画像のスケールに変換
    abs_w = (w * patch_size) / original_size
    abs_h = (h * patch_size) / original_size

    return abs_x, abs_y, abs_w, abs_h

def get_image_types(base_path: str) -> List[str]:
    """
    指定されたディレクトリから画像の種類を取得

    Args:
        base_path: ベースディレクトリのパス

    Returns:
        画像の種類のリスト (例: ["2_0M_Photo", "3_1M_AF", ...])
    """
    first_patch_path = os.path.join(base_path, "x0000_y0000", "labels", "Train")
    pattern = "pt21R_*_x0000_y0000.txt"
    files = glob.glob(os.path.join(first_patch_path, pattern))

    image_types = []
    for file_path in files:
        file_name = os.path.basename(file_path)
        image_type = file_name.split("pt21R_")[1].split("_x")[0]
        image_types.append(image_type)

    return sorted(image_types)

def merge_annotations_by_type(base_path: str, output_base_path: str):
    """
    画像の種類ごとに4つのパッチのアノテーションを統合。
    2_0M_Photoで検出した重複情報を他の画像タイプにも適用する。

    Args:
        base_path: 入力ディレクトリのパス
        output_base_path: 出力ディレクトリのパス
    """
    patch_positions = [
        (0, 0),      # 左上
        (0, 880),    # 左下
        (880, 0),    # 右上
        (880, 880)   # 右下
    ]

    image_types = get_image_types(base_path)
    print(f"Found {len(image_types)} image types: {image_types}")

    # まず2_0M_Photoの重複検出を行い、残すべきアノテーションのインデックスを特定
    reference_type = "2_0M_Photo"
    reference_annotations = []
    reference_index_map = {}  # パッチごとのインデックスマッピング

    # 2_0M_Photoの全パッチのアノテーションを読み込み
    print(f"\nProcessing reference type: {reference_type}")
    print("Initial annotation counts for reference patches:")

    for patch_x, patch_y in patch_positions:
        file_name = f"pt21R_{reference_type}_x{patch_x:04d}_y{patch_y:04d}.txt"
        file_path = os.path.join(base_path, f"x{patch_x:04d}_y{patch_y:04d}",
                               "labels/Train", file_name)

        if not os.path.exists(file_path):
            print(f"Warning: File not found: {file_path}")
            continue

        patch_key = f"x{patch_x:04d}_y{patch_y:04d}"
        annotations = read_yolo_annotations(file_path)
        print(f"Reference Patch {patch_key}: {len(annotations)} annotations")

        # パッチごとのインデックスマッピングを作成
        reference_index_map[patch_key] = {}

        # 座標変換
        for i, ann in enumerate(annotations):
            orig_x, orig_y, orig_w, orig_h = convert_patch_to_original(
                ann['x'], ann['y'], ann['w'], ann['h'],
                patch_x, patch_y
            )

            current_index = len(reference_annotations)
            reference_index_map[patch_key][i] = current_index

            reference_annotations.append({
                'class_id': ann['class_id'],
                'x': orig_x,
                'y': orig_y,
                'w': orig_w,
                'h': orig_h,
                'track_id': ann['track_id'],
                'original_index': current_index
            })

    print(f"Total reference annotations before processing: {len(reference_annotations)}")

    # 重複検出を行い、残すべきアノテーションのインデックスを取得
    processed_reference = process_overlapping_boxes(reference_annotations, iou_threshold=0.3)
    valid_indices = set(ann['original_index'] for ann in processed_reference)

    print(f"Valid indices after overlap processing: {len(valid_indices)}")

    # 全画像タイプに対して処理
    for image_type in image_types:
        print(f"\nProcessing image type: {image_type}")
        merged_annotations = []

        print(f"Initial annotation counts for each patch in {image_type}:")
        # 各パッチのアノテーション数を表示
        patch_counts = {}
        for patch_x, patch_y in patch_positions:
            patch_key = f"x{patch_x:04d}_y{patch_y:04d}"
            file_name = f"pt21R_{image_type}_x{patch_x:04d}_y{patch_y:04d}.txt"
            file_path = os.path.join(base_path, f"x{patch_x:04d}_y{patch_y:04d}",
                                   "labels/Train", file_name)

            if os.path.exists(file_path):
                annotations = read_yolo_annotations(file_path)
                patch_counts[patch_key] = len(annotations)
                print(f"Patch {patch_key}: {len(annotations)} annotations")

        # 各パッチのアノテーションを読み込んで変換
        for patch_x, patch_y in patch_positions:
            patch_key = f"x{patch_x:04d}_y{patch_y:04d}"
            file_name = f"pt21R_{image_type}_x{patch_x:04d}_y{patch_y:04d}.txt"
            file_path = os.path.join(base_path, f"x{patch_x:04d}_y{patch_y:04d}",
                                   "labels/Train", file_name)

            if not os.path.exists(file_path):
                print(f"Warning: File not found: {file_path}")
                continue

            print(f"Reading: {file_path}")
            annotations = read_yolo_annotations(file_path)

            # 座標変換とフィルタリング
            filtered_count = 0
            for i, ann in enumerate(annotations):
                # パッチ内のインデックスを参照インデックスに変換
                if patch_key in reference_index_map and i in reference_index_map[patch_key]:
                    reference_idx = reference_index_map[patch_key][i]

                    # 参照画像で残すと判断されたインデックスのアノテーションのみ追加
                    if reference_idx in valid_indices:
                        filtered_count += 1
                        orig_x, orig_y, orig_w, orig_h = convert_patch_to_original(
                            ann['x'], ann['y'], ann['w'], ann['h'],
                            patch_x, patch_y
                        )

                        merged_annotations.append({
                            'class_id': ann['class_id'],
                            'x': orig_x,
                            'y': orig_y,
                            'w': orig_w,
                            'h': orig_h,
                            'track_id': ann['track_id']
                        })

            print(f"Patch {patch_key}: {filtered_count} annotations after filtering")

        print(f"Total annotations after filtering for {image_type}: {len(merged_annotations)}")

        # y座標でソート
        merged_annotations.sort(key=lambda x: x['y'])

        # 結果を保存
        output_path = os.path.join(output_base_path, f"Pt21R_{image_type}.txt")
        os.makedirs(output_base_path, exist_ok=True)

        with open(output_path, 'w') as f:
            for i, ann in enumerate(merged_annotations):
                f.write(f"{ann['class_id']} {ann['x']:.6f} {ann['y']:.6f} "
                       f"{ann['w']:.6f} {ann['h']:.6f} {i}\n")

        print(f"Saved {len(merged_annotations)} annotations to {output_path}")

if __name__ == "__main__":
    base_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/patch_img/label"
    output_base_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R"
    merge_annotations_by_type(base_path, output_base_path)

Found 5 image types: ['2_0M_Photo', '3_1M_AF', '4_1M_Photo', '5_1Y3M_AF', '6_1Y3M_Photo']

Processing reference type: 2_0M_Photo
Initial annotation counts for reference patches:
Raw line count in pt21R_2_0M_Photo_x0000_y0000.txt: 18
Reference Patch x0000_y0000: 18 annotations
Raw line count in pt21R_2_0M_Photo_x0000_y0880.txt: 87
Reference Patch x0000_y0880: 87 annotations
Raw line count in pt21R_2_0M_Photo_x0880_y0000.txt: 248
Reference Patch x0880_y0000: 248 annotations
Raw line count in pt21R_2_0M_Photo_x0880_y0880.txt: 317
Reference Patch x0880_y0880: 317 annotations
Total reference annotations before processing: 670
Valid indices after overlap processing: 496

Processing image type: 2_0M_Photo
Initial annotation counts for each patch in 2_0M_Photo:
Raw line count in pt21R_2_0M_Photo_x0000_y0000.txt: 18
Patch x0000_y0000: 18 annotations
Raw line count in pt21R_2_0M_Photo_x0000_y0880.txt: 87
Patch x0000_y0880: 87 annotations
Raw line count in pt21R_2_0M_Photo_x0880_y0000.txt: 248
Pa

In [None]:
from google.colab.patches import cv2_imshow
import cv2
import numpy as np

def visualize_merged_annotations(image_path, annotation_path):
    """
    画像にバウンディングボックスを描画（Colab用）

    Args:
        image_path: 元画像のパス
        annotation_path: マージされたアノテーションファイルのパス
    """
    # 画像の読み込み
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError("画像を読み込めませんでした")

    height, width = image.shape[:2]

    # アノテーションの読み込み
    with open(annotation_path, 'r') as f:
        annotations = []
        for line in f:
            class_id, x, y, w, h, track_id = map(float, line.strip().split())
            annotations.append({
                'class_id': int(class_id),
                'x': x,
                'y': y,
                'w': w,
                'h': h,
                'track_id': int(track_id)
            })

    # バウンディングボックスの描画
    for ann in annotations:
        # YOLOの相対座標を画像の絶対座標に変換
        x = int(ann['x'] * width)
        y = int(ann['y'] * height)
        w = int(ann['w'] * width)
        h = int(ann['h'] * height)

        # バウンディングボックスの左上と右下の座標を計算
        x1 = int(x - w/2)
        y1 = int(y - h/2)
        x2 = int(x + w/2)
        y2 = int(y + h/2)

        # バウンディングボックスの描画（緑色）
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # トラッキングIDの表示（赤色）
        cv2.putText(image, str(ann['track_id']), (x1, y1-5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    # 結果の表示（Colab用）
    cv2_imshow(image)

    # 結果の保存
    output_path = 'merged_annotations_visualization.png'
    cv2.imwrite(output_path, image)
    print(f"可視化結果を{output_path}に保存しました")

# 使用例
# image_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/pt21R_2_0M_Photo.png"
# annotation_path = "merged_annotations.txt"
image_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/pt21R_2_0M_Photo.png"
annotation_path = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient21R/Pt21R_2_0M_Photo.txt"
visualize_merged_annotations(image_path, annotation_path)

###**2000x2000→240x240 patches**

stride 180px, overlap 19.6%

In [None]:
# """
# We divided the 2000×2000 pixel images into 224×224 pixel patches with a stride of 180 pixels (19.6% overlap)...
# """
# import os
# import glob
# from PIL import Image
# import numpy as np
# from pathlib import Path
# import time
# from datetime import datetime
# from tqdm.notebook import tqdm
# import math

# def create_processing_log(output_dir):
#     """ログファイルを作成"""
#     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
#     log_filename = os.path.join(output_dir, f"processing_log_{timestamp}.txt")
#     return log_filename

# def log_processing_info(log_file, message):
#     """ログを記録"""
#     with open(log_file, 'a', encoding='utf-8') as f:
#         timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#         f.write(f"[{timestamp}] {message}\n")

# def calculate_patch_info(image_size, patch_size, stride):
#     """パッチ情報を計算"""
#     width, height = image_size
#     # 完全なグリッドのパッチ数を計算
#     n_patches_w = math.floor((width - patch_size) / stride) + 1
#     n_patches_h = math.floor((height - patch_size) / stride) + 1

#     # 開始位置のリストを生成
#     x_positions = [i * stride for i in range(n_patches_w)]
#     y_positions = [i * stride for i in range(n_patches_h)]

#     return {
#         'n_patches_w': n_patches_w,
#         'n_patches_h': n_patches_h,
#         'total_patches': n_patches_w * n_patches_h,
#         'x_positions': x_positions,
#         'y_positions': y_positions,
#         'overlap_ratio': (patch_size - stride) / patch_size * 100
#     }

# def process_directory(input_dir, output_dir, patch_size=224, stride=180):
#     """ディレクトリ内の全PNG画像を処理"""
#     start_time = time.time()

#     # 出力ディレクトリの作成
#     os.makedirs(output_dir, exist_ok=True)

#     # ログファイルの作成
#     log_file = create_processing_log(output_dir)
#     log_processing_info(log_file, "=== 処理パラメータ ===")
#     log_processing_info(log_file, f"パッチサイズ: {patch_size}x{patch_size} pixels")
#     log_processing_info(log_file, f"ストライド: {stride} pixels")
#     log_processing_info(log_file, f"オーバーラップ: {patch_size - stride} pixels")
#     log_processing_info(log_file, f"オーバーラップ率: {(patch_size - stride) / patch_size * 100:.1f}%")

#     # 画像パスの取得
#     image_paths = glob.glob(os.path.join(input_dir, "*.png"))
#     log_processing_info(log_file, f"\n処理対象画像数: {len(image_paths)}")

#     processing_stats = {}

#     # 画像処理のメインループ
#     for image_path in tqdm(image_paths, desc="画像処理中", unit="枚"):
#         original_name = Path(image_path).stem
#         log_processing_info(log_file, f"\n=== 画像処理開始: {original_name} ===")

#         try:
#             # 画像読み込み
#             img = Image.open(image_path)
#             img_array = np.array(img)
#             height, width = img_array.shape[:2]

#             # パッチ情報の計算
#             patch_info = calculate_patch_info((width, height), patch_size, stride)

#             log_processing_info(log_file, f"画像サイズ: {width}x{height}")
#             log_processing_info(log_file, f"予想パッチ数: {patch_info['total_patches']}")

#             patch_count = 0
#             with tqdm(total=patch_info['total_patches'],
#                      desc=f"{original_name} パッチ生成中",
#                      unit="パッチ",
#                      leave=False) as pbar:

#                 for y in patch_info['y_positions']:
#                     for x in patch_info['x_positions']:
#                         # パッチを切り出し
#                         patch = img_array[y:y + patch_size, x:x + patch_size]

#                         # ファイル名生成と保存
#                         patch_filename = f"{original_name}_x{x:04d}_y{y:04d}.png"
#                         patch_img = Image.fromarray(patch)
#                         patch_img.save(os.path.join(output_dir, patch_filename))

#                         patch_count += 1
#                         pbar.update(1)

#             # 統計情報を保存
#             processing_stats[original_name] = {
#                 'image_size': (width, height),
#                 'total_patches': patch_count,
#                 'patches_width': patch_info['n_patches_w'],
#                 'patches_height': patch_info['n_patches_h'],
#                 'overlap_pixels': patch_size - stride,
#                 'overlap_ratio': patch_info['overlap_ratio'],
#                 'status': 'success'
#             }

#             log_processing_info(log_file,
#                 f"画像処理完了: {original_name}\n"
#                 f"生成パッチ数: {patch_count}\n"
#                 f"パッチ分布: {patch_info['n_patches_w']}x{patch_info['n_patches_h']}\n"
#                 f"X座標範囲: 0-{max(patch_info['x_positions'])}\n"
#                 f"Y座標範囲: 0-{max(patch_info['y_positions'])}")

#         except Exception as e:
#             log_processing_info(log_file, f"エラー発生: {original_name} - {str(e)}")
#             processing_stats[original_name] = {
#                 'status': 'error',
#                 'error_message': str(e)
#             }

#     # 処理時間の計算
#     end_time = time.time()
#     processing_time = end_time - start_time
#     log_processing_info(log_file, f"\n=== 処理完了 ===")
#     log_processing_info(log_file, f"総処理時間: {processing_time:.2f}秒")

#     return processing_stats, log_file

# def print_processing_summary(stats, log_file):
#     """処理サマリーの表示とログ記録"""
#     summary = "\n=== 処理サマリー ===\n" + "-" * 50
#     total_patches = 0
#     successful_images = 0
#     failed_images = 0

#     for image_name, info in stats.items():
#         summary += f"\n\n画像: {image_name}"
#         if info['status'] == 'success':
#             summary += f"\n画像サイズ: {info['image_size'][0]}x{info['image_size'][1]}"
#             summary += f"\nパッチ数: {info['total_patches']} ({info['patches_width']}x{info['patches_height']})"
#             summary += f"\nオーバーラップ: {info['overlap_pixels']}px ({info['overlap_ratio']:.1f}%)"
#             total_patches += info['total_patches']
#             successful_images += 1
#         else:
#             summary += f"\n処理失敗: {info['error_message']}"
#             failed_images += 1

#     summary += f"\n\n{'-' * 50}"
#     summary += f"\n総パッチ数: {total_patches}"
#     summary += f"\n処理成功: {successful_images}枚"
#     summary += f"\n処理失敗: {failed_images}枚"
#     summary += f"\n{'-' * 50}"

#     print(summary)
#     log_processing_info(log_file, summary)


# # メイン処理の実行
# input_directory = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient27R"
# output_directory = "/content/drive/MyDrive/Deep_learning/PCAF_AI_Project/Patient27R/batch_img"

# # 処理パラメータ
# patch_size = 224    # パッチサイズ
# stride = 180        # ストライド（約19.6%オーバーラップ）

# print(f"処理を開始します...")
# print(f"入力ディレクトリ: {input_directory}")
# print(f"出力ディレクトリ: {output_directory}")

# # 処理実行
# stats, log_file = process_directory(
#     input_directory,
#     output_directory,
#     patch_size=patch_size,
#     stride=stride
# )

# # 結果表示
# print_processing_summary(stats, log_file)
# print(f"\nログファイルは以下に保存されました: {log_file}")

