# 셀 1) 설치 & 기본 임포트

In [None]:
# IPython/Jupyter 전용: dpkt와 보조 라이브러리 설치
%pip -q install dpkt chardet pandas

# 셀 2) “사람이 읽을 수 있는지” 확인 유틸
- HTTP 헤더 기반(gzip/chunked) 처리
- URL 디코딩 1회
- Base64/HEX 탐지/디코딩
- 텍스트 인코딩 추정(chardet)

In [None]:
import dpkt, urllib.parse, base64, binascii, zlib, chardet
from typing import Optional, Tuple, Dict

def _maybe_decode_text(b: bytes) -> str:
    """텍스트 인코딩 추정 후 안전 디코드"""
    if not b:
        return ""
    guess = chardet.detect(b) or {}
    enc = guess.get("encoding") or "utf-8"
    return b.decode(enc, errors="ignore")

def _decode_url_once(s: str) -> str:
    """URL 디코딩 1회만 (중첩 디코딩 방지)"""
    try:
        return urllib.parse.unquote(s)
    except Exception:
        return s

def _is_base64_bytes(b: bytes) -> bool:
    if len(b) < 8 or len(b) % 4 != 0:  # 간단 길이 규칙
        return False
    try:
        base64.b64decode(b, validate=True)
        return True
    except Exception:
        return False

def _is_hex_bytes(b: bytes) -> bool:
    if len(b) < 8 or len(b) % 2 != 0:
        return False
    try:
        binascii.unhexlify(b)
        return True
    except Exception:
        return False

def _http_extract(body_or_raw: bytes) -> Tuple[Optional[Dict[str,str]], bytes]:
    b = body_or_raw
    try:
        if b.startswith(b"HTTP/1."):
            resp = dpkt.http.Response(b)
            headers = {k.lower(): v for k, v in resp.headers.items()}
            return headers, resp.body or b""
        else:
            req = dpkt.http.Request(b)
            headers = {k.lower(): v for k, v in req.headers.items()}
            # 요청은 본문이 없을 수 있으니 원문 유지
            return headers, req.body or b""
    except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
        return None, b

def _decode_chunked(body: bytes) -> bytes:
    """아주 단순한 chunked 디코더 (데모용)"""
    out = bytearray()
    i = 0
    while True:
        j = body.find(b"\r\n", i)
        if j < 0: break
        size_hex = body[i:j].split(b";", 1)[0]
        try:
            size = int(size_hex, 16)
        except ValueError:
            break
        i = j + 2
        if size == 0:
            break
        out.extend(body[i:i+size])
        i += size + 2  # 데이터 + CRLF
    return bytes(out) if out else body

def _maybe_gzip_deflate(headers: Optional[Dict[str,str]], body: bytes) -> bytes:
    if not headers: 
        return body
    enc = headers.get("content-encoding", "").lower()
    if "gzip" in enc:
        try:
            return zlib.decompress(body, 16 + zlib.MAX_WBITS)
        except Exception:
            pass
    if "deflate" in enc:
        try:
            return zlib.decompress(body, -zlib.MAX_WBITS)
        except Exception:
            pass
    return body

def _maybe_chunked(headers: Optional[Dict[str,str]], body: bytes) -> bytes:
    if not headers: 
        return body
    te = headers.get("transfer-encoding", "").lower()
    if "chunked" in te:
        try:
            return _decode_chunked(body)
        except Exception:
            return body
    return body

def check_payload_readable(raw: bytes) -> Dict[str, str]:
    out = {}

    headers, body = _http_extract(raw)
    if headers:
        hdr_text = "\n".join(f"{k}: {v}" for k, v in headers.items())
        out["http_headers"] = hdr_text

    body1 = _maybe_chunked(headers, body)
    body2 = _maybe_gzip_deflate(headers, body1)

    url_try = _decode_url_once(_maybe_decode_text(raw))
    out["url_decoded"] = url_try

    b64_text = None
    if _is_base64_bytes(body2):
        try:
            b64_text = _maybe_decode_text(base64.b64decode(body2, validate=True))
        except Exception:
            b64_text = None
    out["base64"] = b64_text

    hex_text = None
    if _is_hex_bytes(body2.strip()):
        try:
            hex_text = _maybe_decode_text(binascii.unhexlify(body2.strip()))
        except Exception:
            hex_text = None
    out["hex"] = hex_text

    preview = _maybe_decode_text(body2) if body2 else url_try
    out["preview"] = (preview[:500] + ("..." if len(preview) > 500 else ""))
    return out



# 셀 3) PCAP에서 HTTP 페이로드 뽑아서 검사
- TCP 재조립 없이도 동작(데모 PCAP는 단편화 없음)
- 요청/응답 전체 바이트를 check_payload_readable()에 전달

In [None]:
import dpkt

def iter_http_payloads(pcap_path: str):
    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            eth = dpkt.ethernet.Ethernet(buf)
            ip = getattr(eth, "data", None)
            if not isinstance(ip, dpkt.ip.IP):
                continue
            tcp = getattr(ip, "data", None)
            if not isinstance(tcp, dpkt.tcp.TCP):
                continue
            if not tcp.data:
                continue
            # HTTP 포트 기준 간단 필터
            if tcp.dport == 80 or tcp.sport == 80:
                yield tcp.data

pcap_path = "../pcap_file/demo_pcap/demo_payload_preproc.pcap"

for idx, raw in enumerate(iter_http_payloads(pcap_path), 1):
    res = check_payload_readable(raw)
    print("="*80)
    print(f"[{idx}] length={len(raw)}")
    if "http_headers" in res:
        print("HTTP headers:\n", res["http_headers"])
    print("URL-decoded (1 pass):", res["url_decoded"][:200])
    if res.get("base64"): print("Base64 decode (preview):", res["base64"][:120])
    if res.get("hex"):    print("HEX decode (preview):",    res["hex"][:120])
    print("Preview:", res["preview"][:200])


# 셀 4) (선택) 결과를 표로 보기 (요약 테이블)

In [None]:
import pandas as pd

rows = []
for raw in iter_http_payloads(pcap_path):
    rows.append(check_payload_readable(raw))

df = pd.DataFrame(rows, columns=["http_headers","url_decoded","base64","hex","preview"])
df.fillna("", inplace=True)
df.head()