In [1]:
import requests
import pandas as pd
import json
import logging
from math import ceil
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configure logging to write to debug.log with detailed formatting.
logging.basicConfig(
    filename='debug.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def get_email():
    """
    Reads the email address from the 'email.json' file.
    Returns:
        str: The email address.
    """
    logging.debug("Attempting to read email from '../.config/email.json'.")
    try:
        with open("../.config/email.json", "r") as file:
            data = json.load(file)
        email = data.get("email")
        if email:
            logging.debug("Successfully retrieved email: %s", email)
        else:
            logging.warning("Email not found in the configuration file.")
        return email
    except Exception as e:
        logging.exception("Error reading the email configuration: %s", e)
        raise

def fetch_api_data_with_session(url, session):
    """
    Fetches data from the API using the provided session.
    """
    logging.debug("Fetching API data from URL with session: %s", url)
    try:
        response = session.get(url)
        logging.debug("Received response with status code: %s", response.status_code)
        response.raise_for_status()
        data = response.json()
        logging.debug("Successfully fetched and parsed API data.")
        return data
    except Exception as e:
        logging.exception("Error fetching API data from %s: %s", url, e)
        raise

def parse_results(json_data):
    """
    Parses the JSON data to extract specific fields for each work.
    Handles both a dictionary with a "results" key and a list of works.
    
    Returns:
        list: A list of dictionaries containing selected fields.
    """
    logging.debug("Parsing JSON results.")
    results = []
    if isinstance(json_data, dict):
        works = json_data.get("results", [])
    elif isinstance(json_data, list):
        works = json_data
    else:
        logging.error("Unexpected json_data format: %s", type(json_data))
        return results

    for work in works:
        # Use get("id", "") to ensure a string is returned, then remove the prefix.
        work_id = work.get("id", "").removeprefix("https://openalex.org/")
        doi = (work.get("doi") or "").removeprefix("https://doi.org/")
        title = work.get("title", "")
        primary_topic = work.get("primary_topic", {})
        subfield = primary_topic.get("subfield", {})
        subfield_display_name = subfield.get("display_name", "")
        referenced_works_count = work.get("referenced_works_count", 0)
        referenced_works = work.get("referenced_works", [])
        cited_by_api_url = work.get("cited_by_api_url", "")
        
        results.append({
            "id": work_id,
            "doi": doi,
            "title": title,
            "subfield_display_name": subfield_display_name,
            "referenced_works_count": referenced_works_count,
            "referenced_works": referenced_works,
            "cited_by_api_url": cited_by_api_url
        })
    logging.debug("Parsed %s records from JSON data.", len(results))
    return results

def fetch_all_data(base_url, email):
    """
    Fetches all data from the API using connection pooling and concurrency.
    First, the first page is fetched to determine the total count of records.
    Then, remaining pages are fetched concurrently.
    
    Returns:
        list: A combined list of all parsed work records.
    """
    logging.debug("Starting to fetch all data from the API using a session.")
    per_page = 10
    all_results = []
    session = requests.Session()

    # Fetch the first page
    first_page_url = f"{base_url}&page=1&per_page={per_page}&mailto={email}"
    first_page_data = fetch_api_data_with_session(first_page_url, session)
    first_page_results = parse_results(first_page_data)
    all_results.extend(first_page_results)
    
    # Determine total pages (if meta info is provided)
    total_count = None
    if isinstance(first_page_data, dict):
        meta = first_page_data.get("meta", {})
        total_count = meta.get("count")
    total_pages = ceil(total_count / per_page) if total_count else 1
    logging.debug("Total count: %s, Total pages: %s", total_count, total_pages)

    # Fetch remaining pages concurrently if there are more pages
    if total_pages > 1:
        with ThreadPoolExecutor(max_workers=2) as executor:
            futures = []
            for page in range(2, total_pages + 1):
                page_url = f"{base_url}&page={page}&per_page={per_page}&mailto={email}"
                logging.debug("Submitting fetch for page %s", page)
                futures.append(executor.submit(fetch_api_data_with_session, page_url, session))
            for future in as_completed(futures):
                page_data = future.result()
                page_results = parse_results(page_data)
                logging.debug("Fetched %s records from a page.", len(page_results))
                all_results.extend(page_results)
    logging.debug("Completed fetching all data. Total records: %s", len(all_results))
    return all_results

# Main execution block
if __name__ == "__main__":
    logging.debug("Program started.")
    try:
        # Read the email from the file
        email = get_email()
        if not email:
            logging.error("Email is required to proceed. Exiting program.")
            exit(1)
        
        # Define the API endpoint URL (without page and per_page parameters)
        base_url = (
            "https://api.openalex.org/works?select=id,doi,title,publication_year,"
            "primary_topic,referenced_works_count,referenced_works,cited_by_api_url&"
            "filter=authorships.countries:countries/br,primary_topic.field.id:fields/17,"
            "publication_year:2024, authorships.institutions.lineage:i52418104"
            "&sort=publication_year:desc"
        )
        
        logging.debug("Base API URL: %s", base_url)
        # Append the mailto parameter using the email from the file
        api_url = f"{base_url}&mailto={email}"
        logging.debug("Full API URL with mailto: %s", api_url)
        
        # Fetch all data (this returns a list of parsed records)
        all_results = fetch_all_data(api_url, email)
        df = pd.DataFrame(all_results)
        logging.debug("Data loaded into DataFrame with %s records.", len(df))
        
        # Optionally, save the DataFrame to a CSV file:
        # df.to_csv("openalex_data.csv", index=False)
        logging.debug("Program completed successfully.")
    except Exception as e:
        logging.exception("An error occurred during execution: %s", e)
        raise


HTTPError: 403 Client Error: FORBIDDEN for url: https://api.openalex.org/works?select=id,doi,title,publication_year,primary_topic,referenced_works_count,referenced_works,cited_by_api_url&filter=authorships.countries:countries/br,primary_topic.field.id:fields/17,publication_year:2024,%20authorships.institutions.lineage:i52418104&sort=publication_year:desc&mailto=vinemioto@gmail.com&page=1&per_page=10&mailto=vinemioto@gmail.com

In [None]:
len(df)

In [None]:
df

In [None]:
df.to_csv("openalex_data.csv", index=False)