# 파일 구조/ 네이밍
uploads/
  {index}/
    before_front.jpg
    after_front.jpg
    before_rear.jpg
    after_rear.jpg
    ...
    before_left_side.jpg
    after_left_side.jpg
(한 대당 총 6장 front, rear, left, right, left_side, right_side)

* “기사 개인을 식별하는 게 아니라,
서버가 발급한 job_id 기준으로 사진을 묶고
before/after가 둘 다 있으면
Python AI 모듈(YOLO + severity)을 호출해서
결과 JSON을 반환하면 됩니다.”
* before / after 이미지 경로는 서버 로컬 경로

* 이미지 해상도: 1080p 이상

* 한 view당 before/after 1쌍

# severity 모델 학습 코드

In [16]:
import os, glob, re
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as T
import torchvision.models as models

# ======================
# 설정
# ======================
ROOT = r"C:\Users\woain\Python_AI\Project_Car\car"
LABEL_TRAIN = r"C:\Users\woain\Python_AI\Project_Car\car\labels_train.csv"
IMAGE_SIZE = 224
BATCH_SIZE = 8
EPOCHS = 25
LR = 1e-4
NUM_WORKERS = 0

DIFF_SCALE = 2.0
ALPHA_MAX = 0.6   # max 가중치
ALPHA_MEAN = 0.4  # mean 가중치

VIEWS = ["front","rear","left","right","left_side","right_side"]
EXTS  = [".png",".jpg",".jpeg",".PNG",".JPG",".JPEG"]

device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)

# ======================
# 유틸
# ======================
def carnum(cid: str) -> int:
    m = re.search(r"(\d+)", str(cid))
    if not m:
        raise ValueError(f"car_id에서 숫자를 못 찾음: {cid}")
    return int(m.group(1))

def norm_car_id(cid: str) -> str:
    n = carnum(cid)
    return f"car{n:03d}"

def find_path(phase: str, cid: str, view: str) -> str:
    folder = os.path.join(ROOT, phase)
    for ext in EXTS:
        p = os.path.join(folder, f"{cid}_{view}{ext}")
        if os.path.exists(p):
            return p
    # suffix 대응: car092_left_side_aug3.png 등
    pattern = os.path.join(folder, f"{cid}_{view}*")
    matches = [m for m in glob.glob(pattern) if os.path.splitext(m)[1] in EXTS]
    if matches:
        return matches[0]
    raise FileNotFoundError(f"missing {phase}/{cid}_{view}.*")

# ======================
# Dataset (뷰별 반환)
# ======================
class ViewwiseCarDataset(Dataset):
    def __init__(self, label_csv: str, image_size=224, diff_scale=2.0):
        self.df = pd.read_csv(label_csv).copy()
        self.df["car_id"] = self.df["car_id"].astype(str).apply(norm_car_id)
        self.df["score"] = self.df["score"].astype(float)

        self.car_ids = self.df["car_id"].tolist()
        self.y = self.df["score"].values.astype(np.float32)

        self.diff_scale = float(diff_scale)
        self.tf = T.Compose([T.Resize((image_size, image_size)), T.ToTensor()])

    def __len__(self):
        return len(self.car_ids)

    def __getitem__(self, idx):
        cid = self.car_ids[idx]
        y = torch.tensor(self.y[idx], dtype=torch.float32)

        xs, mask = [], []
        for v in VIEWS:
            try:
                b = self.tf(Image.open(find_path("before", cid, v)).convert("RGB"))
                a = self.tf(Image.open(find_path("after",  cid, v)).convert("RGB"))
                d = (a - b).abs()
                x = torch.cat([b, a, d * self.diff_scale], dim=0)  # [9,H,W]
                xs.append(x); mask.append(1)
            except FileNotFoundError:
                xs.append(torch.zeros(9, IMAGE_SIZE, IMAGE_SIZE, dtype=torch.float32))
                mask.append(0)

        x6 = torch.stack(xs, dim=0)  # [6,9,H,W]
        mask6 = torch.tensor(mask, dtype=torch.bool)  # [6]
        if mask6.sum() == 0:
            raise FileNotFoundError(f"No matched views for {cid}")

        return x6, mask6, y, cid

# ======================
# Model (ResNet18, 9채널)
# ======================
class CNNRegressor9ch(nn.Module):
    def __init__(self):
        super().__init__()
        base = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

        old = base.conv1
        new = nn.Conv2d(9, 64, kernel_size=7, stride=2, padding=3, bias=False)

        with torch.no_grad():
            w = old.weight
            new.weight[:, 0:3] = w
            new.weight[:, 3:6] = w
            new.weight[:, 6:9] = w
            new.weight *= (3/9)

        base.conv1 = new
        base.fc = nn.Linear(base.fc.in_features, 1)
        self.model = base

    def forward(self, x):
        return self.model(x).squeeze(1)  # [N]

# ======================
# 차량 점수 집계: max+mean (mask 포함)
# ======================
def pool_car_score(pred_views, mask6, alpha_max=0.6, alpha_mean=0.4):
    # pred_views: [B,6], mask6: [B,6] bool
    # mean(누락 뷰 제외)
    pred_safe = pred_views.masked_fill(~mask6, 0.0)
    cnt = mask6.sum(dim=1).clamp(min=1)
    pred_mean = pred_safe.sum(dim=1) / cnt

    # max(누락 뷰 제외)
    pred_max = pred_views.masked_fill(~mask6, -1e9).max(dim=1).values

    return alpha_max * pred_max + alpha_mean * pred_mean  # [B]

# ======================
# DataLoader (불균형 보정)
# ======================
dataset = ViewwiseCarDataset(LABEL_TRAIN, image_size=IMAGE_SIZE, diff_scale=DIFF_SCALE)

df_train = pd.read_csv(LABEL_TRAIN)
labels_int = df_train["score"].astype(int).values
counts = np.bincount(labels_int, minlength=5)
class_w = 1.0 / (counts + 1e-6)
sample_w = class_w[labels_int]

sampler = WeightedRandomSampler(sample_w, num_samples=len(sample_w)*4, replacement=True)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=NUM_WORKERS)

print("train size:", len(dataset))
print("score counts:", counts)

# ======================
# Train
# ======================
model = CNNRegressor9ch().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.SmoothL1Loss()

loss_history = []
for epoch in range(EPOCHS):
    model.train()
    total, n = 0.0, 0

    for x6, mask6, y, _cid in loader:
        B = x6.size(0)
        x6 = x6.to(device)
        mask6 = mask6.to(device)
        y = y.to(device)

        x_flat = x6.view(B*6, 9, IMAGE_SIZE, IMAGE_SIZE)
        pred_views = model(x_flat).view(B, 6)

        pred_car = pool_car_score(pred_views, mask6, ALPHA_MAX, ALPHA_MEAN)

        loss = criterion(pred_car, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total += loss.item() * B
        n += B

    avg = total / max(1, n)
    loss_history.append(avg)
    print(f"[Epoch {epoch+1}/{EPOCHS}] loss={avg:.4f}")

# 저장
os.makedirs("car/checkpoints", exist_ok=True)
torch.save(
    {"model": model.state_dict(),
     "loss_history": loss_history,
     "alpha_max": ALPHA_MAX,
     "alpha_mean": ALPHA_MEAN,
     "diff_scale": DIFF_SCALE},
    "car/checkpoints/regressor_view_mixpool.pth"
)
print("Saved -> car/checkpoints/regressor_view_mixpool.pth")

device: cpu
train size: 67
score counts: [16 10 14 16 11]
[Epoch 1/25] loss=0.6382
[Epoch 2/25] loss=0.4094
[Epoch 3/25] loss=0.2754
[Epoch 4/25] loss=0.2058
[Epoch 5/25] loss=0.1884
[Epoch 6/25] loss=0.1151
[Epoch 7/25] loss=0.1311
[Epoch 8/25] loss=0.0918
[Epoch 9/25] loss=0.0751
[Epoch 10/25] loss=0.0578
[Epoch 11/25] loss=0.0354
[Epoch 12/25] loss=0.0351
[Epoch 13/25] loss=0.0272
[Epoch 14/25] loss=0.0340
[Epoch 15/25] loss=0.0271
[Epoch 16/25] loss=0.0268
[Epoch 17/25] loss=0.0511
[Epoch 18/25] loss=0.0287
[Epoch 19/25] loss=0.0196
[Epoch 20/25] loss=0.0247
[Epoch 21/25] loss=0.0183
[Epoch 22/25] loss=0.0148
[Epoch 23/25] loss=0.0176
[Epoch 24/25] loss=0.0168
[Epoch 25/25] loss=0.0127
Saved -> car/checkpoints/regressor_view_mixpool.pth


In [17]:
import os
print("CWD =", os.getcwd())



CWD = C:\Users\woain\Python_AI\Project_Car\pipeline


In [18]:
pip install openpyxl


Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl

   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   ---------------------------------------- 2/2 [openpyxl]

Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
Note: you may need to restart the kernel to use updated packages.


# 단일 index 처리 함수 + 여러 index 배치 처리까지 포함한 "심각도 판정" 
* 저장은 json으로 저장

In [27]:
import os, json, argparse
from typing import Optional, List, Dict
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torchvision.transforms as T
import torchvision.models as models


# ----------------------------
# 설정
# ----------------------------
VIEWS = ["front","rear","left","right","left_side","right_side"]
EXTS  = [".png",".jpg",".jpeg",".PNG",".JPG",".JPEG"]

IMAGE_SIZE = 224
DEFAULT_DEVICE = os.getenv("SEVERITY_DEVICE", "cuda" if torch.cuda.is_available() else "cpu")
DEFAULT_CHECKPOINT = os.getenv("SEVERITY_CHECKPOINT", r"C:\Users\woain\Python_AI\Project_Car\pipeline\car\checkpoints\regressor_view_mixpool.pth")
DEFAULT_UPLOAD_ROOT = os.getenv("UPLOAD_ROOT", "uploads")   # uploads/{index}/before_front.jpg ...
DEFAULT_RESULT_ROOT = os.getenv("RESULT_ROOT", "results")   # results/{index}/severity.json 저장


# ----------------------------
# 유틸: 파일 찾기
# ----------------------------
def find_file(folder: str, name_base: str):
    """folder 안에서 name_base + ext 파일 찾기"""
    for ext in EXTS:
        p = os.path.join(folder, name_base + ext)
        if os.path.exists(p):
            return p
    return None


# ----------------------------
# 모델 (네 코드 그대로)
# ----------------------------
class CNNRegressor9ch(nn.Module):
    def __init__(self):
        super().__init__()
        base = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

        old = base.conv1
        new = nn.Conv2d(9, 64, kernel_size=7, stride=2, padding=3, bias=False)

        with torch.no_grad():
            w = old.weight
            new.weight[:, 0:3] = w
            new.weight[:, 3:6] = w
            new.weight[:, 6:9] = w
            new.weight *= (3/9)

        base.conv1 = new
        base.fc = nn.Linear(base.fc.in_features, 1)
        self.model = base

    def forward(self, x):
        return self.model(x).squeeze(1)  # [N]


def pool_car_score(pred_views, mask6, alpha_max=0.6, alpha_mean=0.4):
    # pred_views: [B,6], mask6: [B,6]
    pred_safe = pred_views.masked_fill(~mask6, 0.0)
    cnt = mask6.sum(dim=1).clamp(min=1)
    pred_mean = pred_safe.sum(dim=1) / cnt
    pred_max = pred_views.masked_fill(~mask6, -1e9).max(dim=1).values
    return alpha_max * pred_max + alpha_mean * pred_mean  # [B]


def suppress_overprediction(pred_views, mask6, pred_mix,
                            max_th=3.4, top2_mean_th=3.0, mix_th=3.2):
    pv = pred_views.masked_fill(~mask6, -1e9)
    pred_max = pv.max(dim=1).values
    top2 = torch.topk(pv, k=2, dim=1).values
    top2_mean = top2.mean(dim=1)

    allow_4 = (pred_max >= max_th) & (top2_mean >= top2_mean_th) & (pred_mix >= mix_th)

    # 4 허용 안되면 3.49로 자르기 (round 하면 최대 3)
    pred_final = torch.where(allow_4, pred_mix, torch.clamp(pred_mix, max=3.49))
    return pred_final


# ----------------------------
# ✅ 단일 index 심각도 판정
# ----------------------------
def infer_severity_for_index(index: str,
                             upload_root: str = DEFAULT_UPLOAD_ROOT,
                             result_root: str = DEFAULT_RESULT_ROOT,
                             checkpoint_path: str = DEFAULT_CHECKPOINT,
                             device: "Optional[str]" = None):
    """
    uploads/{index}/before_front.jpg, after_front.jpg ... 로부터
    심각도(0~4) 산출 후 results/{index}/severity.json 저장
    그리고 drivers_2026.xlsx (Sheet1) 의 A열=번호(index) 행의
    J열(사고점수)에 severity를 자동 반영한다.
    """
    import os, json
    import numpy as np
    import torch
    import torchvision.transforms as T
    from PIL import Image

    folder = os.path.join(upload_root, str(index))
    if not os.path.isdir(folder):
        raise FileNotFoundError(f"Missing upload folder: {folder}")

    if device is None:
        device = DEFAULT_DEVICE

    # 1) 모델 로드
    if not os.path.exists(checkpoint_path):
        raise FileNotFoundError(f"Missing checkpoint: {checkpoint_path}")

    ckpt = torch.load(checkpoint_path, map_location=device)
    alpha_max  = ckpt.get("alpha_max", 0.6)
    alpha_mean = ckpt.get("alpha_mean", 0.4)
    diff_scale = ckpt.get("diff_scale", 2.0)

    model = CNNRegressor9ch().to(device)
    if "model" not in ckpt:
        raise KeyError('Checkpoint에 "model" 키가 없습니다. (ckpt["model"] expected)')
    model.load_state_dict(ckpt["model"])
    model.eval()

    tf = T.Compose([T.Resize((IMAGE_SIZE, IMAGE_SIZE)), T.ToTensor()])

    # 2) view별 입력 구성 (before, after, diff)
    xs = []
    mask = []
    diff_means = []

    for v in VIEWS:
        b_path = find_file(folder, f"before_{v}")
        a_path = find_file(folder, f"after_{v}")

        if (b_path is None) or (a_path is None):
            xs.append(torch.zeros(9, IMAGE_SIZE, IMAGE_SIZE))
            mask.append(False)
            diff_means.append(0.0)
            continue

        b_img = Image.open(b_path).convert("RGB")
        a_img = Image.open(a_path).convert("RGB")

        b = tf(b_img)
        a = tf(a_img)
        d = (a - b).abs()

        x = torch.cat([b, a, d * float(diff_scale)], dim=0)  # [9,H,W]
        xs.append(x)
        mask.append(True)
        diff_means.append(float(d.mean().item()))

    if sum(mask) == 0:
        raise FileNotFoundError(f"No valid before/after view pairs in: {folder}")

    dm = float(np.mean([m for m, ok in zip(diff_means, mask) if ok]))

    x6 = torch.stack(xs, dim=0).unsqueeze(0).to(device)                     # [1,6,9,H,W]
    mask6 = torch.tensor(mask, dtype=torch.bool).unsqueeze(0).to(device)    # [1,6]

    # 3) 예측
    with torch.no_grad():
        x_flat = x6.view(6, 9, IMAGE_SIZE, IMAGE_SIZE)   # [6,9,H,W]
        pred_views = model(x_flat).view(1, 6)            # [1,6]

        # mix + gate
        pred_mix = pool_car_score(pred_views, mask6, alpha_max, alpha_mean)  # [1]
        pred_final = suppress_overprediction(pred_views, mask6, pred_mix,
                                             max_th=3.4, top2_mean_th=3.0, mix_th=3.2)
        pred_final = torch.clamp(pred_final, 0, 4)

        # diff 기반 룰
        ZERO_DIFF_TH = 1e-5
        TINY_DIFF_TH = 5e-5
        LOW_DIFF_TH  = 2e-4

        dm_t = torch.tensor([dm], device=device, dtype=torch.float32)

        pred_final = torch.where(dm_t < ZERO_DIFF_TH, torch.zeros_like(pred_final), pred_final)
        pred_final = torch.where((dm_t >= ZERO_DIFF_TH) & (dm_t < TINY_DIFF_TH),
                                 torch.clamp(pred_final, max=1.0), pred_final)
        pred_final = torch.where((dm_t >= TINY_DIFF_TH) & (dm_t < LOW_DIFF_TH),
                                 torch.clamp(pred_final, max=2.0), pred_final)

        FOUR_MIN_DIFF = 5e-4
        pred_final = torch.where((pred_final >= 3.5) & (dm_t < FOUR_MIN_DIFF),
                                 torch.clamp(pred_final, max=3.49), pred_final)

        THREE_MIN_DIFF = 5e-4
        pred_final = torch.where((pred_final >= 2.5) & (dm_t < THREE_MIN_DIFF),
                                 torch.clamp(pred_final, max=2.49), pred_final)

        pred_round = torch.clamp(torch.round(pred_final), 0, 4).int()

        # 3 → 4 승격 룰
        PROMOTE_4_DIFF_TH = 0.005
        PROMOTE_4_RAW_TH  = 2.8
        pred_round = torch.where(
            (pred_round == 3) & (dm_t >= PROMOTE_4_DIFF_TH) & (pred_final >= PROMOTE_4_RAW_TH),
            torch.tensor([4], device=device, dtype=torch.int32),
            pred_round
        )

    severity = int(pred_round.item())
    raw_score = float(pred_final.item())

    # 4) 결과 저장(JSON)
    out_dir = os.path.join(result_root, str(index))
    os.makedirs(out_dir, exist_ok=True)
    out_json = os.path.join(out_dir, "severity.json")

    result = {
        "index": str(index),
        "severity": severity,
        "raw_score": raw_score,
        "diff_mean": dm,
        "views_used": [v for v, ok in zip(VIEWS, mask) if ok],
        "missing_views": [v for v, ok in zip(VIEWS, mask) if not ok],
    }

    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)

    # 5) ✅ 엑셀 업데이트 (사고점수 J열)
    from excel_updater import update_from_severity_json
    update_from_severity_json(out_json)

    return result

    

    # 4) 결과 저장(JSON)
    os.makedirs(os.path.join(result_root, str(index)), exist_ok=True)
    out_json = os.path.join(result_root, str(index), "severity.json")

    result = {
        "index": str(index),
        "severity": severity,
        "raw_score": raw_score,
        "diff_mean": dm,
        "views_used": [v for v, ok in zip(VIEWS, mask) if ok],
        "missing_views": [v for v, ok in zip(VIEWS, mask) if not ok]
    }

    with open(out_json, "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)

    return result


# ----------------------------
# ✅ 여러 index 배치 처리(선택)
# ----------------------------
def infer_all_ready_indexes(upload_root: str = DEFAULT_UPLOAD_ROOT):
    """
    uploads/ 아래 폴더들을 순회하면서
    before_* 와 after_* 쌍이 하나라도 있으면 infer 수행
    """
    results = []
    for idx in sorted(os.listdir(upload_root)):
        folder = os.path.join(upload_root, idx)
        if not os.path.isdir(folder):
            continue

        has_before = len(glob.glob(os.path.join(folder, "before_*.*"))) > 0
        has_after  = len(glob.glob(os.path.join(folder, "after_*.*"))) > 0
        if not (has_before and has_after):
            continue

        try:
            r = infer_severity_for_index(idx)
            results.append(r)
        except Exception as e:
            print("skip:", idx, "err:", e)
    return results

def main():
    parser = argparse.ArgumentParser(description="Run severity inference for a job index (uploads/{index}/...).")
    parser.add_argument("--index", required=True, help="job_id (folder name under uploads)")
    parser.add_argument("--upload_root", default=DEFAULT_UPLOAD_ROOT)
    parser.add_argument("--result_root", default=DEFAULT_RESULT_ROOT)
    parser.add_argument("--checkpoint", default=DEFAULT_CHECKPOINT)
    parser.add_argument("--device", default=None, help='Optional: "cpu" or "cuda" (default: auto)')
    args = parser.parse_args()

    result = infer_severity_for_index(
        index=args.index,
        upload_root=args.upload_root,
        result_root=args.result_root,
        checkpoint_path=args.checkpoint,
        device=args.device,
    )
    print(json.dumps(result, ensure_ascii=False, indent=2))

#if __name__ == "__main__":
 #   main()

In [36]:
infer_severity_for_index(
    "26_2",
    upload_root=r"C:\Users\woain\Python_AI\Project_Car\uploads"
)




{'index': '26_2',
 'severity': 4,
 'raw_score': 4.0,
 'diff_mean': 0.007066433007518451,
 'views_used': ['front', 'rear', 'left', 'right', 'left_side', 'right_side'],
 'missing_views': []}