In [None]:
#!/usr/bin/env python3
import os
import csv
import time
import json
import math
import queue
import signal
import string
import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from tqdm import tqdm

INPUT_CSV = "filtered_combined_species.csv"
OUTPUT_CSV = "gbif_taxonomy.csv"
CACHE_JSONL = "gbif_match_cache.jsonl"   # name→raw /species/match response
DETAILS_CACHE_JSONL = "gbif_details_cache.jsonl"  # usageKey→/species/{key} detail

# ---- Settings you can tune ----
MAX_WORKERS = 8          # polite parallelism; lower if you see 429s
REQ_TIMEOUT = 12         # seconds per HTTP request
USER_AGENT = "victoria.t.tiki@gmail.com (Pelagica 1.0)"
SLEEP_BETWEEN_BATCHES = 0.0  # seconds; increase if you still see 429s
# --------------------------------

MATCH_URL = "https://api.gbif.org/v1/species/match"
DETAIL_URL = "https://api.gbif.org/v1/species/{}"

session = requests.Session()
session.headers.update({"User-Agent": USER_AGENT})

# ---- simple append-only JSONL caches (restart-safe) ----
def _load_jsonl(path, key_field):
    data = {}
    if os.path.exists(path):
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    obj = json.loads(line)
                    data[obj[key_field]] = obj["value"]
                except Exception:
                    continue
    return data

def _append_jsonl(path, key, value):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps({key: key, "value": value}) + "\n")

match_cache = _load_jsonl(CACHE_JSONL, key_field="name")
details_cache = _load_jsonl(DETAILS_CACHE_JSONL, key_field="key")

class HttpRetryableError(Exception):
    pass

@retry(
    reraise=True,
    stop=stop_after_attempt(6),
    wait=wait_exponential(multiplier=0.8, min=1, max=60),
    retry=retry_if_exception_type(HttpRetryableError),
)
def _get(url, params=None):
    resp = session.get(url, params=params, timeout=REQ_TIMEOUT)
    # 2xx ok
    if 200 <= resp.status_code < 300:
        return resp.json()
    # backoff on 429/5xx
    if resp.status_code in (429, 500, 502, 503, 504):
        raise HttpRetryableError(f"HTTP {resp.status_code}")
    # other codes: raise immediately
    resp.raise_for_status()

def match_name(scientific_name):
    # cache
    if scientific_name in match_cache:
        return match_cache[scientific_name]

    params = {
        "name": scientific_name,
        # strict=false enables helpful fuzzy matching for messy lists
        "strict": "false",
        # verbose adds classification fields when available
        "verbose": "true",
    }
    data = _get(MATCH_URL, params=params)
    match_cache[scientific_name] = data
    _append_jsonl(CACHE_JSONL, "name", {"name": scientific_name, **{"value": data}}["value"])
    return data

def get_detail(usage_key):
    k = str(usage_key)
    if k in details_cache:
        return details_cache[k]
    data = _get(DETAIL_URL.format(usage_key))
    details_cache[k] = data
    _append_jsonl(DETAILS_CACHE_JSONL, "key", {"key": k, **{"value": data}}["value"])
    return data

def normalize_row(scientific_name, match_json):
    """Return a dict of normalized taxonomy fields for output CSV."""
    base = {
        "scientificName": scientific_name,
        "gbifID": None,
        "status": None,
        "matchType": None,
        "confidence": None,
        "kingdom": None,
        "phylum": None,
        "class": None,
        "order": None,
        "family": None,
        "genus": None,
        "species": None,
        "acceptedScientificName": None,
    }

    if not match_json or "usageKey" not in match_json:
        return base  # unmatched

    # Fields from /species/match
    base["gbifID"] = match_json.get("usageKey")
    base["status"] = match_json.get("status")
    base["matchType"] = match_json.get("matchType")
    base["confidence"] = match_json.get("confidence")

    # If a synonym, follow acceptedUsageKey to get the “current/accepted” classification
    accepted_key = match_json.get("acceptedUsageKey")
    if accepted_key:
        detail = get_detail(accepted_key)
        base["acceptedScientificName"] = detail.get("scientificName")
        base["gbifID"] = detail.get("key", base["gbifID"])
        # fill classification from accepted detail
        for rank in ("kingdom", "phylum", "class", "order", "family", "genus", "species"):
            base[rank] = detail.get(rank)
        return base

    # Otherwise, fill classification from match payload (often present with verbose=true).
    for rank in ("kingdom", "phylum", "class", "order", "family", "genus", "species"):
        base[rank] = match_json.get(rank)

    # If any are missing, fetch /species/{usageKey} to complete
    if not all(base[r] for r in ("kingdom", "phylum", "class", "order", "family", "genus")):
        detail = get_detail(base["gbifID"])
        for rank in ("kingdom", "phylum", "class", "order", "family", "genus", "species"):
            base[rank] = base[rank] or detail.get(rank)
        base["acceptedScientificName"] = detail.get("scientificName")

    return base

def read_input_names(path):
    df = pd.read_csv(path)
    # keep only Genus + Species; ignore the rest
    if "Genus" not in df.columns or "Species" not in df.columns:
        raise ValueError("Input CSV must contain 'Genus' and 'Species' columns.")
    tmp = df[["Genus", "Species"]].copy()
    tmp["Genus"] = tmp["Genus"].astype(str).str.strip()
    tmp["Species"] = tmp["Species"].astype(str).str.strip()
    tmp["scientificName"] = tmp["Genus"] + " " + tmp["Species"]
    # de-duplicate to minimize API calls
    names = sorted(tmp["scientificName"].dropna().unique().tolist())
    return names

def process_names(names):
    results = []

    def worker(name):
        mj = match_name(name)
        row = normalize_row(name, mj)
        return row

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = {ex.submit(worker, nm): nm for nm in names}
        for i, fut in enumerate(tqdm(as_completed(futures), total=len(futures), desc="GBIF matching")):
            try:
                results.append(fut.result())
            except Exception as e:
                results.append({
                    "scientificName": futures[fut],
                    "gbifID": None, "status": None, "matchType": None, "confidence": None,
                    "kingdom": None, "phylum": None, "class": None, "order": None,
                    "family": None, "genus": None, "species": None, "acceptedScientificName": None,
                    "error": str(e),
                })
            if SLEEP_BETWEEN_BATCHES:
                time.sleep(SLEEP_BETWEEN_BATCHES)

    return results

def main():
    names = read_input_names(INPUT_CSV)
    print(f"Unique names to resolve: {len(names):,}")

    rows = process_names(names)

    # save as tidy DataFrame
    out_df = pd.DataFrame(rows, columns=[
        "scientificName", "gbifID", "status", "matchType", "confidence",
        "kingdom", "phylum", "class", "order", "family", "genus", "species",
        "acceptedScientificName"
    ])
    out_df.to_csv(OUTPUT_CSV, index=False)
    print(f"Wrote {OUTPUT_CSV} with {len(out_df):,} rows.")

if __name__ == "__main__":
    main()


Unique names to resolve: 68,958


GBIF matching: 100%|██████████| 68958/68958 [32:39<00:00, 35.19it/s]


Wrote gbif_taxonomy.csv with 68,958 rows.
