## call with interval

In [1]:
import os
import re
import json
import time
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple

# =========================
# CONFIG (non-streaming only)
# =========================
API_BASE = "http://127.0.0.1:8000"
API_URL_BATCH = f"{API_BASE}/currency/exchange-rates/batch"

BASE_DIR = "WebService/data"

# Accepts DD-MM-YYYY or YYYY-MM-DD for selecting folders (folders are YYYY-MM-DD)
START_DATE = "01-07-2025"
END_DATE   = "11-11-2025"

# Batch size per request (set to 20 by default; change as needed)
BATCH_SIZE = 400

# HTTP timeouts / retries
REQUEST_TIMEOUT = 300  # seconds per HTTP request
RETRY_MAX = 4
RETRY_BACKOFF_BASE = 2  # seconds (exponential)

# Behavior on failures
STOP_ON_ERROR = False  # if True, stop immediately when a batch fails

# Input hygiene
DROP_SAME_CURRENCY = True  # drop items where FromCurrency == ToCurrency

# Output
WRITE_DAY_SUMMARY = True  # write per-day JSON summary under each day folder

# Throttling between days
WAIT_BETWEEN_DAYS_SECONDS = 120  # ⏱ wait 2 minutes after finishing a day before next day

# =========================
# HELPERS
# =========================

_DD_MM_YYYY_DASH = re.compile(r"^(\d{2})-(\d{2})-(\d{4})$")
_YYYY_MM_DD = re.compile(r"^(\d{4})-(\d{2})-(\d{2})$")

def _parse_date_any(s: str) -> datetime:
    s = (s or "").strip()
    m = _DD_MM_YYYY_DASH.fullmatch(s)
    if m:
        dd, mm, yyyy = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    m = _YYYY_MM_DD.fullmatch(s)
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    raise ValueError(f"Date must be DD-MM-YYYY or YYYY-MM-DD, got: {s!r}")

def _daterange_inclusive(start_dt: datetime, end_dt: datetime):
    cur = start_dt
    while cur <= end_dt:
        yield cur
        cur = cur + timedelta(days=1)

def _load_payload_for_day(day_dir: str) -> Optional[List[Any]]:
    """Read exchange_rates_payload.json and return AS-IS (no normalization)."""
    path = os.path.join(day_dir, "exchange_rates_payload.json")
    if not os.path.isfile(path):
        print(f"[skip] payload not found: {path}")
        return None
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, list):
            print(f"[warn] payload is not a list, skipping: {path}")
            return None
        return data
    except Exception as e:
        print(f"[warn] failed reading payload {path}: {e}")
        return None

def _chunked(lst: List[Any], size: int) -> List[List[Any]]:
    size = max(1, int(size))
    return [lst[i:i+size] for i in range(0, len(lst), size)]

def _dedupe_payload(payload: List[Any]) -> Tuple[List[Any], int]:
    """Deduplicate by logical key; fallback to full JSON. Preserve order."""
    seen = set()
    out: List[Any] = []
    removed = 0
    for item in payload:
        key: Optional[Tuple[Any, ...]] = None
        if isinstance(item, dict):
            fields = (
                item.get("ExchangeRateType"),
                item.get("FromCurrency"),
                item.get("ToCurrency"),
                item.get("ValidFrom"),
                item.get("Quotation"),
            )
            if all(v is not None for v in fields):
                key = ("LOGIC_KEY",) + fields
        if key is None:
            try:
                canonical = json.dumps(item, sort_keys=True, ensure_ascii=False)
            except Exception:
                canonical = repr(item)
            key = ("RAW_ITEM", canonical)
        if key in seen:
            removed += 1
            continue
        seen.add(key)
        out.append(item)
    return out, removed

def _filter_same_currency(payload: List[Any]) -> Tuple[List[Any], int]:
    """Drop rows where FromCurrency == ToCurrency (exact string compare)."""
    out: List[Any] = []
    dropped = 0
    for it in payload:
        if isinstance(it, dict):
            f = it.get("FromCurrency")
            t = it.get("ToCurrency")
            if f is not None and t is not None and str(f) == str(t):
                dropped += 1
                continue
        out.append(it)
    return out, dropped

def _post_with_retries(url: str, *, json_body: Any, timeout: Optional[int]) -> requests.Response:
    """POST with exponential backoff on 429/5xx/connect/read issues."""
    last_exc = None
    session = requests.Session()
    headers = {"Content-Type": "application/json"}
    for attempt in range(1, RETRY_MAX + 1):
        try:
            r = session.post(url, json=json_body, timeout=timeout, headers=headers)
            if r.status_code in (429, 502, 503, 504):
                raise requests.RequestException(f"HTTP {r.status_code}: {r.text[:200]}")
            r.raise_for_status()
            return r
        except (requests.ConnectTimeout, requests.ReadTimeout, requests.ConnectionError, requests.RequestException) as e:
            last_exc = e
            if attempt >= RETRY_MAX:
                break
            sleep_s = max(1, int(RETRY_BACKOFF_BASE) ** (attempt - 1))
            print(f"[retry] attempt {attempt}/{RETRY_MAX} failed: {e}. Backing off {sleep_s}s")
            time.sleep(sleep_s)
    if isinstance(last_exc, Exception):
        raise last_exc
    raise RuntimeError("Unknown POST failure")

def _post_payload_batch(payload: List[Any]) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
    """Always post to the non-streaming /batch endpoint."""
    try:
        r = _post_with_retries(API_URL_BATCH, json_body=payload, timeout=REQUEST_TIMEOUT)
        try:
            return True, r.json(), None
        except Exception:
            return True, None, r.text
    except Exception as e:
        body = None
        if isinstance(e, requests.RequestException) and getattr(e, "response", None) is not None:
            try:
                body = e.response.text
            except Exception:
                body = None
        return False, None, f"{e}\n{('Response body: ' + body) if body else ''}"

def _write_day_summary(day_dir: str, *, total_rows: int, rows_after_filters: int,
                       dupes_removed: int, same_currency_dropped: int,
                       posted_batches: int, rows_sent: int, errors: int) -> None:
    if not WRITE_DAY_SUMMARY:
        return
    try:
        out = {
            "total_rows_in_file": total_rows,
            "rows_after_filters": rows_after_filters,
            "duplicates_removed": dupes_removed,
            "same_currency_dropped": same_currency_dropped,
            "posted_batches": posted_batches,
            "rows_sent": rows_sent,
            "errors": errors,
            "endpoint": "batch",
            "batch_size": BATCH_SIZE,
            "ts": datetime.now().isoformat(),
        }
        p = os.path.join(day_dir, "post_summary.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write day summary failed in {day_dir}: {e}")

# =========================
# MAIN (strict day->batches sequence)
# =========================

def main():
    start_dt = _parse_date_any(START_DATE)
    end_dt   = _parse_date_any(END_DATE)
    if end_dt < start_dt:
        raise SystemExit(f"END_DATE {END_DATE} is before START_DATE {START_DATE}")

    bs = max(1, int(BATCH_SIZE))

    total_days = 0
    days_with_payload = 0
    posted_days = 0
    posted_batches = 0
    rows_sent = 0
    skipped_days = 0
    errors = 0
    total_dupes_removed = 0
    total_same_currency_dropped = 0

    print(f"[range] {start_dt.date()} → {end_dt.date()} (inclusive)")
    print(f"[config] batch_size = {bs}, endpoint = /batch")
    print("[mode] STRICT ORDER: per-day, then per-day batches (no interleaving)\n")

    # STRICT: iterate days in order
    for d in _daterange_inclusive(start_dt, end_dt):
        total_days += 1
        day_name = d.strftime("%Y-%m-%d")
        day_dir = os.path.join(BASE_DIR, day_name)
        print(f"\n=== DAY {day_name} ===")

        if not os.path.isdir(day_dir):
            print(f"[skip] day folder missing: {day_dir}")
            skipped_days += 1
            continue

        payload = _load_payload_for_day(day_dir)
        if not payload:
            print(f"[skip] no valid payload in: {day_dir}")
            skipped_days += 1
            continue

        days_with_payload += 1
        total_rows_in_file = len(payload)

        # Hygiene for the day (still AS-IS structure)
        deduped_payload, dupes_removed = _dedupe_payload(payload)
        total_dupes_removed += dupes_removed

        if DROP_SAME_CURRENCY:
            filtered_payload, same_drop = _filter_same_currency(deduped_payload)
        else:
            filtered_payload, same_drop = deduped_payload, 0
        total_same_currency_dropped += same_drop

        n_after = len(filtered_payload)
        print(f"[day] rows: {total_rows_in_file} → {n_after} "
              f"(dedup removed {dupes_removed}, same-currency dropped {same_drop})")

        if n_after == 0:
            print(f"[day] {day_name}: nothing to post after filters; skipping")
            _write_day_summary(day_dir,
                               total_rows=total_rows_in_file,
                               rows_after_filters=n_after,
                               dupes_removed=dupes_removed,
                               same_currency_dropped=same_drop,
                               posted_batches=0,
                               rows_sent=0,
                               errors=0)
            continue

        # STRICT: process this day's batches sequentially
        chunks = _chunked(filtered_payload, bs)
        total_chunks = len(chunks)
        print(f"[day] batching: {total_chunks} batch(es) of up to {bs}")

        day_had_success = False
        day_errors = 0
        day_rows_sent = 0
        day_batches_posted = 0

        for idx, batch in enumerate(chunks, start=1):
            print(f"[post] {day_name} | batch {idx}/{total_chunks}: {len(batch)} rows → /batch")

            ok, resp_json, resp_text = _post_payload_batch(batch)

            if ok:
                day_had_success = True
                day_batches_posted += 1
                posted_batches += 1
                day_rows_sent += len(batch)
                rows_sent += len(batch)

                if resp_json is not None:
                    # Uncomment for verbose:
                    # print(json.dumps(resp_json, indent=2, ensure_ascii=False)[:2000])
                    pass
                elif resp_text is not None:
                    print(resp_text[:1000])
                else:
                    print("[info] posted OK (no response body)")
            else:
                day_errors += 1
                errors += 1
                print(f"[error] POST failed for {day_name} batch {idx}/{total_chunks}:\n"
                      f"{(resp_text or '(no body)')[:1000]}")
                if STOP_ON_ERROR:
                    print("[halt] STOP_ON_ERROR=True → halting at this batch")
                    # write partial day summary then stop everything
                    _write_day_summary(day_dir,
                                       total_rows=total_rows_in_file,
                                       rows_after_filters=n_after,
                                       dupes_removed=dupes_removed,
                                       same_currency_dropped=same_drop,
                                       posted_batches=day_batches_posted,
                                       rows_sent=day_rows_sent,
                                       errors=day_errors)
                    print("\n[summary]")
                    print(f"  days in range         : {total_days}")
                    print(f"  days with payload     : {days_with_payload}")
                    print(f"  posted days           : {posted_days}")
                    print(f"  posted batches        : {posted_batches}")
                    print(f"  rows sent             : {rows_sent}")
                    print(f"  skipped days          : {skipped_days}")
                    print(f"  errors                : {errors}")
                    print(f"  duplicates removed    : {total_dupes_removed}")
                    print(f"  same-currency dropped : {total_same_currency_dropped}")
                    return

        if day_had_success:
            posted_days += 1

        _write_day_summary(day_dir,
                           total_rows=total_rows_in_file,
                           rows_after_filters=n_after,
                           dupes_removed=dupes_removed,
                           same_currency_dropped=same_drop,
                           posted_batches=day_batches_posted,
                           rows_sent=day_rows_sent,
                           errors=day_errors)

        # =========================
        # WAIT before next day
        # =========================
        if day_batches_posted > 0 and d < end_dt:
            print(f"[sleep] finished {day_name}. Waiting {WAIT_BETWEEN_DAYS_SECONDS}s before next day…")
            time.sleep(WAIT_BETWEEN_DAYS_SECONDS)

    # Final overall summary
    print("\n[summary]")
    print(f"  days in range         : {total_days}")
    print(f"  days with payload     : {days_with_payload}")
    print(f"  posted days           : {posted_days}")
    print(f"  posted batches        : {posted_batches}")
    print(f"  rows sent             : {rows_sent}")
    print(f"  skipped days          : {skipped_days}")
    print(f"  errors                : {errors}")
    print(f"  duplicates removed    : {total_dupes_removed}")
    print(f"  same-currency dropped : {total_same_currency_dropped}")

if __name__ == "__main__":
    main()


[range] 2025-07-01 → 2025-11-11 (inclusive)
[config] batch_size = 400, endpoint = /batch
[mode] STRICT ORDER: per-day, then per-day batches (no interleaving)


=== DAY 2025-07-01 ===
[day] rows: 4 → 4 (dedup removed 0, same-currency dropped 0)
[day] batching: 1 batch(es) of up to 400
[post] 2025-07-01 | batch 1/1: 4 rows → /batch
[sleep] finished 2025-07-01. Waiting 120s before next day…

=== DAY 2025-07-02 ===
[day] rows: 4 → 4 (dedup removed 0, same-currency dropped 0)
[day] batching: 1 batch(es) of up to 400
[post] 2025-07-02 | batch 1/1: 4 rows → /batch
[sleep] finished 2025-07-02. Waiting 120s before next day…

=== DAY 2025-07-03 ===
[day] rows: 4 → 4 (dedup removed 0, same-currency dropped 0)
[day] batching: 1 batch(es) of up to 400
[post] 2025-07-03 | batch 1/1: 4 rows → /batch
[sleep] finished 2025-07-03. Waiting 120s before next day…

=== DAY 2025-07-04 ===
[day] rows: 4 → 4 (dedup removed 0, same-currency dropped 0)
[day] batching: 1 batch(es) of up to 400
[post] 2025-07-04 |

## Calling

In [1]:
import os
import re
import json
import time
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple

# =========================
# CONFIG (non-streaming only)
# =========================
API_BASE = "http://127.0.0.1:8000"
API_URL_BATCH = f"{API_BASE}/currency/exchange-rates/batch"

BASE_DIR = "WebService/data"

# Accepts DD-MM-YYYY or YYYY-MM-DD for selecting folders (folders are YYYY-MM-DD)
START_DATE = "30-11-2025"
END_DATE   = "30-11-2025"

# Batch size per request (set to 20 by default; change as needed)
BATCH_SIZE = 408

# HTTP timeouts / retries
REQUEST_TIMEOUT = 30000  # seconds per HTTP request
RETRY_MAX = 4
RETRY_BACKOFF_BASE = 2  # seconds (exponential)

# Behavior on failures
STOP_ON_ERROR = False  # if True, stop immediately when a batch fails

# Input hygiene
DROP_SAME_CURRENCY = True  # drop items where FromCurrency == ToCurrency

# Output
WRITE_DAY_SUMMARY = True  # write per-day JSON summary under each day folder

# =========================
# HELPERS
# =========================

_DD_MM_YYYY_DASH = re.compile(r"^(\d{2})-(\d{2})-(\d{4})$")
_YYYY_MM_DD = re.compile(r"^(\d{4})-(\d{2})-(\d{2})$")

def _parse_date_any(s: str) -> datetime:
    s = (s or "").strip()
    m = _DD_MM_YYYY_DASH.fullmatch(s)
    if m:
        dd, mm, yyyy = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    m = _YYYY_MM_DD.fullmatch(s)
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    raise ValueError(f"Date must be DD-MM-YYYY or YYYY-MM-DD, got: {s!r}")

def _daterange_inclusive(start_dt: datetime, end_dt: datetime):
    cur = start_dt
    while cur <= end_dt:
        yield cur
        cur = cur + timedelta(days=1)

def _load_payload_for_day(day_dir: str) -> Optional[List[Any]]:
    """Read exchange_rates_payload.json and return AS-IS (no normalization)."""
    path = os.path.join(day_dir, "exchange_rates_payload.json")
    if not os.path.isfile(path):
        print(f"[skip] payload not found: {path}")
        return None
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, list):
            print(f"[warn] payload is not a list, skipping: {path}")
            return None
        return data
    except Exception as e:
        print(f"[warn] failed reading payload {path}: {e}")
        return None

def _chunked(lst: List[Any], size: int) -> List[List[Any]]:
    size = max(1, int(size))
    return [lst[i:i+size] for i in range(0, len(lst), size)]

def _dedupe_payload(payload: List[Any]) -> Tuple[List[Any], int]:
    """Deduplicate by logical key; fallback to full JSON. Preserve order."""
    seen = set()
    out: List[Any] = []
    removed = 0
    for item in payload:
        key: Optional[Tuple[Any, ...]] = None
        if isinstance(item, dict):
            fields = (
                item.get("ExchangeRateType"),
                item.get("FromCurrency"),
                item.get("ToCurrency"),
                item.get("ValidFrom"),
                item.get("Quotation"),
            )
            if all(v is not None for v in fields):
                key = ("LOGIC_KEY",) + fields
        if key is None:
            try:
                canonical = json.dumps(item, sort_keys=True, ensure_ascii=False)
            except Exception:
                canonical = repr(item)
            key = ("RAW_ITEM", canonical)
        if key in seen:
            removed += 1
            continue
        seen.add(key)
        out.append(item)
    return out, removed

def _filter_same_currency(payload: List[Any]) -> Tuple[List[Any], int]:
    """Drop rows where FromCurrency == ToCurrency (exact string compare)."""
    out: List[Any] = []
    dropped = 0
    for it in payload:
        if isinstance(it, dict):
            f = it.get("FromCurrency")
            t = it.get("ToCurrency")
            if f is not None and t is not None and str(f) == str(t):
                dropped += 1
                continue
        out.append(it)
    return out, dropped

def _post_with_retries(url: str, *, json_body: Any, timeout: Optional[int]) -> requests.Response:
    """POST with exponential backoff on 429/5xx/connect/read issues."""
    last_exc = None
    session = requests.Session()
    headers = {"Content-Type": "application/json"}
    for attempt in range(1, RETRY_MAX + 1):
        try:
            r = session.post(url, json=json_body, timeout=timeout, headers=headers)
            if r.status_code in (429, 502, 503, 504):
                raise requests.RequestException(f"HTTP {r.status_code}: {r.text[:200]}")
            r.raise_for_status()
            return r
        except (requests.ConnectTimeout, requests.ReadTimeout, requests.ConnectionError, requests.RequestException) as e:
            last_exc = e
            if attempt >= RETRY_MAX:
                break
            sleep_s = max(1, int(RETRY_BACKOFF_BASE) ** (attempt - 1))
            print(f"[retry] attempt {attempt}/{RETRY_MAX} failed: {e}. Backing off {sleep_s}s")
            time.sleep(sleep_s)
    if isinstance(last_exc, Exception):
        raise last_exc
    raise RuntimeError("Unknown POST failure")

def _post_payload_batch(payload: List[Any]) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
    """Always post to the non-streaming /batch endpoint."""
    try:
        r = _post_with_retries(API_URL_BATCH, json_body=payload, timeout=REQUEST_TIMEOUT)
        try:
            return True, r.json(), None
        except Exception:
            return True, None, r.text
    except Exception as e:
        body = None
        if isinstance(e, requests.RequestException) and getattr(e, "response", None) is not None:
            try:
                body = e.response.text
            except Exception:
                body = None
        return False, None, f"{e}\n{('Response body: ' + body) if body else ''}"

def _write_day_summary(day_dir: str, *, total_rows: int, rows_after_filters: int,
                       dupes_removed: int, same_currency_dropped: int,
                       posted_batches: int, rows_sent: int, errors: int) -> None:
    if not WRITE_DAY_SUMMARY:
        return
    try:
        out = {
            "total_rows_in_file": total_rows,
            "rows_after_filters": rows_after_filters,
            "duplicates_removed": dupes_removed,
            "same_currency_dropped": same_currency_dropped,
            "posted_batches": posted_batches,
            "rows_sent": rows_sent,
            "errors": errors,
            "endpoint": "batch",
            "batch_size": BATCH_SIZE,
            "ts": datetime.now().isoformat(),
        }
        p = os.path.join(day_dir, "post_summary.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write day summary failed in {day_dir}: {e}")

# =========================
# MAIN (strict day->batches sequence)
# =========================

def main():
    start_dt = _parse_date_any(START_DATE)
    end_dt   = _parse_date_any(END_DATE)
    if end_dt < start_dt:
        raise SystemExit(f"END_DATE {END_DATE} is before START_DATE {START_DATE}")

    bs = max(1, int(BATCH_SIZE))

    total_days = 0
    days_with_payload = 0
    posted_days = 0
    posted_batches = 0
    rows_sent = 0
    skipped_days = 0
    errors = 0
    total_dupes_removed = 0
    total_same_currency_dropped = 0

    print(f"[range] {start_dt.date()} → {end_dt.date()} (inclusive)")
    print(f"[config] batch_size = {bs}, endpoint = /batch")
    print("[mode] STRICT ORDER: per-day, then per-day batches (no interleaving)\n")

    # STRICT: iterate days in order
    for d in _daterange_inclusive(start_dt, end_dt):
        total_days += 1
        day_name = d.strftime("%Y-%m-%d")
        day_dir = os.path.join(BASE_DIR, day_name)
        print(f"\n=== DAY {day_name} ===")

        if not os.path.isdir(day_dir):
            print(f"[skip] day folder missing: {day_dir}")
            skipped_days += 1
            continue

        payload = _load_payload_for_day(day_dir)
        if not payload:
            print(f"[skip] no valid payload in: {day_dir}")
            skipped_days += 1
            continue

        days_with_payload += 1
        total_rows_in_file = len(payload)

        # Hygiene for the day (still AS-IS structure)
        deduped_payload, dupes_removed = _dedupe_payload(payload)
        total_dupes_removed += dupes_removed

        if DROP_SAME_CURRENCY:
            filtered_payload, same_drop = _filter_same_currency(deduped_payload)
        else:
            filtered_payload, same_drop = deduped_payload, 0
        total_same_currency_dropped += same_drop

        n_after = len(filtered_payload)
        print(f"[day] rows: {total_rows_in_file} → {n_after} "
              f"(dedup removed {dupes_removed}, same-currency dropped {same_drop})")

        if n_after == 0:
            print(f"[day] {day_name}: nothing to post after filters; skipping")
            _write_day_summary(day_dir,
                               total_rows=total_rows_in_file,
                               rows_after_filters=n_after,
                               dupes_removed=dupes_removed,
                               same_currency_dropped=same_drop,
                               posted_batches=0,
                               rows_sent=0,
                               errors=0)
            continue

        # STRICT: process this day's batches sequentially
        chunks = _chunked(filtered_payload, bs)
        total_chunks = len(chunks)
        print(f"[day] batching: {total_chunks} batch(es) of up to {bs}")

        day_had_success = False
        day_errors = 0
        day_rows_sent = 0
        day_batches_posted = 0

        for idx, batch in enumerate(chunks, start=1):
            print(f"[post] {day_name} | batch {idx}/{total_chunks}: {len(batch)} rows → /batch")

            ok, resp_json, resp_text = _post_payload_batch(batch)

            if ok:
                day_had_success = True
                day_batches_posted += 1
                posted_batches += 1
                day_rows_sent += len(batch)
                rows_sent += len(batch)

                if resp_json is not None:
                    # Uncomment for verbose:
                    # print(json.dumps(resp_json, indent=2, ensure_ascii=False)[:2000])
                    pass
                elif resp_text is not None:
                    print(resp_text[:1000])
                else:
                    print("[info] posted OK (no response body)")
            else:
                day_errors += 1
                errors += 1
                print(f"[error] POST failed for {day_name} batch {idx}/{total_chunks}:\n"
                      f"{(resp_text or '(no body)')[:1000]}")
                if STOP_ON_ERROR:
                    print("[halt] STOP_ON_ERROR=True → halting at this batch")
                    # write partial day summary then stop everything
                    _write_day_summary(day_dir,
                                       total_rows=total_rows_in_file,
                                       rows_after_filters=n_after,
                                       dupes_removed=dupes_removed,
                                       same_currency_dropped=same_drop,
                                       posted_batches=day_batches_posted,
                                       rows_sent=day_rows_sent,
                                       errors=day_errors)
                    print("\n[summary]")
                    print(f"  days in range         : {total_days}")
                    print(f"  days with payload     : {days_with_payload}")
                    print(f"  posted days           : {posted_days}")
                    print(f"  posted batches        : {posted_batches}")
                    print(f"  rows sent             : {rows_sent}")
                    print(f"  skipped days          : {skipped_days}")
                    print(f"  errors                : {errors}")
                    print(f"  duplicates removed    : {total_dupes_removed}")
                    print(f"  same-currency dropped : {total_same_currency_dropped}")
                    return

        if day_had_success:
            posted_days += 1

        _write_day_summary(day_dir,
                           total_rows=total_rows_in_file,
                           rows_after_filters=n_after,
                           dupes_removed=dupes_removed,
                           same_currency_dropped=same_drop,
                           posted_batches=day_batches_posted,
                           rows_sent=day_rows_sent,
                           errors=day_errors)

    # Final overall summary
    print("\n[summary]")
    print(f"  days in range         : {total_days}")
    print(f"  days with payload     : {days_with_payload}")
    print(f"  posted days           : {posted_days}")
    print(f"  posted batches        : {posted_batches}")
    print(f"  rows sent             : {rows_sent}")
    print(f"  skipped days          : {skipped_days}")
    print(f"  errors                : {errors}")
    print(f"  duplicates removed    : {total_dupes_removed}")
    print(f"  same-currency dropped : {total_same_currency_dropped}")

if __name__ == "__main__":
    main()


[range] 2025-11-30 → 2025-11-30 (inclusive)
[config] batch_size = 408, endpoint = /batch
[mode] STRICT ORDER: per-day, then per-day batches (no interleaving)


=== DAY 2025-11-30 ===
[day] rows: 408 → 408 (dedup removed 0, same-currency dropped 0)
[day] batching: 1 batch(es) of up to 408
[post] 2025-11-30 | batch 1/1: 408 rows → /batch

[summary]
  days in range         : 1
  days with payload     : 1
  posted days           : 1
  posted batches        : 1
  rows sent             : 408
  skipped days          : 0
  errors                : 0
  duplicates removed    : 0
  same-currency dropped : 0


# Draft Deletion

In [2]:
# post_delete_drafts.py
import os
import re
import json
import time
import requests
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple

# =========================
# CONFIG
# =========================
API_BASE = "http://127.0.0.1:8000"
API_URL_DELETE = f"{API_BASE}/currency/exchange-rates/drafts/delete"

BASE_DIR = "WebService/data"

# Accepts DD-MM-YYYY or YYYY-MM-DD
START_DATE = "31-10-2025"
END_DATE   = "31-10-2025"

# If True → call the API one day at a time (date_from=day=date_to).
# If False → call the API once with the full inclusive range.
PER_DAY = True

# Optional cap on deletions per day. Set to None to remove the limit.
MAX_PER_DAY: Optional[int] = None  # e.g., 200

# HTTP timeouts / retries
REQUEST_TIMEOUT = 300  # seconds per HTTP request
RETRY_MAX = 4
RETRY_BACKOFF_BASE = 2  # seconds (exponential)

# Behavior on failures
STOP_ON_ERROR = False  # if True, halt immediately on a failed POST

# Output
WRITE_DAY_SUMMARY = True   # write per-day JSON summary under each day folder
WRITE_RANGE_SUMMARY = True # write a summary json under BASE_DIR when PER_DAY=False

# =========================
# HELPERS
# =========================

_DD_MM_YYYY_DASH = re.compile(r"^(\d{2})-(\d{2})-(\d{4})$")
_YYYY_MM_DD = re.compile(r"^(\d{4})-(\d{2})-(\d{2})$")

def _parse_date_any(s: str) -> datetime:
    s = (s or "").strip()
    m = _DD_MM_YYYY_DASH.fullmatch(s)
    if m:
        dd, mm, yyyy = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    m = _YYYY_MM_DD.fullmatch(s)
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    raise ValueError(f"Date must be DD-MM-YYYY or YYYY-MM-DD, got: {s!r}")

def _daterange_inclusive(start_dt: datetime, end_dt: datetime):
    cur = start_dt
    while cur <= end_dt:
        yield cur
        cur = cur + timedelta(days=1)

def _ensure_dir(p: str) -> None:
    os.makedirs(p, exist_ok=True)

def _post_with_retries(url: str, *, params: Dict[str, Any], timeout: Optional[int]) -> requests.Response:
    """POST with exponential backoff on 429/5xx/connect/read issues."""
    last_exc = None
    session = requests.Session()
    headers = {"Content-Type": "application/json"}
    for attempt in range(1, RETRY_MAX + 1):
        try:
            r = session.post(url, params=params, timeout=timeout, headers=headers)
            if r.status_code in (429, 502, 503, 504):
                raise requests.RequestException(f"HTTP {r.status_code}: {r.text[:200]}")
            r.raise_for_status()
            return r
        except (requests.ConnectTimeout, requests.ReadTimeout, requests.ConnectionError, requests.RequestException) as e:
            last_exc = e
            if attempt >= RETRY_MAX:
                break
            sleep_s = max(1, int(RETRY_BACKOFF_BASE) ** (attempt - 1))
            print(f"[retry] attempt {attempt}/{RETRY_MAX} failed: {e}. Backing off {sleep_s}s")
            time.sleep(sleep_s)
    if isinstance(last_exc, Exception):
        raise last_exc
    raise RuntimeError("Unknown POST failure")

def _post_delete(day_from: str, day_to: str, max_per_day: Optional[int]) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
    """
    Calls /currency/exchange-rates/drafts/delete using query params:
      ?date_from=YYYY-MM-DD&date_to=YYYY-MM-DD&max_per_day=INT
    If date_from == date_to, server processes one day only (as required).
    """
    params: Dict[str, Any] = {"date_from": day_from, "date_to": day_to}
    if max_per_day is not None:
        params["max_per_day"] = int(max_per_day)
    try:
        r = _post_with_retries(API_URL_DELETE, params=params, timeout=REQUEST_TIMEOUT)
        try:
            return True, r.json(), None
        except Exception:
            return True, None, r.text
    except Exception as e:
        body_txt = None
        if isinstance(e, requests.RequestException) and getattr(e, "response", None) is not None:
            try:
                body_txt = e.response.text
            except Exception:
                body_txt = None
        return False, None, f"{e}\n{('Response body: ' + body_txt) if body_txt else ''}"

def _write_day_summary(day_dir: str, *, deleted_count: int, deleted_sample: Optional[list], ok: bool, why: str = "") -> None:
    if not WRITE_DAY_SUMMARY:
        return
    try:
        out = {
            "deleted_count": int(deleted_count),
            "deleted_sample": deleted_sample or [],
            "ok": bool(ok),
            "why": why,
            "endpoint": "drafts/delete",
            "ts": datetime.now().isoformat(),
        }
        _ensure_dir(day_dir)
        p = os.path.join(day_dir, "delete_summary.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write day summary failed in {day_dir}: {e}")

def _write_range_summary(base_dir: str, *, start_date: str, end_date: str, total_deleted: int, days_processed: int, errors: int) -> None:
    if not WRITE_RANGE_SUMMARY:
        return
    try:
        out = {
            "range": {"from": start_date, "to": end_date},
            "total_deleted": int(total_deleted),
            "days_processed": int(days_processed),
            "errors": int(errors),
            "endpoint": "drafts/delete",
            "ts": datetime.now().isoformat(),
        }
        p = os.path.join(base_dir, "delete_summary_range.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write range summary failed: {e}")

def _iso(d: datetime) -> str:
    return d.strftime("%Y-%m-%d")

# =========================
# MAIN
# =========================

def main():
    start_dt = _parse_date_any(START_DATE)
    end_dt   = _parse_date_any(END_DATE)
    if end_dt < start_dt:
        raise SystemExit(f"END_DATE {END_DATE} is before START_DATE {START_DATE}")

    print(f"[range] {start_dt.date()} → {end_dt.date()} (inclusive)")
    print(f"[config] endpoint=/currency/exchange-rates/drafts/delete, per_day={PER_DAY}, max_per_day={MAX_PER_DAY}\n")

    if PER_DAY:
        # ---------- one API call per day ----------
        total_days = 0
        days_processed = 0
        total_deleted = 0
        skipped_days = 0
        errors = 0

        for d in _daterange_inclusive(start_dt, end_dt):
            total_days += 1
            day_iso = _iso(d)
            day_dir = os.path.join(BASE_DIR, day_iso)
            print(f"\n=== DAY {day_iso} ===")

            ok, resp_json, resp_text = _post_delete(day_iso, day_iso, MAX_PER_DAY)
            if ok and isinstance(resp_json, dict) and resp_json.get("ok"):
                days_processed += 1
                deleted_count = int(resp_json.get("total_deleted", 0))
                total_deleted += deleted_count
                deleted_sample = []
                # Try to collect a small sample from 'per_day' entry in server payload
                try:
                    per_day = resp_json.get("per_day") or []
                    if isinstance(per_day, list):
                        for entry in per_day:
                            if isinstance(entry, dict) and entry.get("date") == day_iso:
                                deleted_sample = entry.get("sample", []) or []
                                break
                except Exception:
                    deleted_sample = []

                print(f"[day] deleted: {deleted_count}")
                _write_day_summary(day_dir,
                                deleted_count=deleted_count,
                                deleted_sample=deleted_sample[:10],
                                ok=True)
            else:
                errors += 1
                why = ""
                if resp_json and isinstance(resp_json, dict):
                    why = resp_json.get("error") or resp_json.get("message") or ""
                elif resp_text:
                    why = str(resp_text)[:400]
                print(f"[error] delete failed for {day_iso}: {why or '(no details)'}")
                _write_day_summary(day_dir, deleted_count=0, deleted_sample=[], ok=False, why=why)
                if STOP_ON_ERROR:
                    print("[halt] STOP_ON_ERROR=True → halting")
                    _write_range_summary(BASE_DIR,
                                         start_date=_iso(start_dt),
                                         end_date=_iso(end_dt),
                                         total_deleted=total_deleted,
                                         days_processed=days_processed,
                                         errors=errors)
                    return
                skipped_days += 1

        # Final summary
        print("\n[summary]")
        print(f"  days in range   : {total_days}")
        print(f"  processed days  : {days_processed}")
        print(f"  total deleted   : {total_deleted}")
        print(f"  skipped days    : {skipped_days}")
        print(f"  errors          : {errors}")

        _write_range_summary(BASE_DIR,
                             start_date=_iso(start_dt),
                             end_date=_iso(end_dt),
                             total_deleted=total_deleted,
                             days_processed=days_processed,
                             errors=errors)

    else:
        # ---------- one API call for the entire range ----------
        df = _iso(start_dt)
        dt = _iso(end_dt)
        print(f"[range] single call: {df}..{dt}")
        ok, resp_json, resp_text = _post_delete(df, dt, MAX_PER_DAY)

        total_deleted = 0
        days_processed = 0
        errors = 0

        if ok and isinstance(resp_json, dict) and resp_json.get("ok"):
            total_deleted = int(resp_json.get("total_deleted", 0))
            days_processed = int(resp_json.get("days_processed", 0))
            print(f"[range] deleted={total_deleted}, days_processed={days_processed}")
        else:
            errors = 1
            why = ""
            if resp_json and isinstance(resp_json, dict):
                why = resp_json.get("error") or resp_json.get("message") or ""
            elif resp_text:
                why = str(resp_text)[:400]
            print(f"[error] range delete failed: {why or '(no details)'}")
            if STOP_ON_ERROR:
                print("[halt] STOP_ON_ERROR=True → halting")

        _write_range_summary(BASE_DIR,
                             start_date=df,
                             end_date=dt,
                             total_deleted=total_deleted,
                             days_processed=days_processed,
                             errors=errors)

if __name__ == "__main__":
    main()


[range] 2025-10-31 → 2025-10-31 (inclusive)
[config] endpoint=/currency/exchange-rates/drafts/delete, per_day=True, max_per_day=None


=== DAY 2025-10-31 ===
[retry] attempt 1/4 failed: HTTPConnectionPool(host='127.0.0.1', port=8000): Max retries exceeded with url: /currency/exchange-rates/drafts/delete?date_from=2025-10-31&date_to=2025-10-31 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001D14D6C01D0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it')). Backing off 1s
[retry] attempt 2/4 failed: HTTPConnectionPool(host='127.0.0.1', port=8000): Max retries exceeded with url: /currency/exchange-rates/drafts/delete?date_from=2025-10-31&date_to=2025-10-31 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001D14D6C0BC0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refuse

KeyboardInterrupt: 

# collect_missing

In [2]:
# collect_missing.py
import os
import re
import json
import time
import requests
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple

# =========================
# CONFIG
# =========================
API_BASE = "http://127.0.0.1:8000"
API_URL = f"{API_BASE}/currency/exchange-rates/fallback/collect-missing"

BASE_DIR = "WebService/data"

# Accepts DD-MM-YYYY or YYYY-MM-DD
START_DATE = "23-11-2025"
END_DATE   = "23-11-2025"

PER_DAY = True
REQUEST_TIMEOUT = 300
RETRY_MAX = 4
RETRY_BACKOFF_BASE = 2
STOP_ON_ERROR = False
WRITE_DAY_SUMMARY = True
WRITE_RANGE_SUMMARY = True

# =========================
# HELPERS
# =========================
_DD_MM_YYYY_DASH = re.compile(r"^(\d{2})-(\d{2})-(\d{4})$")
_YYYY_MM_DD = re.compile(r"^(\d{4})-(\d{2})-(\d{2})$")

def _parse_date_any(s: str) -> datetime:
    s = (s or "").strip()
    m = _DD_MM_YYYY_DASH.fullmatch(s)
    if m:
        dd, mm, yyyy = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    m = _YYYY_MM_DD.fullmatch(s)
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    raise ValueError(f"Date must be DD-MM-YYYY or YYYY-MM-DD, got: {s!r}")

def _daterange_inclusive(start_dt: datetime, end_dt: datetime):
    cur = start_dt
    while cur <= end_dt:
        yield cur
        cur = cur + timedelta(days=1)

def _ensure_dir(p: str) -> None:
    os.makedirs(p, exist_ok=True)

def _post_with_retries(url: str, *, params: Dict[str, Any], timeout: Optional[int]) -> requests.Response:
    last_exc = None
    session = requests.Session()
    headers = {"Content-Type": "application/json"}
    for attempt in range(1, RETRY_MAX + 1):
        try:
            r = session.post(url, params=params, timeout=timeout, headers=headers)
            if r.status_code in (429, 502, 503, 504):
                raise requests.RequestException(f"HTTP {r.status_code}: {r.text[:200]}")
            r.raise_for_status()
            return r
        except (requests.ConnectTimeout, requests.ReadTimeout, requests.ConnectionError, requests.RequestException) as e:
            last_exc = e
            if attempt >= RETRY_MAX:
                break
            sleep_s = max(1, int(RETRY_BACKOFF_BASE) ** (attempt - 1))
            print(f"[retry] attempt {attempt}/{RETRY_MAX} failed: {e}. Backing off {sleep_s}s")
            time.sleep(sleep_s)
    if isinstance(last_exc, Exception):
        raise last_exc
    raise RuntimeError("Unknown POST failure")

def _post_collect(day_from: str, day_to: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
    params: Dict[str, Any] = {"date_from": day_from, "date_to": day_to}
    try:
        r = _post_with_retries(API_URL, params=params, timeout=REQUEST_TIMEOUT)
        try:
            return True, r.json(), None
        except Exception:
            return True, None, r.text
    except Exception as e:
        body_txt = None
        if isinstance(e, requests.RequestException) and getattr(e, "response", None) is not None:
            try:
                body_txt = e.response.text
            except Exception:
                body_txt = None
        return False, None, f"{e}\n{('Response body: ' + body_txt) if body_txt else ''}"

def _write_day_summary(day_dir: str, payload: Dict[str, Any]) -> None:
    if not WRITE_DAY_SUMMARY:
        return
    try:
        _ensure_dir(day_dir)
        p = os.path.join(day_dir, "fallback_collect_summary.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(payload, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write day summary failed in {day_dir}: {e}")

def _write_range_summary(base_dir: str, *, start_date: str, end_date: str, total_missing: int, days_processed: int, errors: int) -> None:
    if not WRITE_RANGE_SUMMARY:
        return
    try:
        out = {
            "range": {"from": start_date, "to": end_date},
            "total_missing": int(total_missing),
            "days_processed": int(days_processed),
            "errors": int(errors),
            "endpoint": "fallback/collect-missing",
            "ts": datetime.now().isoformat(),
        }
        p = os.path.join(base_dir, "fallback_collect_summary_range.json")
        with open(p, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write range summary failed: {e}")

def _iso(d: datetime) -> str:
    return d.strftime("%Y-%m-%d")

# =========================
# MAIN
# =========================
def main():
    start_dt = _parse_date_any(START_DATE)
    end_dt   = _parse_date_any(END_DATE)
    if end_dt < start_dt:
        raise SystemExit(f"END_DATE {END_DATE} is before START_DATE {START_DATE}")

    print(f"[range] {start_dt.date()} → {end_dt.date()} (inclusive)")
    print(f"[config] endpoint=/currency/exchange-rates/fallback/collect-missing, per_day={PER_DAY}\n")

    total_missing = 0
    days_processed = 0
    errors = 0

    if PER_DAY:
        for d in _daterange_inclusive(start_dt, end_dt):
            day_iso = _iso(d)
            day_dir = os.path.join(BASE_DIR, day_iso)
            print(f"\n=== DAY {day_iso} ===")

            ok, resp_json, resp_text = _post_collect(day_iso, day_iso)
            if ok and isinstance(resp_json, dict) and resp_json.get("ok"):
                # grab the day row (if present)
                per_day = resp_json.get("per_day") or []
                found = None
                for entry in per_day:
                    if isinstance(entry, dict) and entry.get("date") == day_iso:
                        found = entry
                        break

                if found:
                    days_processed += 1
                    total_missing += int(found.get("missing", 0))
                    print(f"[day] excel_rows={found.get('excel_rows')} json_rows={found.get('json_rows')} missing={found.get('missing')}")
                    _write_day_summary(day_dir, {
                        "missing_count": found.get("missing", 0),
                        "excel_rows": found.get("excel_rows", 0),
                        "json_rows": found.get("json_rows", 0),
                        "tracker_path": found.get("tracker_path", ""),
                        "export_clicked": found.get("export_clicked", False),
                        "xlsx_path": found.get("xlsx_path", ""),
                        "xlsx_size": found.get("xlsx_size", 0),
                        "headers_seen": found.get("headers_seen", []),
                        "json_file_exists": found.get("json_file_exists", False),
                        "ok": True,
                        "why": "",
                        "endpoint": "fallback/collect-missing",
                        "ts": datetime.now().isoformat(),
                    })
                else:
                    errors += 1
                    print(f"[warn] API ok but 'per_day' did not contain {day_iso}")
                    _write_day_summary(day_dir, {
                        "missing_count": 0, "excel_rows": 0, "json_rows": 0,
                        "tracker_path": "", "export_clicked": False, "xlsx_path": "",
                        "xlsx_size": 0, "headers_seen": [], "json_file_exists": False,
                        "ok": False, "why": "no_day_entry_in_response",
                        "endpoint": "fallback/collect-missing", "ts": datetime.now().isoformat(),
                    })

            else:
                errors += 1
                why = ""
                if resp_json and isinstance(resp_json, dict):
                    why = resp_json.get("error") or resp_json.get("message") or ""
                elif resp_text:
                    why = str(resp_text)[:400]
                print(f"[error] collect failed for {day_iso}: {why or '(no details)'}")
                _write_day_summary(day_dir, {
                    "missing_count": 0, "excel_rows": 0, "json_rows": 0,
                    "tracker_path": "", "export_clicked": False, "xlsx_path": "",
                    "xlsx_size": 0, "headers_seen": [], "json_file_exists": False,
                    "ok": False, "why": why,
                    "endpoint": "fallback/collect-missing", "ts": datetime.now().isoformat(),
                })
                if STOP_ON_ERROR:
                    print("[halt] STOP_ON_ERROR=True → halting")
                    _write_range_summary(BASE_DIR,
                                         start_date=_iso(start_dt),
                                         end_date=_iso(end_dt),
                                         total_missing=total_missing,
                                         days_processed=days_processed,
                                         errors=errors)
                    return

        # summary
        print("\n[summary]")
        print(f"  processed days  : {days_processed}")
        print(f"  total missing   : {total_missing}")
        print(f"  errors          : {errors}")

        _write_range_summary(BASE_DIR,
                             start_date=_iso(start_dt),
                             end_date=_iso(end_dt),
                             total_missing=total_missing,
                             days_processed=days_processed,
                             errors=errors)

    else:
        # Range call (server already iterates day-by-day)
        df = _iso(start_dt)
        dt = _iso(end_dt)
        print(f"[range] single call: {df}..{dt}")
        ok, resp_json, resp_text = _post_collect(df, dt)

        if ok and isinstance(resp_json, dict) and resp_json.get("ok"):
            per_day = resp_json.get("per_day") or []
            days_processed = len([x for x in per_day if x.get("ok")])
            total_missing = sum(int(x.get("missing", 0)) for x in per_day)
            for found in per_day:
                day_iso = found.get("date") or ""
                if not day_iso:
                    continue
                day_dir = os.path.join(BASE_DIR, day_iso)
                _write_day_summary(day_dir, {
                    "missing_count": found.get("missing", 0),
                    "excel_rows": found.get("excel_rows", 0),
                    "json_rows": found.get("json_rows", 0),
                    "tracker_path": found.get("tracker_path", ""),
                    "export_clicked": found.get("export_clicked", False),
                    "xlsx_path": found.get("xlsx_path", ""),
                    "xlsx_size": found.get("xlsx_size", 0),
                    "headers_seen": found.get("headers_seen", []),
                    "json_file_exists": found.get("json_file_exists", False),
                    "ok": found.get("ok", False),
                    "why": found.get("why", ""),
                    "endpoint": "fallback/collect-missing",
                    "ts": datetime.now().isoformat(),
                })
        else:
            errors = 1
            why = ""
            if resp_json and isinstance(resp_json, dict):
                why = resp_json.get("error") or resp_json.get("message") or ""
            elif resp_text:
                why = str(resp_text)[:400]
            print(f"[error] range collect failed: {why or '(no details)'}")

        _write_range_summary(BASE_DIR,
                             start_date=df,
                             end_date=dt,
                             total_missing=total_missing,
                             days_processed=days_processed,
                             errors=errors)

if __name__ == "__main__":
    main()


[range] 2025-11-23 → 2025-11-23 (inclusive)
[config] endpoint=/currency/exchange-rates/fallback/collect-missing, per_day=True


=== DAY 2025-11-23 ===
[day] excel_rows=253 json_rows=408 missing=155

[summary]
  processed days  : 1
  total missing   : 155
  errors          : 0


docker exec -it 969e03517899 sh -lc 'mkdir -p /app/WebService/data/2025-10-05'

docker cp .\WebService\data\2025-10-05\exchange_rates_payload.json 969e03517899:/app/WebService/data/2025-10-05\exchange_rates_payload.json 

# New post_fallback_refill

In [3]:
import requests

BASE = "http://127.0.0.1:8000"
params = {"date_from": "2025-11-23", "date_to": "2025-11-23"}
r = requests.post(f"{BASE}/currency/exchange-rates/fallback/refill-missing", params=params, timeout=12000)
r.raise_for_status()
print(r.json())


{'ok': True, 'total_days': 1, 'posted_days': 1, 'skipped_days': 0, 'errors': 0, 'total_rows_sent': 155, 'per_day': [{'date': '2025-11-23', 'ok': True, 'input_rows': 155, 'kept_rows': 155, 'dedup_removed': 0, 'dropped_no_rate': 0, 'batch_id': 'fallback-refill-2025-11-23-0dbee720', 'duration_sec': 2414.74, 'reports': {'dir': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720', 'result_json': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720\\result.json', 'failed_json': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720\\failed.json', 'failed_csv': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720\\failed.csv', 'skipped_json': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720\\skipped.json', 'skipped_csv': 'reports\\2025-11-23\\fallback-refill-2025-11-23-0dbee720\\skipped.csv'}, 'records_day': '2025-11-23', 'live_has_pending': False, 'created': 155, 'failed': 0, 'skipped': 0}]}


In [7]:
import gc
gc.collect()

2529

# post_fallback_refill

In [5]:
# post_fallback_refill.py
import os
import re
import json
import time
import requests
from datetime import datetime, timedelta
from decimal import Decimal, ROUND_HALF_UP
from typing import List, Dict, Any, Optional, Tuple

# =========================
# CONFIG
# =========================
API_BASE = "http://127.0.0.1:8000"
API_URL_BATCH = f"{API_BASE}/currency/exchange-rates/batch"  # non-streaming

# Where the fallback collector saved the “missing” rows
FALLBACK_DIR = "WebService/TrackDrivers/Fallback"

# Accepts DD-MM-YYYY or YYYY-MM-DD (files are named YYYY-MM-DD.json)
START_DATE = "05-10-2025"
END_DATE   = "05-10-2025"

BATCH_SIZE = 50

# HTTP timeouts / retries
REQUEST_TIMEOUT = 300  # seconds per HTTP request
RETRY_MAX = 4
RETRY_BACKOFF_BASE = 2  # seconds (exponential backoff base)

# Behavior on failures
STOP_ON_ERROR = False  # stop immediately if a batch fails

# Input hygiene
DROP_SAME_CURRENCY = True  # drop items where FromCurrency == ToCurrency
ROUND_RATE_5DP = True      # coerce ExchangeRate to string with 5 dp (API model expectation)

# Output (writes next to each <YYYY-MM-DD>.json in FALLBACK_DIR)
WRITE_DAY_SUMMARY = True

# =========================
# HELPERS
# =========================
_DD_MM_YYYY_DASH = re.compile(r"^(\d{2})-(\d{2})-(\d{4})$")
_YYYY_MM_DD = re.compile(r"^(\d{4})-(\d{2})-(\d{2})$")

def _parse_date_any(s: str) -> datetime:
    s = (s or "").strip()
    m = _DD_MM_YYYY_DASH.fullmatch(s)
    if m:
        dd, mm, yyyy = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    m = _YYYY_MM_DD.fullmatch(s)
    if m:
        yyyy, mm, dd = m.group(1), m.group(2), m.group(3)
        return datetime(int(yyyy), int(mm), int(dd))
    raise ValueError(f"Date must be DD-MM-YYYY or YYYY-MM-DD, got: {s!r}")

def _daterange_inclusive(start_dt: datetime, end_dt: datetime):
    cur = start_dt
    while cur <= end_dt:
        yield cur
        cur = cur + timedelta(days=1)

def _chunked(lst: List[Any], size: int) -> List[List[Any]]:
    size = max(1, int(size))
    return [lst[i:i+size] for i in range(0, len(lst), size)]

def _post_with_retries(url: str, *, json_body: Any, timeout: Optional[int]) -> requests.Response:
    last_exc = None
    session = requests.Session()
    headers = {"Content-Type": "application/json"}
    for attempt in range(1, RETRY_MAX + 1):
        try:
            r = session.post(url, json=json_body, timeout=timeout, headers=headers)
            if r.status_code in (429, 502, 503, 504):
                raise requests.RequestException(f"HTTP {r.status_code}: {r.text[:200]}")
            r.raise_for_status()
            return r
        except (requests.ConnectTimeout, requests.ReadTimeout, requests.ConnectionError, requests.RequestException) as e:
            last_exc = e
            if attempt >= RETRY_MAX:
                break
            sleep_s = max(1, int(RETRY_BACKOFF_BASE) ** (attempt - 1))
            print(f"[retry] attempt {attempt}/{RETRY_MAX} failed: {e}. Backing off {sleep_s}s")
            time.sleep(sleep_s)
    if isinstance(last_exc, Exception):
        raise last_exc
    raise RuntimeError("Unknown POST failure")

def _post_payload_batch(payload: List[Dict[str, Any]]) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
    try:
        r = _post_with_retries(API_URL_BATCH, json_body=payload, timeout=REQUEST_TIMEOUT)
        try:
            return True, r.json(), None
        except Exception:
            return True, None, r.text
    except Exception as e:
        body = None
        if isinstance(e, requests.RequestException) and getattr(e, "response", None) is not None:
            try:
                body = e.response.text
            except Exception:
                body = None
        return False, None, f"{e}\n{('Response body: ' + body) if body else ''}"

# ---------- normalization used by API model (ExchangeRateItem) ----------
def _q_norm(q: str | None) -> str:
    s = (q or "Direct").strip().lower()
    return "Indirect" if s.startswith("ind") else "Direct"

def _date_to_ddmmyyyy(s: str | None) -> str:
    if not s:
        return ""
    s = s.strip()
    fmts = [
        "%d.%m.%Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y",
        "%Y/%m/%d", "%Y%m%d", "%Y-%d-%m"
    ]
    for f in fmts:
        try:
            dt = datetime.strptime(s, f)
            return dt.strftime("%d.%m.%Y")
        except Exception:
            pass
    # If it already looks like DD.MM.YYYY but failed parse for some reason, keep as-is
    return s

def _rate_5dp(v: Any) -> str:
    q = Decimal(str(v))
    if q <= 0:
        # Let API reject if invalid; still format
        q = Decimal("0.00001")
    q = q.quantize(Decimal("0.00001"), rounding=ROUND_HALF_UP)
    return f"{q:.5f}"

def _norm_row(r: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    try:
        return {
            "ExchangeRateType": (r.get("ExchangeRateType") or "").strip().upper(),
            "FromCurrency": (r.get("FromCurrency") or "").strip().upper(),
            "ToCurrency": (r.get("ToCurrency") or "").strip().upper(),
            "ValidFrom": _date_to_ddmmyyyy(r.get("ValidFrom")),
            "Quotation": _q_norm(r.get("Quotation")),
            "ExchangeRate": _rate_5dp(r.get("ExchangeRate")),
        }
    except Exception:
        return None

def _dedupe_payload(payload: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], int]:
    """
    Deduplicate by logical key (Type, From, To, Date, Quotation). Preserve order.
    """
    seen: set[Tuple[str, str, str, str, str]] = set()
    out: List[Dict[str, Any]] = []
    removed = 0
    for r in payload:
        key = (
            (r.get("ExchangeRateType") or "").upper(),
            (r.get("FromCurrency") or "").upper(),
            (r.get("ToCurrency") or "").upper(),
            (r.get("ValidFrom") or ""),
            _q_norm(r.get("Quotation")),
        )
        if key in seen:
            removed += 1
            continue
        seen.add(key)
        out.append(r)
    return out, removed

def _filter_same_currency(payload: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], int]:
    out: List[Dict[str, Any]] = []
    dropped = 0
    for r in payload:
        f = (r.get("FromCurrency") or "").upper()
        t = (r.get("ToCurrency") or "").upper()
        if f and t and f == t and DROP_SAME_CURRENCY:
            dropped += 1
            continue
        out.append(r)
    return out, dropped

# ---------- IO ----------
def _load_fallback_for_day(day_iso: str) -> Optional[List[Dict[str, Any]]]:
    """
    Read WebService/TrackDrivers/Fallback/<YYYY-MM-DD>.json
    Return list[dict] or None if missing/bad.
    """
    path = os.path.join(FALLBACK_DIR, f"{day_iso}.json")
    if not os.path.isfile(path):
        print(f"[skip] fallback file not found: {path}")
        return None
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f) or []
        if not isinstance(data, list):
            print(f"[warn] fallback file is not a list: {path}")
            return None
        # Normalize rows to API model shape
        norm: List[Dict[str, Any]] = []
        for r in data:
            if not isinstance(r, dict):
                continue
            nr = _norm_row(r)
            if nr:
                norm.append(nr)
        return norm
    except Exception as e:
        print(f"[warn] failed reading fallback {path}: {e}")
        return None

def _write_day_summary(day_iso: str, *, total_rows: int, rows_after_filters: int,
                       dupes_removed: int, same_currency_dropped: int,
                       posted_batches: int, rows_sent: int, errors: int) -> None:
    if not WRITE_DAY_SUMMARY:
        return
    try:
        out = {
            "total_rows_in_fallback": total_rows,
            "rows_after_filters": rows_after_filters,
            "duplicates_removed": dupes_removed,
            "same_currency_dropped": same_currency_dropped,
            "posted_batches": posted_batches,
            "rows_sent": rows_sent,
            "errors": errors,
            "endpoint": "/currency/exchange-rates/batch",
            "batch_size": BATCH_SIZE,
            "ts": datetime.now().isoformat(),
        }
        path = os.path.join(FALLBACK_DIR, f"{day_iso}.refill_summary.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(out, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"[warn] write summary failed for {day_iso}: {e}")

# =========================
# MAIN
# =========================
def main():
    start_dt = _parse_date_any(START_DATE)
    end_dt   = _parse_date_any(END_DATE)
    if end_dt < start_dt:
        raise SystemExit(f"END_DATE {END_DATE} is before START_DATE {START_DATE}")

    bs = max(1, int(BATCH_SIZE))

    total_days = 0
    posted_days = 0
    posted_batches = 0
    rows_sent = 0
    skipped_days = 0
    errors = 0
    total_dupes_removed = 0
    total_same_currency_dropped = 0

    print(f"[range] {start_dt.date()} → {end_dt.date()} (inclusive)")
    print(f"[config] batch_size = {bs}, source = {FALLBACK_DIR}, endpoint = /batch\n")

    for d in _daterange_inclusive(start_dt, end_dt):
        total_days += 1
        day_iso = d.strftime("%Y-%m-%d")
        print(f"\n=== DAY {day_iso} ===")

        payload = _load_fallback_for_day(day_iso)
        if not payload:
            print(f"[skip] no valid fallback rows for {day_iso}")
            skipped_days += 1
            continue

        total_rows_in_file = len(payload)

        # Hygiene (dedupe, drop same-currency, already normalized date/quotation/rate)
        deduped_payload, dupes_removed = _dedupe_payload(payload)
        total_dupes_removed += dupes_removed

        filtered_payload, same_drop = _filter_same_currency(deduped_payload)
        total_same_currency_dropped += same_drop

        n_after = len(filtered_payload)
        print(f"[day] rows: {total_rows_in_file} → {n_after} "
              f"(dedup removed {dupes_removed}, same-currency dropped {same_drop})")

        if n_after == 0:
            _write_day_summary(
                day_iso,
                total_rows=total_rows_in_file,
                rows_after_filters=n_after,
                dupes_removed=dupes_removed,
                same_currency_dropped=same_drop,
                posted_batches=0,
                rows_sent=0,
                errors=0,
            )
            continue

        chunks = _chunked(filtered_payload, bs)
        total_chunks = len(chunks)
        print(f"[day] batching: {total_chunks} batch(es) of up to {bs}")

        day_had_success = False
        day_errors = 0
        day_rows_sent = 0
        day_batches_posted = 0

        for idx, batch in enumerate(chunks, start=1):
            print(f"[post] {day_iso} | batch {idx}/{total_chunks}: {len(batch)} rows")
            ok, resp_json, resp_text = _post_payload_batch(batch)
            if ok:
                day_had_success = True
                day_batches_posted += 1
                posted_batches += 1
                day_rows_sent += len(batch)
                rows_sent += len(batch)
                # Optionally inspect resp_json here
            else:
                day_errors += 1
                errors += 1
                print(f"[error] POST failed for {day_iso} batch {idx}/{total_chunks}:\n"
                      f"{(resp_text or '(no body)')[:1000]}")
                if STOP_ON_ERROR:
                    print("[halt] STOP_ON_ERROR=True → halting at this batch")
                    _write_day_summary(
                        day_iso,
                        total_rows=total_rows_in_file,
                        rows_after_filters=n_after,
                        dupes_removed=dupes_removed,
                        same_currency_dropped=same_drop,
                        posted_batches=day_batches_posted,
                        rows_sent=day_rows_sent,
                        errors=day_errors,
                    )
                    # Final tallies before exit
                    print("\n[summary]")
                    print(f"  days in range         : {total_days}")
                    print(f"  posted days           : {posted_days}")
                    print(f"  posted batches        : {posted_batches}")
                    print(f"  rows sent             : {rows_sent}")
                    print(f"  skipped days          : {skipped_days}")
                    print(f"  errors                : {errors}")
                    print(f"  duplicates removed    : {total_dupes_removed}")
                    print(f"  same-currency dropped : {total_same_currency_dropped}")
                    return

        if day_had_success:
            posted_days += 1

        _write_day_summary(
            day_iso,
            total_rows=total_rows_in_file,
            rows_after_filters=n_after,
            dupes_removed=dupes_removed,
            same_currency_dropped=same_drop,
            posted_batches=day_batches_posted,
            rows_sent=day_rows_sent,
            errors=day_errors,
        )

    # Final summary
    print("\n[summary]")
    print(f"  days in range         : {total_days}")
    print(f"  posted days           : {posted_days}")
    print(f"  posted batches        : {posted_batches}")
    print(f"  rows sent             : {rows_sent}")
    print(f"  skipped days          : {skipped_days}")
    print(f"  errors                : {errors}")
    print(f"  duplicates removed    : {total_dupes_removed}")
    print(f"  same-currency dropped : {total_same_currency_dropped}")

if __name__ == "__main__":
    main()


[range] 2025-10-05 → 2025-10-05 (inclusive)
[config] batch_size = 50, source = WebService/TrackDrivers/Fallback, endpoint = /batch


=== DAY 2025-10-05 ===
[skip] fallback file not found: WebService/TrackDrivers/Fallback\2025-10-05.json
[skip] no valid fallback rows for 2025-10-05

[summary]
  days in range         : 1
  posted days           : 0
  posted batches        : 0
  rows sent             : 0
  skipped days          : 1
  errors                : 0
  duplicates removed    : 0
  same-currency dropped : 0
