**Imports**

In [None]:
from __future__ import annotations

import argparse
import csv
import io
import os
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np
import requests
from obspy import UTCDateTime, read as obspy_read
from obspy.clients.fdsn import Client
from obspy.clients.fdsn.header import FDSNNoDataException

In [None]:
# -----------------------------
# Defaults / knobs
# -----------------------------
CHANNEL = os.getenv("CHANNEL", "BH?,HH?")
LOCATION = os.getenv("LOCATION", "*")
TIMEOUT_S = float(os.getenv("TIMEOUT_S", "60"))
DEFAULT_MAX_BYTES = int(os.getenv("MAX_BYTES", str(200 * 1024 * 1024)))  # 200 MB

DATACENTER_MAP: Dict[str, str] = {
    "IRISDMC": "IRIS",
    "IRIS": "IRIS",
    "SCEDC": "SCEDC",
    "NCEDC": "NCEDC",
}


@dataclass(frozen=True)
class StationRow:
    row_index: int
    station: str
    datacenter: str
    start: UTCDateTime
    end: UTCDateTime
    site: str
    lat: float
    lon: float
    elev_m: float


@dataclass(frozen=True)
class DayTask:
    row: StationRow
    provider: str
    day_start: UTCDateTime
    day_end: UTCDateTime


parse_date_ymd(s: str) -> UTCDateTime

Parses a date string like YYYY-MM-DD into an ObsPy UTCDateTime at midnight UTC. It’s used when reading the input station CSV so that each station’s Start/End dates become time objects the rest of the pipeline can work with. Called by read_station_csv().

In [None]:
def parse_date_ymd(s: str) -> UTCDateTime:
    return UTCDateTime(s.strip())

parse_iso_time(s: str) -> UTCDateTime

Parses an ISO-8601 datetime string (e.g., 2014-05-01T00:00:00Z) into UTCDateTime. It’s used for CLI overrides (--start, --end) so you can override the time range for all stations. Called by main().

In [None]:
def parse_iso_time(s: str) -> UTCDateTime:
    return UTCDateTime(s.strip())

provider_for_datacenter(dc: str) -> str

Maps the CSV “DataCenter” field (e.g., IRISDMC) to an ObsPy FDSN client shortcut (e.g., IRIS). This ensures the correct remote FDSN service is used for station and dataselect calls. Called by read_station_csv() indirectly via build_tasks(), and also used when creating tasks.

In [None]:
def provider_for_datacenter(dc: str) -> str:
    key = dc.strip().upper()
    if key not in DATACENTER_MAP:
        raise ValueError(f"Unknown DataCenter '{dc}'. Add it to DATACENTER_MAP.")
    return DATACENTER_MAP[key]

read_station_csv(csv_path: Path) -> List[StationRow]

Reads the input CSV into a list of StationRow objects (one per row). It validates required columns, converts Start/End to UTCDateTime, and makes the End date inclusive by adding 86400 seconds (so the range covers the full end day). Called by main() to ingest the input.

In [None]:
def read_station_csv(csv_path: Path) -> List[StationRow]:
    rows: List[StationRow] = []
    with csv_path.open("r", encoding="utf-8-sig", newline="") as f:
        reader = csv.DictReader(f)
        required = {"Station", "DataCenter", "Start", "End", "Site", "Latitude", "Longitude", "Elevation"}
        missing = required - set(reader.fieldnames or [])
        if missing:
            raise ValueError(f"CSV missing required columns: {sorted(missing)}")

        for i, r in enumerate(reader, start=1):
            if not any((v or "").strip() for v in r.values()):
                continue
            start = parse_date_ymd(r["Start"])
            end = parse_date_ymd(r["End"]) + 86400  # inclusive end date
            rows.append(
                StationRow(
                    row_index=i,
                    station=r["Station"].strip(),
                    datacenter=r["DataCenter"].strip(),
                    start=start,
                    end=end,
                    site=(r.get("Site") or "").strip(),
                    lat=float(r["Latitude"]),
                    lon=float(r["Longitude"]),
                    elev_m=float(r["Elevation"]),
                )
            )
    return rows

iter_days(t1: UTCDateTime, t2: UTCDateTime)

Breaks a time interval into daily windows aligned to UTC midnight: yields (day_start, day_end) for each day intersecting [t1, t2). It’s what drives the “one metadata row per day” behavior. Called by build_tasks().

In [None]:
def iter_days(t1: UTCDateTime, t2: UTCDateTime) -> Iterable[Tuple[UTCDateTime, UTCDateTime]]:
    """
    Yield [day_start, day_end) windows aligned to midnight UTC, intersected with [t1,t2).
    """
    cur = UTCDateTime(t1.date)  # midnight
    while cur < t2:
        nxt = cur + 86400
        win_start = max(cur, t1)
        win_end = min(nxt, t2)
        if win_end > win_start:
            yield (win_start, win_end)
        cur = nxt

pick_station_channels(inv) -> List[(net, sta, loc, cha)]

Extracts unique channel tuples from an ObsPy Inventory (returned by get_stations(level="channel")). The output is used to build a bulk dataselect request. Called by process_day_task().

In [None]:
def pick_station_channels(inv) -> List[tuple[str, str, str, str]]:
    out = set()
    for net in inv:
        for sta in net:
            for cha in sta:
                loc = cha.location_code or ""
                out.add((net.code, sta.code, loc, cha.code))
    return sorted(out)

build_bulk_lines(chan_tuples, t1, t2) -> str

Creates the body of an FDSN bulk dataselect request: one line per channel tuple with NET STA LOC CHA START END. It converts blank locations to -- (required by the bulk format). Called by process_day_task() right before streaming waveforms.

In [None]:
def build_bulk_lines(chan_tuples: List[tuple[str, str, str, str]], t1: UTCDateTime, t2: UTCDateTime) -> str:
    lines = []
    for net, sta, loc, cha in chan_tuples:
        loc_out = loc if loc != "" else "--"
        lines.append(f"{net} {sta} {loc_out} {cha} {t1.isoformat()} {t2.isoformat()}")
    return "\n".join(lines) + "\n"

stream_dataselect_bulk(dataselect_base_url, bulk_text, ...) -> bytes

Performs an HTTP POST to dataselect /query and streams the response into memory (up to max_bytes). Returns raw MiniSEED bytes for that day/task, or b"" if no data. Called by process_day_task() (wrapped by retry logic).

In [None]:
def stream_dataselect_bulk(dataselect_base_url: str, bulk_text: str, *, timeout_s: float, max_bytes: int) -> bytes:
    url = dataselect_base_url.rstrip("/") + "/query"
    headers = {"Content-Type": "text/plain", "Accept": "application/vnd.fdsn.mseed"}

    with requests.post(url, data=bulk_text.encode("utf-8"), headers=headers, stream=True, timeout=timeout_s) as r:
        if r.status_code in (204, 404):
            return b""
        r.raise_for_status()

        buf = bytearray()
        for chunk in r.iter_content(chunk_size=1024 * 128):
            if not chunk:
                continue
            buf.extend(chunk)
            if len(buf) > max_bytes:
                raise MemoryError(
                    f"MiniSEED exceeds max_bytes={max_bytes}. Reduce date span per request or increase --max-bytes."
                )
        return bytes(buf)

is_transient_network_error(e: Exception) -> bool

Heuristically classifies errors like connection resets/timeouts as “transient” (worth retrying). It exists to handle network issues and the ObsPy “splitlines” edge-case by retrying instead of failing immediately. Called by with_retries().

In [None]:
def is_transient_network_error(e: Exception) -> bool:
    msg = str(e).lower()
    return any(
        s in msg
        for s in (
            "connection reset",
            "connection aborted",
            "remote disconnected",
            "temporarily unavailable",
            "timed out",
            "timeout",
            "ssl",
            "chunkedencodingerror",
            "protocolerror",
            "max retries exceeded",
        )
    ) or isinstance(e, (ConnectionResetError, TimeoutError))

with_retries(fn, retries, backoff)

Runs a function and retries it on transient network errors using exponential backoff + jitter. It centralizes “be resilient against resets/timeouts.” Called by process_day_task() around get_stations() and stream_dataselect_bulk().

In [None]:
def with_retries(fn, *, retries: int, backoff: float):
    last = None
    for attempt in range(1, retries + 1):
        try:
            return fn()
        except FDSNNoDataException:
            raise
        except Exception as e:
            last = e
            if attempt == retries or not is_transient_network_error(e):
                raise
            sleep_s = backoff * (2 ** (attempt - 1)) + random.uniform(0, backoff)
            time.sleep(sleep_s)
    raise RuntimeError(f"failed after {retries} retries: {last}") from last

compute_trace_data_stats(tr) -> dict

Computes numeric stats from the trace’s actual samples: min, max, mean, std, RMS, peak-to-peak, and fraction of finite samples. Converts masked arrays to NaNs and uses float64 for stability. Called by first_trace_daily_row().

In [None]:
def compute_trace_data_stats(tr) -> dict:
    data = tr.data
    if np.ma.isMaskedArray(data):
        data = data.filled(np.nan)
    x = np.asarray(data, dtype=np.float64)

    finite = np.isfinite(x)
    if not np.any(finite):
        return {
            "data_min": "",
            "data_max": "",
            "data_mean": "",
            "data_std": "",
            "data_rms": "",
            "data_p2p": "",
            "finite_frac": 0.0,
        }

    xf = x[finite]
    mn = float(np.min(xf))
    mx = float(np.max(xf))
    mean = float(np.mean(xf))
    std = float(np.std(xf))
    rms = float(np.sqrt(np.mean(xf * xf)))
    p2p = float(mx - mn)
    finite_frac = float(np.sum(finite) / x.size)

    return {
        "data_min": mn,
        "data_max": mx,
        "data_mean": mean,
        "data_std": std,
        "data_rms": rms,
        "data_p2p": p2p,
        "finite_frac": finite_frac,
    }

first_trace_daily_row(task: DayTask, tr) -> dict

Builds the final output row (a dict) for the CSV: station context, day/time window, trace header info (network/station/channel/npts/etc.), plus computed stats from compute_trace_data_stats(). Called by process_day_task() after parsing and selecting the first trace.

In [None]:
def first_trace_daily_row(task: DayTask, tr) -> dict:
    row = task.row
    stats = tr.stats

    out = {
        "csv_row": row.row_index,
        "csv_station": row.station,
        "csv_datacenter": row.datacenter,
        "provider": task.provider,
        "csv_site": row.site,
        "csv_latitude": row.lat,
        "csv_longitude": row.lon,
        "csv_elevation_m": row.elev_m,
        "day": task.day_start.date.strftime("%Y-%m-%d"),
        "request_start": task.day_start.isoformat(),
        "request_end": task.day_end.isoformat(),
        "trace_id": tr.id,
        "network": getattr(stats, "network", ""),
        "station": getattr(stats, "station", ""),
        "location": getattr(stats, "location", ""),
        "channel": getattr(stats, "channel", ""),
        "sampling_rate": getattr(stats, "sampling_rate", ""),
        "npts": getattr(stats, "npts", ""),
        "starttime": stats.starttime.isoformat() if getattr(stats, "starttime", None) else "",
        "endtime": stats.endtime.isoformat() if getattr(stats, "endtime", None) else "",
        "delta": getattr(stats, "delta", ""),
        "calib": getattr(stats, "calib", ""),
    }
    out.update(compute_trace_data_stats(tr))
    return out

process_day_task(task: DayTask, ...) -> Optional[dict]

This is the core per-day worker. For one (station, day) task, it: creates an ObsPy client, gets station channels for that day, builds a bulk request, streams MiniSEED bytes, parses them into an ObsPy Stream, selects the first trace, computes stats, and returns a row dict. If there’s no data, returns None. Called by main() via ThreadPoolExecutor.submit().

In [None]:
def process_day_task(task: DayTask, *, max_bytes: int, retries: int, backoff: float) -> Optional[dict]:
    """
    Worker: station discovery + streaming dataselect + parse + first-trace stats.
    Returns a CSV row dict or None if no data.
    """
    # Per-worker clients to avoid shared state
    client = Client(task.provider, timeout=TIMEOUT_S)
    dataselect_base = client.base_url.rstrip("/") + "/fdsnws/dataselect/1"

    # 1) station discovery for this day
    try:
        inv = with_retries(
            lambda: client.get_stations(
                network="*",
                station=task.row.station,
                location=LOCATION,
                channel=CHANNEL,
                starttime=task.day_start,
                endtime=task.day_end,
                level="channel",
                includerestricted=False,
            ),
            retries=retries,
            backoff=backoff,
        )
    except FDSNNoDataException:
        return None

    chan_tuples = pick_station_channels(inv)
    if not chan_tuples:
        return None

    bulk_text = build_bulk_lines(chan_tuples, task.day_start, task.day_end)

    # 2) stream MiniSEED bytes
    try:
        mseed_bytes = with_retries(
            lambda: stream_dataselect_bulk(
                dataselect_base,
                bulk_text,
                timeout_s=TIMEOUT_S,
                max_bytes=max_bytes,
            ),
            retries=retries,
            backoff=backoff,
        )
    except FDSNNoDataException:
        return None
    except Exception:
        # let caller log the specific error
        raise

    if not mseed_bytes:
        return None

    # 3) parse and take first trace
    st = obspy_read(io.BytesIO(mseed_bytes))
    if len(st) == 0:
        return None

    return first_trace_daily_row(task, st[0])

build_tasks(rows, override_start, override_end) -> List[DayTask]

Creates a list of tasks—one per day per station—using iter_days() and provider_for_datacenter(). This is what allows parallelization at the “day chunk” level. Called by main().

In [None]:
def build_tasks(rows: List[StationRow], override_start: Optional[UTCDateTime], override_end: Optional[UTCDateTime]) -> List[DayTask]:
    tasks: List[DayTask] = []
    for r in rows:
        provider = provider_for_datacenter(r.datacenter)
        start = override_start if override_start is not None else r.start
        end = override_end if override_end is not None else r.end
        if end <= start:
            continue
        for d1, d2 in iter_days(start, end):
            tasks.append(DayTask(row=r, provider=provider, day_start=d1, day_end=d2))
    return tasks

In [None]:
build_argparser() -> argparse.ArgumentParser

Defines CLI arguments (CSV path, output dir, overrides, worker count, retry/backoff, etc.). Called by main() to parse arguments.

In [None]:
def build_argparser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="Parallel daily streaming MiniSEED -> first trace stats -> CSV.")
    p.add_argument("csv_path", help="Input station CSV")
    p.add_argument("output_dir", help="Directory to write output CSV")
    p.add_argument("--start", default=None, help="Override start time for ALL stations (ISO-8601 UTC).")
    p.add_argument("--end", default=None, help="Override end time for ALL stations (ISO-8601 UTC).")
    p.add_argument("--workers", type=int, default=min(6, (os.cpu_count() or 4) * 2), help="Parallel worker threads.")
    p.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Max bytes to buffer per day request.")
    p.add_argument("--retries", type=int, default=6, help="Retries for transient network errors.")
    p.add_argument("--backoff", type=float, default=0.8, help="Base backoff seconds for retries.")
    return p

main() is the program orchestrator. It parses CLI args, reads the station CSV into structured rows, applies optional global start/end overrides, expands those rows into per-day tasks, and then runs those tasks concurrently with a ThreadPoolExecutor. Each worker returns either a CSV row dict (success) or None (no data), while errors are caught and logged. Importantly, only the main thread writes to the CSV (to avoid corrupting output), so workers return results and main() appends them to disk as futures complete.

In [None]:
def main() -> None:
    args = build_argparser().parse_args()

    csv_path = Path(args.csv_path).expanduser().resolve()
    out_dir = Path(args.output_dir).expanduser().resolve()
    out_dir.mkdir(parents=True, exist_ok=True)

    rows = read_station_csv(csv_path)
    if not rows:
        print("No stations found in CSV.", file=sys.stderr)
        raise SystemExit(1)

    override_start = parse_iso_time(args.start) if args.start else None
    override_end = parse_iso_time(args.end) if args.end else None

    tasks = build_tasks(rows, override_start, override_end)
    if not tasks:
        print("No day tasks to process (check time window).", file=sys.stderr)
        raise SystemExit(1)

    out_csv = out_dir / "daily_first_trace_stats.csv"
    fieldnames = [
        "csv_row",
        "csv_station",
        "csv_datacenter",
        "provider",
        "csv_site",
        "csv_latitude",
        "csv_longitude",
        "csv_elevation_m",
        "day",
        "request_start",
        "request_end",
        "trace_id",
        "network",
        "station",
        "location",
        "channel",
        "sampling_rate",
        "npts",
        "starttime",
        "endtime",
        "delta",
        "calib",
        "data_min",
        "data_max",
        "data_mean",
        "data_std",
        "data_rms",
        "data_p2p",
        "finite_frac",
    ]

    print(
        f"Stations={len(rows)}  DayTasks={len(tasks)}  workers={args.workers}  "
        f"CHANNEL={CHANNEL}  LOCATION={LOCATION}  max_bytes={args.max_bytes}  retries={args.retries}"
    )
    if override_start or override_end:
        print(f"Override window: start={override_start} end={override_end}")
    print(f"Writing CSV: {out_csv}")

    wrote_header = False
    ok = skipped = err = 0

    # main thread is the ONLY writer
    with ThreadPoolExecutor(max_workers=args.workers) as ex:
        futs = [
            ex.submit(process_day_task, t, max_bytes=args.max_bytes, retries=args.retries, backoff=args.backoff)
            for t in tasks
        ]

        for fut in as_completed(futs):
            try:
                rowdict = fut.result()
            except Exception as e:
                err += 1
                print(f"ERR  task failed: {e}")
                continue

            if rowdict is None:
                skipped += 1
                continue

            with out_csv.open("a", encoding="utf-8", newline="") as f:
                w = csv.DictWriter(f, fieldnames=fieldnames)
                if not wrote_header:
                    w.writeheader()
                    wrote_header = True
                w.writerow(rowdict)

            ok += 1
            print(
                f"OK   {rowdict['csv_station']} {rowdict['day']} {rowdict['trace_id']} "
                f"(npts={rowdict['npts']})"
            )

    print(f"\nDone. ok={ok} skipped={skipped} error={err}")


if __name__ == "__main__":
    main()