# Order Gmail

## Trigger flow to get mails

In [2]:
from datetime import datetime, timedelta
from urllib.parse import urlencode
import requests
import time
# ========= 配置区（你只需要改这里） =========
BASE_WEBHOOK = "https://app-yamaguchi.app.n8n.cloud/webhook/e5ce8962-0721-4864-9a22-debf771fe490"

START_DATE = "2026/1/27"   # X 起始值（YYYY/M/D）
# START_DATE = "2025/12/13"   # X 起始值（YYYY/M/D）
DAYS = 4                  # 循环多少天
DIRECTION = "backward"     # "backward": X 每次 -1 天；"forward": X 每次 +1 天
TIMEOUT = 30               # 请求超时秒数
STOP_ON_FAIL = False       # True：遇到非 2xx 立即停止
# =========================================

def parse_ymd_slash(s: str) -> datetime:
    """解析 YYYY/M/D 或 YYYY/MM/DD"""
    y, m, d = map(int, s.split("/"))
    return datetime(y, m, d)

def fmt_slash(d: datetime) -> str:
    """输出 YYYY/M/D（不补零），与用户示例一致：2026/1/20"""
    return f"{d.year}/{d.month}/{d.day}"

def build_url(before_dt: datetime, after_dt: datetime) -> str:
    params = {"before": fmt_slash(before_dt), "after": fmt_slash(after_dt)}
    return f"{BASE_WEBHOOK}?{urlencode(params)}"

results = []  # 可选：收集结果，后面可转 DataFrame

start_dt = parse_ymd_slash(START_DATE)
step = -1 if DIRECTION == "backward" else 1

with requests.Session() as session:
    for index, i in enumerate(range(DAYS)):
        before_dt = start_dt + timedelta(days=i * step)
        after_dt = before_dt - timedelta(days=1)

        url = build_url(before_dt, after_dt)

        try:
            resp = session.post(url, timeout=TIMEOUT)
            status = resp.status_code
            text_preview = (resp.text or "")[:300].replace("\n", "\\n")
            ok = (200 <= status < 300)

            print(f"[{i+1:04d}] before={fmt_slash(before_dt)} after={fmt_slash(after_dt)} -> HTTP {status} | {text_preview}")

            results.append({
                "i": i + 1,
                "before": fmt_slash(before_dt),
                "after": fmt_slash(after_dt),
                "url": url,
                "status": status,
                "ok": ok,
                "preview": text_preview,
            })

            if STOP_ON_FAIL and not ok:
                print("STOP_ON_FAIL=True，遇到非 2xx，已停止。")
                break

        except requests.RequestException as e:
            print(f"[{i+1:04d}] before={fmt_slash(before_dt)} after={fmt_slash(after_dt)} -> REQUEST ERROR: {e}")
            results.append({
                "i": i + 1,
                "before": fmt_slash(before_dt),
                "after": fmt_slash(after_dt),
                "url": url,
                "status": None,
                "ok": False,
                "preview": str(e),
            })
            if STOP_ON_FAIL:
                print("STOP_ON_FAIL=True，遇到请求异常，已停止。")
                break
        time.sleep(4)
        # if (index+1) % 4 == 0:
        #     time.sleep(60*30)


[0001] before=2026/1/27 after=2026/1/26 -> HTTP 200 | {"message":"Workflow was started"}
[0002] before=2026/1/26 after=2026/1/25 -> HTTP 200 | {"message":"Workflow was started"}
[0003] before=2026/1/25 after=2026/1/24 -> HTTP 200 | {"message":"Workflow was started"}
[0004] before=2026/1/24 after=2026/1/23 -> HTTP 200 | {"message":"Workflow was started"}


KeyboardInterrupt: 

## Get mails

In [12]:
import os

ROOT = "../some-data/9/"
EXCEL_EXTS = {".json",}
IGNORE_DIRS = {".git", ".idea", "__pycache__", "node_modules", ".venv", "venv"}

def fast_list(root: str):
    files, parents = [], set()
    for dirpath, dirnames, filenames in os.walk(root):
        # 剪枝：从待遍历子目录中移除忽略目录
        dirnames[:] = [d for d in dirnames if d not in IGNORE_DIRS]
        for fn in filenames:
            if fn.startswith("~$"):
                continue
            ext = os.path.splitext(fn)[1].lower()
            full = os.path.abspath(os.path.join(dirpath, fn))
            files.append(full)
            parents.add(os.path.dirname(full))
    files.sort()
    return files, sorted(parents)


files, parent_dirs = fast_list(ROOT)
print(files)

['/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:39.853+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:51.813+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:54.794+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:57.672+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:00.193+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:02.542+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:05.485+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:06.093+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:08.286+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:51:09.277+09:00-a', '/Users/syu/PycharmProjects/Automation-yamaguchi/

## Post mail to server

In [13]:
API_URL = "http://data.yamaguchi.lan/api/aggregation/emails/ingest/"
API_TOKEN = "FAZEHBZu0g2o3sRfQ58MxRC0w0htdUoPaLDN8R3ku8dJxk5exDgEUC1GtbJhwWWJKr4s8E"
import json
import time
import traceback
from typing import List, Dict, Any, Optional

import requests
from requests import Response
CA_BUNDLE = "/Users/syu/caddy-local-root-ca.crt"   # 这里换成你拷贝出来的 Caddy Root CA 路径

def load_emails_from_json(filepath: str) -> List[Dict]:
    with open(filepath, 'r', encoding='utf-8') as f:
        data = json.load(f)
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        return [data]
    raise ValueError("Invalid JSON format: expected list or dict")


def _safe_response_text(resp: Optional[Response], limit: int = 2000) -> str:
    if resp is None:
        return ""
    try:
        text = resp.text
    except Exception:
        return "<failed to read response text>"
    if text is None:
        return ""
    return text[:limit] + ("...(truncated)" if len(text) > limit else "")


def batch_ingest_emails(emails: List[Dict]) -> Dict[str, Any]:
    headers = {
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
    }
    payload = {"emails": emails}

    try:
        resp = requests.post(
            API_URL,
            json=payload,
            headers=headers,
            timeout=10,
            verify=CA_BUNDLE,  # 关键：告诉 requests 信任你的内网 CA
        )
        # 先处理非 2xx，方便打印响应体
        if not resp.ok:
            body_preview = _safe_response_text(resp)
            raise requests.HTTPError(
                f"HTTP {resp.status_code} Error. Response preview: {body_preview}",
                response=resp,
            )

        # 尝试解析 JSON；如果不是 JSON，也要给出可读信息
        try:
            return resp.json()
        except ValueError:
            body_preview = _safe_response_text(resp)
            raise ValueError(f"Response is not valid JSON. Response preview: {body_preview}")

    except requests.RequestException as e:
        # 把尽可能多的上下文打印出来，然后把异常继续抛给 main 去决定“跳过”
        resp = getattr(e, "response", None)
        print("\n[ERROR] batch_ingest_emails 请求失败")
        print(f"  API_URL: {API_URL}")
        print(f"  Exception: {repr(e)}")
        if resp is not None:
            print(f"  HTTP Status: {resp.status_code}")
            print(f"  Response preview: {_safe_response_text(resp)}")
        # 如果你想更详细，可打开下一行
        # traceback.print_exc()
        raise


def main(filepath: str) -> bool:
    """
    返回 True 表示该文件导入成功（至少请求成功并拿到结果），False 表示该文件处理失败但已记录错误。
    """
    try:
        emails = load_emails_from_json(filepath)
        print(f"从文件加载了 {len(emails)} 封邮件: {filepath}")

        result = batch_ingest_emails(emails)

        print("导入结果:")
        print(f"  状态: {result.get('status')}")
        print(f"  总计: {result.get('total')} 封")
        print(f"  成功: {result.get('successful')} 封")
        print(f"  失败: {result.get('failed')} 封")

        # 打印单条明细（如果有）
        if result.get("results"):
            failed_items = [x for x in result["results"] if x.get("status") != "success"]
            if failed_items:
                print("失败明细:")
                for item in failed_items[:50]:  # 避免过多输出
                    print(f"  ✗ {item.get('email_id')} -> {item.get('error', 'Unknown')}")
                if len(failed_items) > 50:
                    print(f"  ...(truncated) 还有 {len(failed_items)-50} 条失败未显示")

        return True

    except Exception as e:
        print("\n[ERROR] 处理文件失败，已跳过继续执行")
        print(f"  File: {filepath}")
        print(f"  Exception: {repr(e)}")
        # 打印栈信息，便于定位是 JSON 解析问题还是网络/证书/服务端错误
        traceback.print_exc()
        print("-" * 80)
        return False


# 你的原始逻辑不变，只是 main 返回 False 时继续即可
ind = 0

success_count = 0
fail_count = 0

for index, filepath in enumerate(files[ind:], start=0):
    print(index, "  --", filepath)
    ok = main(filepath)
    if ok:
        success_count += 1
    else:
        fail_count += 1
    time.sleep(1)

print(f"\n批处理完成：成功 {success_count} 个文件，失败 {fail_count} 个文件")

0   -- /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:39.853+09:00-a
从文件加载了 3 封邮件: /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:39.853+09:00-a
导入结果:
  状态: success
  总计: 3 封
  成功: 3 封
  失败: 0 封
1   -- /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:51.813+09:00-a
从文件加载了 10 封邮件: /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:51.813+09:00-a
导入结果:
  状态: success
  总计: 10 封
  成功: 10 封
  失败: 0 封
2   -- /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:54.794+09:00-a
从文件加载了 10 封邮件: /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:54.794+09:00-a
导入结果:
  状态: success
  总计: 10 封
  成功: 10 封
  失败: 0 封
3   -- /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:57.672+09:00-a
从文件加载了 10 封邮件: /Users/syu/PycharmProjects/Automation-yamaguchi/some-data/9/2026-01-24T11:50:57.672+09:00-a
导入结果:
  状态: success
  总