In [1]:
# ============================================================
# 002_OpenAlex Keyword Search
# ============================================================
#
# ⚠️ Important Notes (Please Read First)
# ------------------------------------
# - This notebook uses the OpenAlex API to retrieve academic works based on
#   keyword-based search queries (OpenAlex /works).
# - OpenAlex API behavior (rate limits, pagination, available fields) may change
#   over time. When retrieving a large number of results, always proceed
#   incrementally and save intermediate outputs (this notebook can optionally
#   persist raw responses as JSONL).
# - The OpenAlex "search" parameter is not a strict Boolean query language.
#   Operators like OR can be helpful, but expect both noise and omissions.
#   Treat the retrieved results as a *first-pass candidate set*, not a final corpus.
# - The metadata provided by OpenAlex is not guaranteed to be complete or error-free.
#   For validation, critical analysis, or citation, always refer to the original
#   publisher pages, DOIs, or PDFs.
# - Do not hard-code sensitive information (e.g., API keys) in this notebook.
#   This notebook is designed to run without an API key.
# - (Optional) For polite API usage, consider setting a mailto parameter.
#
# Overview
# --------
# This notebook performs keyword-based retrieval of academic works using the
# OpenAlex API. The goal is to collect a structured dataset of papers matching
# a given query, optionally filtered by publication year and open-access status.
# The retrieved results are normalized into a tabular format (DataFrame / CSV)
# so that they can be reused in downstream steps such as deduplication,
# ranking, topic modeling, or LLM-based synthesis.
#
# Structure
# ---------
# 1) Parameters / Query Design
#    - Define search keywords, time range, open-access filters,
#      pagination size, and maximum number of results
#    - (Optional) Configure parameters via ipywidgets UI
# 2) API Core Functions (Search & Pagination)
#    - Construct OpenAlex /works requests and retrieve results
#      using cursor-based pagination (and basic retry/backoff)
# 3) Parsing & Normalization
#    - Extract and standardize key metadata fields
#      (title, authors, year, venue, DOI, citation count, OA status, etc.)
# 4) Output & Export
#    - Export normalized CSV (+ metadata JSON, optional dedup view)
#
# Notes
# -----
# - For reproducibility, log the exact query text, filters, and total number
#   of retrieved records.
# - When working with large queries, persist partial results to avoid data loss
#   and enable safe retries.
# - To improve search precision:
#     (1) expand queries with synonyms using OR,
#     (2) explicitly exclude noisy terms,
#     (3) combine keyword search with field or year filters,
#     (4) start small and scale up iteratively.
# - Deduplication (e.g., by DOI or fuzzy title matching) and quality filtering
#   are intentionally handled in downstream steps.
#

In [2]:
# ============================================================
# 1) Parameters / Query Design
# ============================================================
#
# This section defines all parameters used for the OpenAlex
# keyword-based search. Keeping them centralized here improves
# readability, reproducibility, and ease of iteration.
#
# Recommended workflow:
# - Start with a small max_results to validate query quality
# - Inspect sample outputs
# - Gradually expand the retrieval scope
#

# ------------------------
# Core keyword query
# ------------------------
# OpenAlex uses a simple keyword-based full-text search over titles,
# abstracts, and other indexed fields.
# Complex logic (AND / OR) should be expressed explicitly in the string.
query_text = "venture capital OR startup OR innovation policy"


# ------------------------
# Time range filter
# ------------------------
# Publication year range (inclusive).
# Set to None to disable year-based filtering.
from_year = 2010
to_year = 2025


# ------------------------
# Open Access filter
# ------------------------
# If True, restrict results to Open Access works only.
# This is useful when downstream steps require PDF access.
open_access_only = False


# ------------------------
# Pagination & volume control
# ------------------------
# Number of records per API call (OpenAlex max is typically 200).
per_page = 200

# Maximum number of records to retrieve in total.
# Use a small number for testing, then increase gradually.
max_results = 1000


# ------------------------
# Sorting strategy
# ------------------------
# Common options include:
# - relevance_score (default search relevance)
# - publication_year
# - cited_by_count
sort_by = "relevance_score"


# ------------------------
# Output & logging
# ------------------------
# Directory where intermediate and final outputs will be saved.
output_dir = "./data/openalex"

# Whether to print progress logs during retrieval.
verbose = True


# ------------------------
# Reproducibility note
# ------------------------
# All parameters defined in this section should be logged together
# with the output dataset so that the exact retrieval conditions
# can be reconstructed later.
#


In [3]:
# ============================================================
# 2) API Core Functions (Search & Pagination)
# ============================================================
#
# This section implements:
# - request construction for OpenAlex /works
# - cursor-based pagination (recommended for large result sets)
# - basic retry / backoff for transient errors or rate limits
# - optional intermediate saving (JSONL) for safe long runs
#

import os
import time
import json
import math
import random
from typing import Dict, Any, List, Optional, Tuple

import requests


# ------------------------
# OpenAlex endpoint
# ------------------------
OPENALEX_BASE_URL = "https://api.openalex.org"
WORKS_ENDPOINT = f"{OPENALEX_BASE_URL}/works"


# ------------------------
# Utilities
# ------------------------
def _ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


def _build_openalex_filters(
    from_year: Optional[int],
    to_year: Optional[int],
    open_access_only: bool
) -> Optional[str]:
    """
    Build OpenAlex filter string.
    We use publication_date rather than publication_year to be explicit.
    """
    filters = []

    if from_year is not None:
        filters.append(f"from_publication_date:{from_year}-01-01")
    if to_year is not None:
        filters.append(f"to_publication_date:{to_year}-12-31")
    if open_access_only:
        filters.append("open_access.is_oa:true")

    return ",".join(filters) if filters else None


def _build_sort_params(sort_by: str) -> Dict[str, str]:
    """
    OpenAlex supports 'sort' in the form 'field:direction'.
    If using relevance, we simply omit sort to let OpenAlex rank by relevance.
    """
    sort_by = (sort_by or "").strip().lower()

    if sort_by in ["relevance", "relevance_score", ""]:
        return {}  # OpenAlex default relevance ordering for search queries

    # Common examples:
    # - "cited_by_count" -> cited_by_count:desc
    # - "publication_year" -> publication_year:desc
    # You can customize direction as needed.
    if sort_by in ["cited_by_count", "publication_year", "publication_date"]:
        return {"sort": f"{sort_by}:desc"}

    # Fall back to user-provided value if they already passed "field:dir"
    if ":" in sort_by:
        return {"sort": sort_by}

    # Default to descending if unknown field is given
    return {"sort": f"{sort_by}:desc"}


def _request_with_retry(
    url: str,
    params: Dict[str, Any],
    headers: Optional[Dict[str, str]] = None,
    max_retries: int = 6,
    timeout: int = 60,
    verbose: bool = True
) -> Dict[str, Any]:
    """
    Simple retry with exponential backoff + jitter.
    Handles transient network errors and basic rate limiting.
    """
    headers = headers or {}
    last_err = None

    for attempt in range(1, max_retries + 1):
        try:
            r = requests.get(url, params=params, headers=headers, timeout=timeout)

            # Rate limit / transient server errors
            if r.status_code in [429, 500, 502, 503, 504]:
                wait = min(60, (2 ** (attempt - 1)) + random.random())
                if verbose:
                    print(f"⚠️ HTTP {r.status_code}. Retry {attempt}/{max_retries} in {wait:.1f}s")
                time.sleep(wait)
                continue

            r.raise_for_status()
            return r.json()

        except Exception as e:
            last_err = e
            wait = min(60, (2 ** (attempt - 1)) + random.random())
            if verbose:
                print(f"⚠️ Request error ({type(e).__name__}). Retry {attempt}/{max_retries} in {wait:.1f}s")
            time.sleep(wait)

    raise RuntimeError(f"Request failed after {max_retries} retries. Last error: {last_err}")


# ------------------------
# Core search function
# ------------------------
def openalex_keyword_search_works(
    query_text: str,
    from_year: Optional[int] = None,
    to_year: Optional[int] = None,
    open_access_only: bool = False,
    per_page: int = 200,
    max_results: int = 1000,
    sort_by: str = "relevance_score",
    output_dir: str = "./data/openalex",
    save_jsonl: bool = True,
    polite_delay_sec: float = 0.1,
    mailto: Optional[str] = None,   # Optional: OpenAlex recommends mailto for polite usage
    verbose: bool = True
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    """
    Retrieve works from OpenAlex using cursor-based pagination.

    Returns:
      - results: list of raw OpenAlex 'works' items (JSON dicts)
      - meta:    retrieval metadata (query, filters, counts, etc.)
    """
    if not query_text or not query_text.strip():
        raise ValueError("query_text must be a non-empty string.")

    per_page = int(per_page)
    if per_page <= 0:
        raise ValueError("per_page must be > 0.")
    if per_page > 200:
        if verbose:
            print("ℹ️ per_page > 200 detected; capping to 200 (OpenAlex typical maximum).")
        per_page = 200

    max_results = int(max_results)
    if max_results <= 0:
        raise ValueError("max_results must be > 0.")

    _ensure_dir(output_dir)

    filters = _build_openalex_filters(from_year, to_year, open_access_only)
    sort_params = _build_sort_params(sort_by)

    # Save path for intermediate results
    jsonl_path = os.path.join(output_dir, "openalex_works_raw.jsonl")

    # Cursor pagination
    cursor = "*"
    collected: List[Dict[str, Any]] = []
    total_available = None

    # Base params
    params_base: Dict[str, Any] = {
        "search": query_text,
        "per_page": per_page,
        "cursor": cursor,
    }
    if filters:
        params_base["filter"] = filters
    if mailto:
        params_base["mailto"] = mailto
    params_base.update(sort_params)

    # Retrieval meta
    meta: Dict[str, Any] = {
        "query_text": query_text,
        "from_year": from_year,
        "to_year": to_year,
        "open_access_only": open_access_only,
        "per_page": per_page,
        "max_results": max_results,
        "sort_by": sort_by,
        "filter_string": filters,
        "output_dir": output_dir,
        "jsonl_path": jsonl_path if save_jsonl else None,
        "retrieved": 0,
        "total_available_reported_by_api": None,
    }

    if verbose:
        print("=== OpenAlex Keyword Search: Start ===")
        print(f"- query_text        : {query_text}")
        print(f"- filter            : {filters if filters else '(none)'}")
        print(f"- per_page          : {per_page}")
        print(f"- max_results       : {max_results}")
        print(f"- sort              : {sort_params.get('sort', '(default relevance)')}")
        if save_jsonl:
            print(f"- saving JSONL to   : {jsonl_path}")

    # If saving, truncate existing file for a clean run
    if save_jsonl:
        with open(jsonl_path, "w", encoding="utf-8") as f:
            pass

    page_idx = 0
    while len(collected) < max_results:
        page_idx += 1

        # Update cursor param
        params = dict(params_base)
        params["cursor"] = cursor

        data = _request_with_retry(
            WORKS_ENDPOINT,
            params=params,
            headers=None,
            verbose=verbose
        )

        # API meta
        api_meta = data.get("meta", {}) or {}
        next_cursor = api_meta.get("next_cursor")
        count = api_meta.get("count")

        if total_available is None and isinstance(count, int):
            total_available = count
            meta["total_available_reported_by_api"] = total_available

        results = data.get("results", []) or []
        if not results:
            if verbose:
                print(f"ℹ️ No more results returned (page {page_idx}). Stopping.")
            break

        # Take only what we still need
        remaining = max_results - len(collected)
        batch = results[:remaining]

        collected.extend(batch)
        meta["retrieved"] = len(collected)

        # Save intermediate batch (raw)
        if save_jsonl:
            with open(jsonl_path, "a", encoding="utf-8") as f:
                for item in batch:
                    f.write(json.dumps(item, ensure_ascii=False) + "\n")

        if verbose:
            total_str = f"{total_available}" if total_available is not None else "?"
            print(f"Page {page_idx:03d}: +{len(batch):4d}  (total={len(collected)}/{total_str})")

        # Stop if no next cursor
        if not next_cursor:
            if verbose:
                print("ℹ️ next_cursor not found. Stopping.")
            break

        cursor = next_cursor

        # Polite delay to reduce load
        if polite_delay_sec and polite_delay_sec > 0:
            time.sleep(polite_delay_sec)

    if verbose:
        print("=== OpenAlex Keyword Search: Done ===")
        print(f"Retrieved: {len(collected)} works")

    return collected, meta


# ------------------------
# Example execution (optional)
# ------------------------
# raw_works, retrieval_meta = openalex_keyword_search_works(
#     query_text=query_text,
#     from_year=from_year,
#     to_year=to_year,
#     open_access_only=open_access_only,
#     per_page=per_page,
#     max_results=max_results,
#     sort_by=sort_by,
#     output_dir=output_dir,
#     save_jsonl=True,
#     polite_delay_sec=0.1,
#     mailto=None,   # e.g., "your_email@domain.com"
#     verbose=verbose
# )


In [4]:
# ============================================================
# 3) Parsing & Normalization
# ============================================================
#
# This section converts raw OpenAlex "works" JSON objects into a
# flat, analysis-friendly tabular format (pandas DataFrame).
#
# Design goals:
# - Keep a stable schema for downstream steps
# - Preserve important identifiers (OpenAlex ID, DOI, PMID/PMCID if available)
# - Extract author / institution / concept info in a lightweight way
# - Be robust to missing fields
#

from typing import Any, Dict, List, Optional
import pandas as pd


# ------------------------
# Helpers
# ------------------------
def _safe_get(d: Any, keys: List[str], default=None):
    """
    Safely access nested dictionaries.
    keys: list like ["primary_location", "source", "display_name"]
    """
    cur = d
    for k in keys:
        if cur is None:
            return default
        if isinstance(cur, dict) and k in cur:
            cur = cur[k]
        else:
            return default
    return cur


def _normalize_doi(doi: Optional[str]) -> Optional[str]:
    """
    Normalize DOI to a canonical lowercase '10.xxxx/...' format when possible.
    OpenAlex often returns DOI as 'https://doi.org/...'
    """
    if not doi:
        return None
    doi = doi.strip()
    doi = doi.replace("https://doi.org/", "").replace("http://doi.org/", "")
    doi = doi.replace("https://dx.doi.org/", "").replace("http://dx.doi.org/", "")
    doi = doi.strip()
    return doi.lower() if doi else None


def _join_nonempty(xs: List[str], sep: str = "; ") -> str:
    xs2 = [x.strip() for x in xs if isinstance(x, str) and x.strip()]
    return sep.join(xs2)


def _extract_authors(work: Dict[str, Any], max_authors: int = 25) -> Dict[str, Any]:
    """
    Extract author-level information from authorships.
    Returns:
      - authors: joined author names
      - author_ids: joined OpenAlex author IDs
      - institutions: joined institution names (unique)
      - countries: joined institution country codes (unique)
    """
    authorships = work.get("authorships", []) or []

    author_names = []
    author_ids = []
    inst_names = set()
    inst_countries = set()

    for a in authorships[:max_authors]:
        author = a.get("author") or {}
        name = author.get("display_name")
        if name:
            author_names.append(name)

        aid = author.get("id")
        if aid:
            author_ids.append(aid)

        # institutions (may be multiple per authorship)
        for inst in (a.get("institutions", []) or []):
            iname = inst.get("display_name")
            if iname:
                inst_names.add(iname)

            ccode = inst.get("country_code")
            if ccode:
                inst_countries.add(ccode)

    return {
        "authors": _join_nonempty(author_names),
        "author_ids": _join_nonempty(author_ids),
        "institutions": _join_nonempty(sorted(inst_names)),
        "institution_country_codes": _join_nonempty(sorted(inst_countries)),
        "n_authors": len(authorships),
    }


def _extract_concepts(work: Dict[str, Any], top_k: int = 8) -> Dict[str, Any]:
    """
    Extract top concepts (name + score).
    """
    concepts = work.get("concepts", []) or []
    # Sort by score desc (OpenAlex provides score)
    concepts_sorted = sorted(
        concepts,
        key=lambda x: (x.get("score") is not None, x.get("score")),
        reverse=True
    )

    names = []
    ids = []
    scored = []

    for c in concepts_sorted[:top_k]:
        cname = c.get("display_name")
        cid = c.get("id")
        score = c.get("score")

        if cname:
            names.append(cname)
        if cid:
            ids.append(cid)
        if cname and score is not None:
            scored.append(f"{cname} ({score:.3f})")

    return {
        "concepts_top": _join_nonempty(names),
        "concept_ids_top": _join_nonempty(ids),
        "concepts_scored_top": _join_nonempty(scored),
    }


def _extract_primary_venue(work: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract primary venue / source info.
    Prefer primary_location.source if present, otherwise fall back to host_venue.
    """
    source_name = _safe_get(work, ["primary_location", "source", "display_name"])
    source_id = _safe_get(work, ["primary_location", "source", "id"])
    source_type = _safe_get(work, ["primary_location", "source", "type"])
    source_issn = _safe_get(work, ["primary_location", "source", "issn_l"])

    # Fallback: host_venue (often populated)
    if not source_name:
        source_name = _safe_get(work, ["host_venue", "display_name"])
    if not source_id:
        source_id = _safe_get(work, ["host_venue", "id"])
    if not source_issn:
        source_issn = _safe_get(work, ["host_venue", "issn_l"])

    return {
        "venue_name": source_name,
        "venue_id": source_id,
        "venue_type": source_type,
        "venue_issn_l": source_issn,
    }


def _extract_open_access(work: Dict[str, Any]) -> Dict[str, Any]:
    oa = work.get("open_access") or {}
    return {
        "is_oa": oa.get("is_oa"),
        "oa_status": oa.get("oa_status"),
        "oa_url": oa.get("oa_url"),
    }


def _extract_ids(work: Dict[str, Any]) -> Dict[str, Any]:
    ids = work.get("ids") or {}
    doi = ids.get("doi") or work.get("doi")
    return {
        "openalex_id": work.get("id"),
        "openalex_work_id": work.get("id"),
        "doi": _normalize_doi(doi),
        "doi_url": ids.get("doi"),
        "pmid": ids.get("pmid"),
        "pmcid": ids.get("pmcid"),
        "mag": ids.get("mag"),
    }


def _extract_biblio(work: Dict[str, Any]) -> Dict[str, Any]:
    b = work.get("biblio") or {}
    return {
        "volume": b.get("volume"),
        "issue": b.get("issue"),
        "first_page": b.get("first_page"),
        "last_page": b.get("last_page"),
    }


# ------------------------
# Main normalization
# ------------------------
def normalize_openalex_works(
    raw_works: List[Dict[str, Any]],
    max_authors: int = 25,
    concepts_top_k: int = 8
) -> pd.DataFrame:
    """
    Convert raw works JSON into a normalized DataFrame.
    """
    rows: List[Dict[str, Any]] = []

    for w in raw_works or []:
        # Core fields
        title = w.get("display_name")
        year = w.get("publication_year")
        pub_date = w.get("publication_date")

        # Abstract (OpenAlex may provide inverted index; keep raw pointer only)
        # We'll store a boolean flag here; full reconstruction can be done later if needed.
        abstract_inverted = w.get("abstract_inverted_index")
        has_abstract = abstract_inverted is not None

        row = {}
        row.update(_extract_ids(w))
        row.update({
            "title": title,
            "publication_year": year,
            "publication_date": pub_date,
            "type": w.get("type"),
            "language": w.get("language"),
            "has_abstract_inverted_index": has_abstract,
            "cited_by_count": w.get("cited_by_count"),
        })
        row.update(_extract_primary_venue(w))
        row.update(_extract_open_access(w))
        row.update(_extract_biblio(w))

        # Authorships & concepts
        row.update(_extract_authors(w, max_authors=max_authors))
        row.update(_extract_concepts(w, top_k=concepts_top_k))

        # URLs
        row.update({
            "openalex_url": w.get("id"),
            "work_url": w.get("id"),
            "landing_page_url": _safe_get(w, ["primary_location", "landing_page_url"]),
            "pdf_url": _safe_get(w, ["primary_location", "pdf_url"]),
        })

        rows.append(row)

    df = pd.DataFrame(rows)

    # Basic cleanup: enforce column order (stable schema)
    preferred_cols = [
        "openalex_id",
        "doi",
        "title",
        "publication_year",
        "publication_date",
        "type",
        "language",
        "venue_name",
        "venue_id",
        "venue_issn_l",
        "cited_by_count",
        "is_oa",
        "oa_status",
        "oa_url",
        "landing_page_url",
        "pdf_url",
        "authors",
        "n_authors",
        "institutions",
        "institution_country_codes",
        "concepts_top",
        "concepts_scored_top",
        "concept_ids_top",
        "has_abstract_inverted_index",
        "pmid",
        "pmcid",
        "volume",
        "issue",
        "first_page",
        "last_page",
        "openalex_url",
    ]
    existing = [c for c in preferred_cols if c in df.columns]
    remaining = [c for c in df.columns if c not in existing]
    df = df[existing + remaining]

    # Remove exact duplicates by OpenAlex ID (if any)
    if "openalex_id" in df.columns:
        df = df.drop_duplicates(subset=["openalex_id"]).reset_index(drop=True)

    return df


# ------------------------
# Example execution (optional)
# ------------------------
# df_works = normalize_openalex_works(
#     raw_works=raw_works,
#     max_authors=25,
#     concepts_top_k=8
# )
# df_works.head()


In [5]:
# ============================================================
# 4) Output & Export
# ============================================================
#
# This section:
# - creates lightweight run artifacts (CSV + metadata JSON)
# - prints basic summary stats for quick sanity checks
# - (optional) creates a minimal dedup view by DOI / title
#

import os
import json
import re
from datetime import datetime, timezone

import pandas as pd


# ------------------------
# Utilities
# ------------------------
def _utc_timestamp() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")


def _ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


def _safe_slug(text: str, max_len: int = 80) -> str:
    """
    Create a filesystem-friendly slug from query text.
    """
    if not text:
        return "query"
    s = text.lower().strip()
    s = re.sub(r"\s+", "_", s)
    s = re.sub(r"[^a-z0-9_\-]+", "", s)
    s = s.strip("_-")
    return s[:max_len] if s else "query"


def _write_json(path: str, obj: dict) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)


def _print_summary(df: pd.DataFrame) -> None:
    n = len(df)
    print("=== Summary ===")
    print(f"- rows: {n}")

    if "publication_year" in df.columns:
        years = df["publication_year"].dropna()
        if not years.empty:
            print(f"- year range: {int(years.min())} – {int(years.max())}")

    if "is_oa" in df.columns:
        oa = df["is_oa"].dropna()
        if not oa.empty:
            print(f"- OA ratio: {oa.mean():.3f}  (True share among non-null)")

    if "doi" in df.columns:
        doi_nonnull = df["doi"].notna().sum()
        print(f"- DOI non-null: {doi_nonnull} / {n} ({doi_nonnull/n:.1%})")
        if doi_nonnull:
            doi_dup = df["doi"].dropna().duplicated().sum()
            print(f"- DOI duplicates: {doi_dup}")

    if "title" in df.columns:
        title_nonnull = df["title"].notna().sum()
        print(f"- Title non-null: {title_nonnull} / {n} ({title_nonnull/n:.1%})")

    if "venue_name" in df.columns:
        top_venues = (
            df["venue_name"]
            .dropna()
            .value_counts()
            .head(10)
        )
        if len(top_venues):
            print("- Top venues (count):")
            for k, v in top_venues.items():
                print(f"  - {k}: {v}")


def _basic_dedup_view(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a minimal dedup key view (for quick inspection).
    This does NOT remove duplicates; it just provides a key table.
    """
    cols = []
    for c in ["openalex_id", "doi", "title", "publication_year", "venue_name", "cited_by_count"]:
        if c in df.columns:
            cols.append(c)

    view = df[cols].copy() if cols else df.copy()
    if "doi" in view.columns:
        view["dup_doi"] = view["doi"].notna() & view["doi"].duplicated(keep=False)
    else:
        view["dup_doi"] = False

    # Title duplicate flag (exact match only; fuzzy matching is downstream)
    if "title" in view.columns:
        view["dup_title_exact"] = view["title"].notna() & view["title"].duplicated(keep=False)
    else:
        view["dup_title_exact"] = False

    return view


# ------------------------
# Export function
# ------------------------
def export_openalex_outputs(
    df_works: pd.DataFrame,
    retrieval_meta: dict,
    query_text: str,
    output_dir: str = "./data/openalex",
    include_timestamp: bool = True,
    export_dedup_view: bool = True
) -> dict:
    """
    Export:
      - normalized CSV
      - retrieval metadata JSON
      - optional dedup view CSV
    Returns paths as a dict.
    """
    _ensure_dir(output_dir)

    ts = _utc_timestamp() if include_timestamp else None
    slug = _safe_slug(query_text)

    base = f"openalex_works__{slug}"
    if ts:
        base = f"{base}__{ts}"

    csv_path = os.path.join(output_dir, f"{base}.csv")
    meta_path = os.path.join(output_dir, f"{base}__meta.json")

    # Ensure meta contains export context
    meta_out = dict(retrieval_meta or {})
    meta_out.update({
        "exported_at_utc": datetime.now(timezone.utc).isoformat(),
        "export_csv": csv_path,
        "rows_exported": int(len(df_works)),
    })

    # Write outputs
    df_works.to_csv(csv_path, index=False)
    _write_json(meta_path, meta_out)

    paths = {
        "csv_path": csv_path,
        "meta_path": meta_path,
    }

    if export_dedup_view:
        dedup_view = _basic_dedup_view(df_works)
        dedup_path = os.path.join(output_dir, f"{base}__dedup_view.csv")
        dedup_view.to_csv(dedup_path, index=False)
        paths["dedup_view_path"] = dedup_path

    return paths


# ============================================================
# UI: OpenAlex Keyword Search Controls (ipywidgets)
# ============================================================

import ipywidgets as widgets
from IPython.display import display, clear_output
import pandas as pd

# --- Widgets ---
w_query = widgets.Text(
    value="venture capital OR startup OR innovation policy",
    description="query_text",
    layout=widgets.Layout(width="900px")
)

w_from_year = widgets.IntText(
    value=2010,
    description="from_year",
    layout=widgets.Layout(width="250px")
)

w_to_year = widgets.IntText(
    value=2025,
    description="to_year",
    layout=widgets.Layout(width="250px")
)

w_per_page = widgets.IntSlider(
    value=200,
    min=25,
    max=200,
    step=25,
    description="per_page",
    continuous_update=False,
    layout=widgets.Layout(width="500px")
)

w_max_results = widgets.IntText(
    value=1000,
    description="max_results",
    layout=widgets.Layout(width="250px")
)

w_open_access = widgets.Checkbox(
    value=False,
    description="open_access_only"
)

w_run = widgets.Button(
    description="Run Search",
    button_style="success",
    tooltip="Call OpenAlex API and export CSV",
    icon="play"
)

w_status = widgets.HTML(value="")
out = widgets.Output()

# Optional: show the computed OpenAlex filter string
w_filter_preview = widgets.HTML(value="")


def _update_filter_preview(*args):
    filt = _build_openalex_filters(
        from_year=w_from_year.value,
        to_year=w_to_year.value,
        open_access_only=w_open_access.value
    )
    w_filter_preview.value = f"<b>filter</b>: {filt if filt else '(none)'}"

for w in [w_from_year, w_to_year, w_open_access]:
    w.observe(_update_filter_preview, names="value")

_update_filter_preview()


# --- Runner callback ---
def _on_run_clicked(btn):
    with out:
        clear_output(wait=True)

        # Basic validation
        if not w_query.value.strip():
            print("⚠️ query_text is empty.")
            return
        if w_from_year.value and w_to_year.value and w_from_year.value > w_to_year.value:
            print("⚠️ from_year must be <= to_year.")
            return
        if w_per_page.value < 1 or w_per_page.value > 200:
            print("⚠️ per_page must be between 1 and 200.")
            return
        if w_max_results.value < 1:
            print("⚠️ max_results must be >= 1.")
            return

        # Display selected params
        filt = _build_openalex_filters(
            from_year=w_from_year.value,
            to_year=w_to_year.value,
            open_access_only=w_open_access.value
        )

        print("=== Selected Parameters ===")
        print(f"- query_text        : {w_query.value}")
        print(f"- filter            : {filt if filt else '(none)'}")
        print(f"- per_page          : {w_per_page.value}")
        print(f"- max_results       : {w_max_results.value}")
        print(f"- open_access_only  : {w_open_access.value}")
        print("")

        # 1) Retrieve
        raw_works, retrieval_meta = openalex_keyword_search_works(
            query_text=w_query.value,
            from_year=w_from_year.value,
            to_year=w_to_year.value,
            open_access_only=w_open_access.value,
            per_page=w_per_page.value,
            max_results=w_max_results.value,
            sort_by="relevance_score",
            output_dir=output_dir,
            save_jsonl=True,
            polite_delay_sec=0.1,
            mailto=None,
            verbose=True
        )

        print("")
        print(f"Retrieved raw works: {len(raw_works)}")

        # 2) Normalize
        df_works = normalize_openalex_works(raw_works)
        print(f"Normalized rows   : {len(df_works)}")

        # 3) Summary
        print("")
        _print_summary(df_works)

        # 4) Export
        print("")
        paths = export_openalex_outputs(
            df_works=df_works,
            retrieval_meta=retrieval_meta,
            query_text=w_query.value,
            output_dir=output_dir,
            include_timestamp=True,
            export_dedup_view=True
        )

        print("")
        print("=== Exported files ===")
        for k, v in paths.items():
            print(f"- {k}: {v}")

        display(df_works.head(10))

        # Keep latest result in globals for convenience
        globals()["raw_works"] = raw_works
        globals()["retrieval_meta"] = retrieval_meta
        globals()["df_works"] = df_works
        globals()["export_paths"] = paths


w_run.on_click(_on_run_clicked)


# --- Layout ---
row1 = widgets.HBox([w_from_year, w_to_year, w_open_access])
row2 = widgets.HBox([w_per_page, w_max_results])
panel = widgets.VBox([
    widgets.HTML("<h3>OpenAlex Keyword Search</h3>"),
    w_query,
    row1,
    w_filter_preview,
    row2,
    w_run,
    out
])

display(panel)


VBox(children=(HTML(value='<h3>OpenAlex Keyword Search</h3>'), Text(value='venture capital OR startup OR innov…