In [50]:
import pandas as pd

In [60]:
df_c_trials = pd.read_csv("data/clinical_trials.csv")
df_drugs = pd.read_csv("data/drugs.csv")
df_pubmed_csv = pd.read_csv("data/pubmed.csv")
df_pubmed_json = pd.read_json("data/pubmed.json")

In [61]:
COLS_CLEAN_MAPPING = {
    "drugs": {
        "date_columns": [],
        "drop_na_columns": ["drug"],
        "text_search_columns": ["drug"],
        "id_prefix": "drug",
        "id_column": "atccode"
    },
    "pubmed": {
            "date_columns": ["date"],
            "drop_na_columns": ["title", "journal"],
            "text_search_columns": ["title"],
            "id_prefix": "pubmed",
            "id_column": "id"
    },
    "clinical": {
            "date_columns": ["date"],
            "drop_na_columns": ["scientific_title", "journal"],
            "text_search_columns": ["scientific_title"],
            "id_prefix": "clinical",
            "id_column": "id"
    }
}

In [62]:
class DataCleaner:
    """
    A utility class for cleaning and standardizing data in a pandas DataFrame.

    The class provides methods to:
    - Clean ID columns
    - Standardize date values to a specific format
    - Remove rows with missing values in specific columns
    - Remove special characters from specific text columns
    - Normalize text formatting (lowercase, whitespace trimming) for specific columns

    :param date_columns: List of column names containing date values to standardize.
    :type date_columns: list
    :param drop_na_columns: List of column names to drop rows if they contain missing values.
    :type drop_na_columns: list
    :param text_search_columns: List of text columns to clean and standardize.
    :type text_search_columns: list
    :param id_column: Name of the ID column to clean.
    :type id_column: str
    :param id_prefix: Prefix to add to each ID value for uniqueness.
    :type id_prefix: str
    """

    def __init__(
            self, date_columns: list, drop_na_columns: list, text_search_columns: list, id_column: str, id_prefix: str):
        self.standard_date_format = "%Y-%m-%d"
        self.date_columns = date_columns
        self.drop_na_columns = drop_na_columns
        self.text_search_columns = text_search_columns
        self.id_prefix = id_prefix
        self.id_column = id_column

    def clean_id(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Clean ID by homogenizing type to str and adding a prefix to distinguish identical IDs from different
        sources of input data.

        :param df: Dataframe.
        :type df: pd.DataFrame
        :param id_column: ID column name.
        :type id_column: str
        :return: Dataframe with cleaned ID.
        :rtype: pd.DataFrame
        :raises ValueError: If the specified column is not found in the dataframe.
        :raises Exception: For any other errors encountered during processing.
        """

        try:
            df[self.id_column] = df[self.id_column].astype(str)
            df[self.id_column] = df[self.id_column].apply(
                lambda x: "{}_{}".format(self.id_prefix, x)
            )
            df.drop_duplicates(subset=[self.id_column]).reset_index(drop=True)
            return df
        except KeyError:
            raise ValueError(f"Column '{self.id_column}' not found in the dataframe.")
        except Exception as e:
            raise Exception(f"Error converting ID column to string. More details here: {e}")

    def standardize_date_format(self, df: pd.DataFrame, date_column: str) -> pd.DataFrame:
        """
        Standardize all date values to a specific format for a given column.

        :param df: Dataframe.
        :type df: pd.DataFrame
        :param date_column: Date column name.
        :type df: str.
        :return: Dataframe with column standardized date.
        :rtype: pd.DataFrame
        :raises ValueError: If the specified column is not found in the dataframe.
        :raises Exception: For any other errors encountered during processing.
        """

        try:
            df[date_column] = pd.to_datetime(df[date_column], errors="coerce").dt.strftime(
                self.standard_date_format
            )
            return df
        except KeyError:
            raise ValueError(f"Column '{date_column}' not found in the dataframe.")
        except Exception as e:
            raise Exception(f"Error standardizing date values. More details here: {e}")

    @staticmethod
    def remove_rows_missing_column_value(df: pd.DataFrame, column_to_drop: str) -> pd.DataFrame:
        """
        Remove rows where a given column NaN.

        :param df: Dataframe.
        :type df: pd.DataFrame
        :param column_to_drop: Name of column to drop row containing NaN value on it.
        :type column_to_drop: str
        :return: The dataframe with rows with NaN values for given columns removed.
        :rtype: pd.DataFrame
        :raises ValueError: If the specified column is not found in the dataframe.
        :raises Exception: For any other errors encountered during processing.
        """

        try:
            return df.dropna(subset=[column_to_drop])
        except KeyError:
            raise ValueError(
                f"Columns '{column_to_drop}' not found in the dataframe."
            )
        except Exception as e:
            raise Exception(f"Error removing rows with empty titles or journals. More details here: {e}")

    @staticmethod
    def remove_special_characters(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
        """
        Remove special characters from a specified text column in a dataframe.
        Only ASCII characters, word characters, whitespace, and hyphens are preserved.

        :param df: Dataframe.
        :type df: pd.DataFrame
        :param column_name: Name of column from which to remove special characters.
        :type column_name: str
        :return: Dataframe with specified column cleaned of special characters.
        :rtype: pd.DataFrame
        :raises ValueError: If the specified column is not found in the dataframe.
        :raises Exception: For any other errors encountered during processing.
        """

        try:
            df[column_name] = (
                df[column_name].str.encode("ascii", "ignore").str.decode("utf-8")
            )
            df[column_name] = df[column_name].str.replace(r"[^\w\s-]", "", regex=True)
            return df
        except KeyError:
            raise ValueError(f"Column '{column_name}' not found in the dataframe.")
        except Exception as e:
            raise Exception(f"Error cleaning special characters. More details here: {e}")

    @staticmethod
    def standardize_text(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
        """
        Standardize text in a specified column by applying lower casing, trimming whitespace,
        and replacing multiple spaces with a single space.

        :param df: Dataframe.
        :type df: pd.DataFrame
        :param column_name: Name of the column to standardize.
        :type column_name: str
        :return: Dataframe with the standardized text column.
        :rtype: pd.DataFrame
        :raises ValueError: If the specified column is not found in the dataframe.
        :raises Exception: For any other errors encountered during processing.
        """

        try:
            df[column_name] = (
                df[column_name]
                .str.lower()
                .str.strip()
                .str.replace(r"\s+", " ", regex=True)
            )
            return df
        except KeyError:
            raise ValueError(f"Column '{column_name}' not found in the dataframe.")
        except Exception as e:
            raise Exception(f"Error standardizing text. More details here: {e}")

    def __call__(self, df):
        """
        Clean the given DataFrame by applying a pipeline of transformations:
        - Standardizes date formats for all specified date columns.
        - Removes rows with missing values in specified columns.
        - Removes special characters from specified text columns.
        - Standardizes text formatting (lowercasing, whitespace normalization) for text columns.

        :param df: Input DataFrame to be cleaned.
        :type df: pd.DataFrame
        :return: The cleaned and transformed DataFrame.
        :rtype: pd.DataFrame
        """

        df = self.clean_id(df)
        for col_name in self.date_columns:
            df = self.standardize_date_format(
                df=df, date_column=col_name)
        for col_name in self.drop_na_columns:
            df = self.remove_rows_missing_column_value(
                df=df, column_to_drop=col_name)
        for col_name in self.text_search_columns:
            df = self.remove_special_characters(
                df=df, column_name=col_name)
        for col_name in self.text_search_columns:
            df = self.standardize_text(
                df=df, column_name=col_name)
        return df

In [63]:
# df_drugs

In [64]:
data_cleaner = DataCleaner(**COLS_CLEAN_MAPPING["drugs"])
df_drugs_clean = data_cleaner(df=df_drugs)

In [66]:
# df_drugs_clean

In [29]:
df_drugs_clean.to_json("data/drugs_clean.json")

In [67]:
data_cleaner = DataCleaner(**COLS_CLEAN_MAPPING["clinical"])
df_c_trials_clean = data_cleaner(df=df_c_trials)

In [69]:
# df_c_trials_clean

In [58]:
df_c_trials_clean.to_json("data/clinical_clean.json", orient="records")

In [70]:
data_cleaner = DataCleaner(**COLS_CLEAN_MAPPING["pubmed"])
df_pubmed_csv_clean = data_cleaner(df=df_pubmed_csv)

In [71]:
df_pubmed_json_clean = data_cleaner(df=df_pubmed_json)

In [73]:
def concatenate_dataframe_list(dfs):
    """
    Concatenate a list of Pandas DataFrames row-wise (i.e., vertically).

    :param dfs: List of DataFrames to concatenate.
    :type dfs: List[pd.DataFrame]
    :return: A single DataFrame resulting from row-wise concatenation of all valid input DataFrames.
    :rtype: pd.DataFrame
    :raises ValueError: If no valid DataFrames are provided.
    :raises Exception: If concatenation fails for any reason.
    """

    valid_dfs = []
    for i, df in enumerate(dfs):
        if isinstance(df, pd.DataFrame):
            valid_dfs.append(df)

    if not valid_dfs:
        raise ValueError("No valid DataFrames to concatenate.")

    try:
        return pd.concat(valid_dfs, axis=0, ignore_index=True)
    except Exception as e:
        raise

In [75]:
df_pumbed_clean = concatenate_dataframe_list([df_pubmed_json_clean, df_pubmed_csv_clean])

In [76]:
df_pumbed_clean

Unnamed: 0,id,title,date,journal
0,pubmed_9,gold nanoparticles synthesized from euphorbia ...,2020-01-01,"Journal of photochemistry and photobiology. B,..."
1,pubmed_10,clinical implications of umbilical artery dopp...,2020-01-01,The journal of maternal-fetal & neonatal medicine
2,pubmed_11,effects of topical application of betamethason...,2020-01-01,Journal of back and musculoskeletal rehabilita...
3,pubmed_12,comparison of pressure release phonophoresis a...,2020-01-03,Journal of back and musculoskeletal rehabilita...
4,pubmed_,comparison of pressure betamethasone release p...,2020-01-03,The journal of maternal-fetal & neonatal medicine
5,pubmed_1,a 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
6,pubmed_2,an evaluation of benadryl pyribenzamine and ot...,2019-01-01,Journal of emergency nursing
7,pubmed_3,diphenhydramine hydrochloride helps symptoms o...,2019-02-01,The Journal of pediatrics
8,pubmed_4,tetracycline resistance patterns of lactobacil...,2020-01-01,Journal of food protection
9,pubmed_5,appositional tetracycline bone formation rates...,2020-02-01,American journal of veterinary research


In [78]:
df_pumbed_clean.to_json("data/pubmed_clean_merged.json", orient="records")

In [85]:
import pandas as pd
import logging
from typing import Dict, List
import re

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

class DataMatcher:
    """
    A class used to find and format matches between drugs and publication titles then journals
    from provided data sources.
    :param drug_col_name: Column name containing drug names in the drug DataFrame.
    :type drug_col_name: str
    :param pub_title_col_name: Column name containing publication titles in the publication DataFrame.
    :type pub_title_col_name: str
    :param journal_col_name: Column name containing journal names in the publication DataFrame.
    :type journal_col_name: str
    :param date_col_name: Column name containing publication dates in the publication DataFrame.
    :type date_col_name: str
    :param data_source: Name of the data source (used in output formatting).
    :type data_source: str
    """

    def __init__(
            self,
            drug_col_name: str,
            pub_title_col_name: str,
            journal_col_name: str,
            date_col_name: str,
            data_source: str):
        self.drug_col_name = drug_col_name
        self.pub_title_col_name = pub_title_col_name
        self.journal_col_name = journal_col_name
        self.date_col_name = date_col_name
        self.data_source = data_source

    def find_drug_pub_matches(self, df_drugs: pd.DataFrame, df_publications: pd.DataFrame) -> List[Dict[str, str]]:
        """
        Identify matches between drug names and publication titles.

        :param df_drugs: DataFrame containing drug names.
        :type df_drugs: pd.DataFrame
        :param df_publications: DataFrame containing publication data.
        :type df_publications: pd.DataFrame
        :return: A list of dictionaries representing matched drugs and publication metadata.
        :rtype: List[Dict[str, str]]
        """

        matches = []
        drug_patterns = {
            drug: re.compile(rf"\b{re.escape(drug)}\b") for drug in df_drugs[self.drug_col_name].dropna()
        }
        for drug, pattern in drug_patterns.items():
            df_matching = df_publications[df_publications[self.pub_title_col_name].str.contains(
                pattern, regex=True, na=False)]
            for _, row in df_matching.iterrows():
                matches.append(
                    {
                        "drug": drug,
                        "source": self.data_source,
                        "title": row[self.pub_title_col_name],
                        "journal": row.get(self.journal_col_name),
                        "date": row.get(self.date_col_name)
                    }
                )
        logging.info(f"Found {len(matches)} drug mentions in publications.")
        return matches

    def _format_drug_pub_matches(self, matches: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        Format matches between drug names and publication titles.

        :param matches: A list of matched drug-publication dictionaries.
        :type matches: List[Dict[str, str]]
        :return: A list of formatted matches with standardized keys.
        :rtype: List[Dict[str, str]]
        """

        formatted_matches = []
        for match in matches:
            formatted_matches.append({
                "drug": match[self.drug_col_name],
                "title": match["title"],
                "ref_type": "{}_publication".format(self.data_source),
                "date_mention": match[self.date_col_name]
            })
        return formatted_matches

    def format_drug_journal_matches(self, matches: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        Format and deduplicate matches between drug names and journal entries.

        :param matches: A list of matched drug-publication dictionaries.
        :type matches: List[Dict[str, str]]
        :return: A list of formatted matches, including both publication and journal references.
        :rtype: List[Dict[str, str]]
        """

        formatted_matches = self._format_drug_pub_matches(matches)
        n_matchings_drugs_pub = len(formatted_matches)

        if not matches or len(matches)==0:
            logging.warning(
                "No matches found."
            )
            return {}

        seen_journals = set()
        for match in matches:
            journal_cleaned = bytes(match[self.journal_col_name], "utf-8").decode("utf-8", errors="ignore")
            key = (match["drug"], journal_cleaned.strip(), match["date"])
            if key not in seen_journals:
                seen_journals.add(key)
                formatted_matches.append({
                    "drug": match[self.drug_col_name],
                    "title": journal_cleaned.strip(),
                    "ref_type": "journal",
                    "date_mention": match[self.date_col_name]
                })
        logging.info(f"Found {len(matches)} drug mentions in publications.")
        logging.info(f"Found {len(formatted_matches) - n_matchings_drugs_pub} drug mentions in journals.")
        return formatted_matches

    def __call__(self,  df_drugs: pd.DataFrame, df_publications: pd.DataFrame) -> List[Dict[str, str]]:
        """
        Execute the data matching process when the object is called like a function.

        :param df_drugs: DataFrame containing drug names.
        :type df_drugs: pd.DataFrame
        :param df_publications: DataFrame containing publication and journal data.
        :type df_publications: pd.DataFrame
        :return: A list of formatted matches including publication and journal references.
        :rtype: List[Dict[str, str]]
        """

        drug_pub_matches = self.find_drug_pub_matches(df_drugs, df_publications)
        all_formatted_matches = self.format_drug_journal_matches(drug_pub_matches)
        return all_formatted_matches


In [94]:
COLS_MATCH_MAPPING = {
    "drugs_clinical": {
        "drug_col_name": "drug",
        "pub_title_col_name": "scientific_title",
        "journal_col_name": "journal",
        "date_col_name": "date",
        "data_source": "clinical"
    },
    "drugs_pubmed": {
        "drug_col_name": "drug",
        "pub_title_col_name": "title",
        "journal_col_name": "journal",
        "date_col_name": "date",
        "data_source": "pubmed"
    }
}

In [87]:
data_matcher = DataMatcher(**COLS_MATCH_MAPPING["drugs_clinical"])
drug_clinical_matches = data_matcher(df_drugs_clean, df_c_trials_clean)

2025-06-17 18:04:12,422 - INFO - Found 5 drug mentions in publications.
2025-06-17 18:04:12,424 - INFO - Found 5 drug mentions in publications.
2025-06-17 18:04:12,425 - INFO - Found 3 drug mentions in journals.


In [88]:
drug_clinical_matches

[{'drug': 'diphenhydramine',
  'title': 'use of diphenhydramine as an adjunctive sedative for colonoscopy in patients chronically on opioids',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'phase 2 study iv quzyttir cetirizine hydrochloride injection vs v diphenhydramine',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'feasibility of a randomized controlled clinical trial comparing the use of cetirizine to replace diphenhydramine in the prevention of reactions related to paclitaxel',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'epinephrine',
  'title': 'tranexamic acid versus epinephrine during exploratory tympanotomy',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-04-27'},
 {'drug': 'betamethasone',
  'title': 'preemptive infiltration with betamethasone and ropivacaine for postoperative pain in laminoplas

In [91]:
with open("data/drug_clinical_matches.json", "w") as f:
    json.dump(drug_clinical_matches, f)

In [95]:
data_matcher = DataMatcher(**COLS_MATCH_MAPPING["drugs_pubmed"])
drug_pubmed_matches = data_matcher(df_drugs_clean, df_pumbed_clean)

2025-06-17 18:20:04,304 - INFO - Found 14 drug mentions in publications.
2025-06-17 18:20:04,305 - INFO - Found 14 drug mentions in publications.
2025-06-17 18:20:04,306 - INFO - Found 13 drug mentions in journals.


In [96]:
drug_pubmed_matches

[{'drug': 'diphenhydramine',
  'title': 'a 44-year-old man with erythema of the face diphenhydramine neck and chest weakness and palpitations',
  'ref_type': 'pubmed_publication',
  'date_mention': '2019-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'an evaluation of benadryl pyribenzamine and other so-called diphenhydramine antihistaminic drugs in the treatment of allergy',
  'ref_type': 'pubmed_publication',
  'date_mention': '2019-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'diphenhydramine hydrochloride helps symptoms of ciguatera fish poisoning',
  'ref_type': 'pubmed_publication',
  'date_mention': '2019-02-01'},
 {'drug': 'tetracycline',
  'title': 'tetracycline resistance patterns of lactobacillus buchneri group strains',
  'ref_type': 'pubmed_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'tetracycline',
  'title': 'appositional tetracycline bone formation rates in the beagle',
  'ref_type': 'pubmed_publication',
  'date_mention': '2020-02-01'},
 {'drug': 'te

In [97]:
with open("data/drug_pubmed_matches.json", "w") as f:
    json.dump(drug_pubmed_matches, f)

In [98]:
from typing import List, Dict
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class DataAggregator:
    """
    A class to aggregate a list of formatted matchings e.g. matching issued from pubmed + matching issued
    from clinical trials as a single JSON object.

    Provides functionality to:
    - Flatten nested lists of dictionaries into a single list.
    - Deduplicate entries based on dictionary content.
    """

    def __init__(self):
        self.aggregated_data: List[Dict[str, str]] = []

    def flatten(self, data: List[List[Dict[str, str]]]) -> List[Dict[str, str]]:
        """
        Flatten a list of lists of dictionaries into a single list.

        :param data: A list of lists, where each inner list contains dictionaries with string keys and values.
        :type data: List[List[Dict[str, str]]]
        :return: A flattened list of dictionaries.
        :rtype: List[Dict[str, str]]
        :raises ValueError: If input is not a list of lists of dictionaries.
        """

        if not isinstance(data, list) or not all(isinstance(sub, list) for sub in data):
            raise ValueError("Input must be a list of lists.")

        flattened = [item for sublist in data for item in sublist]
        logging.info(f"Flattened data into {len(flattened)} total entries.")
        self.aggregated_data = flattened
        return flattened

    def deduplicate(self) -> List[Dict[str, str]]:
        """
        Remove duplicate dictionaries from the aggregated data.

        :return: A deduplicated list of dictionaries.
        :rtype: List[Dict[str, str]]
        """

        seen = set()
        unique_data = []
        for entry in self.aggregated_data:
            frozen = frozenset(entry.items())
            if frozen not in seen:
                seen.add(frozen)
                unique_data.append(entry)
        logging.info(f"Reduced to {len(unique_data)} unique entries.")
        self.aggregated_data = unique_data
        return unique_data

    def __call__(self, data: List[List[Dict[str, str]]]) -> List[Dict[str, str]]:
        """
        Aggregate data by flattening and deduplicating it.

        :param data: A list of lists of dictionaries to aggregate.
        :type data: List[List[Dict[str, str]]]
        :return: The aggregated (flattened and deduplicated) list of dictionaries.
        :rtype: List[Dict[str, str]]
        """

        self.flatten(data)
        return self.deduplicate()


In [99]:
data_aggregator = DataAggregator()
aggregated_matches = data_aggregator([drug_clinical_matches, drug_pubmed_matches])

2025-06-17 18:23:03,431 - INFO - Flattened data into 35 total entries.
2025-06-17 18:23:03,431 - INFO - Reduced to 35 unique entries.


In [100]:
aggregated_matches

[{'drug': 'diphenhydramine',
  'title': 'use of diphenhydramine as an adjunctive sedative for colonoscopy in patients chronically on opioids',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'phase 2 study iv quzyttir cetirizine hydrochloride injection vs v diphenhydramine',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'diphenhydramine',
  'title': 'feasibility of a randomized controlled clinical trial comparing the use of cetirizine to replace diphenhydramine in the prevention of reactions related to paclitaxel',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-01-01'},
 {'drug': 'epinephrine',
  'title': 'tranexamic acid versus epinephrine during exploratory tympanotomy',
  'ref_type': 'clinical_publication',
  'date_mention': '2020-04-27'},
 {'drug': 'betamethasone',
  'title': 'preemptive infiltration with betamethasone and ropivacaine for postoperative pain in laminoplas

In [101]:
with open("data/aggregated_matches.json", "w") as f:
    json.dump(aggregated_matches, f)