In [10]:
import math
import asyncio
from typing import List, Dict, Optional, Any, Callable, Union
import pyalex
from pyalex import Works, Authors, Concepts, Institutions # Import necessary pyalex classes including Institutions

import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Configuration ---
# Set your email for the OpenAlex polite pool (optional but recommended)
# pyalex.config.email = "your_email@example.com"

MAX_CNT = 10000
BATCH_SIZE = 50
DEFAULT_MAX_CONCURRENCY = 10
DEFAULT_SLEEP_INTERVAL = 0.1
OPENALEX_MAX_PER_PAGE = 200

class OpenAlexKit:
    def __init__(
        self,
        email: str = None,
        max_concurrency: int = DEFAULT_MAX_CONCURRENCY,
        sleep_interval: float = DEFAULT_SLEEP_INTERVAL,
    ):
        if max_concurrency <= 0:
            raise ValueError("max_concurrency must be a positive integer")
        if sleep_interval < 0:
            raise ValueError("sleep_interval must be non-negative")

        if email:
            pyalex.config.email = email
            logger.info(f"OpenAlex email set for polite pool: {email}")
        else:
            logger.warning("OpenAlex email not set. Using anonymous pool (lower rate limits).")

        self.batch_size = BATCH_SIZE
        self.max_cnt = MAX_CNT
        self.semaphore = asyncio.Semaphore(max_concurrency)
        self.sleep_interval = sleep_interval
        logger.info(f"OpenAlexKit initialized with max_concurrency={max_concurrency}, sleep_interval={sleep_interval}s")

    async def _execute_sync_with_controls(self, sync_func: Callable, *args: Any, **kwargs: Any) -> Any:
        # (Implementation remains the same as previous version)
        async with self.semaphore:
            func_name = sync_func.__name__
            logger.debug(f"Semaphore acquired for {func_name}. Executing...")
            try:
                result = await asyncio.to_thread(sync_func, *args, **kwargs)
                logger.debug(f"Execution of {func_name} completed. Result type: {type(result)}. Sleeping for {self.sleep_interval}s.")
                await asyncio.sleep(self.sleep_interval)
                return result
            except Exception as e:
                logger.error(f"Exception during controlled execution of {func_name}: {e}", exc_info=True)
                logger.debug(f"Sleeping for {self.sleep_interval}s after error in {func_name}.")
                await asyncio.sleep(self.sleep_interval)
                if "search" in func_name or "get" in func_name:
                     return []
                raise e
            finally:
                logger.debug(f"Semaphore released for {func_name}.")


    # --- Synchronous Helper Methods ---

    # Updated _validate_openalex_ids function
    def _validate_openalex_ids(self, entity_type: str, ids: List[str]) -> List[str]:
        """
        Validates input IDs and returns only the native OpenAlex IDs
        matching the expected entity type.
        It recognizes DOI URLs (for works), ORCID URLs (for authors),
        and ROR URLs (for institutions) as valid input formats but does not
        return them in the output list, logging warnings instead.

        Returns a list of validated native OpenAlex IDs (e.g., W..., A...).
        """
        prefix_map = {"work": "W", "author": "A", "venue": "V", "concept": "C", "institution": "I"}
        entity_type_lower = entity_type.lower()
        entity_prefix = prefix_map.get(entity_type_lower)
        # No error if entity_prefix is None, just means we can only validate by structure if URL used

        valid_ids = []
        recognized_external_count = 0
        skipped_count = 0
        processed_ids = set() # Keep track of processed IDs to avoid duplicate warnings/processing

        for item_id_raw in ids:
            if not isinstance(item_id_raw, str) or not item_id_raw.strip():
                # Skip empty or non-string entries silently or log if needed
                continue

            item_id = item_id_raw.strip()

            # Avoid processing the same ID multiple times if it appears duplicated in the input list
            if item_id in processed_ids:
                continue
            processed_ids.add(item_id)

            # 1. Check for Native OpenAlex ID (URL or direct)
            is_native = False
            native_id_to_add = None
            if item_id.startswith("https://openalex.org/"):
                potential_id = item_id.split("/")[-1]
                # Check if the extracted part starts with the *expected* prefix (if known)
                if entity_prefix and potential_id.startswith(entity_prefix):
                    native_id_to_add = potential_id
                    is_native = True
                # Or if type is unknown, check if it starts with *any* OA prefix
                elif not entity_prefix and any(potential_id.startswith(p) for p in prefix_map.values()):
                     native_id_to_add = potential_id # Assume valid if type unknown
                     is_native = True
                # Else: Prefix mismatch or not an OA structure after URL prefix
            elif entity_prefix and item_id.startswith(entity_prefix):
                 native_id_to_add = item_id
                 is_native = True

            if is_native and native_id_to_add:
                valid_ids.append(native_id_to_add)
                continue # Found native ID, move to next item

            # 2. Check for recognized External IDs (only if entity type matches)
            is_external_recognized = False
            external_type = None
            if entity_type_lower == "work" and item_id.startswith("https://doi.org/"):
                 is_external_recognized = True
                 external_type = "DOI"
            elif entity_type_lower == "author" and item_id.startswith("https://orcid.org/"):
                 is_external_recognized = True
                 external_type = "ORCID"
            elif entity_type_lower == "institution" and item_id.startswith("https://ror.org/"):
                 is_external_recognized = True
                 external_type = "ROR"

            if is_external_recognized:
                recognized_external_count += 1
                valid_ids.append(item_id)
                logger.warning(f"Recognized external ID format ({external_type}: {item_id}) for type '{entity_type}', but it will be skipped by this function. Modify calling method to handle {external_type} lookups if needed.")

            # 3. If none matched (and not native)
            # Log specific warnings for prefix mismatches vs general invalid format
            is_other_oa_id = False
            if any(item_id.startswith(p) or (item_id.startswith("https://openalex.org/") and any(item_id.split("/")[-1].startswith(p) for p in prefix_map.values())) for p in prefix_map.values()):
                 # It looks like an OpenAlex ID but didn't match the expected type or structure checks above
                 is_other_oa_id = True

            if is_other_oa_id:
                 logger.warning(f"ID {item_id} looks like an OpenAlex ID but does not match expected type '{entity_type}' or has unexpected format. Skipping.")
            else:
                 logger.warning(f"Invalid or unrecognized ID format for type '{entity_type}' skipped: {item_id}")
            skipped_count += 1


        if recognized_external_count > 0:
             logger.info(f"Recognized {recognized_external_count} external IDs (DOI/ORCID/ROR) for entity type '{entity_type}'.")
        if skipped_count > 0:
             logger.info(f"Skipped {skipped_count} other invalid, non-matching, or empty IDs for type '{entity_type}'.")

        # Return unique list of valid native IDs
        # Using dict.fromkeys preserves order while removing duplicates
        unique_valid_native_ids = list(dict.fromkeys(valid_ids))
        logger.info(f"Validation for type '{entity_type}' completed. Returning {len(unique_valid_native_ids)} unique native OpenAlex IDs.")
        return unique_valid_native_ids

    # --- Other Sync Methods (_sync_get_works_by_ids, _sync_get_authors_by_ids, etc.) ---
    # (These remain unchanged from the previous version, as they rely on the output
    #  of _validate_openalex_ids which still returns a List[str] of native IDs)

    def _sync_get_works_by_ids(self, work_ids: List[str]) -> List[Dict]:
        """Fetches work details for a batch of OpenAlex Work IDs."""
        logger.info(f"_sync_get_works_by_ids: Thread started for batch ({len(work_ids)} IDs, first 5: {work_ids[:5]}...).")
        if not work_ids:
            return []
        try:
            filter_query = "|".join(work_ids)
            results = Works().filter(openalex_id=filter_query).get()
            logger.info(f"_sync_get_works_by_ids: API call successful for batch (first 5: {work_ids[:5]}...), returning {len(results)} items.")
            return results
        except Exception as e:
            logger.error(f"Error in _sync_get_works_by_ids for batch (first 5 IDs: {work_ids[:5]}...): {e}", exc_info=True)
            return []

    def _sync_get_authors_by_ids(self, author_ids: List[str]) -> List[Dict]:
        """Fetches author details for a batch of OpenAlex Author IDs."""
        logger.info(f"_sync_get_authors_by_ids: Thread started for batch ({len(author_ids)} IDs, first 5: {author_ids[:5]}...).")
        if not author_ids:
            return []
        try:
            filter_query = "|".join(author_ids)
            results = Authors().filter(openalex_id=filter_query).get()
            logger.info(f"_sync_get_authors_by_ids: API call successful for batch (first 5: {author_ids[:5]}...), returning {len(results)} items.")
            return results
        except Exception as e:
            logger.error(f"Error in _sync_get_authors_by_ids for batch (first 5 IDs: {author_ids[:5]}...): {e}", exc_info=True)
            return []

    # ... (rest of the _sync methods: _sync_search_works_by_keywords, _sync_get_work_references, _sync_get_work_citations remain the same) ...
    def _sync_search_works_by_keywords(self, **kwargs) -> List[Dict]:
        # (Implementation remains the same as previous version)
        query = kwargs.get('query', None)
        limit = kwargs.get('limit', self.max_cnt)
        logger.info(f"_sync_search_works_by_keywords: Thread started for query '{str(query)[:50]}...' with limit {limit}.")
        try:
            works_query = Works()
            if query: works_query = works_query.search(query)
            # Apply filters... (year, type, oa, venue, concepts, pub_date, citations) - Logic unchanged
            if kwargs.get('year'): works_query = works_query.filter(publication_year=int(kwargs['year']))
            # ... other filters ...
            if kwargs.get('min_citation_count') is not None: works_query = works_query.filter(cited_by_count=f">{int(kwargs['min_citation_count'])}")
            # Apply sorting... - Logic unchanged
            sort_param = kwargs.get('sort')
            if sort_param:
                sort_field_map = {'relevance': 'relevance_score', 'citationCount': 'cited_by_count', 'publicationDate': 'publication_date'}
                # ... sorting logic ...
                if field_s2 in sort_field_map: works_query = works_query.sort(**{sort_field_map[field_s2]: direction})

            # Fetching Results with Pagination - Logic unchanged
            paper_metadata = []
            processed_count = 0
            page_size = min(limit, OPENALEX_MAX_PER_PAGE)
            if limit <= 0: page_size = OPENALEX_MAX_PER_PAGE
            logger.info(f"Executing OpenAlex search query with limit={limit}, page_size={page_size}...")
            for page in works_query.paginate(per_page=page_size, n_max=limit):
                 if not page: break
                 num_to_add = min(len(page), limit - processed_count)
                 paper_metadata.extend(page[:num_to_add])
                 processed_count += num_to_add
                 if processed_count >= limit: break
            logger.info(f"_sync_search_works_by_keywords: API call successful for query '{str(query)[:50]}...', returning {len(paper_metadata)} items.")
            return paper_metadata
        except Exception as e:
            logger.error(f"Error in _sync_search_works_by_keywords for query '{str(query)[:50]}...': {e}", exc_info=True)
            return []

    def _sync_get_work_references(self, work_id: str, limit: int) -> List[Dict]:
        # (Implementation remains the same as previous version)
        logger.info(f"_sync_get_work_references: Thread started for work {work_id} with limit {limit}.")
        if not work_id: return []
        try:
            work_data = Works()[work_id].get()
            if not work_data or 'referenced_works' not in work_data: return []
            referenced_urls = work_data.get('referenced_works', [])
            if not referenced_urls: return []
            referenced_ids = [url.split('/')[-1] for url in referenced_urls if url and url.startswith("https://openalex.org/W")] # Ensure they are Work IDs
            referenced_ids_limited = referenced_ids[:limit] # Apply limit *before* validation
            # Validate these IDs are actual work IDs before fetching
            valid_referenced_ids = self._validate_openalex_ids("work", referenced_ids_limited) # Use internal validation
            if not valid_referenced_ids: return []
            logger.info(f"Fetching details for {len(valid_referenced_ids)} referenced works for {work_id}.")
            referenced_works_data = self._sync_get_works_by_ids(valid_referenced_ids) # Fetch validated IDs
            logger.info(f"_sync_get_work_references: API call successful for work {work_id}, returning {len(referenced_works_data)} referenced items.")
            return referenced_works_data
        except Exception as e:
            logger.error(f"Error in _sync_get_work_references for {work_id}: {e}", exc_info=True)
            return []


    def _sync_get_work_citations(self, work_id: str, limit: int) -> List[Dict]:
        # (Implementation remains the same as previous version)
        logger.info(f"_sync_get_work_citations: Thread started for work {work_id} with limit {limit}.")
        if not work_id: return []
        try:
            citing_works_query = Works().filter(cites=work_id)
            citations_metadata = []
            processed_count = 0
            page_size = min(limit, OPENALEX_MAX_PER_PAGE)
            if limit <= 0: page_size = OPENALEX_MAX_PER_PAGE
            logger.info(f"Executing OpenAlex citations query for {work_id} with limit={limit}, page_size={page_size}...")
            for page in citing_works_query.paginate(per_page=page_size, n_max=limit):
                if not page: break
                num_to_add = min(len(page), limit - processed_count)
                citations_metadata.extend(page[:num_to_add])
                processed_count += num_to_add
                if processed_count >= limit: break
            logger.info(f"_sync_get_work_citations: API call successful for work {work_id}, returning {len(citations_metadata)} citing items.")
            return citations_metadata
        except Exception as e:
            logger.error(f"Error in _sync_get_work_citations for {work_id}: {e}", exc_info=True)
            return []


    # --- Asynchronous Public Methods ---
    # (These remain unchanged from the previous version, as they call _validate_openalex_ids
    #  and receive back a List[str] of native IDs, which they already expect)

    async def async_search_paper_by_ids(
        self,
        id_list: List[str] # Can contain OpenAlex IDs, DOI URLs
    ) -> List[Dict]:
        """Search paper by OpenAlex Work IDs or DOI URLs asynchronously.
           NOTE: Currently only processes native OpenAlex IDs due to validator output.
        """
        # Validate IDs - This now recognizes DOIs but only returns native W... IDs
        valid_native_id_list = self._validate_openalex_ids("work", id_list)
        id_cnt = len(valid_native_id_list)
        paper_metadata = []

        if id_cnt > 0:
            batch_size = self.batch_size
            batch_cnt = math.ceil(id_cnt / batch_size)
            batches = [valid_native_id_list[i * batch_size:(i + 1) * batch_size] for i in range(batch_cnt)]

            tasks = []
            logger.info(f"async_search_paper_by_ids: Creating {len(batches)} tasks for {id_cnt} valid native OpenAlex Work IDs.")
            for batch in batches:
                tasks.append(self._execute_sync_with_controls(self._sync_get_works_by_ids, batch))

            logger.info(f"async_search_paper_by_ids: Gathering {len(tasks)} tasks...")
            batch_results_list = await asyncio.gather(*tasks, return_exceptions=True)
            logger.info(f"async_search_paper_by_ids: Gather complete. Processing results.")

            for result in batch_results_list:
                if isinstance(result, Exception): logger.error(f"A batch task for async_search_paper_by_ids failed: {result}")
                elif isinstance(result, list): paper_metadata.extend(result)
                else: logger.warning(f"Unexpected result type {type(result)} from paper batch task: {result}")
        else:
            logger.warning("async_search_paper_by_ids: No valid native OpenAlex Work IDs found in the input list.")

        return paper_metadata

    async def async_search_author_by_ids(
        self,
        author_ids: List[str], # Can contain OpenAlex IDs, ORCID URLs
        with_abstract: Optional[bool] = False
    ) -> List[Dict]:
        """Search author by OpenAlex Author IDs or ORCID URLs asynchronously.
           NOTE: Currently only processes native OpenAlex IDs due to validator output.
        """
        # Validate IDs - This now recognizes ORCIDs but only returns native A... IDs
        valid_native_id_list = self._validate_openalex_ids("author", author_ids)
        id_cnt = len(valid_native_id_list)
        author_metadata = []

        if id_cnt > 0:
            batch_size = self.batch_size
            batch_cnt = math.ceil(id_cnt / batch_size)
            batches = [valid_native_id_list[i * batch_size:(i + 1) * batch_size] for i in range(batch_cnt)]
            logger.info(f"async_search_author_by_ids: Fetching {id_cnt} authors by native ID in {batch_cnt} batches.")

            tasks = []
            for batch in batches:
                 tasks.append(self._execute_sync_with_controls(self._sync_get_authors_by_ids, batch))

            logger.info(f"async_search_author_by_ids: Gathering {len(tasks)} tasks...")
            batch_results_list = await asyncio.gather(*tasks, return_exceptions=True)
            logger.info(f"async_search_author_by_ids: Gather complete. Processing results.")

            for result in batch_results_list:
                if isinstance(result, Exception): logger.error(f"A batch task for async_search_author_by_ids failed: {result}")
                elif isinstance(result, list): author_metadata.extend(result)
                else: logger.warning(f"Unexpected result type {type(result)} from author batch task: {result}")
        else:
             logger.warning("async_search_author_by_ids: No valid native OpenAlex Author IDs found in the input list.")

        if with_abstract:
            logger.warning("`with_abstract=True` is not directly supported for `async_search_author_by_ids` with OpenAlex.")

        return author_metadata

    # ... (rest of the async methods: async_search_paper_by_keywords, async_get_s2_cited_papers, async_get_s2_citing_papers, async_get_s2_recommended_papers remain the same) ...
    async def async_search_paper_by_keywords(self, query: str, year: str = None, publication_types: list = None, open_access_pdf: bool = None, venue: list = None, fields_of_study: list = None, publication_date_or_year: str = None, min_citation_count: int = None, limit: int = 100, bulk: bool = False, sort: str = None, match_title: bool = False) -> List[Dict]:
        # (Implementation remains the same as previous version)
        search_kwargs = { k: v for k, v in locals().items() if k != 'self' and v is not None and k != 'match_title'} # Simplified kwargs creation
        search_kwargs['limit'] = min(search_kwargs.get('limit', 100), self.max_cnt) # Ensure limit is applied correctly
        logger.info(f"async_search_paper_by_keywords: Searching papers by keyword: '{query[:50]}...' with effective limit {search_kwargs.get('limit')}.")
        try:
            paper_metadata = await self._execute_sync_with_controls(self._sync_search_works_by_keywords, **search_kwargs)
        except Exception as e:
             logger.error(f"async_search_paper_by_keywords: Failed for query '{query[:50]}...': {e}")
             paper_metadata = []
        return paper_metadata

    async def async_get_s2_cited_papers(self, paper_id: str, limit: int = 100, with_abstract: Optional[bool] = False) -> List[Dict]:
        # (Implementation remains the same as previous version)
        # Note: paper_id here MUST be a native OpenAlex Work ID for _sync_get_work_references to work
        valid_paper_id = self._validate_openalex_ids("work", [paper_id])
        if not valid_paper_id:
             logger.error(f"async_get_s2_cited_papers: Invalid native OpenAlex Work ID provided: {paper_id}")
             return []
        work_id = valid_paper_id[0]

        max_limit = min(limit, self.max_cnt)
        logger.info(f"async_get_s2_cited_papers: Fetching references for paper {work_id} with effective limit {max_limit}.")
        refs_metadata = []
        try:
            refs_metadata = await self._execute_sync_with_controls(self._sync_get_work_references, work_id, max_limit)
        except Exception as e:
             logger.error(f"async_get_s2_cited_papers: Failed for paper {work_id}: {e}")
             refs_metadata = []

        # Handle with_abstract... (Logic unchanged)
        if with_abstract and refs_metadata:
            papers_missing_abstracts_ids = set()
            # ... abstract fetching logic ...
            if papers_missing_abstracts_ids:
                 # ... call async_search_paper_by_ids ...
                 # ... map abstracts back ...
                 pass # Placeholder for brevity

        return refs_metadata

    async def async_get_s2_citing_papers(self, paper_id: str, limit: int = 100, with_abstract: Optional[bool] = False) -> List[Dict]:
        # (Implementation remains the same as previous version)
        # Note: paper_id here MUST be a native OpenAlex Work ID for _sync_get_work_citations to work
        valid_paper_id = self._validate_openalex_ids("work", [paper_id])
        if not valid_paper_id:
             logger.error(f"async_get_s2_citing_papers: Invalid native OpenAlex Work ID provided: {paper_id}")
             return []
        work_id = valid_paper_id[0]

        max_limit = min(limit, self.max_cnt)
        logger.info(f"async_get_s2_citing_papers: Fetching citations for paper {work_id} with effective limit {max_limit}.")
        citedby_metadata = []
        try:
            citedby_metadata = await self._execute_sync_with_controls(self._sync_get_work_citations, work_id, max_limit)
        except Exception as e:
            logger.error(f"async_get_s2_citing_papers: Failed for paper {work_id}: {e}")
            citedby_metadata = []

        # Handle with_abstract... (Logic unchanged)
        if with_abstract and citedby_metadata:
             papers_missing_abstracts_ids = set()
             # ... abstract fetching logic ...
             if papers_missing_abstracts_ids:
                  # ... call async_search_paper_by_ids ...
                  # ... map abstracts back ...
                  pass # Placeholder for brevity

        return citedby_metadata

    async def async_get_s2_recommended_papers(self, positive_paper_ids: List[str], negative_paper_ids: List[str] = None, limit: int = 100, with_abstract: Optional[bool] = False) -> List[Dict]:
        # (Implementation remains the same as previous version - logs warning, returns [])
        logger.warning("OpenAlex does not support recommendations based on positive/negative paper ID lists like Semantic Scholar.")
        logger.warning("async_get_s2_recommended_papers will return an empty list.")
        valid_pos_ids = self._validate_openalex_ids("work", positive_paper_ids) # Validate for logging
        valid_neg_ids = []
        if negative_paper_ids: valid_neg_ids = self._validate_openalex_ids("work", negative_paper_ids)
        logger.info(f"async_get_s2_recommended_papers called with {len(valid_pos_ids)} valid native positive IDs. Limit: {limit}. Returning [].")
        return []

In [11]:
seed_dois = ['https://doi.org/10.48550/arXiv.2406.10252',  # AutoSurvey: Large Language Models Can Automatically Write Surveys
            'https://doi.org/10.48550/arXiv.2412.10415',  # Generative Adversarial Reviews: When LLMs Become the Critic
            'https://doi.org/10.48550/arXiv.2402.12928',  # A Literature Review of Literature Reviews in Pattern Analysis and Machine Intelligence 
            ]

In [12]:
oa = OpenAlexKit(email="ai4fun@gmail.com")
papers_info = await oa.async_search_paper_by_ids(id_list=["https://doi.org/10.7717/peerj.4375"])

2025-04-21 15:13:15,246 - INFO - OpenAlex email set for polite pool: ai4fun@gmail.com
2025-04-21 15:13:15,247 - INFO - OpenAlexKit initialized with max_concurrency=10, sleep_interval=0.1s
2025-04-21 15:13:15,249 - INFO - Recognized 1 external IDs (DOI/ORCID/ROR) for entity type 'work'.
2025-04-21 15:13:15,250 - INFO - Skipped 1 other invalid, non-matching, or empty IDs for type 'work'.
2025-04-21 15:13:15,250 - INFO - Validation for type 'work' completed. Returning 1 unique native OpenAlex IDs.
2025-04-21 15:13:15,251 - INFO - async_search_paper_by_ids: Creating 1 tasks for 1 valid native OpenAlex Work IDs.
2025-04-21 15:13:15,251 - INFO - async_search_paper_by_ids: Gathering 1 tasks...
2025-04-21 15:13:15,252 - INFO - _sync_get_works_by_ids: Thread started for batch (1 IDs, first 5: ['https://doi.org/10.7717/peerj.4375']...).
2025-04-21 15:13:16,261 - ERROR - Error in _sync_get_works_by_ids for batch (first 5 IDs: ['https://doi.org/10.7717/peerj.4375']...): 'https://doi.org/10.7717/pe

In [9]:
papers_info

[]

In [3]:
from pyalex import Works, Authors, Sources, Institutions, Topics, Publishers, Funders

import pyalex

pyalex.config.email = "ai4fun2004@gmail.com"

# same as
Works()["https://doi.org/10.48550/arXiv.2501.04682"]

{'id': 'https://openalex.org/W4406231668',
 'doi': 'https://doi.org/10.48550/arxiv.2501.04682',
 'title': 'Towards System 2 Reasoning in LLMs: Learning How to Think With Meta\n  Chain-of-Thought',
 'display_name': 'Towards System 2 Reasoning in LLMs: Learning How to Think With Meta\n  Chain-of-Thought',
 'publication_year': 2025,
 'publication_date': '2025-01-08',
 'ids': {'openalex': 'https://openalex.org/W4406231668',
  'doi': 'https://doi.org/10.48550/arxiv.2501.04682'},
 'language': 'en',
 'primary_location': {'is_oa': True,
  'landing_page_url': 'http://arxiv.org/abs/2501.04682',
  'pdf_url': 'http://arxiv.org/pdf/2501.04682',
  'source': {'id': 'https://openalex.org/S4306400194',
   'display_name': 'arXiv (Cornell University)',
   'issn_l': None,
   'issn': None,
   'is_oa': True,
   'is_in_doaj': False,
   'is_indexed_in_scopus': False,
   'is_core': False,
   'host_organization': 'https://openalex.org/I205783295',
   'host_organization_name': 'Cornell University',
   'host_orga