In [1]:
import os
import time
import uuid
import json
import logging
import requests
import ntplib
from datetime import datetime, timezone, timedelta
from concurrent.futures import ThreadPoolExecutor

from dotenv import load_dotenv
load_dotenv()

# =============== 配置 ===============
LECTURE_URL = os.getenv("LECTURE_URL")
ACCESS_TOKENS = [t.strip() for t in os.getenv("ACCESS_TOKENS", "").split(",") if t.strip()]
PREFILED_ID = os.getenv("PREFILED_ID")
AUDIENCE_ID = os.getenv("AUDIENCE_ID")
GRAB_TIME = os.getenv("GRAB_TIME")  # e.g. "2025-11-10 14:00:00" (北京时间)
THREADS_PER_ACCOUNT = int(os.getenv("THREADS_PER_ACCOUNT", "3"))
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "10"))
SLEEP_RETRY = float(os.getenv("SLEEP_RETRY", "0.08"))
NOTIFY_WEBHOOK = os.getenv("NOTIFY_WEBHOOK")  # 可选：成功通知 webhook
# =====================================

logging.basicConfig(filename="wechat_grab_multi.log",
                    level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")

def sync_time():
    try:
        c = ntplib.NTPClient()
        r = c.request("ntp.aliyun.com", version=3)
        return r.tx_time
    except Exception as e:
        logging.warning("NTP失效，使用本地时间: %s", e)
        return time.time()

def get_target_timestamp(grab_time_str):
    dt = datetime.strptime(grab_time_str, "%Y-%m-%d %H:%M:%S")
    dt = dt.replace(tzinfo=timezone(timedelta(hours=8)))
    return dt.timestamp()

def generate_trace_id():
    return str(uuid.uuid4())

# ====== 根据抓包结果实现签名函数（如果需签名） ======
def generate_signature(payload: dict, secret: str = ""):
    """
    占位函数：如果接口需要 sign，请实现这里的算法，
    例如对 payload 按键名排序，连接 key=value，用 secret 做 HMAC-SHA256 等。
    """
    # 示例占位：返回空字符串或具体签名
    return ""

def notify_success(msg: str):
    if not NOTIFY_WEBHOOK:
        return
    try:
        requests.post(NOTIFY_WEBHOOK, json={"text": msg}, timeout=2)
    except Exception as e:
        logging.warning("通知失败: %s", e)

def attempt_grab(token: str, account_idx: int, attempt_idx: int):
    """
    单次请求尝试。返回 True 表示成功。
    """
    headers = {
        "access-token": token,
        "front-trace-id": generate_trace_id(),
        "Content-Type": "application/json",
        "User-Agent": "WeixinApp/1.0"
    }

    payload = {
        "preFiledId": PREFILED_ID,
        "audienceId": AUDIENCE_ID,
        # 其它必须字段（根据抓包补全）
    }

    # 如果需签名，添加签名字段
    sign = generate_signature(payload, secret="")
    if sign:
        payload["sign"] = sign

    try:
        res = requests.post(LECTURE_URL, headers=headers, json=payload, timeout=2)
    except Exception as e:
        logging.debug("[acct %d] 请求异常: %s", account_idx, e)
        return False, f"请求异常: {e}"

    # 解析返回（根据你抓包的成功字段调整）
    try:
        data = res.json()
    except ValueError:
        logging.debug("[acct %d] 非 JSON 响应: %s", account_idx, res.text[:200])
        return False, "非JSON响应"

    # 根据实际返回判断成功（下面只是常见例子）
    if isinstance(data, dict) and (data.get("success") is True or data.get("code") == 0 or "报名成功" in json.dumps(data, ensure_ascii=False)):
        return True, data

    return False, data

def worker_for_account(token: str, account_idx: int, target_ts: float):
    """
    为单个账号并发多个线程执行抢票请求，直到成功或重试结束。
    """
    logging.info("账户 %d 启动，token 前6: %s", account_idx, token[:6])
    # 等待到前 0.5s，然后忙等
    while time.time() < target_ts - 0.5:
        time.sleep(0.05)
    while time.time() < target_ts:
        pass

    # 发起并发尝试（小量并发以降低风险）
    with ThreadPoolExecutor(max_workers=THREADS_PER_ACCOUNT) as ex:
        futures = []
        for attempt in range(MAX_RETRIES):
            # 提交 THREADS_PER_ACCOUNT 个并发请求
            for i in range(THREADS_PER_ACCOUNT):
                futures.append(ex.submit(attempt_grab, token, account_idx, attempt))
            # 轮询这些请求结果
            for fut in futures:
                ok, info = fut.result(timeout=5)
                if ok:
                    logging.info("[acct %d] 抢票成功: %s", account_idx, info)
                    notify_success(f"账号{account_idx} 抢票成功: {info}")
                    return True
            # 未成功，短暂等待并继续下一轮
            time.sleep(SLEEP_RETRY)
    logging.warning("[acct %d] 重试完毕，未抢到。", account_idx)
    return False

def main():
    if not ACCESS_TOKENS:
        print("请在 .env 中配置 ACCESS_TOKENS（逗号分隔多个 token）")
        return

    target_ts = get_target_timestamp(GRAB_TIME)
    print("同步 NTP 时间...")
    ntp_time = sync_time()
    print("当前 NTP 时间:", datetime.fromtimestamp(ntp_time, timezone(timedelta(hours=8))))
    print("计划抢票时间:", GRAB_TIME)

    # 为每个账号启动 worker（可并发多个账号）
    results = []
    for idx, tkn in enumerate(ACCESS_TOKENS):
        # 每个账号单独线程/进程同时等待并在目标时刻发起并发请求
        # 这里用线程启动 worker_for_account（worker 内又用线程池并发），也可用 multiprocessing
        import threading
        th = threading.Thread(target=worker_for_account, args=(tkn, idx, target_ts))
        th.daemon = True
        th.start()
        results.append(th)

    # 等待所有线程结束（注意：如果你想要立即退出可以不 join）
    for th in results:
        th.join()

    print("所有账号抢票流程结束，请查看日志 wechat_grab_multi.log")

if __name__ == "__main__":
    main()


请在 .env 中配置 ACCESS_TOKENS（逗号分隔多个 token）
