# 準備
実行前の準備を行います。

## 環境設定

In [None]:
# Google Drive に接続
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# ワーキングディレクトリを設定
full_path = '/content/drive/MyDrive/CT_Analyzer_asc_desc_aorta/'

In [None]:
# ライブラリインポート
# 基本ライブラリ
import os
import glob
from tqdm.notebook import tqdm
import re
import random
import shutil
import datetime

# データ処理ライブラリ
import numpy as np
import pandas as pd
import matplotlib.pyplot  as plt
import cv2

# CT画像処理ライブラリ
!pip install pylibjpeg-libjpeg
!pip install pylibjpeg
!pip install pydicom
import pydicom

## Detectron2インストール

In [None]:
# Detectron2インストール　240515改訂
!python -m pip install pyyaml --upgrade
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'


In [None]:
import torch, detectron2
!nvcc --version
TORCH_VERSION = ".".join(torch.__version__.split(".")[:2])
CUDA_VERSION = torch.__version__.split("+")[-1]
print("torch: ", TORCH_VERSION, "; cuda: ", CUDA_VERSION)
print("detectron2:", detectron2.__version__)

In [None]:
# Detectron2ライブラリのインポート
from detectron2.utils.logger import setup_logger
setup_logger()

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer, ColorMode

# 関数定義
処理に使用する関数を定義します。

In [None]:
# ソート関数
def atoi(text):
    return int(text) if text.isdigit() else text

def natural_keys(text):
    return [ atoi(c) for c in re.split(r'(\d+)', text) ]

In [None]:
# メイン関数
def run(file_path, output_path, debug=False):
    ## Load
    # DICOMデータの読み込み
    dicom = pydicom.dcmread(file_path)
    file_name = os.path.basename(file_path)

    # DICOMデータから必要な値を取得
    dp = dicom.pixel_array.copy()  # CT値を取得
    intercept = dicom['RescaleIntercept'].value  # 変換式の切片を取得（変換しない場合は0を取得）
    try:
        outer_value = dicom['PixelPaddingValue'].value  # 円外のパディング箇所のCT値を取得
    except:
        outer_value = -2048  # 設定がない場合はデフォルト値を設定

    # 取得した値を使って逆変換
    dp = dp + intercept  # 変換式を使って逆変換
    outer_value += intercept  # 円外のパディング箇所の逆変換後の値を計算
    dp = np.where(dp==outer_value, -2048, dp)  # 該当箇所をデフォルト値に置換

    # DICOM -> Image 変換
    if debug:
        save_path = f"{file_name}.jpg"
    else:
        save_path = f"{org_path}{file_name}.jpg"
    cv2.imwrite(save_path, dp)

    # Image を読み込み
    im = cv2.imread(save_path)
    h, w = im.shape[0], im.shape[1]
    if debug:
        plt.figure(figsize=(20,20))
        plt.subplot(2,2,1)
        plt.imshow(im)

    ## Predict
    # Image を渡して Predict
    outputs = predictor(im)

    # Save image
    v = Visualizer(im[:, :, ::-1], scale=1.0)
    v = v.draw_instance_predictions(outputs["instances"].to("cpu"))
    if debug:
        plt.subplot(2,2,2)
        plt.imshow(v.get_image()[:, :, ::-1])
    else:
        cv2.imwrite(f"{pred_path}{file_name}.jpg", v.get_image()[:, :, ::-1])

    ## Postprocess
    # 予測結果
    pred_masks = outputs['instances'].get('pred_masks')
    if len(pred_masks) == 0:
        if debug:
            print(f"[{file_name}] 予測結果が存在しません。")
        return [{'result': file_name}]  # 予測結果が存在しない場合終了

    result_list = []
    for i, pred_mask in enumerate(pred_masks):  # 予測結果の数だけ繰り返し
        # 予測結果取得
        pred_mask = pred_mask.cpu().numpy().astype(np.uint8)
        score = outputs['instances'].get('scores').cpu().numpy()[i]
        pred_classe = outputs['instances'].get('pred_classes').cpu().numpy()[i]

        # 中心座標取得
        index = np.where(pred_mask)
        x_mean = round(np.sum(index[0]) / len(index[0]), 2)
        y_mean = round(np.sum(index[1]) / len(index[1]), 2)

        # 検出対象範囲内外チェック
        if (x_mean < x_range[0]) | (x_range[1] < x_mean) | (y_mean < y_range[0]) | (y_range[1] < y_mean) :
            # 範囲外の場合、繰り返しを抜ける
            print(f"[{file_name}][{i+1}] Out of range: ({x_mean}, {y_mean})")
            continue
        else:
            # 範囲内の場合、名前を変更して続行
            if i == 0:
                file_name_tmp = file_name
            else:
                file_name_tmp = f"{file_name}_{i+1}"

        ## 外側領域の抽出
        # ピクセル数の値変換
        ps_value = dicom['PixelSpacing'].value
        if debug:
            print(f"[{file_name_tmp}] Pixel Spacing：{ps_value}")
        if ps_value[0] != ps_value[1]:
            print(f"[Warning] Pixel Spacing の値を確認してください：{ps_value}")
        radius_trans = int(radius / ps_value[0])

        # 予測結果ピクセルを中心とした円を描画
        cir = np.zeros((h, w, 3))
        for hight in range(0, h):
            for width in range(0, w):
                if pred_mask[width][hight] == 1:  # 予測対象マスクの場合
                    # ブランク画像に描画
                    cv2.circle(cir,
                              center=(hight, width),
                              radius=radius_trans,
                              color=(0, 255, 0),
                              thickness=-1,
                              lineType=cv2.LINE_4,
                              shift=0)

        # 予測結果を引き算することでドーナツ状の外側領域のマスクのみを描画
        cir_mask = (cir[:,:,1]).astype(np.uint8)  # G軸のみ抽出してint変換
        target_mask = np.where((pred_mask==0)&(cir_mask!=0), 1, 0)  # 外側領域を抽出
        if debug:
            plt.subplot(2,2,3)
            plt.imshow(target_mask)

        # 元の画像に外側領域のマスクを描画
        cir_on_img = im.copy()
        cir_on_img[:,:,1] = np.where(target_mask == 1, 255, cir_on_img[:,:,1])  # 外側領域を抽出
        if debug:
            plt.imshow(cir_on_img)
        else:
            cv2.imwrite(f"{mask_path}{file_name_tmp}.jpg", cir_on_img)

        ## 計算対象領域の抽出
        # 計算対象領域のCT値のみ残す
        target_CT = dp * target_mask
        if debug:
            print(f"[{file_name_tmp}][{select_category[pred_classe]}] 選別前…　最小値：{target_CT.min()}  最大値：{target_CT.max()}  面積：{target_mask.sum()}")

        # 計算対象領域のCT値を選別
        target_CT = np.where(target_CT > max_CT, 0, target_CT)  # 最大値以上を0に変換
        target_CT = np.where(target_CT < min_CT, 0, target_CT)  # 最小値以下を0に変換
        if debug:
            print(f"[{file_name_tmp}][{select_category[pred_classe]}] 選別後…　最小値：{target_CT.min()}  最大値：{target_CT.max()}  面積：{target_mask.sum()}")

        # 予測対象マスクに含まれるCT値だけを抽出
        CTs = []
        for hight in range(0, h):
            for width in range(0, w):
                if target_mask[width][hight] == 1:  # 予測対象マスクの場合
                    if target_CT[width][hight] != 0:  # 判定CT値内のマスクの場合
                        CTs.append(target_CT[width][hight])

        # 計算対象領域のマスクを描画
        calc_on_img = im.copy()
        calc_on_img[:,:,1] = np.where(target_CT != 0, 255, calc_on_img[:,:,1])  # 計算対象領域を抽出
        if debug:
            print(f"[{file_name_tmp}][{select_category[pred_classe]}] 統計量…　平均値：{np.mean(CTs)}  標準偏差：{np.std(CTs)}  面積：{len(CTs)}  合計値：{np.sum(CTs)}")
            plt.subplot(2,2,4)
            plt.imshow(calc_on_img)
            os.makedirs(f"{full_path}output/debug/", exist_ok=True)
            plt.savefig(f"{full_path}output/debug/{file_name_tmp}.jpg")
        else:
            cv2.imwrite(f"{res_path}{file_name_tmp}.jpg", calc_on_img)

        # 予測結果を辞書に追加
        _dict = {}
        _dict['result'] = file_name_tmp
        _dict['class'] = select_category[pred_classe]
        _dict['CTs'] = CTs
        _dict['score'] = score
        _dict['(x,y)'] = (x_mean, y_mean)

        # 出力リストに結果を追加
        result_list.append(_dict)

    return result_list

# 予測とCT値計算処理
学習済みモデルを使用して、大動脈領域の予測を行います。<br>
その後、大動脈領域の周辺のCT値を計算します。

## 動作設定

In [None]:
# 処理対象の入力/出力フォルダ設定
input_root = f"{full_path}input/"
output_root = f"{full_path}output/"

# CT値検出設定
threshold = 0.90      # 検出対象の確信度閾値
x_range = [150, 400]  # 検出対象範囲：x軸
y_range = [200, 400]  # 検出対象範囲：y軸
radius = 5           # 大動脈領域周囲の距離（半径mm）
min_CT = -190         # 判定最小CT値
max_CT = -30          # 判定最大CT値

In [None]:
# 学習済みモデルの設定（学習時と同じ設定）
suffix = "20240513"
# suffix = "YYYYMMDD"
learned_dir = "COCO-InstanceSegmentation"
learned_yaml = "mask_rcnn_X_101_32x8d_FPN_3x"
checkpoint = "model_final.pth"
select_category = ['Ascending aorta', 'Descending aorta']

## 実行前準備

In [None]:
# 設定値に従ってPredictorインスタンスを作成（学習済みモデルを読み込み）
learned_model = f"{learned_dir}/{learned_yaml}.yaml"
weights_dir = f"{full_path}weights/{learned_yaml}_{suffix}"

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file(learned_model))
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = threshold
cfg.MODEL.WEIGHTS = os.path.join(weights_dir, checkpoint)
cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(select_category)
predictor = DefaultPredictor(cfg)

In [None]:
# 入力フォルダ内の全フォルダを取得
files = os.listdir(input_root)
dirs = []
for f in files:
    path = os.path.join(input_root, f)
    if os.path.isdir(path):
        dirs.append(f)
print(f"対象フォルダ名：{dirs}")

## テスト実行
1ファイルだけランダムに実行します。

In [None]:
# 最初のフォルダのファイルリストを取得
input_path = f"{input_root}/{dirs[0]}/"
output_path = f"{output_root}/{dirs[0]}/"
files_path = glob.glob(f"{input_path}*")
files_path = sorted(files_path, key=natural_keys)

# テスト用に1つ取得
test_file = random.choice(files_path)
print(f"テスト実行対象：{test_file}")

# テスト用にデバッグモードで実行
list_results = run(test_file, output_path, debug=True)

## 本処理実行

In [None]:
# 全てのフォルダに対して実行
for target in dirs:
    # パス設定
    print(f"[{target}] Start: {datetime.datetime.now()}")
    input_path = f"{input_root}/{target}/"
    output_path = f"{output_root}/{target}/"

    # ファイルリストを取得
    files_path = glob.glob(f"{input_path}*")
    files_path = sorted(files_path, key=natural_keys)

    # 出力先フォルダを用意
    if os.path.exists(output_path):
        # 出力先が既に存在する場合は削除
        print(f'Remove existed output folder...')
        shutil.rmtree(output_path)

    # フォルダ作成
    org_path = f"{output_path}original_images/"
    pred_path = f"{output_path}predicted_images/"
    mask_path = f"{output_path}masked_images/"
    res_path = f"{output_path}result_images/"
    os.makedirs(org_path, exist_ok=True)
    os.makedirs(pred_path, exist_ok=True)
    os.makedirs(mask_path, exist_ok=True)
    os.makedirs(res_path, exist_ok=True)

    # ファイルリストの全てに対して実行
    _list = []
    for file_path in tqdm(files_path):
        # 処理実行
        list_results = run(file_path, output_path, debug=False)

        for dict_result in list_results:
            # CT値のデータを格納
            try:
                CT = dict_result['CTs']

                dict_result["mean"] = np.mean(CT)
                dict_result["std"] = np.std(CT)
                dict_result["area"] = len(CT)
                dict_result["sum"] = np.sum(CT)
                _list.append(dict_result)

            except Exception as e:
                print(f"[{dict_result['result']}] 予測結果が存在しません。")

    # CT値のデータを出力
    df = pd.DataFrame(_list)
    df.to_csv(f"{output_path}{target}.csv", index=None)
    print(f"[{target}] End: {datetime.datetime.now()}\n")