## Bulk download ESO spectra (HARPS, NIRPS, UVES) for FGK list

This notebook:
- Reads `../data/fgk.txt`
- Filters rows with `origin in {"HARPS", "NIRPS", "UVES"}`
- Uses the FGK `ID` column as the archive identifier (defaults to `obs_id` in TAP)
- Downloads spectra with concurrency (HPC-friendly threads)
- Saves:
  - raw FITS to `../data/spectra/eso/{instrument}/`
  - Zarr to `../data/eso_{instrument}.zarr`
  - not-found identifiers to `../data/eso_{instrument}_not_found.jsonl`
  - all errors to `../data/eso_{instrument}_errors.jsonl`
  - a restartable cache to `../data/eso_{instrument}_bulk_cache.jsonl`

> Note: This runs real network downloads and may take a while.

In [1]:
import sys
import os

# Add the project root to sys.path so `spectra_download` is importable
notebook_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd()
sys.path.append(os.path.dirname(notebook_dir))

In [2]:
from __future__ import annotations

import json
import logging
from pathlib import Path

from tqdm.auto import tqdm

from spectra_download import EsoHarpsSource, EsoNirpsSource, EsoUvesSource, SpectraRequest, bulk_download

logging.basicConfig(level=logging.INFO)

ROOT = Path("..").resolve()
FGK_PATH = ROOT / "data" / "fgk.txt"
OUT_ROOT = ROOT / "data"

print("FGK:", FGK_PATH)
print("OUT_ROOT:", OUT_ROOT)

FGK: /Users/mjablons/Documents/spectra/data/fgk.txt
OUT_ROOT: /Users/mjablons/Documents/spectra/data


In [3]:
import csv


def load_fgk_name_candidates_by_origin(path: Path) -> dict[str, list[tuple[str, list[str]]]]:
    out: dict[str, list[tuple[str, list[str]]]] = {"HARPS": [], "NIRPS": [], "UVES": []}
    seen: dict[str, set[str]] = {k: set() for k in out}

    def _clean(x: str | None) -> str:
        x = (x or "").strip()
        return "" if not x or x == "-" else x

    with path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f, delimiter="\t")
        for row in reader:
            origin = (row.get("origin") or "").strip().upper()
            if origin not in out:
                continue

            star = _clean(row.get("star"))
            alt1 = _clean(row.get("star_alt1"))
            alt2 = _clean(row.get("star_alt2"))

            candidates = [c for c in [star, alt1, alt2] if c]
            if not candidates:
                continue

            primary = candidates[0]
            fallbacks = [c for c in candidates[1:] if c != primary]

            if primary in seen[origin]:
                continue
            seen[origin].add(primary)
            out[origin].append((primary, fallbacks))

    return out


names_by_origin = load_fgk_name_candidates_by_origin(FGK_PATH)
for k, v in names_by_origin.items():
    example = v[0] if v else None
    print(k, len(v), "example:", example)

HARPS 81 example: ('HIP101345', ['HD195564'])
NIRPS 0 example: None
UVES 65 example: ('HIP112440', ['HD215665'])


In [4]:
# Concurrency settings
MAX_WORKERS = 16
PER_SOURCE_MAX_WORKERS = 6

RESUME = True

INSTRUMENTS = {
    "HARPS": ("eso_harps", EsoHarpsSource(timeout=60, max_retries=3)),
    "NIRPS": ("eso_nirps", EsoNirpsSource(timeout=60, max_retries=3)),
    "UVES": ("eso_uves", EsoUvesSource(timeout=60, max_retries=3)),
}

sources = {src_name: src for _, (src_name, src) in INSTRUMENTS.items()}
print("Sources:", list(sources.keys()))

Sources: ['eso_harps', 'eso_nirps', 'eso_uves']


In [None]:
def run_instrument(origin: str) -> None:
    src_name, _src = INSTRUMENTS[origin]

    raw_dir = OUT_ROOT / "spectra" / "eso" / origin.lower()
    raw_dir.mkdir(parents=True, exist_ok=True)

    zarr_path = OUT_ROOT / f"eso_{origin.lower()}.zarr"
    not_found_path = OUT_ROOT / f"eso_{origin.lower()}_not_found.jsonl"
    error_path = OUT_ROOT / f"eso_{origin.lower()}_errors.jsonl"
    cache_path = OUT_ROOT / f"eso_{origin.lower()}_bulk_cache.jsonl"

    # Load cache for resume
    completed: set[str] = set()
    if RESUME and cache_path.exists():
        for line in cache_path.read_text(encoding="utf-8").splitlines():
            if not line.strip():
                continue
            try:
                rec = json.loads(line)
                if rec.get("source") == src_name and rec.get("status") in {"ok", "not_found"}:
                    completed.add(str(rec.get("identifier")))
            except Exception:
                pass

    entries = [e for e in names_by_origin[origin] if e[0] not in completed]

    extra_params = {
        "raw_save_path": str(raw_dir),
        "zarr_paths": str(zarr_path),
        "not_found_path": str(not_found_path),
        "error_path": str(error_path),
        "progress_every": 25,
        # TAP sources generally don't have CCF.
        "process_ccf": False,
        # Use a cone search around resolved coordinates (preferred).
        "use_coords": True,
        "search_radius_arcsec": 5.0,
        # If resolution fails, fall back to target_name matching.
        "identifier_field": "target_name",
    }

    requests = [
        SpectraRequest(
            source=src_name,
            identifier=primary,
            extra_params={**extra_params, "fallback_identifiers": fallbacks},
        )
        for primary, fallbacks in entries
    ]

    print("\n==", origin, "==")
    print("Total stars:", len(names_by_origin[origin]))
    print("Already completed:", len(completed))
    print("To process now:", len(entries))
    print("raw:", raw_dir)
    print("zarr:", zarr_path)
    print("cache:", cache_path)

    pbar = tqdm(total=len(requests), desc=f"{origin}", unit="obs")

    def on_result(res, done, total):  # type: ignore[no-untyped-def]
        pbar.update(1)
        status = "ok" if res.success else "error"
        if res.success and len(res.spectra) == 0:
            status = "not_found"
        rec = {
            "source": res.request.source,
            "identifier": res.request.identifier,
            "status": status,
            "error": res.error,
            "n_products": len(res.spectra),
        }
        with cache_path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(rec) + "\n")

    try:
        _ = bulk_download(
            requests,
            sources,
            max_workers=MAX_WORKERS,
            per_source_max_workers=PER_SOURCE_MAX_WORKERS,
            on_result=on_result,
        )
    finally:
        pbar.close()


# Run one instrument at a time to keep load predictable.
# Change this list as needed.
for origin in ["HARPS", "NIRPS", "UVES"]:
    run_instrument(origin)


== HARPS ==
Total stars: 81
Already completed: 84
To process now: 81
raw: /Users/mjablons/Documents/spectra/data/spectra/eso/harps
zarr: /Users/mjablons/Documents/spectra/data/eso_harps.zarr
cache: /Users/mjablons/Documents/spectra/data/eso_harps_bulk_cache.jsonl


HARPS:   0%|          | 0/81 [00:00<?, ?obs/s]

INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.bulk:Bulk download start
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.sources.base:Downloading spectra
INFO:spectra_download.sources.base:Parsed spectra response
INFO:spectra_download.sources.base:Post-processing spectra
INFO:spectra_download.sources.base:Parsed spectra response
INFO:spectra_download.sources.base:Post-processing spectra
INFO:spectra_download.sources.base:Parsed spectra response
INFO:spectra_download.sources.base:Post-processing spectra
INFO:spectra_download.sources.base