In [1]:
import os
import shutil
import random
import re
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

In [2]:
# 入力ディレクトリと出力ディレクトリの設定
input_all = Path("/mnt/nfs1/home/yamamoto-hiroto/research/vertebrae_saka/input_nii")
output_dir_train_and_val = Path("/mnt/nfs1/home/yamamoto-hiroto/research/vertebrae_saka/Sakaguchi_file/S_train_and_val")
output_dir_test = Path("/mnt/nfs1/home/yamamoto-hiroto/research/vertebrae_saka/Sakaguchi_file/S_test")

# 出力ディレクトリを作成
output_dir_train_and_val.mkdir(exist_ok=True)
output_dir_test.mkdir(exist_ok=True)

# 最適化されたファイル名から番号（xxxx）を抽出（正規表現使用）
number_pattern = re.compile(r'\d+')
def extract_number(filename):
    match = number_pattern.search(filename)
    return match.group() if match else ''

# ファイルを番号ごとにグループ化（最適化）
files = list(input_all.iterdir())
groups = {}
for file_path in files:
    if file_path.is_file():
        number = extract_number(file_path.name)
        if number:
            if number not in groups:
                groups[number] = []
            groups[number].append(file_path)

# グループをシャッフルして8症例をテストに割り当て
group_keys = list(groups.keys())
random.shuffle(group_keys)

# 8症例をテストデータに分ける
test_keys = set(group_keys[:8])

# 並列処理でファイルコピーを高速化
def copy_file(args):
    src_path, dest_path = args
    shutil.copy2(src_path, dest_path)  # copy2は高速でメタデータも保持
    return dest_path

# コピータスクを準備
copy_tasks = []
for number, file_paths in groups.items():
    target_dir = output_dir_test if number in test_keys else output_dir_train_and_val
    
    for file_path in file_paths:
        dest_path = target_dir / file_path.name
        copy_tasks.append((file_path, dest_path))

# 並列処理でファイルコピーを実行
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(copy_file, copy_tasks))

print(f"テストデータの分割が完了しました。処理されたファイル数: {len(results)}")
print(f"テストデータ: {len([k for k in test_keys])} 症例")
print(f"訓練・検証データ: {len(group_keys) - len(test_keys)} 症例")

テストデータの分割が完了しました。処理されたファイル数: 152
テストデータ: 8 症例
訓練・検証データ: 30 症例
