In [None]:
## 필요 라이브러리 import
import os
from pathlib import Path
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import multiprocessing
import numpy as np  # single_file_processing에서 필요할 수 있음
import cv2  # single_file_processing에서 필요할 수 있음
from PIL import Image  # single_file_processing에서 필요할 수 있음
import torchvision.transforms.functional as TF  # fft_2d 수정에 필요

# ####################################################################
# # (가정 1) 사용자가 정의한 모델 아키텍처 (수정 사항 적용)
# ####################################################################


def fft_2d(img: torch.Tensor):
    """
    (수정됨) 3채널 이미지를 그레이스케일로 변환 후 FFT 수행
    magnitude_branch/phase_branch의 입력 채널(1)에 맞춤
    """
    # (B, 3, H, W) -> (B, 1, H, W)
    if img.shape[1] == 3:  # 3채널일 경우
        img_gray = TF.rgb_to_grayscale(img).to(img.device)
    else:
        img_gray = img  # 이미 1채널일 경우

    fft_result = torch.fft.fft2(img_gray)  # (B, 1, H, W) 텐서에 FFT 적용
    fft_shifted = torch.fft.fftshift(fft_result)

    magnitude = torch.abs(fft_shifted)  # (B, 1, H, W)
    phase = torch.angle(fft_shifted)  # (B, 1, H, W)
    return magnitude, phase


class CNNClfWithFFT(nn.Module):
    """
    사용자가 제공한 CNN + FFT 모델 (오류 수정됨)
    """

    def __init__(self, num_classes: int = 2):
        super(CNNClfWithFFT, self).__init__()

        # (참고) input_size는 single_file_processing에서
        # 최종적으로 리사이즈하는 크기와 일치해야 합니다.
        input_size = 512  # (예시)
        layer_size = (32, 64, 64, 32)
        filter_size = (7, 5, 3)
        padding_size = (3, 2, 1)

        self.rgb_branch = nn.Sequential(
            nn.Conv2d(
                3,
                layer_size[0],
                kernel_size=filter_size[0],
                stride=1,
                padding=padding_size[0],
            ),
            nn.BatchNorm2d(layer_size[0]),
            nn.ReLU(),
            nn.Conv2d(
                layer_size[0],
                layer_size[1],
                kernel_size=filter_size[1],
                stride=1,
                padding=padding_size[1],
            ),
            nn.BatchNorm2d(layer_size[1]),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 512 -> 256
            nn.Conv2d(
                layer_size[1],
                layer_size[2],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[2]),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 256 -> 128
            nn.Conv2d(
                layer_size[2],
                layer_size[2],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[2]),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 128 -> 64
        )

        # (수정됨) 입력 채널을 1로 설정 (fft_2d가 그레이스케일 반환)
        self.magnitude_branch = nn.Sequential(
            nn.Conv2d(
                1,
                layer_size[0],
                kernel_size=filter_size[0],
                stride=1,
                padding=padding_size[0],
            ),
            nn.BatchNorm2d(layer_size[0]),
            nn.ReLU(),
            nn.Conv2d(
                layer_size[0],
                layer_size[1],
                kernel_size=filter_size[1],
                stride=1,
                padding=padding_size[1],
            ),
            nn.BatchNorm2d(layer_size[1]),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(
                layer_size[1],
                layer_size[3],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[3]),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(
                layer_size[3],
                layer_size[3],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[3]),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.phase_branch = nn.Sequential(
            nn.Conv2d(
                1,
                layer_size[0],
                kernel_size=filter_size[0],
                stride=1,
                padding=padding_size[0],
            ),
            nn.BatchNorm2d(layer_size[0]),
            nn.ReLU(),
            nn.Conv2d(
                layer_size[0],
                layer_size[1],
                kernel_size=filter_size[1],
                stride=1,
                padding=padding_size[1],
            ),
            nn.BatchNorm2d(layer_size[1]),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(
                layer_size[1],
                layer_size[3],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[3]),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(
                layer_size[3],
                layer_size[3],
                kernel_size=filter_size[2],
                stride=1,
                padding=padding_size[2],
            ),
            nn.BatchNorm2d(layer_size[3]),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        fc_input_size = layer_size[2] + 2 * layer_size[3]

        # (수정됨) MaxPool(2)가 3번 있으므로 2^3 = 8로 나눔
        image_size = input_size // 8  # 64

        self.fc = nn.Sequential(
            nn.Linear(fc_input_size * image_size * image_size, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes),
        )

    def forward(self, x):
        x_mag, x_pha = fft_2d(x)

        # (수정됨) 잘못된 nn.MaxPool2d() 호출 삭제
        # x_mag = nn.MaxPool2d(x_mag) <-- 오류 발생 (삭제)
        # x_pha = nn.MaxPool2d(x_pha) <-- 오류 발생 (삭제)
        # x = nn.MaxPool2d(x)         <-- 오류 발생 (삭제)

        f_rgb = self.rgb_branch(x)
        f_mag = self.magnitude_branch(x_mag)
        f_pha = self.phase_branch(x_pha)

        fused = torch.cat([f_rgb, f_mag, f_pha], dim=1)

        # (수정됨) FC 레이어에 입력하기 전 flatten 필요
        fused_flat = torch.flatten(fused, 1)

        output = self.fc(fused_flat)

        return output


# ####################################################################
# # (가정 2) 사용자가 정의한 전처리 함수 (Placeholder)
# ####################################################################


def single_file_processing(file_path: Path):
    pass


# ####################################################################
# # --- 메인 처리 로직 (베이스라인 + 커스텀 모델 적용) ---
# ####################################################################

if __name__ == "__main__":

    # 1. 추론 환경 경로 설정
    # (학습된 커스텀 모델의 .pth 또는 .pt 파일 경로로 수정)
    model_path = "./model/my_custom_model.pth"
    test_dataset_path = Path("./data")
    output_csv_path = Path("submission.csv")

    # 2. 모델 로드
    try:
        # (num_classes는 학습 시와 동일하게 설정)
        model = CNNClfWithFFT(num_classes=2).to("cuda")

        # (중요) 실제 추론 시에는 학습된 가중치를 로드해야 합니다.
        # 이 코드는 'pass' 요청에 따라 가중치 로드 없이 실행됩니다.
        if os.path.exists(model_path):
            model.load_state_dict(torch.load(model_path))
            print(f"Loaded weights from {model_path}")
        else:
            print(f"Warning: Model path '{model_path}' not found.")
            print("Using initialized weights (random).")

        model.eval()
        print("Custom model (CNNClfWithFFT) created and set to eval mode.")

    except Exception as e:
        print(f"Error loading model: {e}")
        print(
            f"Ensure '{model_path}' exists and 'CNNClfWithFFT' class is correctly defined."
        )
        exit()  # 모델 로드 실패 시 종료

    # 3. CSV 파일 헤더 작성
    with open(output_csv_path, mode="w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["filename", "label"])

    files = [p for p in sorted(test_dataset_path.iterdir()) if p.is_file()]
    print("Test data length:", len(files))

    # 4. CPU 코어 수 설정
    num_workers = min(max(1, multiprocessing.cpu_count() - 1), 8)
    print(f"Using {num_workers} worker processes for preprocessing.")

    # 5. 결과를 저장할 딕셔너리
    results_to_write = {}

    # 6. multiprocessing.Pool을 사용하여 파일 전처리를 병렬로 실행
    with multiprocessing.Pool(processes=num_workers) as pool:
        with tqdm(total=len(files), desc="Preprocessing files") as pbar:

            # 'single_file_processing' 함수를 호출
            for filename, image_obj, error in pool.imap_unordered(
                single_file_processing, files
            ):

                if error:
                    # 'Not Implemented' 에러는 'pass' 상태이므로 콘솔에 출력
                    if "Not Implemented" in error:
                        pbar.set_postfix_str(f"{filename}: Not Implemented")
                    else:
                        print(f"Error processing {filename}: {error}")

                if not image_obj:
                    # 전처리 실패 또는 텐서 미반환 시 레이블 0으로 저장
                    results_to_write[filename] = 0
                else:
                    # 7. 전처리 성공 시 GPU 추론 수행
                    try:
                        # (C, H, W) 텐서 리스트 -> (B, C, H, W) 배치 텐서
                        inputs = torch.stack(image_obj).to("cuda")

                        with torch.no_grad():
                            outputs = model(inputs)  # (B, num_classes)
                            logits = outputs
                            probs = F.softmax(logits, dim=1)
                            avg_probs = probs.mean(dim=0)  # 프레임 간 평균
                            predicted_class = torch.argmax(avg_probs).item()

                            results_to_write[filename] = predicted_class

                    except Exception as e:
                        print(f"Error during inference for {filename}: {e}")
                        results_to_write[filename] = 0

                pbar.update(1)  # 진행률 표시

    # 8. CSV에 결과 기록
    print("\nWriting results to CSV...")
    with open(output_csv_path, mode="a", newline="") as f:
        writer = csv.writer(f)
        for p in files:
            filename = p.name
            label = results_to_write.get(filename, 0)  # 결과 없으면 0
            writer.writerow([filename, label])

    print("Inference completed.")