In [None]:
# ============================================================
# FixB(12시=0°, CCW) → 12시=0°, CW 의 180° 반전(FixB_180) 파이프라인
# - raw CSV 보존 → 미리보기(as_is/fixB/fixB_180, 동일 샘플) → fixed CSV 생성 → 학습
# - ResNet-18 구조를 직접 레이어 설정하여 통합 (디버깅 용이성)
# ============================================================
import os, math, csv, glob, random, cv2, numpy as np
from PIL import Image

import torch
import torch.nn as nn # ResNet-18 직접 구현을 위해 추가
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
# torchvision.models는 직접 구현한 모델을 사용하므로 주석 처리하거나 제거
# import torchvision.models as tvm 
from datetime import datetime

# ------------------------------------
# 📌 ResNet-18 직접 구현 레이어 정의 (직접레이어소스코드 내용)
# ------------------------------------

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution for downsampling"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class BasicBlock(nn.Module):
    """
    ResNet-18의 기본 블록 (BasicBlock) 구조입니다.
    """
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        
        # 1. Conv1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        
        # 2. Conv2
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        
        # Downsample 경로
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x # 잔차 연결을 위한 입력 값 저장

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        # Downsample 경로가 있다면 실행 (차원 맞추기)
        if self.downsample is not None:
            identity = self.downsample(x)

        # 잔차 연결 (Residual Connection)
        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    """
    ResNet-18 구조입니다.
    """
    def __init__(self, block, layers, num_classes=2):
        super(ResNet, self).__init__()
        self.inplanes = 64
        
        # 1. 초기 Convolution, BN, ReLU, MaxPool
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # 2. ResNet 스테이지 (layer1 ~ layer4)
        self.layer1 = self._make_layer(block, 64, layers[0]) 
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        
        # 3. 최종 분류 레이어: num_classes=2 (sin, cos)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # ResNet-18의 최종 출력 채널은 512 * BasicBlock.expansion = 512입니다.
        self.fc = nn.Linear(512 * block.expansion, num_classes) 

        # Weight Initialization (he_normal과 유사한 Kaiming Initialization)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        # 1. 초기 레이어
        x = self.conv1(x)
        # 📌 디버깅 포인트: conv1 출력 크기 확인
        # print("After conv1:", x.shape) 
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        # 📌 디버깅 포인트: maxpool 출력 크기 확인
        # print("After maxpool:", x.shape) 

        # 2. 스테이지
        x = self.layer1(x)
        # print("After layer1:", x.shape)
        x = self.layer2(x)
        # print("After layer2:", x.shape)
        x = self.layer3(x)
        # print("After layer3:", x.shape)
        x = self.layer4(x)
        # print("After layer4:", x.shape)

        # 3. 최종 분류
        x = self.avgpool(x)
        # print("After avgpool:", x.shape)
        x = torch.flatten(x, 1)
        # print("After flatten:", x.shape)
        x = self.fc(x)

        return x

def resnet18_custom(num_classes=2):
    """
    직접 구현한 ResNet-18 (BasicBlock, [2, 2, 2, 2] 블록 구성)을 생성합니다.
    """
    # ResNet-18은 [2, 2, 2, 2] 블록 구성을 사용합니다.
    model = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes)
    return model

# ------------------------------------
# 📌 수정할 소스코드
# ------------------------------------
# (생략된 상위 import, 경로 설정, CSV 정리/변환 함수는 그대로 유지)
# ...
# -----------------
# Dataset / Model
# -----------------
class DialDataset(Dataset):
# (DialDataset 클래스 내용은 그대로 유지)
# ...
    def __init__(self, csv_path, img_size=224, augment=True):
        self.items=[]
        with open(csv_path,"r",encoding="utf-8") as f:
            r=csv.reader(f); next(r)
            for fp, th, s, c in r:
                if os.path.isfile(fp):
                    self.items.append((fp,float(th),float(s),float(c)))
        if not self.items:
            raise RuntimeError(f"유효 항목 0: {csv_path}")
        augs=[]
        if augment: augs += [T.ColorJitter(brightness=0.2, contrast=0.2)]
        self.tfm=T.Compose(augs+[
            T.Resize((img_size,img_size)),
            T.ToTensor(),
            T.Lambda(lambda x: x.expand(3,-1,-1)),
            T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
        ])
    def __len__(self): return len(self.items)
    def __getitem__(self,i):
        fp, th, s, c=self.items[i]
        # x=self.tfm(Image.open(fp).convert("L"))
        # 그레이스케일로 로드 후 3채널로 확장하는 로직이 T.Lambda에 있음
        x=self.tfm(Image.open(fp).convert("L"))
        y=torch.tensor([s,c],dtype=torch.float32)
        return x,y


class AngleHead(nn.Module): 
    """
    직접 구현한 ResNet-18을 Backbone으로 사용하도록 수정
    """
    def __init__(self, num_classes=2): # num_classes 인자 추가 (기존 ResNet 클래스에 맞추기 위해)
        super().__init__()
        # 📌 수정 사항: torchvision.models.resnet18 대신 직접 구현한 resnet18_custom 사용
        self.backbone = resnet18_custom(num_classes=num_classes) 
        
        # Note: resnet18_custom 내부에서 이미 self.fc = nn.Linear(512, num_classes)로 
        # 최종 레이어가 설정되었으므로, 별도로 fc 레이어를 교체할 필요가 없습니다.
        # 기존: self.backbone = tvm.resnet18(weights=None)
        # 기존: self.backbone.fc = nn.Linear(self.backbone.fc.in_features, 2)
        
    def forward(self,x):
        y=self.backbone(x)
        # 📌 중요: 기존 코드의 출력 정규화 로직 유지
        return y/(y.norm(dim=1,keepdim=True)+1e-8)

def angle_mae_deg(p,t):
    dot=(p*t).sum(dim=1).clamp(-1,1)
    ang=torch.acos(dot)*180.0/math.pi
    return ang.mean().item()

# -----------------
# 학습 (FIXED CSV 사용) - MSE 추적 추가
# -----------------
def train_r18(csv_tr=CSV_TRAIN_FIXED, csv_va=CSV_TEST_FIXED,
              img_size=224, epochs=30, batch=32, lr=3e-4):
    global timestamp
    # ... (생략된 학습 코드 내용은 그대로 유지)
    from torch.utils.tensorboard import SummaryWriter

    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
    ds_tr=DialDataset(csv_tr, img_size, augment=True)
    try:
        ds_va=DialDataset(csv_va, img_size, augment=False)
    except:
        print("⚠️ val CSV 없음 → train 15%를 val로 사용")
        n=len(ds_tr); k=max(1,int(n*0.15))
        ds_va=torch.utils.data.Subset(ds_tr, list(range(k)))
        ds_tr=torch.utils.data.Subset(ds_tr, list(range(k,n)))

    dl_tr=DataLoader(ds_tr, batch_size=batch, shuffle=True,  num_workers=0, pin_memory=True)
    dl_va=DataLoader(ds_va, batch_size=batch, shuffle=False, num_workers=0, pin_memory=True)

    LOG_DIR = os.path.join(ROOT, "runs", f"r18_fixb180_{timestamp}")
    writer = SummaryWriter(LOG_DIR)
    print(f"📊 TensorBoard 로그 디렉토리: {LOG_DIR}")

    # 📌 AngleHead 생성 시 num_classes=2 명시 (기본값)
    model=AngleHead(num_classes=2).to(device) 
    
    # 📌 디버깅 포인트: 모델 구조 확인 (직접 구현한 ResNet 레이어 확인)
    print("\n--- Custom ResNet-18 (AngleHead) Model Structure ---")
    print(model)
    print("--------------------------------------------------\n")
    
    # 📌 디버깅 포인트: 더미 입력으로 한 번 통과시켜 각 레이어 크기 확인
    try:
        dummy_input = torch.randn(batch, 3, img_size, img_size).to(device)
        dummy_output = model(dummy_input)
        print(f"✅ 모델 테스트 통과: 입력 {dummy_input.shape} → 출력 {dummy_output.shape}")
    except Exception as e:
        print(f"❌ 모델 테스트 실패: {e}")
        # 여기에 BasicBlock이나 ResNet.forward() 내부에 print(x.shape)를 넣어 디버깅 가능

    opt=torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    # # 기존: cos=nn.CosineEmbeddingLoss(); mse=nn.MSELoss()
    # # 수정: (reduction='mean'으로 명시)
    cos=nn.CosineEmbeddingLoss(reduction='mean')
    mse=nn.MSELoss(reduction='mean')
    sch=torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs)

    best=1e9
    best_ckpt_path = ""

    for ep in range(1,epochs+1):
        # ... (이후 학습 및 검증 루프 코드는 그대로 유지)
        # ========== Training ==========
        model.train()
        tr_total = 0.0
        tr_cos_sum = 0.0
        tr_mse_sum = 0.0
        
        for x,y in dl_tr:
            x,y=x.to(device), y.to(device)
            p=model(x)
            tgt=torch.ones(p.size(0), device=device)
            
            loss_cos = cos(p,y,tgt)
            loss_mse = mse(p,y)
            loss = loss_cos + 0.1*loss_mse
            
            opt.zero_grad(); loss.backward(); opt.step()
            
            tr_total += loss.item()*x.size(0)
            tr_cos_sum += loss_cos.item()*x.size(0)
            tr_mse_sum += loss_mse.item()*x.size(0)
        
        tr_total /= len(dl_tr.dataset)
        tr_cos = tr_cos_sum / len(dl_tr.dataset)
        tr_mse = tr_mse_sum / len(dl_tr.dataset)
        sch.step()

        # ========== Validation ==========
        model.eval()
        va_total = 0.0
        va_cos_sum = 0.0
        va_mse_sum = 0.0
        mae = 0.0
        n = 0
        
        with torch.no_grad():
            for x,y in dl_va:
                x,y=x.to(device), y.to(device)
                p=model(x)
                tgt=torch.ones(p.size(0), device=device)
                
                loss_cos = cos(p,y,tgt)
                loss_mse = mse(p,y)
                loss = loss_cos + 0.1*loss_mse
                
                va_total += loss.item()*x.size(0)
                va_cos_sum += loss_cos.item()*x.size(0)
                va_mse_sum += loss_mse.item()*x.size(0)
                mae += angle_mae_deg(p,y)*x.size(0)
                n += x.size(0)
        
        va_total /= max(1,n)
        va_cos = va_cos_sum / max(1,n)
        va_mse = va_mse_sum / max(1,n)
        mae /= max(1,n)
        
        # ========== Console Output ==========
        print(f"[{ep:02d}] train {tr_total:.4f} (cos:{tr_cos:.4f}, mse:{tr_mse:.4f}) | "
              f"val {va_total:.4f} (cos:{va_cos:.4f}, mse:{va_mse:.4f}) | val-MAE {mae:.2f}°")

        # ========== TensorBoard Logging ==========
        writer.add_scalar('Loss/Train_Total', tr_total, ep)
        writer.add_scalar('Loss/Train_Cosine', tr_cos, ep)
        writer.add_scalar('Loss/Train_MSE', tr_mse, ep)
        
        writer.add_scalar('Loss/Validation_Total', va_total, ep)
        writer.add_scalar('Loss/Validation_Cosine', va_cos, ep)
        writer.add_scalar('Loss/Validation_MSE', va_mse, ep)
        
        writer.add_scalar('Metrics/Validation_MAE_Deg', mae, ep)
        writer.add_scalar('Learning_Rate', opt.param_groups[0]['lr'], ep)

        timestamp_save = datetime.now().strftime("%Y%m%d_%H%M%S")
        if mae<best:
            best = mae
            actual_filename = f"notimganet_best-{timestamp_save}-mae{mae:.2f}.pth"
            save_path = os.path.join(CKPT_DIR, actual_filename)
            torch.save(model.state_dict(), save_path)
            best_ckpt_path = save_path

    writer.close()
    print(f"✅ best val-MAE = {best:.2f}°  → {best_ckpt_path}")
    return best_ckpt_path

best_path = train_r18(epochs=30, batch=32, lr=3e-4)
print("BEST:", best_path)

# -----------------
# 추론 예시(오버레이) – FIXED 기준
# -----------------
# (이후 추론 코드는 AngleHead 클래스가 수정되었으므로 그대로 사용 가능)
# ...


# -----------------
# 추론 예시(오버레이) – FIXED 기준
# -----------------
def load_for_infer(ckpt):
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model=AngleHead().to(device)
    sd=torch.load(ckpt, map_location=device)
    model.load_state_dict(sd, strict=True); model.eval()
    tfm=T.Compose([
        T.Resize((224,224)),
        T.ToTensor(),
        T.Lambda(lambda x: x.expand(3,-1,-1)),
        T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])
    return model, tfm, device

def infer_one(model, tfm, device, img_path, theta_zero_rad=0.0, ticks_per_rev=100, mm_per_rev=1.0):
    x=tfm(Image.open(img_path).convert("L")).unsqueeze(0).to(device)
    with torch.no_grad():
        y=model(x)[0].cpu().numpy()
    sinp,cosp=float(y[0]), float(y[1])
    theta=(math.atan2(sinp,cosp))%(2*math.pi)
    delta=(theta-theta_zero_rad)%(2*math.pi)
    ticks=delta/(2*math.pi)*ticks_per_rev
    mm   =delta/(2*math.pi)*mm_per_rev
    return theta, ticks, mm

def save_overlay(img_path, theta_deg, value_mm, out_path):
    g=cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    h,w=g.shape[:2]; cx,cy=w//2,h//2; R=int(min(h,w)*0.45)
    th=math.radians(theta_deg)
    x2=int(round(cx+math.cos(th-math.pi/2)*R))
    y2=int(round(cy+math.sin(th-math.pi/2)*R))
    vis=cv2.cvtColor(g,cv2.COLOR_GRAY2BGR)
    cv2.arrowedLine(vis,(cx,cy),(x2,y2),(0,255,0),2,cv2.LINE_AA,tipLength=0.06)
    cv2.putText(vis,f"{value_mm:.3f} mm",(10,30),
                cv2.FONT_HERSHEY_SIMPLEX,0.9,(0,255,0),2,cv2.LINE_AA)
    cv2.imwrite(out_path,vis)

pngs=glob.glob(os.path.join(TEST_DIR,"*.png")) or glob.glob(os.path.join(TRAIN_DIR,"*.png"))
if pngs:
    model,tfm,device=load_for_infer(best_path)
    sample=pngs[0]
    theta_zero_deg=0.0
    theta,ticks,mm=infer_one(model,tfm,device,sample,
                              theta_zero_rad=math.radians(theta_zero_deg),
                              ticks_per_rev=100, mm_per_rev=1.0)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    out_prev=os.path.join(DATASET_DIR,f"_infer_notimganet_preview({timestamp}).jpg")
    save_overlay(sample, theta*180/math.pi, mm, out_prev)
    print(f"🔎 예측: θ={theta*180/math.pi:.2f}°, value={mm:.3f} mm")
    print("🖼  저장:", out_prev)
else:
    print("⚠️ 샘플 이미지 없음")

    

