<a href="https://colab.research.google.com/github/xyshuai/openalex-api-demo/blob/main/OpenAlex_Cursor_Paging_Full.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os, csv, time, requests

# ---------------- Google Drive Setup ----------------
USE_DRIVE = False
try:
    from google.colab import drive, files  # type: ignore
    try:
        drive.mount('/content/drive', force_remount=True)
        USE_DRIVE = True
        OUT_DIR = "/content/drive/MyDrive/OpenAlex"
        print("‚úÖ Google Drive mounted. Saving to:", OUT_DIR)
    except Exception as e:
        print("‚ö†Ô∏è Failed to mount Google Drive. Falling back to /content. Error:", e)
        OUT_DIR = "/content"
except Exception:
    # Not running in Colab; save to the current working directory
    OUT_DIR = "."
    print("‚ÑπÔ∏è Not running in Colab. Saving to the current directory.")

os.makedirs(OUT_DIR, exist_ok=True)

# ---------------- Config ----------------
# Base endpoint only (DO NOT paste UI-generated URLs with filters)
BASE_URL = "https://api.openalex.org/works"
MAILTO = "your.email@domain.com"  # TODO: Replace with your email

# Years to harvest (This is an demo example, you may change the range based on your needs)
YEARS = list(range(2023, 2025))   # Example: 2023‚Äì2024
PER_PAGE = 200                    # Cursor pagination max = 200
MAX_PAGES = 5                 # None ‚Üí full harvest; set to small int (e.g., 5) for testing
SLEEP_SEC = 1.0                   # Delay between pages; increase if rate-limited

# Base filter
# For "authors with country MY", use authorships.countries
# For "affiliated institutions in MY", replace with institutions.country_code:my
BASE_FILTER = "open_access.is_oa:true,authorships.countries:countries/my,type:types/article|types/review"
SORT = "cited_by_count:desc"

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
    "Accept": "application/json",
    "From": MAILTO,
}

# Select only top-level fields allowed by OpenAlex
SELECT_FIELDS = [
    "id", "doi", "title", "display_name", "type", "language",
    "publication_year", "cited_by_count",
    "primary_location", "sources",
    "open_access", "best_oa_location",
    "authorships", "institutions",
    "concepts", "primary_topic", "topics",
    "fwci", "citation_normalized_percentile",
    "apc_list", "sustainable_development_goals"
]
SELECT_PARAM = ",".join(SELECT_FIELDS)

# Column headers for CSV export
CSV_HEADERS = [
    "openalex_id", "doi", "title", "year", "type", "language",
    "cited_by_count", "journal", "issn_l",
    "is_oa", "oa_status", "oa_url", "license", "version",
    "first_author", "authors", "authors_affiliations",
    "corresponding_authors", "corresponding_author_country_codes",
    "primary_topic_id", "primary_topic_name",
    "primary_topic_domain", "primary_topic_field", "primary_topic_subfield",
    "topics_top5",
    "apc_list_values",
    "fwci", "citation_percentile", "citation_top_1pct", "citation_top_10pct",
    "sdg_labels",
]

# ---------------- Helper Functions ----------------
def flatten_authors(authorships):
    if not authorships: return ""
    names = []
    for a in authorships:
        nm = (a.get("author") or {}).get("display_name")
        if nm: names.append(nm)
    return "; ".join(names)

def flatten_authors_affils(authorships):
    """
    Format authors as:
      Author A (Affil1;Affil2); Author B (Affil3)
    """
    if not authorships: return ""
    parts = []
    for a in authorships:
        nm = (a.get("author") or {}).get("display_name") or ""
        affs = []
        for inst in (a.get("institutions") or []):
            dn = inst.get("display_name") or ""
            if dn: affs.append(dn)
        if nm:
            parts.append(nm if not affs else f"{nm} ({';'.join(affs)})")
    return "; ".join(parts)

def corresponding_authors_list(authorships):
    """Extract corresponding authors."""
    if not authorships: return ""
    names = []
    for a in authorships:
        if a.get("is_corresponding"):
            nm = (a.get("author") or {}).get("display_name")
            if nm: names.append(nm)
    return "; ".join(names)

def corresponding_author_country_codes(authorships):
    """
    Collect country codes for corresponding author(s):
    - For each authorship with is_corresponding=True:
      * authorships[i].countries[]
      * authorships[i].institutions[].country_code
    """
    if not authorships:
        return ""
    s = set()
    for a in authorships:
        if not a.get("is_corresponding"):
            continue
        for c in (a.get("countries") or []):
            if c:
                s.add(c)
        for inst in (a.get("institutions") or []):
            cc = inst.get("country_code")
            if cc:
                s.add(cc)
    return ";".join(sorted(s)) if s else ""

def pick_venue_from_primary_or_sources(w):
    """Extract journal / ISSN-L from primary_location or sources[] as fallback."""
    journal, issn_l = "", ""
    pl = w.get("primary_location") or {}
    src = pl.get("source") or {}
    if isinstance(src, dict):
        journal = src.get("display_name", "") or journal
        issn_l = src.get("issn_l", "") or issn_l

    if not journal or not issn_l:
        for s in (w.get("sources") or []):
            if isinstance(s, dict) and s.get("type") == "journal" and s.get("display_name"):
                journal = journal or s.get("display_name", "")
                issn_l = issn_l or s.get("issn_l", "")
                break
        if not journal or not issn_l:
            for s in (w.get("sources") or []):
                if isinstance(s, dict) and s.get("display_name"):
                    journal = journal or s.get("display_name", "")
                    issn_l = issn_l or s.get("issn_l", "")
                    break
    return journal, issn_l

def topics_top5_str(w):
    """Return top 5 non-primary topics sorted by score."""
    primary = w.get("primary_topic") or {}
    primary_id = primary.get("id", "")
    topics = w.get("topics") or []
    if not isinstance(topics, list): return ""
    others = [t for t in topics if isinstance(t, dict) and t.get("id") != primary_id]
    others_sorted = sorted(others, key=lambda t: t.get("score", 0) or 0, reverse=True)[:5]
    out = []
    for t in others_sorted:
        name = t.get("display_name", "")
        sc = t.get("score", None)
        if name:
            out.append(f"{name} (score={sc:.2f})" if isinstance(sc, (int, float)) else name)
    return "; ".join(out)

def percentile_fields(w):
    """Handle citation_normalized_percentile fields."""
    cnp = w.get("citation_normalized_percentile") or {}
    val = cnp.get("value", None)
    pct = f"{val:.6f}" if isinstance(val, (int, float)) else ""
    top1 = cnp.get("is_in_top_1_percent")
    top10 = cnp.get("is_in_top_10_percent")
    return (
        pct,
        "Yes" if top1 else ("No" if top1 is not None else ""),
        "Yes" if top10 else ("No" if top10 is not None else "")
    )

def apc_list_values_str(work):
    """Normalize apc_list into readable string."""
    apc = work.get("apc_list", None)
    items = []
    def norm_one(x):
        if isinstance(x, dict):
            val = x.get("value", None); cur = x.get("currency", "")
            if val is not None and cur: return f"{val} {cur}"
            if val is not None: return str(val)
            if cur: return cur
            return ""
        if isinstance(x, (int, float)): return str(x)
        if isinstance(x, str): return x.strip()
        return ""
    if isinstance(apc, list):
        for it in apc:
            s = norm_one(it);  s and items.append(s)
    elif isinstance(apc, dict):
        s = norm_one(apc);   s and items.append(s)
    elif apc is not None:
        s = norm_one(apc);   s and items.append(s)
    return "; ".join(items)

def parse_sdg_labels(work):
    """
    Extract SDG information from 'sustainable_development_goals'.
    Returns a string like: 'SDG 3: Good health and well-being (0.95); SDG 4: Quality education (0.87)'
    """
    sdg_field = work.get("sustainable_development_goals")

    if not sdg_field:
        return ""

    labels = []

    # OpenAlex returns a list with id, display_name, score
    if isinstance(sdg_field, list):
        for item in sdg_field:
            if isinstance(item, dict):
                # Extract SDG number from id URL (e.g., "https://metadata.un.org/sdg/3" -> "3")
                sdg_id = item.get("id", "")
                sdg_number = ""
                if sdg_id:
                    try:
                        sdg_number = sdg_id.rstrip('/').split('/')[-1]
                    except:
                        pass

                name = item.get("display_name", "")
                score = item.get("score")

                if name:
                    # Combine into full format: SDG 3: Good health and well-being (0.95)
                    if sdg_number:
                        full_label = f"SDG {sdg_number}: {name}"
                    else:
                        full_label = name

                    if score is not None:
                        labels.append(f"{full_label} ({score:.2f})")
                    else:
                        labels.append(full_label)
        if labels:
            return "; ".join(labels)

    # If not list format, try other handling
    elif isinstance(sdg_field, dict):
        sdg_id = sdg_field.get("id", "")
        sdg_number = ""
        if sdg_id:
            try:
                sdg_number = sdg_id.rstrip('/').split('/')[-1]
            except:
                pass

        name = sdg_field.get("display_name", "")
        score = sdg_field.get("score")

        if name:
            if sdg_number:
                full_label = f"SDG {sdg_number}: {name}"
            else:
                full_label = name

            if score is not None:
                return f"{full_label} ({score:.2f})"
            return full_label

    elif isinstance(sdg_field, str):
        return sdg_field.strip()

    return ""

# ---------------- Run (multi-year full harvest) ----------------
print(f"üìÅ Output directory: {OUT_DIR}")
print(f"üìÖ Years to process: {YEARS}")
print(f"üìä MAX_PAGES setting: {MAX_PAGES if MAX_PAGES else 'Full harvest'}")
print()

for YEAR in YEARS:
    cursor = "*"
    total = 0
    pages_done = 0

    FILTER = f"{BASE_FILTER},publication_year:{YEAR}"
    CSV_PATH = os.path.join(OUT_DIR, f"openalex_{YEAR}.csv")
    print(f"\n‚ñ∂Ô∏è Starting year {YEAR}, output: {CSV_PATH}")

    with open(CSV_PATH, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(CSV_HEADERS)

        while True:
            # Build URL manually to avoid URL-encoded slashes (%2F ‚Üí can trigger 403)
            query = (
                f"filter={FILTER}"
                f"&sort={SORT}"
                f"&per_page={PER_PAGE}"
                f"&cursor={cursor}"
                f"&select={SELECT_PARAM}"
                f"&mailto={MAILTO}"
            )
            url = f"{BASE_URL}?{query}"
            if total == 0:
                print("üõ∞Ô∏è Request:", url[:150] + "...")

            r = requests.get(url, headers=HEADERS, timeout=120)
            if r.status_code != 200:
                print(f"üö´ HTTP {r.status_code}: {r.text[:300]}")
                time.sleep(SLEEP_SEC * 2)
                continue

            data = r.json()
            if pages_done == 0:
                meta_count = (data.get("meta") or {}).get("count")
                print(f"üìä Year {YEAR} official meta.count = {meta_count}")

            results = data.get("results", [])
            next_cursor = data.get("meta", {}).get("next_cursor")

            if not results:
                print(f"‚úÖ Year {YEAR}: no more results.")
                break

            for w in results:
                # Safety check ‚Äî write only records with matching year
                year = w.get("publication_year", "")
                if str(year) != str(YEAR):
                    continue

                doi = w.get("doi", "")
                title = w.get("display_name") or w.get("title", "")
                wtype = w.get("type", "")
                lang = w.get("language", "")
                cited = w.get("cited_by_count", 0)

                journal, issn_l = pick_venue_from_primary_or_sources(w)

                oa = w.get("open_access") or {}
                is_oa = "Yes" if oa.get("is_oa") else "No"
                oa_status = oa.get("oa_status", "")
                oa_url = oa.get("oa_url", "")

                boa = w.get("best_oa_location") or {}
                license_ = boa.get("license", "")
                version = boa.get("version", "")

                authorships = w.get("authorships", [])
                first_author = authorships[0].get("author", {}).get("display_name") if authorships else ""
                authors = flatten_authors(authorships)
                authors_affils = flatten_authors_affils(authorships)
                corr_authors = corresponding_authors_list(authorships)
                corr_author_cc = corresponding_author_country_codes(authorships)

                primary = w.get("primary_topic") or {}
                primary_topic_id = primary.get("id", "")
                primary_topic_name = primary.get("display_name", "")
                primary_topic_domain = (primary.get("domain") or {}).get("display_name", "")
                primary_topic_field = (primary.get("field") or {}).get("display_name", "")
                primary_topic_subfield = (primary.get("subfield") or {}).get("display_name", "")

                top5 = topics_top5_str(w)
                apc_values = apc_list_values_str(w)
                fwci = w.get("fwci", "")
                citation_percentile, top1, top10 = percentile_fields(w)
                sdg_labels = parse_sdg_labels(w)

                writer.writerow([
                    w.get("id", ""), doi, title, year, wtype, lang,
                    cited, journal, issn_l,
                    is_oa, oa_status, oa_url, license_, version,
                    first_author, authors, authors_affils,
                    corr_authors, corr_author_cc,
                    primary_topic_id, primary_topic_name,
                    primary_topic_domain, primary_topic_field, primary_topic_subfield,
                    top5,
                    apc_values,
                    fwci, citation_percentile, top1, top10,
                    sdg_labels
                ])
                total += 1

            pages_done += 1
            print(f"üì¶ Year {YEAR}, page {pages_done}: {len(results)} records, total {total}")

            cursor = next_cursor
            if not cursor:
                print(f"‚úÖ Finished {YEAR}, total {total} records.")
                break

            if (MAX_PAGES is not None) and (pages_done >= MAX_PAGES):
                print(f"‚èπÔ∏è Year {YEAR} reached MAX_PAGES={MAX_PAGES}, stopping early (test mode).")
                break

            time.sleep(SLEEP_SEC)

print("\nüéâ All years processed.")
print(f"üìÅ Files saved to: {OUT_DIR}")

# Optional: if not using Drive in Colab, automatically trigger download
if not USE_DRIVE:
    try:
        from google.colab import files  # type: ignore
        print("\n‚¨áÔ∏è Downloading files...")
        for year in YEARS:
            csv_file = os.path.join(OUT_DIR, f"openalex_{year}.csv")
            if os.path.exists(csv_file):
                files.download(csv_file)
                print(f"‚úÖ Downloaded: openalex_{year}.csv")
    except Exception as e:
        print(f"‚ÑπÔ∏è Auto-download not available: {e}")