# Utility functions used throughout analysis

Import key libraries, defines utility functions for reading and wrangling files, including .ris and .bib files, get zotero collection from account, create and search for combinations of regex patterns for literature searches.

In [None]:
#Import necessary libraries
import collections
from datetime import datetime, UTC
from inspect import getsourcefile
import itertools
import litstudy  # Use pip install git+https://github.com/NLeSC/litstudy to download dev version
import numpy as np
import os
import pandas as pd
from pathlib import Path
import pickle
from pyzotero import zotero
import re
import requests
import shutil
from typing import List, Union, Dict, Any, Optional
from urllib.parse import urlparse

In [4]:
def regex_list_files(in_dir: Union[str, Path],
                     in_pattern: str,
                     full_path: bool = True) -> List[str]:
    """
    Lists files in a directory matching a regular expression.

    Args:
        in_dir: The directory to search. Can be a string path or a pathlib.Path object.
        in_pattern: The regular expression pattern to match filenames against.
        full_path: Whether to return full paths (True) or just filenames (False).

    Returns:
        A list of strings, either full paths or filenames, of files matching the pattern.
        Returns an empty list if no matches are found or if an error occurs.

    Raises:
        TypeError: if input arguments are of incorrect type
        ValueError: if input directory does not exist
        re.error:  If the regular expression pattern is invalid.

    """

    # --- Input Type Validation ---
    if not isinstance(in_dir, (str, Path)):
        raise TypeError("in_dir must be a string or pathlib.Path object.")
    if not isinstance(in_pattern, str):
        raise TypeError("in_pattern must be a string.")
    if not isinstance(full_path, bool):
        raise TypeError("full_path must be a boolean.")

    # --- Convert to Path object for consistency ---
    if isinstance(in_dir, str):
        in_dir = Path(in_dir)

    # --- Input Value Validation ---
    if not in_dir.is_dir():
        raise ValueError(f"The directory '{in_dir}' does not exist.")

    # --- Regex Compilation (with error handling) ---
    try:
        regex = re.compile(in_pattern)
    except re.error as e:
        raise re.error(f"Invalid regular expression pattern: {e}")

    file_list = []
    for root, _, files in os.walk(in_dir):  # os.walk works with Path objects
        for file in files:
            if regex.match(file):
                if full_path:
                    # Use .joinpath for consistent path construction with Path objects
                    file_list.append(str(Path(root).joinpath(file)))  # Convert to string for consistent return type
                else:
                    file_list.append(file)

    return file_list

In [6]:
def rpickle_bibdocset(in_dirpath: Path, 
                      in_pattern: str, 
                      out_pickle: Path) -> list:
    """Loads BibTeX files matching a pattern, combines them, and pickles the result.

    If the output pickle file already exists, it loads the data from the pickle
    instead of reprocessing the BibTeX files.

    Args:
        in_dirpath: Path to the directory containing BibTeX files.
        in_pattern: Regex pattern to match filenames within in_dirpath.
        out_pickle: Path to the output pickle file where the combined list
                    of references will be saved/loaded from.

    Returns:
        A list of reference objects loaded from the BibTeX files or pickle file.
        
    Raises:
        FileNotFoundError: If in_dirpath does not exist.
        # Add other potential exceptions from litstudy or pickle
    """
    if not out_pickle.exists():
        compiled_pattern = re.compile(in_pattern)
        if not in_dirpath.is_dir():
           raise FileNotFoundError(f"Input directory not found: {in_dirpath}")

        # Get list of every bib file
        bib_filepaths = [p for p in in_dirpath.glob('*') 
                         if p.is_file() and compiled_pattern.match(p.name)] 
        
        # Read bib files from first scoping and join them (takes ~15-20 sec/1000 refs)
        reflist = []
        try:
            for bib_path in bib_filepaths: 
                reflist.extend(litstudy.load_bibtex(bib_path)) 
        except Exception as e: 
            print(f"Error processing file {bib_path}: {e}")

        # Pickle them (save the full document set as a binary file on disk that can be easily retrieved)
        try:
            with open(out_pickle, 'wb') as f:
                pickle.dump(reflist, f)
        except (IOError, pickle.PicklingError) as e:
            print(f"Error pickling data to {out_pickle}: {e}")
    else:
        # Read pre-saved document set
        try:
            with open(out_pickle, 'rb') as f:
                reflist = pickle.load(f)
        except (IOError, pickle.UnpicklingError, EOFError) as e:
             print(f"Error unpickling data from {out_pickle}: {e}")
    return reflist


def get_zotero_collection_titles_dois(library_id: int | str, 
                 api_key_path: Path, 
                 collection_name: str) -> collections.defaultdict[str, list]:
    """Fetches item keys, titles, and DOIs from a specific Zotero collection.

    Args:
        library_id: The Zotero library ID (numeric or potentially username).
        api_key_path: Path to a file containing the Zotero API key.
        collection_name: The exact name of the Zotero collection.

    Returns:
        A defaultdict where keys are Zotero item keys and values are lists
        containing [title, DOI (or np.nan if missing)].
        
    Raises:
        FileNotFoundError: If api_key_path does not exist.
        ValueError: If the specified collection name is not found.
        # Add potential exceptions from zotero library (e.g., connection errors)
    """
    try:
        api_key = api_key_path.read_text().strip()
    except FileNotFoundError:
        print(f"API key file not found: {api_key_path}")
        raise # Re-raise the exception

    zot = zotero.Zotero(library_id=library_id,
                      library_type='group', 
                      api_key=api_key)
    try:
        # This is slightly complex, could be a helper function or loop
        collection = next((col for col in zot.collections_top() if col['data']['name'] == collection_name), None)
        if collection is None:
            raise ValueError(f"Collection '{collection_name}' not found in Zotero library {library_id}.")
        testlist_colID = collection['key'] 
    except Exception as e: 
        print(f"Error fetching Zotero collections: {e}")
        raise 

    try:
        testlist_items = zot.everything(zot.collection_items_top(testlist_colID))
    except Exception as e: # Catch specific zotero exceptions
        print(f"Error fetching items from collection ID {testlist_colID}: {e}")
        raise

    testlist_title_dois = collections.defaultdict(list)
    for ref in testlist_items:
        item_key = ref['key']
        title = ref.get('data', {}).get('title', 'N/A') # Provide default if missing
        doi = ref.get('data', {}).get('DOI', np.nan)

        testlist_title_dois[item_key].append(title)
        testlist_title_dois[item_key].append(doi)
        
    return testlist_title_dois

# Get all DOIs and titles in references returned from search
def tabulate_searchlist(in_reflist, out_csvpath):
    if not out_csvpath.exists():
        reflist_dict = {}
        for i, ref in enumerate(in_reflist):
            reflist_dict[i] = [re.sub(r"[^a-zA-Z\d\s]", "", 
                                      ref.title.replace('\n', ' ').lower()),
                               ref.publication_source,
                               ref.publication_year, ref.abstract]
            if 'doi' in ref.entry:
                reflist_dict[i].append(ref.entry['doi'])
            else:
                reflist_dict[i].append(np.nan)

        reflist_pd = pd.DataFrame.from_dict(reflist_dict, orient='index')
        reflist_pd.columns = ['title', 'source', 'year', 'abstract', 'doi']
        reflist_pd.to_csv(out_csvpath)
    else:
        reflist_pd = pd.read_csv(out_csvpath)
    return reflist_pd

# Erite string y to file x
def write(x, y):
    with open(x, 'a') as f:
        f.write(y)
        f.write('\n')
    return _


def tabulate_searchlist(in_reflist: list, out_csvpath: Path) -> pd.DataFrame:
    """Converts a list of reference objects to a Pandas DataFrame and saves/loads it as CSV.

    Extracts title, source, year, abstract, and DOI. Cleans the title.
    If the output CSV file exists, it loads the DataFrame from the CSV.

    Args:
        in_reflist: A list of reference objects (assuming attributes like title, 
                    publication_source, publication_year, abstract, entry['doi']).
        out_csvpath: Path to the CSV file to save/load the DataFrame.

    Returns:
        A Pandas DataFrame containing the tabulated reference data.
    """
    if not out_csvpath.exists():
        processed_refs = []
        for i, ref in enumerate(in_reflist):
            try:
                #Remove line breaks, convert to lower case
                cleaned_title = re.sub(r"[^a-zA-Z\d\s]", "", ref.title.replace('\n', ' ')).lower()
                
                doi = ref.entry.get('doi', np.nan) if hasattr(ref, 'entry') and isinstance(ref.entry, dict) else np.nan

                processed_refs.append({
                    'id': i, 
                    'title': cleaned_title,
                    'source': getattr(ref, 'publication_source', None), 
                    'year': getattr(ref, 'publication_year', None),   
                    'abstract': getattr(ref, 'abstract', None),     
                    'doi': doi
                })
            except AttributeError as e:
                print(f"Warning: Skipping reference {i} due to missing attribute: {e}")
                continue # Skip this reference

        # Convert list of dicts to DataFrame
        reflist_pd = pd.DataFrame(processed_refs)
        
        # CSV writing
        try:
            reflist_pd.to_csv(out_csvpath, index=False)
        except IOError as e:
            print(f"Error writing CSV to {out_csvpath}: {e}")

    else:
        try:
            reflist_pd = pd.read_csv(out_csvpath)
        except (IOError, pd.errors.EmptyDataError) as e:
            print(f"Error reading CSV from {out_csvpath}: {e}")
        except Exception as e:
             print(f"Unexpected error reading CSV {out_csvpath}: {e}")

    return reflist_pd

def combine_2w_regex(pattern1: str, 
                     pattern2: str, 
                     precede: bool = False) -> str:
    """Builds a regex pattern to find two words separated by non-word characters.

    Args:
        pattern1: The first word/pattern.
        pattern2: The second word/pattern.
        require_order: If True, pattern1 must precede pattern2. 
                       If False (default), finds either order (pattern1..pattern2 or pattern2..pattern1).

    Returns:
        The compiled regex pattern string.
    """
    p1_p2 = rf"{pattern1}\W*{pattern2}\b"
    
    if precede:
        regexp = p1_p2
    else:
        regexp = f"({regexp})|({pattern2}\\W*{pattern1}\\b)"
    return regexp

# Count number a times a simple 2-pattern group occurs in text
def find_2w_regex(text: str, 
                  pattern1: str, 
                  pattern2: str, 
                  precede: bool = False) -> int:
    """Counts occurrences of a two-word pattern in text using regex.

    Args:
        text: The text to search within.
        pattern1: The first word/pattern.
        pattern2: The second word/pattern.
        require_order: If True, pattern1 must precede pattern2. 
                       If False (default), finds either order.

    Returns:
        The number of times the pattern occurs.
    """
    regexp_pattern = build_two_word_regex(pattern1, pattern2, precede)
    
    matches = re.findall(regexp_pattern, text, flags=re.IGNORECASE) #case-insensitivity
    match_count = len(matches)
    return match_count

# Count number a times a simple pattern occurs in text
def find_regex(text: str, pattern: str) -> int:
    """Counts occurrences of a regex pattern in text.

    Args:
        text: The text to search within.
        pattern: The regex pattern string.

    Returns:
        The number of non-overlapping occurrences of the pattern.
    """
    matches = re.findall(rf"{pattern}", text, flags=re.IGNORECASE) 
    return len(matches)

# Join all strings in a list with | signs and parentheses
def recomb(in_str: Union[List[str], str], 
           recomb_sep: str = '|') -> str:
    """Builds a regex OR group from a list of strings, or returns the string itself.

    Example: ['a', 'b'] -> r'((a)|(b))' 
             'abc' -> 'abc'

    Args:
        items: A list of strings (patterns) or a single string.
        separator: The separator to use between patterns in the OR group (default '|').

    Returns:
        A regex string representing the OR group, or the original string if not a list.
    """
    if isinstance(in_str, list):
        inner_patterns = f"({recomb_sep.join(f'({w})' for w in in_str)})"
        return rf"({inner_patterns})" # Outer group captures the whole match
    else:
        return in_str

# Find patterns in text based on search dictionary
def combo_refind(in_searchdict: dict, text: str) -> int | None:
        """Searches text based on patterns defined in a dictionary. 

    Args:
        in_searchdict: A dictionary where values define search patterns. 
                           Expected structure (example):
                           { 'key1': ('with', ('patternA', 'patternB')),
                             'key2': ('pre', ('patternC', 'patternD')),
                             'key3': ('single', 'patternE'), # Using 'single' instead of None
                             'key4': 'patternF' # Assuming this means single pattern search
                           }
        text: The text to search within.

    Returns:
        The count of matches found for the first applicable definition, or None if 
        no definition is processed correctly or an unknown type is encountered early.
    """
    for regexp_combo in in_searchdict.values():
        if regexp_combo[0] == 'with':
            k = find_2w_regex(text, 
                              regexp_combo[1][0], 
                              regexp_combo[1][1], 
                              precede=False)
        elif regexp_combo[0] == 'pre':
            k = find_2w_regex(text,
                              regexp_combo[1][0], 
                              regexp_combo[1][1], 
                              precede=True)
        elif regexp_combo[0] is None:
            k = find_regex(text, regexp_combo)
        else:
            break
        return k

# Generate n-grams from DOI
#CHECK OUT: from pattern.en import ngrams
#print(ngrams("He goes to hospital", n=2))
def DOI_ngram(A):
    count0 = collections.Counter()
    s1 = A[0].replace("'", '')
    s2 = s1.replace("?", '')
    s3 = s2.replace(".", '')
    s4 = s3.replace(",", '')
    s5 = s4.replace(":", '')
    s6 = s5.lower()
    tokens = nltk.word_tokenize(s6)
    every = nltk.everygrams(tokens, 2, 4)
    count0 = count0 + (collections.Counter(every))
    count0 = count0.most_common()

    count1 = collections.Counter()
    s1 = A[1].replace("'", '')
    s2 = s1.replace("?", '')
    s3 = s2.replace(".", '')
    s4 = s3.replace(",", '')
    s5 = s4.replace(":", '')
    s6 = s5.lower()
    tokens = nltk.word_tokenize(s6)
    every = nltk.everygrams(tokens, 2, 4)
    count1 = count1 + (collections.Counter(every))
    count1 = count1.most_common()

    count2 = collections.Counter()
    for idx, i in enumerate(A[2]):
        x = collections.Counter([l.lower() for l in i])
        count2 += x
    count2 = count2.most_common()

    count3 = count0 + count1 + count2
    return count3

In [8]:
def list_of_dicts_to_dataframe(
    list_of_dicts: List[Dict[str, Any]], 
    keys_to_keep: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Converts a list of dictionaries to a Pandas DataFrame, keeping a specified
    subset of keys as columns.  Handles missing keys gracefully.

    Args:
        list_of_dicts: The list of dictionaries.
        keys_to_keep: An optional list of keys to keep as columns.
            If None, attempts to use all keys present in *any* of the
            dictionaries, but prioritizes keys from the *first* dictionary
            if there are inconsistencies.

    Returns:
        A Pandas DataFrame.

    Raises:
        TypeError: if input is not a list or contains non-dict elements.
        ValueError: if keys_to_keep is provided but is empty.
    """

    if not isinstance(list_of_dicts, list):
        raise TypeError("Input 'list_of_dicts' must be a list of dictionaries.")
    if not all(isinstance(item, dict) for item in list_of_dicts):
        raise TypeError("All elements in 'list_of_dicts' must be dictionaries.")
    if keys_to_keep is not None and not isinstance(keys_to_keep, list):
        raise TypeError("'keys_to_keep' must be a list of strings or None.")
    if keys_to_keep is not None and len(keys_to_keep) == 0:
        raise ValueError("'keys_to_keep' cannot be an empty list.")


    if keys_to_keep is None:
        # Attempt to use all keys, prioritizing the first dictionary
        if not list_of_dicts:  # Handle empty input list
            return pd.DataFrame()
        keys_to_keep = list(list_of_dicts[0].keys())  # Start with keys from first dict
        # Add any keys present in *other* dicts but missing from the first.
        for item in list_of_dicts:
            for key in item:
                if key not in keys_to_keep:
                    keys_to_keep.append(key)
    
    # Create the DataFrame, handling missing keys
    df_data = []
    for item in list_of_dicts:
        row = {key: item.get(key, None) for key in keys_to_keep}
        df_data.append(row)

    df = pd.DataFrame(df_data, columns=keys_to_keep)
    return df

In [10]:
#Function to export OpenAlex works to RIS
%run lib/openalex_formatter_ris.py

def export_oalex_works_to_ris(
    works: List[Dict[str, Any]], 
    filename: str) -> None:
    """
    Export a list of OpenAlex works to an RIS file, including the abstract.

    Args:
        works: List of OpenAlex work dictionaries.
        filename: The name of the RIS file to be created.
    """

    if not isinstance(works, list):
        raise TypeError("works must be a list of dictionaries.")
    if not isinstance(filename, str):
        raise TypeError("filename must be a string.")
    if not filename.endswith(".ris"):
        logging.warning("Filename does not end with '.ris'.  This may cause problems with some RIS readers.")

    try:
        with open(filename, 'w', encoding='utf-8') as ris_file:
            for work in works:
                if not isinstance(work, dict):
                    print(f"Skipping invalid work entry (not a dictionary): {work}")
                    continue

                ris_entry = build_ris_entry(work)
                ris_file.write(ris_entry)

                # # --- Article type ----
                # ris_file.write("TY  - JOUR\n")  # Type of reference (Journal Article)
                
                # # --- Title ---
                # title = work.get('title', '') or ''  # Handle None or missing title
                # ris_file.write(f"TI  - {title}\n")

                # # --- Authors ---
                # # Handle authors correctly.  OpenAlex stores authors as a list of dictionaries.
                # authors = work.get('authorships', []) or []
                # for author_data in authors:
                #     author_name = author_data.get('author', {}).get('display_name', '') or ''
                #     if author_name:
                #         ris_file.write(f"AU  - {author_name}\n")

                # # --- Publication Year ---
                # year = work.get('publication_year', '') or ''
                # ris_file.write(f"PY  - {year}\n")

                # # --- Journal (Source) ---
                # #  OpenAlex stores source information in 'primary_location' and 'locations'.
                # source_title = ''
                # if work.get('primary_location') and work.get('primary_location').get('source'):
                #     source_title = work['primary_location']['source'].get('display_name', '') or ''
                # elif work.get('locations'):
                #     for location in work['locations']:
                #         if location.get('source'):
                #             source_title = location['source'].get('display_name', '') or ''
                #             break # Use the first available location.

                # ris_file.write(f"JO  - {source_title}\n")

                # # --- Volume, Issue, Pages ---
                # volume = work.get('volume', '') or ''
                # issue = work.get('issue', '') or ''
                # #  Pages can be in 'biblio' or as separate 'page_start', 'page_end'
                # start_page = work.get('page_start', '') or ''
                # end_page = work.get('page_end', '') or ''
                # if not start_page and work.get('biblio'):
                #     start_page = work['biblio'].get('first_page', '') or ''
                #     end_page = work['biblio'].get('last_page', '') or ''
                
                # ris_file.write(f"VL  - {volume}\n")
                # ris_file.write(f"IS  - {issue}\n")
                # ris_file.write(f"SP  - {start_page}\n")
                # ris_file.write(f"EP  - {end_page}\n")

                # # --- DOI ---
                # doi = work.get('doi', '') or ''  # Get DOI directly.  It's a top-level field.
                # ris_file.write(f"DO  - {doi}\n")

                # # --- Abstract ---
                # abstract = work['abstract']
                # if abstract:  # Only write abstract if it exists
                #     ris_file.write(f"AB  - {abstract}\n")


                # ris_file.write("ER  - \n\n")  # End of reference

    except (IOError, OSError) as e:
        raise OSError(f"Error writing to RIS file: {e}")
    except Exception as e: #Catch remaining exceptions
        raise Exception(f"An unexpected error occurred: {e}")