In [1]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.util import ngrams
from nltk.corpus import stopwords, words
import string
import pandas as pd
from string import punctuation
punctuation = set(punctuation)
from nltk.stem import PorterStemmer
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import re
import nbimporter
from scipy.stats import skew
from tabulate import tabulate

# needed to import country codes generated in the processing file because it is an input in one of our functions


# load NLTK words corpus for English

english_words = set(words.words())

In [2]:
def process_text(text):
    '''Function to process text fields.
    Involves removing punctuation, tokenizing text, removing stopwords, lemmatizing tokens, folding to lowercase, removing any words 
    that are not in NLTK's word dictionary.'''
    # Define punctuation set
    punctuation = set(string.punctuation)
    # define words
    english_words = set(words.words())
    # Add additional punctuation character
    additional_punctuation = {'‘', '—', '“', '«'}
    punctuation.update(additional_punctuation)

    # Tokenize text using NLTK's word_tokenize
    tokens = word_tokenize(text)
    
    # Remove stopwords and punctuation, and lemmatize tokens
    sw = set(stopwords.words("english"))
    lemmatizer = WordNetLemmatizer()
    tokens = [
        lemmatizer.lemmatize(token.lower())
        for token in tokens
        if token.lower() not in sw and all(char not in punctuation for char in token)
        # remove numerical tokens   
        and not token.isdigit()
        # remove tokens with just one character
        and len(token) > 1 
        and token not in {
            'department', 'health', 'public', 'food', 'drug', 'administration',
            'release', 'report', 'research', 'methodology', 'approach', 'certain',
            'energy', 'commission', 'ultimately', 'finding', 'investigation', 'also',
            'available', 'center', 'disease', 'control', 'us', 'federal', 'authority',
            'rounding', 'register', 'determine', 'absence', 'presence', 'de', 'use',
            'unless', 'work', 'article', 'editor', 'publication', 'since', 'upon',
            'many', 'meet', 'every', 'one', 'two', 'three', 'four', 'five', 'six',
            'seven', 'eight', 'ago', 'name', 'address'
        }
    ]

    # check if tokens are in NLTK's word list - do not include, if not
    tokens_in_dictionary = [
        token
        for token in tokens
        if token in english_words
    ]
    
    return tokens_in_dictionary

In [3]:
# process text with 2-token n-grams for language context
def process_text_grams(text):
    # Define punctuation set
    punctuation = set(string.punctuation)
    # Add additional punctuation character
    punctuation.update({'‘'})

    # Tokenize text using NLTK's word_tokenize
    tokens = word_tokenize(text)
    
    # Remove stopwords and punctuation, and lemmatize tokens
    sw = set(stopwords.words("english"))
    lemmatizer = WordNetLemmatizer()
    tokens = [
        lemmatizer.lemmatize(token.lower())
        for token in tokens
        if token.lower() not in sw and all(char not in punctuation for char in token)
    ]
    
    # Generate bigrams
    bigrams = list(ngrams(tokens, 2))
    
    # Combine tokens and bigrams into one list
    combined_tokens = tokens + [' '.join(bigram) for bigram in bigrams]
    
    return combined_tokens

In [4]:
# Define the text processing function for the drug labels specifically
def process_label_text(text):
    if isinstance(text, str):  # Check if the input is a string
        # Define punctuation set
        punctuation = set(string.punctuation)
        
        # Replace punctuation with spaces
        for p in punctuation:
            text = text.replace(p, ' ')
        
        # Tokenize text using NLTK's word_tokenize
        tokens = word_tokenize(text)

        # Remove stopwords and lemmatize tokens
        sw = set(stopwords.words("english"))
        lemmatizer = WordNetLemmatizer()
        tokens = [
            lemmatizer.lemmatize(token.lower())
            for token in tokens
            if token.lower() not in sw
        ]

        # Remove duplicate tokens while maintaining order
        seen = set()
        unique_tokens = []
        for token in tokens:
            if token not in seen:
                seen.add(token)
                unique_tokens.append(token)

        # Remove "nan" tokens if present
        unique_tokens = [token for token in unique_tokens if token != 'nan']

        # If the resulting list is empty, return pd.NA
        if not unique_tokens:
            return pd.NA
        
        return unique_tokens  # Return the list of tokens
    else:
        return text  # Return the original value if it's not a string

In [5]:
# Define the text processing function for the drug labels specifically
def process_label_text(text):
    if isinstance(text, str):  # Check if the input is a string
        # Define punctuation set
        punctuation = set(string.punctuation)
        
        # Replace punctuation with spaces
        for p in punctuation:
            text = text.replace(p, ' ')
        
        # Tokenize text using NLTK's word_tokenize
        tokens = word_tokenize(text)

        # Remove stopwords and lemmatize tokens
        sw = set(stopwords.words("english"))
        lemmatizer = WordNetLemmatizer()
        tokens = [
            lemmatizer.lemmatize(token.lower())
            for token in tokens
            if token.lower() not in sw
        ]

        # Remove duplicate tokens while maintaining order
        seen = set()
        unique_tokens = []
        for token in tokens:
            if token not in seen:
                seen.add(token)
                unique_tokens.append(token)

        # Remove "nan" tokens if present
        unique_tokens = [token for token in unique_tokens if token != 'nan']

        # If the resulting list is empty, return pd.NA
        if not unique_tokens:
            return pd.NA

         # Generate bigrams
        bigrams = list(ngrams(unique_tokens, 2))
        
        # Combine tokens and bigrams into one list
        combined_tokens = unique_tokens + [' '.join(bigram) for bigram in bigrams]
        
        return combined_tokens  # Return the list of tokens
   
    else:
        return text  # Return the original value if it's not a string

In [6]:
# Create function to make unique IDs for each table
def add_sequential_index(df, index_col_name):

    # Reset the index and rename the index column to input index_col_name
    df = df.reset_index().rename(columns={"index": index_col_name})
    
    # Add 1 to index to start index from 1 instead of 0
    df[index_col_name] = df[index_col_name] + 1
    
    return df

In [7]:
# Function to return count of NaN and proportion of NaN in each column for a dataframe
def nan_info(df):
    # Count # of NA values
    nan_counts = df.isna().sum()
    
    # Calculate proportion of NA values
    prop_null = (nan_counts / len(df)) * 100
    
    # Create a DataFrame to store the information
    nan_info = pd.DataFrame({
        'column_name': nan_counts.index,
        'null_count': nan_counts.values,
        'null_proportion': prop_null.values
    })
    
    return nan_info

In [8]:
# Missing values to null for now (simplifies type conversions & plotting)
def na_to_null(df, column):
    df[column] = df[column].replace('N/A', np.nan)
    return df

In [9]:
# Function to remove duplicates
def remove_duplicates(tokens):
    return list(set(tokens))

In [10]:
# Function to remove duplicates and handle NaNs
def remove_duplicates_nan(tokens):
    if isinstance(tokens, list):
        # Remove "nan" tokens if present
        tokens = [token for token in tokens if token != 'nan']
        
        # Return pd.NA if the list is empty after removing "nan" tokens
        if not tokens:
            return pd.NA
        return list(set(tokens))
    else:
        return tokens

In [11]:
# Function to classify the product type
def classify_product_type(product_types):
    if 'human otc' in product_types:
        return 2
    elif 'human prescription' in product_types:
        return 1
    else:
        return 0

In [12]:
# Process medrap reaction terms
# Remove spacing and replace with a period, lowercase all letters



In [13]:
# Map age units to years, based on code specified here: https://open.fda.gov/apis/drug/event/searchable-fields/
def convert_to_years(age, unit):
    if pd.isna(unit):  # Check if value is NaN/None
        return np.nan
    elif unit == 800:  # Decade
        return age * 10
    elif unit == 801:  # Year
        return age
    elif unit == 802:  # Month
        return age / 12
    elif unit == 803:  # Week
        return age / 52
    elif unit == 804:  # Day
        return age / 365
    elif unit == 805:  # Hour
        return age / (365 * 24)
    else:
        return np.nan  # Return NaN for unknown units

In [14]:
# Return boxplot of character length for object columns, as well as descriptive statistics of character length
def plot_character_length(df, df_name):
    #filter for object columns
    documents_table_object_cols = df.select_dtypes(include=['object'])

    # Calculate the number of characters in each column
    character_counts = documents_table_object_cols.applymap(lambda x: len(str(x)))

    # Generate boxplot
    plt.figure(figsize=(10,8))
    sns.boxplot(data=character_counts, color='hotpink', orient = 'h')
    plt.title(f'Number of Characters in Each Object Column - {df_name}')
    plt.ylabel('Column')
    plt.xlabel('Number of Characters')
    plt.grid(True)
    plt.show()

    # Statistics Table
    stats_table = character_counts.describe().transpose()
    print("\nDescriptive Statistics on Character Length:")
    print(stats_table)

In [15]:
#function for detecting upper outliers
def examine_text_outliers(series):

    # acquire the mean and standard deviation of string lengths
    mean_length = series.str.len().mean()
    std_length = series.str.len().std()

    # calculate upper bound for outlier detection
    upper_bound = mean_length + 2 * std_length 

    # identify rows with string lengths above the upper bound
    upper_length_outliers = series[series.str.len() > upper_bound]

    return upper_length_outliers


In [2]:
def process_company_text(text):
    import re
    import string
    import pandas as pd
    extra_abv = ['nldsp', 'usasp', 'company', 'bax', 'spo',
                'ccaza', 'cinry', 'and', 'cansp', 'oxyc',
            'scpr', 'gbrct', 'gbrsp', 'tjp', 'unk',
            'frasp', 'brasp', 'sol', 'cbst','pmco',
            'jpnct', 'frua', 'espct', 'pre',
            'dsu', 'gmbh', 'dse', 'belsp', 'crisp',
            'kdl', 'irlsp', 'mpi', 'avee', 'usani', 
            'sun', 'belct', 'itasp', 'hkgsp', 'argsp']

    country_codes_df = pd.read_csv('country_codes.csv')
    country_codes = country_codes_df['codes'].tolist()
    if isinstance(text, str):

        # Remove punctuation and replace with a space
        text = re.sub(f'[{string.punctuation}]', ' ', text)
        
        # Remove all numerical characters and replce with a space
        text = re.sub(r'\d+', ' ', text)
    
        # Tokenize words on whitespace
        tokens = text.split()
    
        # Convert all characters to lowercase
        tokens = [token.lower() for token in tokens]
    
        # Only retain tokens with 3 or more characters and remove 2-character country codes
        tokens = [token for token in tokens if len(token) >= 2 
                  and token not in country_codes
                 and token not in extra_abv]

        # Update public health reporting entity labels
        token_replacements = {"phhy": "pubhosp", "pheh": "pubhosp", "phho": "pubhosp", "phfr": "pubhosp"}
        tokens = [token_replacements.get(token, token) for token in tokens]

        # Update long manufacturer names
    
        # Replace entire token list if it contains "ridgefield"
        if 'ridgefield' in tokens:
            tokens = ['bi', 'pharmaceuticals']

        # Replace entire token list if it contains both "ge" and "healthcare"
        if 'ge' in tokens and 'healthcare' in tokens:
            tokens = ['ge', 'healthcare']

        # Alexion pharma inc.
        if 'alexion' in tokens:
            tokens = ['alexion']

        # Assign pd.NA if the token list is empty
        if not tokens:
            return pd.NA
    
        return tokens

In [4]:
# Function to clean manufacturer text 
def clean_manufacturer_text(text_list):
    import re
    
    if not text_list or not isinstance(text_list, list):
        return pd.NA

    cleaned_tokens = []
    for text in text_list:
        if isinstance(text, str):
            # Remove the words "inc" and "llc" ignoring case
            text = re.sub(r'\b(?:inc|llc|ltd|lp|corp|usa)\b', '', text, flags=re.IGNORECASE)

            # Remove all punctuation except commas and replace with spaces
            text = re.sub(f"[{re.escape(string.punctuation.replace(',', ''))}]", ' ', text)
            
            # Remove any instances of two commas in a row, and replace with just a single comma
            text = re.sub(r',+', ',', text)

            # Remove all spaces and replace with dashes
            text = text.replace(' ', '-')

            # Remove any instances of two or more dashes in a row, and replace with just a single dash
            text = re.sub(r'-+', '-', text)

            # Tokenize text by splitting on commas
            tokens = text.split(',')

            # Convert all tokens to lowercase
            tokens = [token.lower().strip() for token in tokens]

            # Append tokens to cleaned_tokens list
            cleaned_tokens.extend(tokens)

    # Remove any empty strings from the list and remaining dashes
    cleaned_tokens = [token for token in cleaned_tokens if token and not re.fullmatch(r'-+', token)]

    # Remove any country code or location strings

    # Update long names to standardized abbreviations dictionary

    # If the resulting token list is empty, assign pd.NA
    if not cleaned_tokens:
        return pd.NA

    return cleaned_tokens

In [7]:
def descriptive_stats(tokens, top_n=5, verbose=True):
    # Flatten the list of tokens and filter out any float values
    flat_tokens = [token for token in tokens if not isinstance(token, float)]
    
    # Calculate the total number of tokens
    total_tokens = len(flat_tokens)
    
    # Calculate the number of unique tokens
    num_unique_tokens = len(set(flat_tokens))
    
    # Calculate lexical diversity
    lexical_diversity = num_unique_tokens / total_tokens if total_tokens > 0 else 0
    
    # Calculate the number of characters
    num_characters = sum(len(token) for token in flat_tokens)
    
    # Calculate the average token length
    avg_token_length = num_characters / total_tokens if total_tokens > 0 else 0
    
    # Calculate token length variance
    token_lengths = [len(token) for token in flat_tokens]
    token_length_variance = np.var(token_lengths)
    
    # Calculate token length standard deviation
    token_length_std_dev = np.std(token_lengths)
    
    # Find the most common tokens
    most_common_tokens = Counter(flat_tokens).most_common(top_n)
    
    if verbose:
        print(f"There are {total_tokens} tokens in the data.")
        print(f"There are {num_unique_tokens} unique tokens in the data.")
        print(f"There are {num_characters} characters in the data.")
        print(f"The lexical diversity is {lexical_diversity:.3f} in the data.")
        print(f"The average token length is {avg_token_length:.3f} in the data.")
        print(f"The variance of token lengths is {token_length_variance:.3f} in the data.")
        print(f"The standard deviation of token lengths is {token_length_std_dev:.3f} in the data.")
        print(f"The {top_n} most common tokens are {most_common_tokens} in the data.")
    
    return [
        total_tokens,
        num_unique_tokens,
        lexical_diversity,
        num_characters,
        avg_token_length,
        token_length_variance,
        token_length_std_dev,
        most_common_tokens
    ]

In [19]:
def calculate_descriptives(column):
    # Calculate mean
    mean_value = column.mean()
    
    # Calculate standard deviation
    std_deviation = column.std()
    
    # Calculate variance
    variance_value = column.var()
    
    # Calculate skewness
    skewness_value = skew(column)
    
    return mean_value, std_deviation, variance_value, skewness_value

# Example usage:
# Create a sample DataFrame
data = {
    'numbers': [2, 4, 6, 8, 10]
}
df = pd.DataFrame(data)

# Calculate descriptives for the 'numbers' column
mean_val, std_dev, var, skewness = calculate_descriptives(df['numbers'])

# Print the results
print(f"Mean: {mean_val}")
print(f"Standard Deviation: {std_dev}")
print(f"Variance: {var}")
print(f"Skewness: {skewness}")


Mean: 6.0
Standard Deviation: 3.1622776601683795
Variance: 10.0
Skewness: 0.0


In [20]:
def calculate_descriptives(column):
    """Calculate descriptives for numerical column inputs"""
    median_value = column.median()

    mean_value = column.mean()
        
    std_deviation = column.std()
    
    variance_value = column.var()
    
    skewness_value = skew(column)
    
    q1 = column.quantile(0.25)
    
    q3 = column.quantile(0.75)
    
    # Prepare data for tabulation
    headers = ['Statistic', 'Value']
    data = [
        ['Median (Md)', median_value],
        ['Mean (x-bar)', mean_value],
        ['Standard Deviation (s)', std_deviation],
        ['Variance (s2)', variance_value],
        ['Skewness', skewness_value],
        ['First Quartile (Q1)', q1],
        ['Third Quartile (Q3)', q3]
    ]
    
    # Print the table
    print(tabulate(data, headers=headers, tablefmt='grid'))

In [21]:
# drugs are currently in list format, clean to work with
def clean_data(x):
    if isinstance(x, list):
        return ','.join(x)
    elif isinstance(x, str):
        return x  # Handle strings as needed
    else:
        return x  # Handle other types as needed


In [22]:
# match text
def contains_unique_value(text, unique_values_lower):
    matched_texts = []
    text_lower = text.lower()
    for value in unique_values_lower:
        if value in text_lower:
            matched_texts.append(value)
    return matched_texts

In [1]:
# Function to convert list to concatenated string
def list_to_string(lst):
    if isinstance(lst, list):
        return ', '.join(lst)
    else:
        return lst  # Handle non-list values if any