In [5]:
# ingest_wiki_to_s3.py
import os, json, time, gzip, uuid, boto3, datetime as dt, requests
from botocore.exceptions import ClientError
from sseclient import SSEClient  # pip install sseclient-py

STREAM = "https://stream.wikimedia.org/v2/stream/recentchange"
BUCKET = os.environ["BUCKET"]
PREFIX = "bronze"
S3 = boto3.client("s3")

def s3_put_bytes(key: str, data: bytes):
    S3.put_object(Bucket=BUCKET, Key=key, Body=data, ContentEncoding="gzip", ContentType="application/x-ndjson")

def rotate_key(event_dt: dt.datetime):
    return f"{PREFIX}/yyyy={event_dt:%Y}/mm={event_dt:%m}/dd={event_dt:%d}/hh={event_dt:%H}/part-{uuid.uuid4().hex}.ndjson.gz"

def main():
    resp = requests.get(STREAM, stream=True, timeout=30)
    client = SSEClient(resp)
    buf = bytearray()
    cur_key = None
    start = time.time()

    for e in client.events():
        if e.event != "message": 
            continue
        msg = json.loads(e.data)

        # 只收「編輯」且有時間戳；可再加過濾（如 enwiki）
        if msg.get("type") != "edit" or "meta" not in msg:
            continue
        # 事件時間
        event_dt = dt.datetime.fromtimestamp(msg["meta"]["dt"] if isinstance(msg["meta"]["dt"], (int,float)) else msg["timestamp"], tz=dt.timezone.utc)
        msg["ingest_time"] = dt.datetime.now(dt.timezone.utc).isoformat()
        msg["_dedup_id"] = msg["meta"]["id"]  # 冪等 key（重連時避免重複）

        if cur_key is None:
            cur_key = rotate_key(event_dt)

        line = (json.dumps(msg) + "\n").encode()
        buf.extend(line)

        # 滾動條件：>5MB 或 >60秒
        if len(buf) > 5_000_000 or time.time() - start > 60:
            s3_put_bytes(cur_key, gzip.compress(bytes(buf)))
            buf.clear()
            cur_key = rotate_key(event_dt)
            start = time.time()


KeyError: 'BUCKET'