# PPE Tutorials

In [12]:
import autorootcwd
import torch
import nibabel as nib
from pathlib import Path
import matplotlib.pyplot as plt
from src.models.proposed.segresnet import PPESegResNet
import seaborn as sns

In [16]:
def visualize_ppe(model: PPESegResNet, x: torch.Tensor, ppe: torch.Tensor, slice_idx: int = None):
    """
    PPE와 입력 이미지, PPE가 적용된 feature map을 시각화합니다.
    
    Parameters:
    -----------
    model : PPESegResNet
        시각화에 사용할 PPESegResNet 모델
    x : torch.Tensor
        입력 이미지 [B, C, H, W, D]
    ppe : torch.Tensor
        PPE 텐서 [H, W, D] 또는 [1, 1, H, W, D]
    slice_idx : int, optional
        시각화할 슬라이스 인덱스. None이면 중앙 슬라이스를 사용
    """
    # CPU로 이동하고 numpy로 변환
    x_np = x.detach().cpu().numpy()
    if len(ppe.shape) == 3:
        ppe_np = ppe.numpy()
    else:
        ppe_np = ppe.squeeze().cpu().numpy()
    
    print("Input Image shape:", x.shape)
    print("PPE shape:", ppe.shape)

    # 첫 번째 컨볼루션 통과
    x_conv = model.convInit(x)

    print("After Initial Convolution shape:", x_conv.shape)

    # PPE 적용
    x_with_ppe = model.ppe_module(ppe, x_conv)
    x_with_ppe_np = x_with_ppe.detach().cpu().numpy()

    print("After PPE shape:", x_with_ppe.shape)
    # 슬라이스 인덱스 설정
    if slice_idx is None:
        slice_idx = x_np.shape[-1] // 2
    
    # 시각화
    fig, axes = plt.subplots(2, 2, figsize=(15, 15))
    fig.suptitle('PPE Visualization', fontsize=16)
    
    # 원본 이미지
    im1 = axes[0, 0].imshow(x_np[0, 0, :, :, slice_idx], cmap='gray')
    axes[0, 0].set_title('Original Image')
    plt.colorbar(im1, ax=axes[0, 0])
    
    # PPE
    im2 = axes[0, 1].imshow(ppe_np[:, :, slice_idx], cmap='viridis')
    axes[0, 1].set_title('Physical Positional Embedding')
    plt.colorbar(im2, ax=axes[0, 1])
    
    # 첫 번째 컨볼루션 결과 (채널 0)
    im3 = axes[1, 0].imshow(x_conv[0, 0, :, :, slice_idx].detach().cpu().numpy(), cmap='viridis')
    axes[1, 0].set_title('After Initial Convolution (Channel 0)')
    plt.colorbar(im3, ax=axes[1, 0])
    
    # PPE가 적용된 feature map (채널 0)
    im4 = axes[1, 1].imshow(x_with_ppe_np[0, 0, :, :, slice_idx], cmap='viridis')
    axes[1, 1].set_title('After PPE Addition (Channel 0)')
    plt.colorbar(im4, ax=axes[1, 1])
    
    # 축 레이블 제거
    for ax in axes.flat:
        ax.set_xticks([])
        ax.set_yticks([])
    
    plt.tight_layout()
    return fig

In [18]:
def visualize_ppe_all_channels(model: PPESegResNet, x: torch.Tensor, ppe: torch.Tensor, slice_idx: int = None):
    """
    PPE와 입력 이미지, 모든 채널의 feature map을 시각화합니다.
    
    Parameters:
    -----------
    model : PPESegResNet
        시각화에 사용할 PPESegResNet 모델
    x : torch.Tensor
        입력 이미지 [B, C, H, W, D]
    ppe : torch.Tensor
        PPE 텐서 [H, W, D] 또는 [1, 1, H, W, D]
    slice_idx : int, optional
        시각화할 슬라이스 인덱스. None이면 중앙 슬라이스를 사용
    """
    # CPU로 이동하고 numpy로 변환
    x_np = x.detach().cpu().numpy()
    if len(ppe.shape) == 3:
        ppe_np = ppe.numpy()
    else:
        ppe_np = ppe.squeeze().cpu().numpy()
    
    print("Input Image shape:", x.shape)
    print("PPE shape:", ppe.shape)

    # 첫 번째 컨볼루션 통과
    x_conv = model.convInit(x)
    print("After Initial Convolution shape:", x_conv.shape)

    # PPE 적용
    x_with_ppe = model.ppe_module(ppe, x_conv)
    x_with_ppe_np = x_with_ppe.detach().cpu().numpy()
    print("After PPE shape:", x_with_ppe.shape)

    # 슬라이스 인덱스 설정
    if slice_idx is None:
        slice_idx = x_np.shape[-1] // 2
    
    # 시각화 (2개의 figure로 나눔)
    # Figure 1: 원본 이미지와 PPE
    fig1, axes1 = plt.subplots(1, 2, figsize=(15, 6))
    fig1.suptitle('Input and PPE', fontsize=16)
    
    # 원본 이미지
    im1 = axes1[0].imshow(x_np[0, 0, :, :, slice_idx], cmap='gray')
    axes1[0].set_title('Original Image')
    plt.colorbar(im1, ax=axes1[0])
    
    # PPE
    im2 = axes1[1].imshow(ppe_np[:, :, slice_idx], cmap='viridis')
    axes1[1].set_title('Physical Positional Embedding')
    plt.colorbar(im2, ax=axes1[1])
    
    # 축 레이블 제거
    for ax in axes1:
        ax.set_xticks([])
        ax.set_yticks([])
    
    plt.tight_layout()
    
    # Figure 2: 모든 채널의 feature maps
    n_channels = x_conv.shape[1]
    fig2, axes2 = plt.subplots(2, n_channels, figsize=(20, 8))
    fig2.suptitle('Feature Maps for All Channels', fontsize=16)
    
    # 첫 번째 컨볼루션 결과 (모든 채널)
    for i in range(n_channels):
        im = axes2[0, i].imshow(x_conv[0, i, :, :, slice_idx].detach().cpu().numpy(), cmap='viridis')
        axes2[0, i].set_title(f'Conv Ch.{i}')
        plt.colorbar(im, ax=axes2[0, i])
    
    # PPE가 적용된 feature map (모든 채널)
    for i in range(n_channels):
        im = axes2[1, i].imshow(x_with_ppe_np[0, i, :, :, slice_idx], cmap='viridis')
        axes2[1, i].set_title(f'PPE Ch.{i}')
        plt.colorbar(im, ax=axes2[1, i])
    
    # 축 레이블 제거
    for ax in axes2.flat:
        ax.set_xticks([])
        ax.set_yticks([])
    
    plt.tight_layout()
    
    return fig1, fig2

In [14]:
def visualize_ppe_heatmap(model: PPESegResNet, ppe: torch.Tensor, slice_idx: int = None):
    """
    PPE의 sinusoidal embedding을 heatmap으로 시각화합니다.
    
    Parameters:
    -----------
    model : PPESegResNet
        시각화에 사용할 PPESegResNet 모델
    ppe : torch.Tensor
        PPE 텐서 [H, W, D] 또는 [1, 1, H, W, D]
    slice_idx : int, optional
        시각화할 슬라이스 인덱스. None이면 중앙 슬라이스를 사용
    """
    import matplotlib.pyplot as plt
    import seaborn as sns
    
    # PPE를 sinusoidal embedding으로 변환
    if len(ppe.shape) == 3:
        ppe_tensor = ppe
    else:
        ppe_tensor = ppe.squeeze()
    
    # 슬라이스 선택
    if slice_idx is None:
        slice_idx = ppe_tensor.shape[-1] // 2
    
    # 선택된 슬라이스의 중앙 row에 대한 embedding 계산
    center_row = ppe_tensor.shape[0] // 2
    position_value = ppe_tensor[center_row, :, slice_idx]
    
    # position_value를 [1, 1, H, W, D] 형태로 변환
    position_value = position_value.unsqueeze(0).unsqueeze(0).unsqueeze(-1).unsqueeze(-1)
    
    # embedding 계산 및 shape 조정
    embedding = model.ppe_module.to_sin_cos_embedding(position_value)
    embedding = embedding.squeeze()  # 불필요한 차원 제거
    
    # numpy로 변환
    embedding_np = embedding.detach().cpu().numpy()
    
    # embedding_np의 shape를 [channels, position]로 재구성
    if len(embedding_np.shape) > 2:
        embedding_np = embedding_np.reshape(-1, embedding_np.shape[1])
    
    # Heatmap 시각화
    plt.figure(figsize=(12, 8))
    sns.heatmap(embedding_np, cmap='coolwarm', center=0)
    plt.title('Positional Encoding Heatmap')
    plt.xlabel('Position')           # X축: 위치
    plt.ylabel('Embedding Channels') # Y축: 임베딩 채널
    
    return plt.gcf()

In [5]:
def visualize_single_case(img_path: str, ppe_path: str, output_dir: str, slice_idx: int = None):
    """
    단일 케이스에 대해 PPE 시각화를 수행합니다.
    
    Parameters:
    -----------
    img_path : str
        이미지 파일 경로 (*.nii.gz)
    ppe_path : str
        PPE 파일 경로 (*.nii.gz)
    output_dir : str
        결과 저장 경로
    slice_idx : int, optional
        시각화할 슬라이스 인덱스
    """
    # 출력 디렉토리 생성
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 데이터 로드
    img_nii = nib.load(img_path)
    ppe_nii = nib.load(ppe_path)
    
    # numpy 배열로 변환
    img_data = img_nii.get_fdata()
    ppe_data = ppe_nii.get_fdata()
    
    # 텐서로 변환 및 배치 차원 추가
    img_tensor = torch.from_numpy(img_data).float().unsqueeze(0).unsqueeze(0)
    ppe_tensor = torch.from_numpy(ppe_data).float()
    
    # 모델 초기화
    model = PPESegResNet(
        spatial_dims=3,
        init_filters=8,
        in_channels=1,
        out_channels=2
    )
    
    # 시각화
    fig = visualize_ppe(model, img_tensor, ppe_tensor, slice_idx)
    
    # 결과 저장
    case_name = Path(img_path).parent.name
    fig.savefig(output_dir / f'ppe_visualization_{case_name}.png', dpi=300, bbox_inches='tight')
    plt.close(fig)

In [23]:
def visualize_single_case_all_channels(img_path: str, ppe_path: str, output_dir: str, slice_idx: int = None):
    """
    단일 케이스에 대해 모든 채널의 시각화를 수행합니다.
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 데이터 로드
    img_nii = nib.load(img_path)
    ppe_nii = nib.load(ppe_path)
    
    img_data = img_nii.get_fdata()
    ppe_data = ppe_nii.get_fdata()
    
    img_tensor = torch.from_numpy(img_data).float().unsqueeze(0).unsqueeze(0)
    ppe_tensor = torch.from_numpy(ppe_data).float()
    
    # 모델 초기화
    model = PPESegResNet(
        spatial_dims=3,
        init_filters=8,
        in_channels=1,
        out_channels=2
    )
    
    # 시각화
    fig1, fig2 = visualize_ppe_all_channels(model, img_tensor, ppe_tensor, slice_idx)
    
    # 결과 저장
    case_name = Path(img_path).parent.name
    fig1.savefig(output_dir / f'ppe_input_{case_name}.png', dpi=300, bbox_inches='tight')
    fig2.savefig(output_dir / f'ppe_channels_{case_name}.png', dpi=300, bbox_inches='tight')
    plt.close(fig1)
    plt.close(fig2)

In [9]:
def visualize_single_case_with_heatmap(img_path: str, ppe_path: str, output_dir: str, slice_idx: int = None):
    """
    단일 케이스에 대해 PPE 시각화와 heatmap을 함께 수행합니다.
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 데이터 로드
    img_nii = nib.load(img_path)
    ppe_nii = nib.load(ppe_path)
    
    img_data = img_nii.get_fdata()
    ppe_data = ppe_nii.get_fdata()
    
    img_tensor = torch.from_numpy(img_data).float().unsqueeze(0).unsqueeze(0)
    ppe_tensor = torch.from_numpy(ppe_data).float()
    
    # 모델 초기화
    model = PPESegResNet(
        spatial_dims=3,
        init_filters=8,
        in_channels=1,
        out_channels=2
    )
    
    # 기존 시각화
    fig1 = visualize_ppe(model, img_tensor, ppe_tensor, slice_idx)
    fig1.savefig(output_dir / f'ppe_visualization_{Path(img_path).parent.name}.png', 
                 dpi=300, bbox_inches='tight')
    plt.close(fig1)
    
    # Heatmap 시각화
    fig2 = visualize_ppe_heatmap(model, ppe_tensor, slice_idx)
    fig2.savefig(output_dir / f'ppe_heatmap_{Path(img_path).parent.name}.png', 
                 dpi=300, bbox_inches='tight')
    plt.close(fig2)

In [None]:
# 기본 경로 설정
base_dir = Path("./data/imageCAS")
output_dir = Path("./nbs/result/ppe")
    
# 테스트 케이스에 대해 시각화 수행
test_dir = base_dir / "test"
for patient_dir in sorted(test_dir.glob("*"))[:1]:  # 처음 1개 케이스만 시각화
    if patient_dir.is_dir():
        img_path = patient_dir / "img.nii.gz"
        ppe_path = patient_dir / "ppe.nii.gz"
            
        if img_path.exists() and ppe_path.exists():
            print(f"Processing {patient_dir.name}...")
            visualize_single_case_all_channels(
                str(img_path),
                str(ppe_path),
                str(output_dir),
                slice_idx=None  # 중앙 슬라이스 사용
            )

In [15]:
# 기본 경로 설정
base_dir = Path("./data/imageCAS")
output_dir = Path("./nbs/result/ppe")
    
# 테스트 케이스에 대해 시각화 수행
test_dir = base_dir / "test"
for patient_dir in sorted(test_dir.glob("*"))[:1]:  # 처음 1개 케이스만 시각화
    if patient_dir.is_dir():
        img_path = patient_dir / "img.nii.gz"
        ppe_path = patient_dir / "ppe.nii.gz"
            
        if img_path.exists() and ppe_path.exists():
            print(f"Processing {patient_dir.name}...")
            visualize_single_case_with_heatmap(
                str(img_path),
                str(ppe_path),
                str(output_dir),
                slice_idx=None  # 중앙 슬라이스 사용
            )

Processing 1000...
