In [14]:
import re
import json
import string
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

# Download required NLTK resources
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [3]:
# Common biomedical abbreviations and their expanded forms
bio_abbreviations = {
    "AD": "Alzheimer's disease",
    "MI": "myocardial infarction",
    "HTN": "hypertension",
    "DM": "diabetes mellitus",
    "CHF": "congestive heart failure",
    "COPD": "chronic obstructive pulmonary disease",
    "RA": "rheumatoid arthritis",
    "MS": "multiple sclerosis",
    "ASMD": "ASM-deficient Niemann-Pick disease",
    # Add more as needed
}

# Common misspellings of drug names
drug_spelling_corrections = {
    "acetaminophen": ["acetaminophen", "acetaminophine", "acetaminofin"],
    "ibuprofen": ["ibuprofen", "ibuprofin", "ibuprophen"],
    "amoxicillin": ["amoxicillin", "amoxicilin", "amoxicillan"],
    # Add more as needed
}

# Create reverse mapping for drug spelling corrections
drug_spelling_map = {}
for correct, variants in drug_spelling_corrections.items():
    for variant in variants:
        if variant != correct:
            drug_spelling_map[variant] = correct

In [2]:
def fix_encoding_issues(text):
    """
    Fix common encoding issues in text.

    Args:
        text (str): The input text with potential encoding issues

    Returns:
        str: Text with fixed encoding issues
    """
    # Replace common problematic characters
    replacements = {
        '\x92': "'",    # Right single quotation mark
        '\x93': '"',    # Left double quotation mark
        '\x94': '"',    # Right double quotation mark
        '\x96': '-',    # En dash
        '\x97': '-',    # Em dash
        '\xa0': ' ',    # Non-breaking space
        '&amp;': '&',   # HTML ampersand
        '&lt;': '<',    # HTML less than
        '&gt;': '>',    # HTML greater than
    }

    for old, new in replacements.items():
        text = text.replace(old, new)

    return text


In [4]:
def standardize_punctuation(text, keep_punctuation=True):
    """
    Standardize punctuation in text.

    Args:
        text (str): The input text
        keep_punctuation (bool): Whether to keep biomedically relevant punctuation

    Returns:
        str: Text with standardized punctuation
    """
    if keep_punctuation:
        # Replace multiple dashes with single dash (but keep the dash)
        text = re.sub(r'-+', '-', text)

        # Ensure spaces around punctuation except for specific cases
        # Keep punctuation in patterns like "COVID-19", "5-HTP", "50mg"
        for punct in [',', '.', ';', ':', '!', '?']:
            text = re.sub(f'(?<![A-Za-z0-9]){re.escape(punct)}', f' {punct} ', text)

        # Standardize parentheses with spaces
        text = re.sub(r'\(', ' ( ', text)
        text = re.sub(r'\)', ' ) ', text)

        # Fix spaces
        text = re.sub(r'\s+', ' ', text)
    else:
        # Remove punctuation entirely (not recommended for biomedical NER)
        text = text.translate(str.maketrans('', '', string.punctuation))

    return text.strip()

In [5]:
def normalize_case(text, preserve_case=False):
    """
    Normalize the case of text.

    Args:
        text (str): The input text
        preserve_case (bool): Whether to preserve the original case

    Returns:
        str: Text with normalized case
    """
    if not preserve_case:
        text = text.lower()
    return text

In [6]:
def expand_abbreviations(text, abbreviations=bio_abbreviations):
    """
    Expand common biomedical abbreviations.

    Args:
        text (str): The input text
        abbreviations (dict): Dictionary of abbreviations and their expanded forms

    Returns:
        str: Text with expanded abbreviations
    """
    words = text.split()
    for i, word in enumerate(words):
        # Check if word is a known abbreviation (as a standalone word)
        if word in abbreviations:
            # Replace with the expanded form
            words[i] = abbreviations[word]
    return ' '.join(words)

In [7]:
def correct_drug_spelling(text, spelling_map=drug_spelling_map):
    """
    Correct common misspellings of drug names.

    Args:
        text (str): The input text
        spelling_map (dict): Dictionary mapping misspelled drugs to their correct spelling

    Returns:
        str: Text with corrected drug spellings
    """
    words = text.split()
    for i, word in enumerate(words):
        lower_word = word.lower()
        if lower_word in spelling_map:
            # Replace with correct spelling but preserve case pattern
            if word.isupper():
                words[i] = spelling_map[lower_word].upper()
            elif word[0].isupper():
                words[i] = spelling_map[lower_word].capitalize()
            else:
                words[i] = spelling_map[lower_word]
    return ' '.join(words)

In [8]:
def remove_stopwords(text, standard_stopwords, custom_stopwords=None):
    """
    Remove stopwords from text.

    Args:
        text (str): The input text
        standard_stopwords (set): Set of standard stopwords
        custom_stopwords (list, optional): List of custom stopwords

    Returns:
        str: Text with stopwords removed
    """
    # Tokenize text
    tokens = word_tokenize(text)

    # Filter out standard stopwords
    filtered_tokens = [token for token in tokens if token.lower() not in standard_stopwords]

    # Filter out custom stopwords if provided
    if custom_stopwords:
        custom_stopwords_lower = [word.lower() for word in custom_stopwords]
        filtered_tokens = [token for token in filtered_tokens
                          if token.lower() not in custom_stopwords_lower]

    # Join tokens back into text
    return ' '.join(filtered_tokens)



In [9]:
def preprocess_text(text, preserve_case=False, keep_punctuation=True,
                   remove_stops=True, custom_stopwords=None):
    """
    Apply all preprocessing steps to the input text.

    Args:
        text (str): The input text
        preserve_case (bool): Whether to preserve the original case
        keep_punctuation (bool): Whether to keep biomedically relevant punctuation
        remove_stops (bool): Whether to remove stopwords
        custom_stopwords (list): Custom stopwords to remove

    Returns:
        str: Fully preprocessed text
    """
    if not text or not isinstance(text, str):
        return ""

    # Apply preprocessing steps in sequence
    text = fix_encoding_issues(text)
    text = standardize_punctuation(text, keep_punctuation)

    if not preserve_case:
        text = normalize_case(text)

    text = expand_abbreviations(text)
    text = correct_drug_spelling(text)

    # Remove stopwords if specified
    if remove_stops:
        standard_stopwords = set(stopwords.words('english'))
        text = remove_stopwords(text, standard_stopwords, custom_stopwords)

    # Ensure clean whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text


In [10]:
def process_json_file(input_file_path, output_file_path, fields_to_process=None,
                     preserve_case=False, keep_punctuation=True, remove_stops=True):
    """
    Process text fields in a JSON file.

    Args:
        input_file_path (str): Path to the input JSON file
        output_file_path (str): Path to save the processed JSON file
        fields_to_process (list): List of specific fields to process
        preserve_case (bool): Whether to preserve the original case
        keep_punctuation (bool): Whether to keep biomedically relevant punctuation
        remove_stops (bool): Whether to remove stopwords
    """
    # Define custom stopwords for biomedical documents
    custom_stopwords = [
        "Warnings", "Precautions", "Use", "Specific", "Populations",
        "see", "contraindications", "indications", "dosage", "administration",
        "adverse", "reactions", "drug", "interactions", "clinical", "studies"
    ]

    # Load the JSON file
    with open(input_file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)

    # If specific fields are provided, only process those
    if fields_to_process:
        for field in fields_to_process:
            if field in data and isinstance(data[field], str):
                # Store original field value
                data[f"{field}_original"] = data[field]

                # Apply text preprocessing
                data[field] = preprocess_text(
                    data[field],
                    preserve_case=preserve_case,
                    keep_punctuation=keep_punctuation,
                    remove_stops=remove_stops,
                    custom_stopwords=custom_stopwords
                )
    else:
        # Process all string fields in the JSON
        processed_data = process_json_object(
            data,
            preserve_case=preserve_case,
            keep_punctuation=keep_punctuation,
            remove_stops=remove_stops,
            custom_stopwords=custom_stopwords
        )
        data = processed_data

    # Save the processed JSON
    with open(output_file_path, 'w', encoding='utf-8') as file:
        json.dump(data, file, indent=4, ensure_ascii=False)

    print(f"Processed JSON saved to {output_file_path}")

In [11]:
def process_json_object(obj, preserve_case=False, keep_punctuation=True,
                       remove_stops=True, custom_stopwords=None):
    """
    Recursively process a JSON object, preprocessing text fields.

    Args:
        obj: JSON object (dict, list, or primitive value)
        preserve_case (bool): Whether to preserve the original case
        keep_punctuation (bool): Whether to keep biomedically relevant punctuation
        remove_stops (bool): Whether to remove stopwords
        custom_stopwords (list): Custom stopwords to remove

    Returns:
        The processed JSON object
    """
    if isinstance(obj, dict):
        result = {}
        for key, value in obj.items():
            if isinstance(value, str):
                # Preprocess text fields
                result[f"{key}_original"] = value
                result[key] = preprocess_text(
                    value,
                    preserve_case=preserve_case,
                    keep_punctuation=keep_punctuation,
                    remove_stops=remove_stops,
                    custom_stopwords=custom_stopwords
                )
            else:
                # Recursively process non-string fields
                result[key] = process_json_object(
                    value,
                    preserve_case=preserve_case,
                    keep_punctuation=keep_punctuation,
                    remove_stops=remove_stops,
                    custom_stopwords=custom_stopwords
                )
        return result
    elif isinstance(obj, list):
        return [process_json_object(
            item,
            preserve_case=preserve_case,
            keep_punctuation=keep_punctuation,
            remove_stops=remove_stops,
            custom_stopwords=custom_stopwords
        ) for item in obj]
    else:
        # Return primitive values unchanged
        return obj

In [15]:
input_file = "sample_data/fe49a0d2-1f44-446c-9144-56ba9ca2cd6a.json"  # Replace with your input file
output_file = "sample_data/processed_drug_data.json" # Custom stopwords for specific fields

# Process only specific fields
fields_to_process = ["contraindications", "indications", "warningsAndPrecautions", "adverseReactions"]

# Process the JSON file
process_json_file(
    input_file,
    output_file,
    fields_to_process=fields_to_process,
    preserve_case=False,
    keep_punctuation=True,
    remove_stops=True
)

Processed JSON saved to sample_data/processed_drug_data.json
