In [2]:
import pandas as pd
import json
import spacy
from spacy.language import Language
from concurrent.futures import ProcessPoolExecutor
from typing import List, Dict, Union
from pathlib import Path
from functools import partial 
import string

In [3]:
data_source = Path('/data/alejandro/projects/ns-pond/pipeline_outputs/participant_demographics/ParticipantDemographicsExtractor/1.0.0/fd0599e01921/')
projects_folder = Path('/data/alejandro/projects/')

results_df = pd.read_csv('pd_normalized.csv')

In [4]:
def normalize_string(input_string: str) -> str:
    """Normalize a string by removing leading/trailing whitespace and converting to lowercase.
    Args:
        input_string (str): The string to normalize.
    Returns:
        str: The normalized string.
    """
    clean_string = string.capwords(input_string.strip())
    clean_string = clean_string.replace("’", "'")
    if clean_string == "":
        return None
    if clean_string == "None":
        return None
    if clean_string == "Nan":
        return None
    if clean_string == "N/A":
        return None
    if clean_string == "Null":
        return None

    return clean_string


def load_abbreviations(
    text: str, model: Union[str, Language] = "en_core_sci_sm"
) -> List[Dict]:
    """Process text to extract abbreviations using spaCy with scispacy.
    Args:
        text (str): The text to process for abbreviations
        model (Union[str, Language]): SpaCy model name or loaded model.
            Defaults to "en_core_sci_sm".
    Returns:
        List[Dict]: List of abbreviation dictionaries, each containing:
            - short_text: The abbreviated form
            - short_start: Start position of short form
            - short_end: End position of short form
            - long_text: The expanded form
            - long_start: Start position of long form
            - long_end: End position of long form
    Example:
        >>> text = "Magnetic resonance imaging (MRI) is a medical imaging technique"
        >>> abbrevs = load_abbreviations(text)
        >>> print(abbrevs[0]['short_text'])  # 'MRI'
        >>> print(abbrevs[0]['long_text'])   # 'Magnetic resonance imaging'
    """
    try:
        if isinstance(model, str):
            try:
                nlp = spacy.load(model, disable=["parser", "ner"])
            except OSError:
                print(f"Downloading {model} model...")
                spacy.cli.download(model)
                nlp = spacy.load(model, disable=["parser", "ner"])
        else:
            nlp = model

        # Add abbreviation detector if not present
        if "abbreviation_detector" not in nlp.pipe_names:
            try:
                import scispacy.abbreviation  # noqa: F401

                nlp.add_pipe("abbreviation_detector")
            except ImportError as e:
                raise ImportError(
                    f"scispacy is required for abbreviation detection: {e}"
                )

        # Process the text
        doc = nlp(text)
        abbreviations = []

        # Extract and serialize abbreviations
        for short in doc._.abbreviations:
            long = short._.long_form
            abbreviations.append(
                {
                    "short_text": short.text,
                    "short_start": short.start_char,
                    "short_end": short.end_char,
                    "long_text": long.text,
                    "long_start": long.start_char,
                    "long_end": long.end_char,
                }
            )

        return abbreviations

    except Exception as e:
        print(f"Warning: Error processing abbreviations: {e}")
        return []


def resolve_abbreviations(
    target: str,
    abbreviations: List[Dict],
) -> str:
    """Resolve abbreviations in target text using a list of known abbreviations.
    Finds and expands all abbreviations in the text, but only processes each unique
    abbreviation once (using its first occurrence).
    Args:
        target (str): The text string that may contain abbreviations
        abbreviations (List[Dict]): List of abbreviation dictionaries from load_abbreviations()
        remove_parenthetical (bool): Removes parenthetical abbreviations from the text.
            Defaults to True.
    Returns:
        str: Text with abbreviations expanded to their full forms
    Example:
        >>> text = "The MRI showed abnormal MRI results. Both EEG and MRI indicated..."
        >>> abbrevs = load_abbreviations(
        ...     "Magnetic resonance imaging (MRI) and electroencephalogram (EEG)..."
        ... )
        >>> expanded = resolve_abbreviations(text, abbrevs)
        >>> print(expanded)
        >>> # Result: First MRI expanded, EEG expanded, subsequent MRIs unchanged
    """
    if not target or not abbreviations:
        return target

    # Track which abbreviations we've already processed
    processed_abbrevs = set()
    result = target

    # Find all abbreviations that appear in the target
    matching_abbrevs = [
        abrv
        for abrv in abbreviations
        if abrv["short_text"] in target and abrv["short_text"] not in processed_abbrevs
    ]

    # Process each unique abbreviation (first occurrence only)
    for abrv in matching_abbrevs:
        short_form = abrv["short_text"]
        if short_form not in processed_abbrevs:
            result = result.replace(short_form, abrv["long_text"])
            processed_abbrevs.add(short_form)

    return result

def find_and_remove_definitions(s, abbreviations):
    """
    Find and remove definitions from the input string.
    Args:
        s (str): The input string to process.
        abbreviations (List[Dict]): List of abbreviation dictionaries.
    Returns:
        str: The modified string with definitions removed.
    """
    words = s.split()
    modified_words = []

    # Iterate through the words
    for i, word in enumerate(words):
        # Assume the word will be kept unless it's a definition to be removed
        is_definition_to_remove = False

        # Check if the word starts with '(' and ends with ')'
        if word.startswith('(') and word.endswith(')'):
            clause = word[1:-1]

            # Check if the clause is a known abbreviation
            for abbreviation in abbreviations:
                if abbreviation["short_text"] == clause:
                    is_definition_to_remove = True
                    break

            # Also check if the clause is a recently defined abbreviation
            clause_len = len(clause)
            if i >= clause_len:
                if not clause:  # Handles the case of "()"
                    is_definition_to_remove = True
                else:
                    # Form the potential abbreviation from the first letters of preceding words
                    # s.split() ensures words in `words` are non-empty, so `prev_word[0]` is safe.
                    preceding_abbr = "".join(prev_word[0] for prev_word in words[i-clause_len : i])
                    if preceding_abbr.lower() == clause.lower():
                        is_definition_to_remove = True

        if not is_definition_to_remove:
            modified_words.append(word)

    # Join the modified words back into a single string
    return ' '.join(modified_words)

In [5]:
def process_single_folder(fol_path: Path, base_projects_folder: Path):
    """
    Processes a single folder (fol_path) to extract and transform data.
    base_projects_folder is the equivalent of the global 'projects_folder'.
    Returns a list of processed group dictionaries, or an empty list on error.
    """
    try:
        # print(f"Processing {fol_path}") # Optional: for debugging
        processed_groups_for_this_fol = []
        results_json_file = fol_path / 'results.json'
        info_json_file = fol_path / 'info.json'

        if not results_json_file.exists():
            # print(f"Skipping {fol_path}: results.json not found.")
            return processed_groups_for_this_fol
        if not info_json_file.exists():
            # print(f"Skipping {fol_path}: info.json not found.")
            return processed_groups_for_this_fol

        with open(results_json_file, 'r') as f:
            results_content = json.load(f) # Renamed from 'results'

        with open(info_json_file, 'r') as f:
            info = json.load(f)

        # --- Input path resolution (carefully matching original logic intent) ---
        input_path_from_info_str = list(info['inputs'].keys())[0]
        input_path_from_info = Path(input_path_from_info_str)
        final_input_path = None

        if not input_path_from_info.is_absolute():
            if not input_path_from_info.parts: # Empty path string (e.g., Path(""))
                print(f"Warning: Empty input path in info.json for {fol_path}")
                return processed_groups_for_this_fol

            # Original logic: if relative, strip the first component of the path
            # and make it relative to base_projects_folder.
            if len(input_path_from_info.parts) > 1:
                remaining_parts = input_path_from_info.parts[1:]
                relative_tail = Path(*remaining_parts)
                final_input_path = base_projects_folder / relative_tail
            else: # Single component relative path (e.g., "file.txt")
                  remaining_parts = input_path_from_info.parts[1:]
                  if remaining_parts: # If "dir/file.txt", remaining_parts is ("file.txt",)
                      relative_tail = Path(*remaining_parts)
                      final_input_path = base_projects_folder / relative_tail
                  elif input_path_from_info.name: # If "file.txt", remaining_parts is empty, but name exists
                      # This is a common interpretation: projects_folder / filename
                      final_input_path = base_projects_folder / input_path_from_info.name
                  else: # Path was like "somedir/" or just "." and parts[1:] was problematic
                      print(f"Warning: Could not resolve relative input path '{input_path_from_info_str}' robustly for {fol_path} using parts[1:] logic. Falling back.")
                      final_input_path = base_projects_folder / input_path_from_info # General fallback
        else:
            final_input_path = input_path_from_info # It's absolute

        if final_input_path is None or not final_input_path.exists() or final_input_path.is_dir():
            # print(f"Skipping {fol_path}: Input file {final_input_path} not found or is a directory.")
            return processed_groups_for_this_fol
        # --- End Input path resolution ---

        with open(final_input_path, 'r', encoding='utf-8') as text_file: # Added encoding
            text = text_file.read()

        abbreviations = load_abbreviations(text)

        for group in results_content.get('groups', []):
            # Iterate over a list of (key, value) items if modifying the dict by adding keys.
            # In this case, new keys are added (_resolved), so this is safer.
            for key, value in list(group.items()):
                if isinstance(value, str):
                    resolved_key_name = f"{key}_resolved"
                    processed_value = find_and_remove_definitions(value, abbreviations)
                    processed_value = resolve_abbreviations(processed_value, abbreviations)
                    group[resolved_key_name] = normalize_string(processed_value)

        # --- Identifiers.json path resolution (based on *resolved* final_input_path) ---
        # Original: identifiers_json = input.parents[2] / 'identifiers.json'
        # 'input' in original context was the resolved path. So, use final_input_path.
        if len(final_input_path.parents) > 2: # Ensures parents[0,1,2] exist
            identifiers_json_path = final_input_path.parents[2] / 'identifiers.json'
            if identifiers_json_path.exists():
                with open(identifiers_json_path, 'r', encoding='utf-8') as f: # Added encoding
                    identifiers_data = json.load(f)
                pmid = identifiers_data.get('pmid')
                if pmid is not None:
                    for group in results_content.get('groups', []):
                        group['pmid'] = pmid
            # else:
                # print(f"Identifiers file {identifiers_json_path} not found for {fol_path}")
        # else:
            # print(f"Input path {final_input_path} for {fol_path} does not have enough parent directories for identifiers.json.")
        # --- End Identifiers.json path resolution ---

        # Append a copy of each modified group
        for group in results_content.get('groups', []):
            processed_groups_for_this_fol.append(group.copy()) # Append copies

        return processed_groups_for_this_fol

    except Exception as e:
        print(f"Error processing folder {fol_path}: {e}")
        # import traceback # For more detailed debugging
        # traceback.print_exc()
        return [] # Return empty list on error to not break the whole process


In [6]:
# all_results_aggregated = []

# # Ensure we are only trying to process directories within data_source
# folders_to_process = [fol for fol in data_source.iterdir() if fol.is_dir()]

# # Use ProcessPoolExecutor for parallel processing
# # max_workers=None will use the number of CPUs on the machine
# with ProcessPoolExecutor(max_workers=None) as executor:
#     # Use functools.partial to pass the 'projects_folder' argument to the worker function
#     # This keeps the worker function signature clean for executor.map
#     worker_task = partial(process_single_folder, base_projects_folder=projects_folder)
    
#     # executor.map applies worker_task to each item in folders_to_process
#     # It returns an iterator, so convert to list to ensure all tasks complete and results are gathered
#     list_of_group_lists = list(executor.map(worker_task, folders_to_process))

# # Flatten the list of lists into a single list of group dictionaries
# for group_list_from_folder in list_of_group_lists:
#     if group_list_from_folder: # Check if the list is not empty (e.g., due to an error in worker)
#         all_results_aggregated.extend(group_list_from_folder)

# # Create a DataFrame from the aggregated list of dictionaries
# results_df = pd.DataFrame(all_results_aggregated)

In [34]:
results_df.group_name.value_counts()

group_name
healthy      23999
patients     16417
relatives        3
controls         2
siblings         1
children         1
adults           1
Name: count, dtype: int64

In [40]:
results_df.diagnosis_resolved.value_counts()[100:]

diagnosis_resolved
Schizophrenia Spectrum Disorder                                                                                                          19
Attention Deficit/Hyperactivity Disorder                                                                                                 19
Healthy Comparison Subjects                                                                                                              19
Huntington'S Disease                                                                                                                     19
Essential Tremor                                                                                                                         19
                                                                                                                                         ..
Bipolar I Disorder, Bipolar Ii Disorder, Or Bipolar Disorder Not Otherwise Specified                                                      1
S

### ONVOC normalization

In [9]:
import json 
from collections import defaultdict
normalized = json.load(open('onvoc-normalized.json'))

In [10]:
mappings = defaultdict(list)

for k, v in normalized.items():
    mappings[v].append(k)

In [11]:
len(mappings['None of the above / Other'])

2182

### Gemini mappings for None/ Other

In [16]:
mappings = json.load(open('onvoc-gemini-mappings.json'))

In [None]:
results_df['diagnosis_resolved'].map(mappings)

0                                                      NaN
1                                                      NaN
2        Tinnitus:Including Idiopathic Tinnitus (variou...
3                                                      NaN
4                                                      NaN
                               ...                        
40419                                                  NaN
40420                                                  NaN
40421                                                  NaN
40422                                                  NaN
40423                                                  NaN
Name: diagnosis_resolved, Length: 40424, dtype: object

In [27]:
mappings = {k: v.split(':')[0] for k, v in mappings.items()}

In [28]:
results_df['onvoc_diagnosis_gemini+'] = results_df['diagnosis_resolved'].map(mappings)

In [30]:
results_df[results_df.onvoc_diagnosis == 'None of the above / Other']["onvoc_diagnosis_gemini+"].value_counts().head(25)

onvoc_diagnosis_gemini+
None of the above / Other                                             729
Aphasia and Related Communication Disorders                           172
Tinnitus                                                              155
Nicotine/Tobacco                                                      139
Chromosomal Abnormalities & Associated Syndromes                      104
Dissociative Disorders                                                 94
Amyotrophic Lateral Sclerosis (ALS) / Motor Neuron Disease             93
Dystonias                                                              90
HIV-Related Conditions                                                 86
Strabismus & Amblyopia                                                 71
Trichotillomania, Hair Pulling & Skin Picking Disorders                69
Burns & Electrical Injury                                              68
Huntington's Disease (HD)                                              66
Depressive Dis

In [31]:
results_df.to_csv('pd_normalized.csv', index=False)