# OpenAIRE Graph University Overview

This notebook fetches summary statistics per university using the [OpenAIRE Graph API](https://graph.openaire.eu/docs/apis/graph-api/). For every listed university we compare three perspectives:

- **A. Publications affiliated to the university** – filter on the OpenAIRE OpenORG identifier.
- **B. Publications collected by the main (CRIS) data source** – filter on the CRS/data source identifier.
- **C. Publications collected by the secondary repository** – filter on the repository identifier when available.

For each perspective we retrieve counts for funding/projects, data sources, and research products split into publications, datasets, software, and other research outputs. The notebook separates the data collection steps so that each can be re-run independently when debugging or iterating.


In [None]:
!pip install pandas matplotlib openpyxl python-dotenv


## 1. Imports and reused constants


In [None]:
import os
import csv
import time
from copy import deepcopy
from io import StringIO
from pathlib import Path
from typing import Any, Dict, Optional, Union

import matplotlib.pyplot as plt
import pandas as pd
import requests
from dotenv import load_dotenv

pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.float_format", lambda value: f"{value:,.0f}")

load_dotenv()

CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")

if not CLIENT_ID or not CLIENT_SECRET:
    raise RuntimeError(
        "Missing OpenAIRE credentials. Set CLIENT_ID and CLIENT_SECRET in the environment."
    )

BASE_URL = "https://api.openaire.eu/graph"
TOKEN_URL = "https://aai.openaire.eu/oidc/token"
API_USER_AGENT = "OpenAIRE-tools overview-stats notebook"
API_PAUSE_SECONDS = 0.1  # throttle requests a bit to stay within rate limits
TOKEN_REFRESH_BUFFER = 60  # refresh the token one minute before expiration

PRODUCT_TYPE_LABELS = {
    "publication": "Publications",
    "dataset": "Research data",
    "software": "Research software",
    "other": "Other research products",
}

METRIC_ORDER = [
    "Funding / Projects",
    "Data sources",
    *PRODUCT_TYPE_LABELS.values(),
]

COMPARISON_LONG_PATH = "comparison_long.csv"
COMPARISON_PIVOT_PATH = "comparison_pivot.csv"

_access_token: Optional[str] = None
_access_token_expiry: float = 0.0


## 2. Parse the university reference table
The raw table below mirrors the values supplied in the request. We reshape it into a structured list so that the rest of the notebook can iterate over the entries.


In [None]:
nl_orgs_baseline_url = "https://docs.google.com/spreadsheets/d/e/2PACX-1vTDQiWDIaI1SZkPTMNCovicBhA-nQND1drXoUKvrG1O_Ga3hLDRvmQZao_TvNgmNQ/pub?output=xlsx"
NL_ORGS_BASELINE_PATH = Path("nl_orgs_baseline.xslx")


def download_nl_orgs_baseline(url: str, dest: Path) -> Path:
    """Download the latest NL organizations baseline and store it locally."""
    response = requests.get(url, timeout=120)
    response.raise_for_status()
    dest.write_bytes(response.content)
    return dest


download_nl_orgs_baseline(nl_orgs_baseline_url, NL_ORGS_BASELINE_PATH)

TABLE_PATH = NL_ORGS_BASELINE_PATH


def load_university_table(path: Union[str, Path]) -> pd.DataFrame:
    """Load the NL organizations reference table and normalise empty values to blanks."""
    table_path = Path(path)
    if not table_path.exists():
        raise FileNotFoundError(f"Reference table not found: {table_path}")
    excel_suffixes = {".xlsx", ".xls", ".xslx"}
    if table_path.suffix.lower() in excel_suffixes:
        df = pd.read_excel(table_path, dtype=str, keep_default_na=False)
    else:
        df = pd.read_csv(table_path, sep="	", dtype=str, keep_default_na=False)
    return df.fillna("")


## 3. Helper functions for the Graph API
These functions wrap the REST requests and centralise filter construction per scenario. Each call prints nothing by default so we can reuse them freely in later cells.


In [None]:
SCENARIO_DEFS = [
    {
        "key": "organization",
        "label": "A. OpenORG affiliation",
        "id_field": "openorg_id",
        "description": "Publications affiliated to the university (OpenAIRE OpenORG ID)",
    },
    {
        "key": "main_datasource",
        "label": "B. Main/CRIS data source",
        "id_field": "main_datasource_id",
        "description": "Publications collected from the main CRIS data source",
    },
    {
        "key": "secondary_datasource",
        "label": "C. Secondary repository",
        "id_field": "secondary_datasource_id",
        "description": "Publications collected from the secondary / repository data source",
    },
]

FILTER_BUILDERS = {
    "organization": lambda entity_id: {
        "projects": {"relOrganizationId": entity_id},
        "dataSources": {"relOrganizationId": entity_id},
        "researchProducts": {"relOrganizationId": entity_id},
    },
    "main_datasource": lambda entity_id: {
        "projects": {"relCollectedFromDatasourceId": entity_id},
        "dataSources": {"id": entity_id},
        "researchProducts": {"relCollectedFromDatasourceId": entity_id},
    },
    "secondary_datasource": lambda entity_id: {
        "projects": {"relCollectedFromDatasourceId": entity_id},
        "dataSources": {"id": entity_id},
        "researchProducts": {"relCollectedFromDatasourceId": entity_id},
    },
}

EMPTY_METRICS = {metric: None for metric in METRIC_ORDER}


def obtain_access_token() -> str:
    """Return a cached OpenAIRE access token, refreshing it when needed."""
    global _access_token, _access_token_expiry
    now = time.time()
    if _access_token and now < _access_token_expiry:
        return _access_token

    response = requests.post(
        TOKEN_URL,
        data={"grant_type": "client_credentials"},
        auth=(CLIENT_ID, CLIENT_SECRET),
        headers={"User-Agent": API_USER_AGENT},
        timeout=60,
    )
    response.raise_for_status()
    payload = response.json()
    token = payload.get("access_token")
    if not token:
        raise RuntimeError("OpenAIRE token response did not include an access_token.")
    expires_in = int(payload.get("expires_in", 3600))
    _access_token = token
    _access_token_expiry = now + max(expires_in - TOKEN_REFRESH_BUFFER, 0)
    return _access_token


def call_graph_api(path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """Invoke the OpenAIRE Graph API and return the decoded JSON payload."""
    url = f"{BASE_URL}{path}"
    effective_params = dict(params or {})
    effective_params.setdefault("page", 1)
    effective_params.setdefault("pageSize", 1)

    headers = {
        "User-Agent": API_USER_AGENT,
        "Authorization": f"Bearer {obtain_access_token()}",
    }

    response = requests.get(
        url,
        params=effective_params,
        headers=headers,
        timeout=60,
    )
    response.raise_for_status()
    time.sleep(API_PAUSE_SECONDS)
    return response.json()


def fetch_num_found(path: str, params: Dict[str, Any]) -> Optional[int]:
    """Return the total number of matching records for the supplied endpoint."""
    payload = call_graph_api(path, params)
    header = payload.get("header", {})
    num_found = header.get("numFound")
    return int(num_found) if num_found is not None else None


def build_filters(scenario_key: str, entity_id: str) -> Dict[str, Dict[str, Any]]:
    builder = FILTER_BUILDERS[scenario_key]
    return {name: dict(filters) for name, filters in builder(entity_id).items()}


def collect_metrics(scenario_key: str, entity_id: Optional[str]) -> Dict[str, Optional[int]]:
    if not entity_id:
        return deepcopy(EMPTY_METRICS)

    filters = build_filters(scenario_key, entity_id)
    results: Dict[str, Optional[int]] = {}

    results["Funding / Projects"] = fetch_num_found("/v1/projects", filters["projects"])
    results["Data sources"] = fetch_num_found("/v1/dataSources", filters["dataSources"])

    for rp_type, label in PRODUCT_TYPE_LABELS.items():
        rp_params = dict(filters["researchProducts"], type=rp_type)
        results[label] = fetch_num_found("/v2/researchProducts", rp_params)

    return results


## 4. Sanity check on a single university
Run a quick test so we know the API access works before looping over the full list.


In [None]:

sample_university = universities[0]
print(f"Sample university: {sample_university['name']}")
for scenario in SCENARIO_DEFS:
    identifier = sample_university.get(scenario["id_field"])
    metrics = collect_metrics(scenario["key"], identifier)
    print(f"  {scenario['label']} ({identifier}):")
    for metric in METRIC_ORDER:
        print(f"    - {metric}: {metrics.get(metric)}")


## 5. Fetch metrics for all universities
This cell iterates over every university and prints intermediate summaries for transparency during execution.


In [None]:

aggregated_results: list[dict[str, Any]] = []

for university in universities:
    uni_entry = {
        "info": university,
        "scenarios": {},
    }
    print(f"\n=== {university['name']} ===")
    for scenario in SCENARIO_DEFS:
        identifier = university.get(scenario["id_field"])
        metrics = collect_metrics(scenario["key"], identifier)
        uni_entry["scenarios"][scenario["key"]] = metrics

        label = scenario["label"]
        identifier_display = identifier if identifier else "no identifier provided"
        print(f"{label} ({identifier_display}):")
        for metric in METRIC_ORDER:
            print(f"  {metric}: {metrics.get(metric)}")
    aggregated_results.append(uni_entry)

print("\nCompleted data collection for all universities.")


## 6. Assemble and save the comparison table
We reshape the collected metrics into both a long format and a pivoted table, then cache them on disk for reuse.


In [None]:
records: list[dict[str, Any]] = []
for entry in aggregated_results:
    base = entry["info"]
    for scenario in SCENARIO_DEFS:
        metrics = entry["scenarios"].get(scenario["key"], {})
        for metric in METRIC_ORDER:
            records.append(
                {
                    "University": base["name"],
                    "ROR": base.get("ror"),
                    "Scenario": scenario["label"],
                    "Metric": metric,
                    "Count": metrics.get(metric),
                }
            )

comparison_df = pd.DataFrame(records)
comparison_pivot = (
    comparison_df.pivot_table(
        index=["University", "ROR"],
        columns=["Scenario", "Metric"],
        values="Count",
    )
    .sort_index(axis=1, level=0)
    .astype("Int64")
)

comparison_df.to_csv(COMPARISON_LONG_PATH, index=False)
comparison_pivot.to_csv(COMPARISON_PIVOT_PATH)
print(f"Saved long-format table to {COMPARISON_LONG_PATH}")
print(f"Saved pivot table to {COMPARISON_PIVOT_PATH}")
comparison_pivot


## 7. Reload saved comparison tables
Reload the cached CSV files so downstream steps can run without recomputing the API queries.


In [None]:
comparison_df = pd.read_csv(COMPARISON_LONG_PATH)
if "Count" in comparison_df.columns:
    comparison_df["Count"] = comparison_df["Count"].astype("Int64")

# Remove problematic row for Uni of Minnesota ('University of Minnesota', '017zqws13') 
mask = (comparison_df["University"] == "University of Minnesota") & (comparison_df["ROR"] == "017zqws13")
comparison_df = comparison_df.loc[~mask].copy()

comparison_pivot = pd.read_csv(
    COMPARISON_PIVOT_PATH,
    header=[0, 1],
    index_col=[0, 1],
)
comparison_pivot.columns = pd.MultiIndex.from_tuples(comparison_pivot.columns)
comparison_pivot = comparison_pivot.astype("Int64")
row_key = ("Uni of Innsbruck", "054pv6659")
if row_key in comparison_pivot.index:
    comparison_pivot = comparison_pivot.drop(index=row_key)
print(f"Reloaded tables from {COMPARISON_LONG_PATH} and {COMPARISON_PIVOT_PATH}")
comparison_pivot


## 8. Visualising publication counts
The chart below compares publication totals by affiliation and harvesting source. Missing bars indicate that the corresponding source does not currently expose publications in the OpenAIRE Graph.


In [None]:
publication_df = comparison_df[comparison_df["Metric"] == "Publications"].copy()
wide_publications = (
    publication_df.pivot(index="University", columns="Scenario", values="Count")
    .reindex(universities_df["name"].tolist())
)

fig, ax = plt.subplots(figsize=(12, 6))
wide_publications.plot(kind="bar", ax=ax)
ax.set_xlabel("University")
ax.set_ylabel("Number of publications")
ax.set_title("Publications by affiliation vs. data sources")
ax.legend(title="Scenario")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.show()

print("Note: Missing bars indicate that the source may not be connected to OpenAIRE or does not expose publications.")
main_datasource_label = next(
    scenario["label"]
    for scenario in SCENARIO_DEFS
    if scenario["key"] == "main_datasource"
)
if main_datasource_label not in wide_publications.columns:
    print("\nNo publication data available for the CRIS data source scenario.")
else:
    main_source_counts = wide_publications[main_datasource_label]
    has_main_datasource = universities_df["main_datasource_id"].fillna("").str.strip() != ""
    main_source_publications = universities_df["name"].map(main_source_counts)
    no_publications_mask = main_source_publications.isna() | (main_source_publications == 0)
    no_publications_cris = universities_df.loc[has_main_datasource & no_publications_mask, "name"].tolist()
    if no_publications_cris:
        print("\nUniversities with a CRIS data source but no publications retrieved from it:")
        for name in no_publications_cris:
            print(f"- {name}")
    else:
        print("\nAll CRIS-connected universities have publications retrieved from that data source.")


## 9. Notes and follow-ups
- A missing identifier (OpenORG or data source) results in `NA` counts for that scenario.
- Some organizations without a dedicated CRIS only contribute repository records, so their main data source column remains empty.
- Consider repeating the collection with date filters (e.g. recent years) or cursor-based pagination if you need to validate the totals beyond the `numFound` figures.
