In [None]:
!pip install ultralytics
!pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118 -q || true
!pip install streamlit pyngrok pillow opencv-python-headless matplotlib -q

Collecting ultralytics
  Downloading ultralytics-8.3.223-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.223-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m42.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.223 ultralytics-thop-2.0.18
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/10.2 MB[0m [31m98.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m50.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
model_path = "/content/11n_150e_best.pt"
print("모델 경로:", model_path)

모델 경로: /content/11n_150e_best.pt


## best.pt 파일의 경로를 설정해주세요.

In [None]:
!pip install "git+https://github.com/facebookresearch/segment-anything-2.git" tqdm
from sam2.sam2_image_predictor import SAM2ImagePredictor

Collecting git+https://github.com/facebookresearch/segment-anything-2.git
  Cloning https://github.com/facebookresearch/segment-anything-2.git to /tmp/pip-req-build-6gr0qjtv
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/segment-anything-2.git /tmp/pip-req-build-6gr0qjtv
  Resolved https://github.com/facebookresearch/segment-anything-2.git to commit 2b90b9f5ceec907a1c18123530e92e794ad901a4


In [None]:
%%bash
cat > app.py <<'PY'
# app.py — YOLOv11n-seg + 클래스별 마스크 멀티체크 + Gemini 2.5 Flash-Lite VLM
import os, io, json, math
import streamlit as st
from PIL import Image
import numpy as np
import cv2
import pandas as pd
import torch

# pip install google-genai
# pip install python-dotenv

# ===================== YOLO 부분 =====================
@st.cache_resource
def load_model():
    from ultralytics import YOLO
    model_path = "/content/11n_150e_best.pt"  # YOLOv11n-seg 가중치
    model = YOLO(model_path)
    return model

def resize_for_model(img, short_side=640):
    w, h = img.size
    if min(w, h) == short_side:
        return img
    if w < h:
        new_w = short_side
        new_h = int(h * (short_side / w))
    else:
        new_h = short_side
        new_w = int(w * (short_side / h))
    return img.resize((new_w, new_h), Image.BILINEAR)

def mask_to_rgba(mask_arr, color=(255, 0, 0), alpha=128):
    h, w = mask_arr.shape
    rgba = np.zeros((h, w, 4), dtype=np.uint8)
    rgba[..., 0:3] = color
    rgba[..., 3] = (mask_arr * alpha).astype(np.uint8)
    return Image.fromarray(rgba, "RGBA")

def compute_geometry(mask, gsd):
    """mask: 0/1, gsd[m/pixel] -> 면적/길이/너비/페렛지수[m]"""
    res = {}
    pixel_count = np.sum(mask)
    res["area_m2"] = float(pixel_count * (gsd ** 2))
    mask_u8 = (mask * 255).astype(np.uint8)
    contours, _ = cv2.findContours(mask_u8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(contours) == 0:
        res.update({"length_m": 0.0, "width_m": 0.0, "feret_m": 0.0})
        return res
    cnt = max(contours, key=cv2.contourArea)
    ((cx, cy), (w, h), _) = cv2.minAreaRect(cnt)
    res["length_m"] = float(max(w, h) * gsd)
    res["width_m"]  = float(min(w, h) * gsd)
    # 페렛 지수(최대 거리)
    pts = cnt.reshape(-1, 2)
    max_d2 = 0.0
    # 간단 다운샘플링
    step = max(1, len(pts)//2000)
    P = pts[::step]
    for i in range(0, len(P), 256):
        block = P[i:i+256]
        d2 = np.sum((block[:, None, :] - P[None, :, :]) ** 2, axis=2)
        max_d2 = max(max_d2, float(d2.max()))
    res["feret_m"] = float(math.sqrt(max_d2) * gsd)
    return res

def overlay_masks_on_image(pil_img, masks_list, colors=None):
    base = pil_img.convert("RGBA")
    out = Image.new("RGBA", base.size)
    if colors is None:
        colors = [
            (255, 0, 0), (0, 255, 0), (0, 0, 255),
            (255, 255, 0), (255, 0, 255), (0, 255, 255),
            (255, 128, 0), (128, 0, 255)
        ]
    for i, m in enumerate(masks_list):
        c = colors[i % len(colors)]
        mask_rgba = mask_to_rgba(m.astype(np.uint8), color=c, alpha=120)
        out = Image.alpha_composite(out, mask_rgba)
    combined = Image.alpha_composite(base, out)
    return combined.convert("RGB")

# ===================== YOLO + SAM2 추론 =====================
from sam2 import SAM2ImagePredictor  # SAM2 라이브러리 임포트 필요

@st.cache_resource
def load_sam2_predictor(model_name="facebook/sam2-hiera-tiny", device="cuda"):
    predictor = SAM2ImagePredictor.from_pretrained(model_name)
    predictor.model.to(device)
    predictor.model.eval()
    return predictor

def run_inference(model, pil_img, conf=0.25, imgsz=640, sam2_model_name="facebook/sam2-hiera-tiny"):
    """
    YOLOv11n-seg + SAM2 기반 마스크 생성
    - pil_img: 업로드된 원본 이미지 (PIL)
    - conf: confidence threshold
    - imgsz: 입력 이미지 크기
    - sam2_model_name: 사용할 SAM2 모델명
    """
    import torch
    import numpy as np
    import cv2

    # 디바이스 설정
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 1️⃣ 이미지 변환
    np_img = np.array(pil_img)
    h, w = np_img.shape[:2]

    # 2️⃣ YOLO 추론
    results = model.predict(source=np_img, conf=conf, imgsz=imgsz, device=device, verbose=False)
    r = results[0]

    boxes = r.boxes.xyxy.cpu().numpy() if hasattr(r, "boxes") and r.boxes is not None else []
    labels = r.boxes.cls.cpu().numpy().astype(int).tolist() if hasattr(r, "boxes") and r.boxes is not None else []
    scores = r.boxes.conf.cpu().numpy().tolist() if hasattr(r, "boxes") and r.boxes is not None else []

    if len(boxes) == 0:
        return [], [], [], {}

    # 3️⃣ SAM2 Predictor 로드 (캐시 활용)
    predictor = load_sam2_predictor(model_name=sam2_model_name, device=device)

    # 4️⃣ 이미지 세팅 및 마스크 생성
    predictor.set_image(np_img)
    masks = []
    for box in boxes:
        masks_pred, _, _ = predictor.predict(box=box, multimask_output=False)
        mask_bin = masks_pred[0].astype(np.uint8)
        if mask_bin.shape != (h, w):
            mask_bin = cv2.resize(mask_bin, (w, h), interpolation=cv2.INTER_NEAREST)
        masks.append(mask_bin)

    # YOLO 클래스 이름 매핑
    names = r.names if hasattr(r, "names") else {i: f"class_{i}" for i in range(len(masks))}

    return masks, labels, scores, names

# ===================== Gemini VLM 부분 =====================

from google import genai
from google.genai import types

import os
from dotenv import load_dotenv
import streamlit as st


def get_api_key():
    # .env 파일 로드 (파일명은 실제 파일 이름으로 변경)
    load_dotenv("/content/googleapi_KE.env")

    # 환경변수에서 API 키 가져오기
    api_key = os.getenv("GOOGLE_API_KEY", "")

    if not api_key:
        st.warning("API 키를 찾을 수 없습니다. googleapi_KE.env 파일을 확인하세요.")
    return api_key


def ndarray_to_jpeg_bytes(pil_img: Image.Image) -> bytes:
    buf = io.BytesIO()
    pil_img.save(buf, format="JPEG", quality=90)
    return buf.getvalue()

EXPERT_PROMPT_TMPL = """[역할]
당신은 토목구조(교량) 외관 손상평가 전문가이자 품질보증 엔지니어입니다.
불확실한 정보는 추정하지 말고 "정보 부족"으로 표기합니다.

[목표]
입력된 (1) 세그멘테이션 수치 JSON, (2) 원본 이미지, (3) 마스크 오버레이,
(4) 사용자 입력 교량 정보(종류/등급)를 바탕으로
간결하고 근거 중심의 전문가 보고 요약을 작성합니다.

[데이터 해석 규칙]
- 수치(JSON) 우선 신뢰. 이미지와 모순 시 모순 사실을 [모순] 태그로 명시.
- 위험도 평가 시 기준(예: 손상 길이/면적 상대적 크기, 노출 위험, 누수·부식 동반 여부)을 근거로 제시.
- 단정 금지: 불확실/추정은 [불확실]로 표기.
- 허위 지식 금지: 출처 없는 규정·코드 인용 금지.

[출력 스타일]
- 한국어, 전문가 톤, 불필요한 수식어 금지.
- 섹션: ①요약(3~5문장) ②세부분석 ③권고사항
- ③권고사항은 세부분석 내용을 기반으로 작성. 구체적 행동지침(예: 추가조사, 보수방법) 포함.
- 수치는 가능한 한 입력값을 재사용(단위 표기 일관성).
- 문장/목록 혼용 허용. 과장 표현 금지.

[입력]
- 세그 JSON: {seg_json}
- 메타(사용자 입력): 교량종류={bridge_type}, 등급={bridge_grade}
- 원본 이미지 및 오버레이 이미지는 첨부됨.

[출력 예시 1  균열 및 백태 동반 사례]
① 요약
PSC Beam교(B등급) 복부 하단부에서 종방향 균열(길이 0.13 m, 폭 2 mm) 및 백태가 동반된 손상이 검출됨.
균열은 긴장력 손실 또는 피복층 건조수축의 영향으로 추정되며, 구조적 영향은 경미하나 수분 침투 위험이 존재함.
백태는 균열부로 유입된 수분의 염분 결정화 현상으로 판단됨.
철근 노출은 확인되지 않았으며, 장기적으로 피복층 열화 확산 가능성 있음.

② 세부분석
- **균열(Crack)**: 길이 0.13 m, 폭 0.002 m, 면적 0.00042 m². 종방향으로 뚜렷하게 발달하였으며 긴장력 전달 구간의 인장응력 집중이 원인으로 추정됨. 백태가 균열선을 따라 형성되어 있어, 균열 내부의 수분 이동이 지속되고 있음을 시사함.
- **표면 변화(백태)**: 밝은 흰색 침전물이 균열부를 따라 분포, 이는 모세관 작용에 의한 내부 수분 증발 후 염 결정화로 판단됨.오버레이 상 일부 경계 불명확 구간은 [모순].
- **구조적 영향**: 철근 근접 깊이는 정보 부족, 피복 손상은 초기 단계로 평가됨.

③ 권고사항
- 균열 폭 및 깊이 실측 후, 폭 0.3 mm 이상 시 폴리우레탄계 충전재 주입 실시.
- 표면 백태 부위는 고압수 세척 후 실리콘계 방수제 도포.
- 추적 관찰을 위해 6개월 주기 균열계측기 부착 권장.
- 부식 징후 발생 시 피복 복원 전 방청도료 도포 절차 병행.

---

[출력 예시 2  박리 및 철근 부식 의심 사례]
① 요약
RC 슬래브교(C등급) 저면부에서 콘크리트 박리(면적 0.0045 m²)와 내부 철근 변색이 확인됨.
박리는 피복층 박락으로 추정되며, 하부 누수 흔적이 함께 관찰되어 염화물 침투 가능성 있음.
세그멘테이션 결과 면적은 국부적이지만, 철근 노출 여부는 [불확실].
현재 상태는 국부 부식 개시 단계로 판단됨.

② 세부분석
- **박리(Spalling)**: 면적 0.0045 m², 폭 0.03 m, 신뢰도 0.83. 경계 불규칙하며 하중 재하 구간 인근에 위치.
  균열이 박리부 주변에서 방사형으로 확산되어 있어, **동결융해 반복 또는 염화물 축적**이 주요 원인으로 판단됨.
- **부식(Corrosion)**: 오버레이 이미지에서 녹색~적갈색 변색 영역이 확인되어, 철근 산화 생성물로 추정됨. 실제 피복 두께 및 철근 직경 정보 부족으로 정확한 진행 정도는 [불확실].
- **환경조건**: 하부 배수 미비 및 장기 누수 흔적 관찰, 콘크리트 내 염분 농도 상승 가능성 있음.

③ 권고사항
- 박리부 콘크리트 제거 후, 노출 철근의 부식 정도 육안확인 및 방청도료 도포 실시.
- 보수는 폴리머 시멘트 모르타르 충전 방식 적용, 최소 피복두께 25 mm 확보.
- 누수 원인 구간 배수로 재정비 및 표면 방수층 보강 병행.
- 장기적 내구성 확보를 위해 1년 내 정기점검 포함 정밀안전진단 병행 권장.
"""

def call_gemini_vlm(img_rgb: Image.Image,
                    overlay_rgb: Image.Image,
                    seg_summary: dict,
                    bridge_type: str,
                    bridge_grade: str,
                    mode: str = "expert") -> str:
    """
    mode: 'expert' -> 전문가 보고서 톤 / 'light' -> 짧은 요점만
    """
    api_key = get_api_key()
    if not api_key:
        raise RuntimeError("GOOGLE_API_KEY 가 없습니다.")

    client = genai.Client(api_key=api_key)
    MODEL = "gemini-2.5-flash-lite"

    if mode == "expert":
        prompt = EXPERT_PROMPT_TMPL.format(
            seg_json=json.dumps(seg_summary, ensure_ascii=False),
            bridge_type=bridge_type,
            bridge_grade=bridge_grade
        )
    else:
        prompt = (
            "아래 자료(세그 JSON + 원본 + 오버레이)를 바탕으로 핵심만 5줄 이내 bullet로 요약.\n"
            f"세그 JSON: {json.dumps(seg_summary, ensure_ascii=False)}\n"
            f"교량종류={bridge_type}, 등급={bridge_grade}\n"
        )

    img_bytes = ndarray_to_jpeg_bytes(img_rgb)
    ovl_bytes = ndarray_to_jpeg_bytes(overlay_rgb)

    contents = types.Content(
        role="user",
        parts=[
            types.Part(text=prompt),
            types.Part(text="원본 이미지:"),
            types.Part(inline_data=types.Blob(mime_type="image/jpeg", data=img_bytes)),
            types.Part(text="마스크 오버레이 이미지:"),
            types.Part(inline_data=types.Blob(mime_type="image/jpeg", data=ovl_bytes)),
        ]
    )

    resp = client.models.generate_content(
        model=MODEL,
        contents=contents,
        config=types.GenerateContentConfig(
            thinking_config=types.ThinkingConfig(thinking_budget=-1),  # 동적 추론
            max_output_tokens=5000 if mode=="expert" else 1500,
            temperature=0.2
        )
    )
    return resp.text

# ===================== Streamlit UI =====================
st.set_page_config(page_title="YOLOv11n-seg + Gemini VLM", layout="wide")
st.title("YOLOv11n-seg → Metrics → Gemini 2.5 Flash-Lite")

st.info("모델 로드 중…")
model_obj = load_model()
st.success("모델 로드 완료! (best.pt)")

# Sidebar: 추론 설정
conf_th = st.sidebar.slider("Confidence threshold", 0.0, 1.0, 0.25, 0.01)
imgsz = st.sidebar.number_input("이미지 short side", value=640, step=32)
show_raw = st.sidebar.checkbox("원본 이미지 보기", True)

# Sidebar: 교량 정보
st.sidebar.title("교량 정보 입력(테스트 환경)")
bridge_grade = st.sidebar.selectbox("교량 등급:", ["1등급", "2등급", "3등급", "미상"])

bridge_type_options = {
    "RG (RCT 거더교)": "RG", "BG (RC 박스 거더교)": "BG", "IG (PSCI I 거더교)": "IG",
    "PG (PSC 박스거더교)": "PG", "SP (강플레이트 거더교)": "SP", "SB (강박스 거더교)": "SB",
    "PF (프리플랙스교)": "PF", "RS (RC 슬래브교)": "RS", "PS (PC 슬래브교)": "PS",
    "VS (PC 중공슬래브교)": "VS", "CR (콘크리트라멘교)": "CR", "SR (강라멘교)": "SR",
    "CA (콘크리트아치교)": "CA", "SA (강아치교)": "SA", "TR (트러스교)": "TR",
    "CS (사장교)": "CS", "UN (미상)": "UN"
}
bridge_type_label = st.sidebar.selectbox("교량 종류:", list(bridge_type_options.keys()))
bridge_type = bridge_type_options[bridge_type_label]

st.sidebar.markdown("---")
gsd = st.sidebar.number_input("GSD [m/pixel]", min_value=0.0, value=0.005, step=0.001, format="%.6f")
st.sidebar.caption("예: 1 픽셀 = 0.5 cm → 0.005 m")

# 이미지 업로드
uploaded = st.file_uploader("이미지 업로드", type=["jpg", "jpeg", "png"])
if uploaded is not None:
    img = Image.open(uploaded).convert("RGB")
    if show_raw:
        st.image(img, caption="원본 이미지", use_container_width=True)

    resized = resize_for_model(img, short_side=int(imgsz))
    masks, labels, scores, names = run_inference(model_obj, resized, conf=conf_th, imgsz=int(imgsz))

    if len(masks) == 0:
        st.warning("감지된 마스크가 없습니다.")
        st.stop()

    # 마스크들을 원본 크기로 되돌림
    final_masks = []
    for m in masks:
        mask_img = Image.fromarray((m * 255).astype(np.uint8))
        mask_orig = mask_img.resize(img.size, resample=Image.NEAREST)
        final_masks.append(np.array(mask_orig) // 255)

    # 클래스 이름 매핑
    class_names = names if isinstance(names, dict) else {i: n for i, n in enumerate(names)}

    # 사이드바: 표시할 클래스 선택
    st.sidebar.markdown("### 표시할 결함 클래스 선택")
    # 가중치의 클래스 이름을 그대로 체크박스에 사용
    unique_names = sorted(set(class_names.values()))
    class_check = {cname: st.sidebar.checkbox(cname, value=True) for cname in unique_names}
    selected_classes = [k for k, v in class_check.items() if v]

    # 선택 클래스 필터링
    filtered_masks, filtered_labels, filtered_scores = [], [], []
    # 라벨/스코어 길이 체크
    if len(labels) != len(final_masks):
        # 박스가 없거나 길이 불일치 시 Unknown 처리
        labels = [ -1 for _ in range(len(final_masks)) ]
        filtered_labels_tmp = []

    for m, l, s in zip(final_masks, labels, scores if len(scores)==len(final_masks) else [0.0]*len(final_masks)):
        cname = class_names.get(l, "Unknown")
        if cname in selected_classes:
            filtered_masks.append(m)
            filtered_labels.append(l)
            filtered_scores.append(float(s))

    # 오버레이 생성/표시
    if len(filtered_masks) > 0:
        overlay = overlay_masks_on_image(img, filtered_masks)
        st.image(overlay, caption=f"표시 중: {', '.join(selected_classes)}", use_container_width=True)
    else:
        st.image(img, caption="선택된 클래스가 없습니다.", use_container_width=True)

    # 표 생성
    data = []
    for i, m in enumerate(filtered_masks):
        geom = compute_geometry(m, gsd)
        cname = class_names.get(filtered_labels[i], "Unknown")
        row = {
            "index": i,
            "class": cname,
            "score": filtered_scores[i] if i < len(filtered_scores) else 0.0,
            "area(m^2)": geom["area_m2"],
            "length(m)": geom["length_m"],
            "width(m)": geom["width_m"],
            "feret(m)": geom["feret_m"]
        }
        data.append(row)
    df = pd.DataFrame(data)
    st.dataframe(df, use_container_width=True)

    # 다운로드
    st.download_button("결과 CSV 다운로드", data=df.to_csv(index=False).encode("utf-8"), file_name="result.csv")
    if len(filtered_masks) > 0:
        buf = io.BytesIO()
        overlay.save(buf, format="PNG")
        st.download_button("결과 이미지 다운로드", data=buf.getvalue(), file_name="seg_result.png", mime="image/png")

    # ===================== Gemini 호출 준비 =====================
    # seg_summary(파이프라인 JSON) 구성
    bridge_id = os.path.splitext(uploaded.name)[0]
    defects = []
    for i, m in enumerate(filtered_masks):
        geom = compute_geometry(m, gsd)
        cname = class_names.get(filtered_labels[i], "Unknown")
        defects.append({
            "class": cname,
            "area_m2": geom["area_m2"],
            "length_m": geom["length_m"],
            "width_m":  geom["width_m"],
            "feret_m":  geom["feret_m"],
            "score": float(filtered_scores[i]) if i < len(filtered_scores) else 0.0
        })
    seg_summary = {
        "bridge_id": bridge_id,
        "bridge_type_input": bridge_type,   # 사용자 입력
        "bridge_grade_input": bridge_grade, # 사용자 입력
        "gsd_m_per_pixel": gsd,
        "defects": defects
    }

    # ===================== Gemini 버튼/출력 =====================
    st.markdown("---")
    st.subheader("Gemini 2.5 Flash-Lite 판단")
    colA, colB = st.columns(2)
    with colA:
        do_expert = st.button("전문가 보고서(요약/세부/권고)")
    with colB:
        do_light = st.button("짧은 결과(불릿 5줄 이내)")

    if do_expert or do_light:
        try:
            # 원본/오버레이 PIL → RGB 보장
            img_rgb = img.convert("RGB")
            ovl_rgb = overlay if len(filtered_masks)>0 else img_rgb

            mode = "expert" if do_expert else "light"
            text = call_gemini_vlm(
                img_rgb=img_rgb,
                overlay_rgb=ovl_rgb,
                seg_summary=seg_summary,
                bridge_type=bridge_type,
                bridge_grade=bridge_grade,
                mode=mode
            )
            st.markdown("### Gemini 결과")
            st.write(text)
        except Exception as e:
            st.error(f"Gemini 호출 실패: {e}")

PY

# Line 19) best.pt 파일의 경로를 입력해주세요.
# Line 155) Google API key의 경로를 입력해주세요.

In [None]:
# pyngrok 설치 (이미 설치했다면 생략)
!pip install pyngrok -q

# ngrok auth token 설정 (토큰을 안전하게 입력하세요)
from pyngrok import ngrok
NGROK_TOKEN = "본인의 NGROK_TOKEN을 입력해주세요"
ngrok.set_auth_token(NGROK_TOKEN)

# Streamlit 실행 (백그라운드)
get_ipython().system_raw("streamlit run app.py &")

# ngrok으로 8501 포트 터널링
public_url = ngrok.connect(8501, "http")
print("Public URL:", public_url)


# Line 6) 사용자의 NGROK_TOKEN을 입력해주세요.