# **주제 : 피싱 이메일 자동 분석 & 위험 근거 설명 **
Gemini 2.5 Pro 사용

In [31]:
import os, getpass
os.environ["GEMINI_API_KEY"] = getpass.getpass("Paste Gemini API key: ").strip()

Paste Gemini API key: ··········


In [32]:
import os
for k in ["HTTP_PROXY","http_proxy","HTTPS_PROXY","https_proxy","ALL_PROXY","all_proxy"]:
    os.environ.pop(k, None)
os.environ["NO_PROXY"] = "*"
os.environ["no_proxy"] = "*"
print("프록시 변수 제거/무시 설정 완료")

프록시 변수 제거/무시 설정 완료


In [47]:
!pip -q install gradio
import os, json, requests, gradio as gr

# 0) 키/엔드포인트
KEY = os.environ.get("GEMINI_API_KEY", "")
assert KEY and KEY.startswith("AIza"), "유효한 Gemini API 키를 먼저 셀 1에서 입력하세요."
URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={KEY}"

# 1) 프롬프트
PROMPT = """
You are a phishing email analyst.
Return ONLY a compact JSON with EXACTLY these keys:
risk_score, decision, evidence{social_engineering, domain_trust, content_markers}, recommended_actions.
Each evidence array MUST HAVE AT MOST 2 short items. No extra keys. No prose.

### Example
Input:
<<<
Your account has been suspended. Click http://short.ly/verify within 6 hours.
>>>
Output:
{"risk_score":85,"decision":"block","evidence":{"social_engineering":["urgency within 6 hours"],"domain_trust":["shortened URL domain"],"content_markers":["account suspension scare"]},"recommended_actions":["do not click","report to security team"]}

### Now analyze this:
<<<
{EMAIL_TEXT}
>>>
Output only JSON:
"""


# 2) 응답 텍스트 안전 추출기
def _extract_text(data: dict) -> str:
    # 표준 경로
    try:
        return data["candidates"][0]["content"]["parts"][0]["text"]
    except Exception:
        # 일부 응답 변형 대비
        cand = (data.get("candidates") or [{}])[0]
        if isinstance(cand, dict) and "text" in cand:
            return cand["text"]
        return ""

# 3) 분석 함수(REST 호출, proxies={}로 프록시 무시)
def analyze_email(text: str):
    try:
        user = (text or "")[:1200]  # 입력 더 줄여 안정화
        prompt = PROMPT.replace("{EMAIL_TEXT}", user)

        def call_api(prompt_text, max_tokens=800, temp=0.0, timeout=60):
            payload = {
                "contents": [{"role":"user","parts":[{"text": prompt_text}]}],
                "generation_config": {
                    "max_output_tokens": max_tokens,
                    "temperature": temp,
                    "top_p": 0.0
                }
            }
            r = requests.post(URL, json=payload, timeout=timeout, proxies={})
            return r

        # 1차 호출(예시 포함 프롬프트)
        r = call_api(prompt, max_tokens=800, temp=0.0, timeout=60)
        if not r.ok:
            return False, {"raw": f"HTTP {r.status_code}: {r.text[:300]}"}
        data = r.json()
        out = _extract_text(data)
        finish = (data.get("candidates") or [{}])[0].get("finishReason","?")

        # 비었거나 잘렸으면 초간단 재시도(초미니 프롬프트)
        if not out or finish == "MAX_TOKENS":
            compact = (
                'Output ONLY valid JSON with keys: '
                'risk_score, decision, evidence{social_engineering,domain_trust,content_markers}, recommended_actions. '
                'Each evidence array <=2 items. No prose.\n\n'
                f'Input:\n<<<\n{user}\n>>>'
            )
            r2 = call_api(compact, max_tokens=400, temp=0.0, timeout=45)
            if not r2.ok:
                return False, {"raw": f"재시도 HTTP {r2.status_code}: {r2.text[:300]}"}
            out = _extract_text(r2.json())
            if not out:
                finish2 = (r2.json().get("candidates") or [{}])[0].get("finishReason","?")
                return False, {"raw": f"모델 텍스트 없음. finishReason={finish2}"}

        # 최종 JSON 파싱
        try:
            return True, json.loads(out)
        except Exception:
            # 마지막 보정: 모델에게 "유효 JSON만" 재출력
            fix = (
                "Reprint as VALID JSON only with keys: "
                "risk_score, decision, evidence{social_engineering,domain_trust,content_markers}, recommended_actions. "
                "No extra text.\n" + (out or "{}")
            )
            r3 = call_api(fix, max_tokens=300, temp=0.0, timeout=30)
            if not r3.ok:
                return False, {"raw": f"보정 실패 HTTP {r3.status_code}: {r3.text[:300]}"}
            out3 = _extract_text(r3.json()) or ""
            try:
                return True, json.loads(out3)
            except Exception:
                return False, {"raw": f"JSON 파싱 실패. 샘플={out3[:200]}"}

    except Exception as e:
        return False, {"raw": f"예외: {type(e).__name__} - {str(e)[:200]}"}


# 4) 샘플 & Gradio UI
SAMPLES = [
    "[의심] [긴급]인사팀입니다. 12시간 내 급여계좌 확인 링크 클릭: http://bit.ly/pay-verify",
    "[의심] Microsoft 로그인 만료. 표시:https://microsoft.com  실제:https://micosoft-verify.co",
    "[의심] 국세 환급 안내. 첨부 ZIP(암호: refund)을 열고 양식 제출",
    "[정상] 내부 공지: 보안 교육 일정 및 사내 포털 링크",
    "[정상] 고객 문의 회신(텍스트만, 링크/첨부 없음)"
]

def run_analyze(text):
    ok, data = analyze_email(text)
    if not ok:
        return 0, "parse-fail", "", "", "", data.get("raw","(원인 미상)")
    ev = data.get("evidence", {})
    return (
        data.get("risk_score", 0),
        data.get("decision", ""),
        "\n".join(ev.get("social_engineering", [])),
        "\n".join(ev.get("domain_trust", [])),
        "\n".join(ev.get("content_markers", [])),
        "\n".join(data.get("recommended_actions", [])),
    )

with gr.Blocks(title="Phishing Analyzer (Gemini 2.5 - REST)") as demo:
    gr.Markdown("## 피싱 이메일 분석 데모 (REST 호출)")
    inp = gr.Textbox(label="이메일 본문/헤더", lines=10, value=SAMPLES[0])
    with gr.Row():
        ex = gr.Dropdown(choices=SAMPLES, label="샘플 선택")
        paste_btn = gr.Button("샘플 넣기")
        paste_btn.click(lambda s: s, inputs=ex, outputs=inp)

    btn = gr.Button("분석하기", variant="primary")
    risk = gr.Number(label="Risk Score (0-100)")
    decision = gr.Textbox(label="Decision")
    se = gr.Textbox(label="Evidence / Social Engineering")
    dt = gr.Textbox(label="Evidence / Domain Trust")
    cm = gr.Textbox(label="Content Markers")
    rec = gr.Textbox(label="Recommended Actions")

    btn.click(run_analyze, inputs=inp, outputs=[risk, decision, se, dt, cm, rec])

demo.launch(debug=False)

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://a62eeeef14e32e80be.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


