In [None]:
# Configure AWS profile for local development
%env AWS_PROFILE=platform-developer

In [None]:
from oai_pmh_client.client import OAIClient
import httpx
import logging

from utils.aws import get_ssm_parameter

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s - %(message)s",
    force=True,
)

logging.getLogger("httpx").setLevel(logging.WARN)

API_TOKEN_SSM_PARAMETER = "/catalogue_pipeline/axiell_collections/oai_api_token"
API_URL_SSM_PARAMETER = "/catalogue_pipeline/axiell_collections/oai_api_url"

API_TOKEN = get_ssm_parameter(API_TOKEN_SSM_PARAMETER)
API_URL = get_ssm_parameter(API_URL_SSM_PARAMETER)

HTTP_TIMEOUT = httpx.Timeout(10.0, read=60.0)  # connect/read timeout tuning
OAI_REQUEST_RETRIES = 4
OAI_REQUEST_BACKOFF = 0.75
OAI_REQUEST_BACKOFF_MAX = 5.0

class AuthenticatedHTTPXClient(httpx.Client):
    def build_request(self, method, url, **kwargs):
        # Add the API token as a query parameter to each request
        params = kwargs.pop("params", {})
        params["token"] = API_TOKEN
        kwargs["params"] = params
        return super().build_request(method, url, **kwargs)
    
# Instantiate the authenticated HTTPX client
httpx_client = AuthenticatedHTTPXClient(timeout=HTTP_TIMEOUT)

# Create a client for the arXiv OAI-PMH endpoint.
client = OAIClient(
    API_URL,
    client=httpx_client,
    max_request_retries=OAI_REQUEST_RETRIES,
    request_backoff_factor=OAI_REQUEST_BACKOFF,
    request_max_backoff=OAI_REQUEST_BACKOFF_MAX,
    redacted_query_params=["token"],
)

# Get the repository's identity.
identity = client.identify()
print(identity)

# List the available metadata formats.
formats = client.list_metadata_formats()
print(formats)

# List the sets in the repository.
sets = client.list_sets()
print(sets)

In [None]:
from __future__ import annotations
import importlib
import json
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path

from lxml import etree
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
from pyiceberg.table import Table as IcebergTable

import oai_pmh_client.client
importlib.reload(oai_pmh_client.client)
from oai_pmh_client.client import OAIClient
from adapters.utils.window_store import (
    IcebergWindowStore,
    WINDOW_STATUS_PARTITION_SPEC,
    WINDOW_STATUS_SCHEMA,
)
from adapters.utils.window_harvester import WindowHarvestManager

# Reuse the authenticated HTTPX client from the previous cell
client = OAIClient(
    API_URL,
    client=httpx_client,
    max_request_retries=OAI_REQUEST_RETRIES,
    request_backoff_factor=OAI_REQUEST_BACKOFF,
    request_max_backoff=OAI_REQUEST_BACKOFF_MAX,
    redacted_query_params=["token"],
)

# ---------------------- configuration knobs ----------------------
LOOKBACK_DAYS = 3  # default window range covers the past week
WINDOW_MINUTES = 15  # length of each sub-window
MAX_PARALLEL_REQUESTS = 3  # number of windows fetched in parallel
MAX_WINDOWS = None  # set to an integer to limit processing during tests
METADATA_PREFIX = "oai_raw"
SET_SPEC = "collect"
AUTO_RUN_PENDING_WINDOWS = True

# ---------------------------- paths -------------------------------
DATA_DIR = Path("data/records")
ICEBERG_ROOT = Path("data/iceberg")
ICEBERG_WAREHOUSE = ICEBERG_ROOT / "warehouse"
ICEBERG_CATALOG = ICEBERG_ROOT / "catalog.db"
ICEBERG_NAMESPACE = ("harvest",)
ICEBERG_TABLE_NAME = "window_status"
ICEBERG_CATALOG_NAME = "window_status_catalog"
DATA_DIR.mkdir(parents=True, exist_ok=True)
ICEBERG_ROOT.mkdir(parents=True, exist_ok=True)
ICEBERG_WAREHOUSE.mkdir(parents=True, exist_ok=True)


def _schema_field_names(schema) -> list[str]:
    return [field.name for field in schema.fields]


EXPECTED_WINDOW_SCHEMA_FIELDS = _schema_field_names(WINDOW_STATUS_SCHEMA)


def load_or_create_window_status_table() -> IcebergTable:
    catalog = SqlCatalog(
        name=ICEBERG_CATALOG_NAME,
        uri=f"sqlite:///{ICEBERG_CATALOG.resolve()}",
        warehouse=str(ICEBERG_WAREHOUSE.resolve()),
    )
    try:
        catalog.create_namespace(ICEBERG_NAMESPACE)
    except NamespaceAlreadyExistsError:
        pass
    identifier = (*ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME)
    if catalog.table_exists(identifier):
        table = catalog.load_table(identifier)
        existing_fields = _schema_field_names(table.schema())
        if existing_fields != EXPECTED_WINDOW_SCHEMA_FIELDS:
            print(
                "Existing window_status table schema is outdated. Dropping table to rebuild with latest schema..."
            )
            catalog.drop_table(identifier)
        else:
            return table
    return catalog.create_table(
        identifier=identifier,
        schema=WINDOW_STATUS_SCHEMA,
        partition_spec=WINDOW_STATUS_PARTITION_SPEC,
    )


window_status_table = load_or_create_window_status_table()
store = IcebergWindowStore(window_status_table)

# ------------------------ helper funcs --------------------------
def sanitize_identifier(identifier: str) -> str:
    return re.sub(r"[^A-Za-z0-9._-]+", "_", identifier)


def header_to_dict(header):
    if header is None:
        return None
    return {
        "identifier": header.identifier,
        "datestamp": header.datestamp.isoformat() if header.datestamp else None,
        "set_specs": header.set_specs,
        "is_deleted": header.is_deleted,
    }


def record_to_payload(
    record,
    window_start: datetime,
    window_end: datetime,
    fallback_suffix: int,
    identifier_override: str | None = None,
):
    header = record.header
    identifier = (
        identifier_override
        if identifier_override is not None
        else header.identifier if header else f"no-header-{window_start.isoformat()}-{fallback_suffix}"
    )
    metadata_xml = None
    if record.metadata is not None:
        metadata_xml = etree.tostring(record.metadata, encoding="unicode", pretty_print=True)
    return identifier, {
        "window": {
            "from": window_start.isoformat(),
            "until": window_end.isoformat(),
        },
        "header": header_to_dict(header),
        "metadata_xml": metadata_xml,
    }


def write_record_json(identifier: str, payload: dict):
    target = DATA_DIR / f"{sanitize_identifier(identifier)}.json"
    target.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
    return target


def write_record_callback(identifier, record, window_start, window_end, fallback_index):
    _, payload = record_to_payload(
        record,
        window_start,
        window_end,
        fallback_index,
        identifier_override=identifier,
    )
    write_record_json(identifier, payload)


harvester = WindowHarvestManager(
    client=client,
    store=store,
    metadata_prefix=METADATA_PREFIX,
    set_spec=SET_SPEC,
    window_minutes=WINDOW_MINUTES,
    max_parallel_requests=MAX_PARALLEL_REQUESTS,
    record_callback=write_record_callback,
)


def harvest_pending_windows(
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    max_windows: int | None = MAX_WINDOWS,
):
    end_time = end_time or datetime.now(timezone.utc)
    start_time = start_time or (end_time - timedelta(days=LOOKBACK_DAYS))
    summaries = harvester.harvest_recent(
        start_time=start_time,
        end_time=end_time,
        max_windows=max_windows,
    )
    total_windows = len(summaries)
    if total_windows == 0:
        print("No windows to process.")
        return []
    for summary in summaries:
        state = summary["state"].upper()
        record_total = len(summary.get("record_ids", []))
        print(
            f"[{state}] {summary['window_key']} -> {record_total} record(s) "
            f"after {summary['attempts']} attempt(s)"
        )
        if summary.get("last_error") and state != "SUCCESS":
            print(f"    Last error: {summary['last_error']}")
    return summaries


if AUTO_RUN_PENDING_WINDOWS:
    harvest_pending_windows()
else:
    failed_count = len(harvester.failed_windows())
    print("Harvest setup complete. Call harvest_pending_windows() when you're ready to process windows.")
    print(f"Current failed windows recorded: {failed_count}")

In [None]:
RETRY_FAILED_WINDOWS = True  # toggle to True to perform retries after inspection
MAX_FAILED_WINDOWS_TO_RETRY = None  # optionally cap the number retried in one pass

failed_rows = sorted(harvester.failed_windows(), key=lambda row: row["window_start"])
if not failed_rows:
    print("No failed windows recorded in Iceberg.")
else:
    print(f"Failed windows recorded: {len(failed_rows)}")
    preview = failed_rows[: min(5, len(failed_rows))]
    for row in preview:
        print(
            f"  {row['window_key']} | attempts={row['attempts']} | last_error={row.get('last_error')}"
        )

    if RETRY_FAILED_WINDOWS:
        to_retry = (
            failed_rows
            if MAX_FAILED_WINDOWS_TO_RETRY is None
            else failed_rows[:MAX_FAILED_WINDOWS_TO_RETRY]
        )
        if not to_retry:
            print("Retry toggle enabled but MAX_FAILED_WINDOWS_TO_RETRY limited the set to zero.")
        else:
            print(f"Retrying {len(to_retry)} failed windows...")
            summaries = harvester.retry_failed_windows(limit=len(to_retry))
            for summary in summaries:
                state = summary["state"].upper()
                record_total = len(summary.get("record_ids", []))
                print(
                    f"[{state}] {summary['window_key']} -> {record_total} record(s) after {summary['attempts']} attempt(s)"
                )
                if summary.get("last_error") and state != "SUCCESS":
                    print(f"    Last error: {summary['last_error']}")

        refreshed = harvester.failed_windows()
        print(
            f"Post-retry failures remaining: {len(refreshed)}"
        )
    else:
        print("Retries disabled; review the details above to decide on next steps.")

In [None]:
report = harvester.coverage_report()
if report.total_windows == 0:
    print("No window activity has been recorded in Iceberg yet.")
else:
    print("Window coverage overview:")
    print(f"  Time span: {report.range_start.isoformat()} -> {report.range_end.isoformat()}")
    print(f"  Total windows: {report.total_windows}")
    state_summary = ", ".join(f"{state}={count}" for state, count in report.state_counts.items())
    print(f"  State counts: {state_summary}")
    print(f"  Total window coverage (hours): {report.coverage_hours:.2f}")
    if report.coverage_gaps:
        print("  Coverage gaps detected:")
        for gap in report.coverage_gaps[:5]:
            print(f"    Gap between {gap.start.isoformat()} -> {gap.end.isoformat()}")
        if len(report.coverage_gaps) > 5:
            print(f"    ...and {len(report.coverage_gaps) - 5} more gaps")
    else:
        print("  No coverage gaps detected between processed windows.")

    if report.failures:
        print(f"  Failures recorded in range: {len(report.failures)}")
        for failure in report.failures[:5]:
            print(
                f"    {failure.window_key} | attempts={failure.attempts} | last_error={failure.last_error}"
            )
        if len(report.failures) > 5:
            print(f"    ...and {len(report.failures) - 5} more failures")