# Clinical Trail Research

## Clinical Trail Data Analysis - Richard Young

### ryoung@unlv.edu


### Updated 2025-01-012

To review / Look at for ideas

https://github.com/RyanWangZf/PyTrial

https://stackoverflow.com/questions/78415818/how-to-get-full-results-with-clinicaltrials-gov-api-in-python


some code based on https://github.com/jvfe/pytrials



In [None]:
import os
import sys
import csv
import pandas as pd

import numpy as np
import requests
import datetime as dt

import json
import ipywidgets as widgets
np.random.seed(10031975)
import matplotlib.pyplot as plt
from matplotlib import *
from matplotlib.ticker import AutoMinorLocator
import openpyxl
# import openai
import re

from urllib.parse import quote
# import logging





In [None]:
maxInt = sys.maxsize

while True:
    # decrease the maxInt value by factor 10
    # as long as the OverflowError occurs.

    try:
        csv.field_size_limit(maxInt)
        break
    except OverflowError:
        maxInt = int(maxInt/10)

Check directory for output

In [None]:

subdirs = [
    "01_data_raw"
    # "01_data_raw/interventions",
    # "data_paper_tables",
    # "data_results",
]

for subdir in subdirs:
    os.makedirs(subdir, exist_ok=True)

#### Note sure I need this cell anymore

all functions for now

In [None]:
import os
import pandas as pd
import requests
from urllib.parse import quote

# Clinical Trials Calls / Functions

def create_yearly_count_series(column_name):
    yearly_counts = summary[column_name].dt.year.value_counts().sort_index()
    # Adjusted to include the current research year
    last_10_years = yearly_counts[(yearly_counts.index >= current_research_year - 10) & (yearly_counts.index <= current_research_year)]
    return last_10_years.rename(column_name)

# Function to save NCTid and year data for each column
def save_nctid_year_data(column_name, file_name):
    filtered_data = summary[summary[column_name].dt.year >= current_research_year - 10]
    filtered_data['Year'] = filtered_data[column_name].dt.year
    year_nctid_data = filtered_data[['Year', 'NCTid']]
    file_path = os.path.join('data_results', file_name)
    year_nctid_data.to_excel(file_path, index=False)
    print(f"{column_name} data saved at {file_path}")

def get_study_count(condition, search_area="condition", page_size=1):
    """
    Get the total count of studies for a given condition from ClinicalTrials.gov API.
    """
    base_url = "https://clinicaltrials.gov/api/v2/studies"
    encoded_condition = quote(condition)

    search_area_params = {
        "condition": "query.cond",
        "title": "query.term",
        "intervention": "query.intr",
        "outcome": "query.outc",
        "sponsor": "query.spons"
    }

    if search_area not in search_area_params:
        print(f"ERROR: Invalid search area: {search_area}")
        return None

    params = {
        search_area_params[search_area]: encoded_condition,
        "pageSize": str(page_size),
        "countTotal": "true"
    }

    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()

        data = response.json()
        total_count = data.get('totalCount', 0)

        print(f"INFO: URL: {response.url}")
        print(f"INFO: Total count: {total_count}")

        return total_count

    except requests.RequestException as e:
        print(f"ERROR: An error occurred: {e}")
        return None

def compare_study_counts(csv_path, api_study_count):
    """
    Compare the count of unique NCTIds in the CSV file with the count from the API.
    """
    # Read CSV file
    df = pd.read_csv(csv_path)
    unique_study_count_in_df = df['NCTId'].nunique()
    print(f"INFO: Unique studies in DataFrame: {unique_study_count_in_df}")

    if api_study_count is None:
        print("ERROR: Failed to retrieve study count from API.")
        return None

    print(f"INFO: Study count from ClinicalTrials API: {api_study_count}")

    # Compare counts
    if unique_study_count_in_df == api_study_count:
        print("INFO: The counts match.")
        return True
    else:
        print(f"INFO: The counts do not match. DataFrame has {unique_study_count_in_df}, API reports {api_study_count}.")
        return False


## Updated for 2025

In [None]:
disease = 'aged'
# disease =  'amyotrophic lateral sclerosis'
# disease = 'depression'
# disease = 'diabetes'
# disease = 'tacs'



In [None]:
import os
import pandas as pd

# Define the headers
headers = [
    "Index", "NCTId", "LeadSponsorClass", "LeadSponsorName", "Condition", "OfficialTitle",
    "BriefTitle", "Acronym", "StudyType", "InterventionType", "InterventionName",
    "InterventionOtherName", "InterventionDescription", "Phase", "StudyFirstSubmitDate",
    "LastUpdateSubmitDate", "CompletionDate", "OverallStatus", "BriefSummary",
    "IsFDARegulatedDevice", "StartDate", "DetailedDescription", "ConditionMeshTerm",
    "PrimaryOutcomeDescription", "SecondaryOutcomeDescription", "EnrollmentCount",
    "EnrollmentType", "BaselineCategoryTitle", "BaselinePopulationDescription",
    "BaselineTypeUnitsAnalyzed", "OtherOutcomeDescription", "EligibilityCriteria",
    "StudyPopulation", "HealthyVolunteers", "ReferencePMID", "LocationCountry",
    "PrimaryOutcomeTimeFrame", "BaselineMeasureTitle", "BaselineMeasureUnitOfMeasure",
    "BaselineMeasurementValue", "GPT_summary",
]

# Set your parameters
csv_file_path = os.path.join('01_data_raw', f'01_{disease.lower()}_done.csv')

# Get the total study count
total_study_count = get_study_count(disease)

# Check if the file exists
if not os.path.exists(csv_file_path):
    # Create the file and write the headers
    with open(csv_file_path, 'w') as file:
        file.write(','.join(headers) + '\n')

skip_next_cells = False

# Ensure the total study count is correctly retrieved
if total_study_count is not None:
    # Save the API count to a DataFrame
    api_count_df = pd.DataFrame({'Condition': [disease], 'API_Study_Count': [total_study_count]})
    print(api_count_df.to_string(index=False))

    # Compare the counts
    counts_match = compare_study_counts(csv_file_path, total_study_count)

    if counts_match is None:
        print("ERROR: Comparison failed due to API error.")
        skip_next_cells = True  # Skip next cells because of API error
    elif counts_match:
        print("INFO: Skipping next cells as counts match.")
        skip_next_cells = True  # Skip next cells because counts match
    else:
        print("INFO: Proceeding with data update...")
        skip_next_cells = False  # Do not skip cells because counts do not match

def compare_study_counts(csv_path, api_study_count):
    # Read CSV file
    df = pd.read_csv(csv_path)
    if 'NCTId' not in df.columns:
        print("ERROR: NCTId column not found in the CSV file.")
        return None
    unique_study_count_in_df = df['NCTId'].nunique()
    print(f"INFO: Unique studies in DataFrame: {unique_study_count_in_df}")
    print(f"INFO: Study count from ClinicalTrials API: {api_study_count}")

    if api_study_count is None:
        return None

    if unique_study_count_in_df == api_study_count:
        print("INFO: The counts match.")
    else:
        print(f"INFO: The counts do not match. DataFrame has {unique_study_count_in_df}, API reports {api_study_count}.")

    return unique_study_count_in_df == api_study_count

# Example usage
print(f"INFO: URL: https://clinicaltrials.gov/api/v2/studies?query.cond={disease}&pageSize=1&countTotal=true")
print(f"INFO: Total count: {total_study_count}")

In [None]:
def fetch_studies(search_terms, total_study_count, iter_size=987):
    """
    Fetches study data in chunks from ClinicalTrials.gov API and processes it into a DataFrame.

    Parameters:
    search_terms (dict): Dictionary with keys as search areas and values as search terms.
    total_study_count (int): Total number of studies to fetch.
    iter_size (int, optional): Number of studies to fetch at a time. Default is 987.

    Returns:
    pd.DataFrame: DataFrame containing the fetched study data.
    """
    base_url = "https://clinicaltrials.gov/api/v2/studies"

    search_area_params = {
        "condition": "query.cond",
        "title": "query.term",
        "intervention": "query.intr",
        "outcome": "query.outc",
        "sponsor": "query.spons"
    }

    params = {
        "pageSize": str(iter_size),
        "countTotal": "true"
    }

    # Build search parameters based on search_terms
    for area, term in search_terms.items():
        if area in search_area_params:
            params[search_area_params[area]] = term
        else:
            print(f"ERROR: Invalid search area: {area}")
            return None

    data_list = []  # List to store each chunk of data

    while True:
        # Print the current URL (for debugging purposes)
        print("Fetching data from:", base_url + '?' + '&'.join([f"{k}={v}" for k, v in params.items()]))

        try:
            response = requests.get(base_url, params=params)
            response.raise_for_status()

            data = response.json()
            studies = data.get('studies', [])

            for study in studies:
                nctId = study['protocolSection']['identificationModule'].get('nctId', 'Unknown')
                overallStatus = study['protocolSection']['statusModule'].get('overallStatus', 'Unknown')
                startDate = study['protocolSection']['statusModule'].get('startDateStruct', {}).get('date', 'Unknown Date')
                conditions = ', '.join(study['protocolSection']['conditionsModule'].get('conditions', ['No conditions listed']))
                acronym = study['protocolSection']['identificationModule'].get('acronym', 'Unknown')

                interventions_list = study['protocolSection'].get('armsInterventionsModule', {}).get('interventions', [])
                interventions = ', '.join([intervention.get('interventionName', 'No intervention name listed') for intervention in interventions_list]) if interventions_list else "No interventions listed"

                locations_list = study['protocolSection'].get('contactsLocationsModule', {}).get('locations', [])
                locations = ', '.join([f"{location.get('city', 'No City')} - {location.get('country', 'No Country')}" for location in locations_list]) if locations_list else "No locations listed"

                primaryCompletionDate = study['protocolSection']['statusModule'].get('primaryCompletionDateStruct', {}).get('date', 'Unknown Date')
                studyFirstPostDate = study['protocolSection']['statusModule'].get('studyFirstPostDateStruct', {}).get('date', 'Unknown Date')
                lastUpdatePostDate = study['protocolSection']['statusModule'].get('lastUpdatePostDateStruct', {}).get('date', 'Unknown Date')
                studyType = study['protocolSection']['designModule'].get('studyType', 'Unknown')
                phases = ', '.join(study['protocolSection']['designModule'].get('phases', ['Not Available']))

                data_list.append({
                    "NCTId": nctId,
                    "Acronym": acronym,
                    "Overall Status": overallStatus,
                    "Start Date": startDate,
                    "Conditions": conditions,
                    "Interventions": interventions,
                    "Locations": locations,
                    "Primary Completion Date": primaryCompletionDate,
                    "Study First Post Date": studyFirstPostDate,
                    "Last Update Post Date": lastUpdatePostDate,
                    "Study Type": studyType,
                    "Phases": phases
                })

            if len(data_list) >= total_study_count:
                break

            nextPageToken = data.get('nextPageToken')
            if nextPageToken:
                params['pageToken'] = nextPageToken
            else:
                break

        except requests.RequestException as e:
            print(f"ERROR: An error occurred: {e}")
            break

    # Create a DataFrame from the list of dictionaries
    df = pd.DataFrame(data_list)
    return df

# Load Data

In [None]:

# Define 'search_terms' and 'total_study_count' if not already defined
search_terms = {
    "condition": disease.capitalize(),
    "title": disease.capitalize(),
}


# Get the total number of studies for the disease
total_study_count = get_study_count(disease)

# Fetch new data from the API using the correct function
df_new = fetch_studies(search_terms, total_study_count)

# Load existing data
csv_file_path = os.path.join('01_data_raw', f'01_{disease.lower()}_done.csv')
df_existing = pd.read_csv(csv_file_path)

# Ensure column names are consistent between df_existing and df_new
print("Columns in df_existing:", df_existing.columns.tolist())
print("Columns in df_new:", df_new.columns.tolist())

# Rename columns in df_new if necessary to match df_existing
if 'NCT ID' in df_new.columns:
    df_new.rename(columns={'NCT ID': 'NCTId'}, inplace=True)

# Compare existing data with new data
existing_nct_ids = set(df_existing['NCTId'])
new_nct_ids = set(df_new['NCTId'])
missing_nct_ids = new_nct_ids - existing_nct_ids

# Get the missing data
df_missing = df_new[df_new['NCTId'].isin(missing_nct_ids)]

# Merge existing data with missing data
df_combined = pd.concat([df_existing, df_missing], ignore_index=True)

df_combined.drop_duplicates(subset='NCTId', inplace=True)

# Save the combined data to a new CSV file
output_csv_path = os.path.join('01_data_raw', f'01_{disease.lower()}_done.csv')
df_combined.to_csv(output_csv_path, index=False)

print("Data fetching and merging complete. Combined data saved to:", output_csv_path)

In [None]:


print(f"Number of records in df_existing: {len(df_existing)}")
print(f"Number of records in df_new: {len(df_new)}")
print(f"Number of missing records to add: {len(df_missing)}")
print(f"Total records after merging: {len(df_combined)}")

In [None]:
print("First few rows of df_existing:")
print(df_existing.head())

print("First few rows of df_new:")
print(df_new.head())

Lets Start Getting the Data



In [None]:
def flatten_dict(d, parent_key='', sep='_', prefixes_to_replace=None):
    """
    Flatten a nested dictionary, optionally replacing specified prefixes in keys.
    """
    items = []
    if prefixes_to_replace is None:
        prefixes_to_replace = {}

    if isinstance(d, dict):
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k

            # Replace specified prefixes
            for old_prefix, new_prefix in prefixes_to_replace.items():
                if new_key.startswith(old_prefix):
                    new_key = new_prefix + new_key[len(old_prefix):]

            if isinstance(v, dict):
                items.extend(flatten_dict(v, new_key, sep=sep, prefixes_to_replace=prefixes_to_replace).items())
            elif isinstance(v, list):
                if v and all(isinstance(i, dict) for i in v):
                    for idx, item in enumerate(v):
                        items.extend(flatten_dict(item, f"{new_key}{sep}{idx}", sep=sep, prefixes_to_replace=prefixes_to_replace).items())
                else:
                    items.append((new_key, v))
            else:
                items.append((new_key, v))
    else:
        items.append((parent_key, d))
    return dict(items)


In [None]:
def fetch_study_details_batch(nct_ids, batch_size=50):
    import requests
    import pandas as pd

    base_url = "https://clinicaltrials.gov/api/v2/studies"
    data_list = []

    for i in range(0, len(nct_ids), batch_size):
        batch_nct_ids = nct_ids[i:i + batch_size]
        nct_ids_str = ','.join(batch_nct_ids)
        params = {
            "format": "json",
            "pageSize": len(batch_nct_ids)
        }
        from urllib.parse import urlencode
        query_string = urlencode(params)
        request_url = f"{base_url}?filter.ids={nct_ids_str}&{query_string}"
        print(f"Fetching data from: {request_url}")

        response = requests.get(request_url)

        if response.status_code == 200:
            data = response.json()
            studies = data.get('studies', [])

            if not studies:
                print(f"No studies found for NCTIds: {batch_nct_ids}")
                continue

            for study in studies:
                # Extract relevant sections
                protocol = study.get('protocolSection', {})
                identificationModule = protocol.get('identificationModule', {})
                statusModule = protocol.get('statusModule', {})
                sponsorModule = protocol.get('sponsorCollaboratorsModule', {})
                descriptionModule = protocol.get('descriptionModule', {})
                conditionsModule = protocol.get('conditionsModule', {})
                designModule = protocol.get('designModule', {})
                armsInterventionsModule = protocol.get('armsInterventionsModule', {})
                outcomesModule = protocol.get('outcomesModule', {})
                eligibilityModule = protocol.get('eligibilityModule', {})
                contactsModule = protocol.get('contactsLocationsModule', {})
                referencesModule = protocol.get('referencesModule', {})

                # Process LocationCountry field with error handling
                locations_data = contactsModule.get('locations', [])
                location_countries = []

                try:
                    if isinstance(locations_data, list):
                        for loc in locations_data:
                            if isinstance(loc, dict):
                                facility = loc.get('facility', {})
                                if isinstance(facility, dict):
                                    country = facility.get('country', '')
                                    if country:
                                        location_countries.append(country)
                    elif isinstance(locations_data, dict):
                        facility = locations_data.get('facility', {})
                        if isinstance(facility, dict):
                            country = facility.get('country', '')
                            if country:
                                location_countries.append(country)
                    elif isinstance(locations_data, str):
                        location_countries.append(locations_data)
                except Exception as e:
                    print(f"Error processing locations for study {identificationModule.get('nctId', '')}: {e}")
                    location_countries = []

                # Extract specific fields
                study_data = {
                    'NCTId': identificationModule.get('nctId', ''),
                    'LeadSponsorClass': sponsorModule.get('leadSponsor', {}).get('class', ''),
                    'LeadSponsorName': sponsorModule.get('leadSponsor', {}).get('name', ''),
                    'Condition': ', '.join(conditionsModule.get('conditions', [])),
                    'OfficialTitle': identificationModule.get('officialTitle', ''),
                    'BriefTitle': identificationModule.get('briefTitle', ''),
                    'Acronym': identificationModule.get('acronym', ''),
                    'StudyType': designModule.get('studyType', ''),
                    'InterventionType': ', '.join([intervention.get('interventionType', '') for intervention in armsInterventionsModule.get('interventions', [])]),
                    'InterventionName': ', '.join([intervention.get('interventionName', intervention.get('name', '')) for intervention in armsInterventionsModule.get('interventions', [])]),
                    'InterventionOtherName': ', '.join([', '.join(intervention.get('otherName', [])) for intervention in armsInterventionsModule.get('interventions', [])]),
                    'InterventionDescription': ', '.join([intervention.get('description', '') for intervention in armsInterventionsModule.get('interventions', [])]),
                    'Phase': ', '.join(designModule.get('phases', [])),
                    'StudyFirstSubmitDate': statusModule.get('studyFirstSubmitDate', ''),
                    'LastUpdateSubmitDate': statusModule.get('lastUpdateSubmitDate', ''),
                    'CompletionDate': statusModule.get('completionDateStruct', {}).get('date', ''),
                    'OverallStatus': statusModule.get('overallStatus', ''),
                    'BriefSummary': descriptionModule.get('briefSummary', ''),
                    'IsFDARegulatedDevice': protocol.get('oversightModule', {}).get('isFdaRegulatedDevice', ''),
                    'StartDate': statusModule.get('startDateStruct', {}).get('date', ''),
                    'DetailedDescription': descriptionModule.get('detailedDescription', ''),
                    'ConditionMeshTerm': ', '.join([mesh.get('term', '') for mesh in protocol.get('derivedSection', {}).get('conditionBrowseModule', {}).get('meshes', [])]),
                    'PrimaryOutcomeDescription': ', '.join([outcome.get('description', '') for outcome in outcomesModule.get('primaryOutcomes', [])]),
                    'SecondaryOutcomeDescription': ', '.join([outcome.get('description', '') for outcome in outcomesModule.get('secondaryOutcomes', [])]),
                    'EnrollmentCount': designModule.get('enrollmentInfo', {}).get('count', ''),
                    'EnrollmentType': designModule.get('enrollmentInfo', {}).get('type', ''),
                    'EligibilityCriteria': eligibilityModule.get('eligibilityCriteria', ''),
                    'StudyPopulation': eligibilityModule.get('studyPopulation', ''),
                    'HealthyVolunteers': eligibilityModule.get('healthyVolunteers', ''),
                    'ReferencePMID': ', '.join([ref.get('pmid', '') for ref in referencesModule.get('references', []) if 'pmid' in ref]),
                    'LocationCountry': ', '.join(location_countries),
                    'PrimaryOutcomeTimeFrame': ', '.join([outcome.get('timeFrame', '') for outcome in outcomesModule.get('primaryOutcomes', [])]),
                    # Add other fields as required
                }

                # Append to data list
                data_list.append(study_data)

        else:
            print(f"Failed to fetch data. Status code: {response.status_code}")
            print(f"Response content: {response.text}")
            continue

    # Create DataFrame
    df = pd.DataFrame(data_list)
    return df


In [None]:
# Compare existing data with new data
existing_nct_ids = set(df_existing['NCTId'])
new_nct_ids = set(df_new['NCTId'])
missing_nct_ids = list(new_nct_ids - existing_nct_ids)

print(f"Number of missing NCTIds to fetch: {len(missing_nct_ids)}")

# Fetch missing data using the updated function
df_missing_details = fetch_study_details_batch(missing_nct_ids, batch_size=25)

# Verify the data fetched
print("First few rows of df_missing_details:")
print(df_missing_details.head())

# Check if df_missing_details is empty
if df_missing_details.empty:
    print("No data was fetched for the missing NCTIds.")
else:
    # Combine with existing data
    df_combined = pd.concat([df_existing, df_missing_details], ignore_index=True)
    df_combined.drop_duplicates(subset='NCTId', inplace=True)

    # Save the combined data
    output_csv_path_csv = os.path.join('01_data_raw', f'01_{disease.lower()}_done.csv')
    output_excel_path = os.path.join('01_data_raw', f'01_{disease.lower()}_done.xlsx')

    df_combined.to_csv(output_csv_path_csv, index=False)
    df_combined.to_excel(output_excel_path, index=False)

    print("Data fetching and merging complete. Combined data saved to:", output_csv_path)


In [None]:
# Check if 'GPT_summary' column exists, if not, add it
if 'GPT_summary' not in df_combined.columns:
    df_combined.insert(3, 'GPT_summary', '')  # Insert as the 4th column (index 3)

print(df_combined.head())

In [None]:
results = df_combined.copy()

In [None]:
import os
import pandas as pd
import numpy as np

# Assuming df_combined is your DataFrame
df_combined['IsFDARegulatedDevice'] = df_combined['IsFDARegulatedDevice'].replace('', np.nan)

# Ensure 'IsFDARegulatedDevice' column contains only strings or NaN
df_combined['IsFDARegulatedDevice'] = df_combined['IsFDARegulatedDevice'].astype(str).replace('nan', np.nan)

# Replace empty strings in 'HealthyVolunteers' with NaN
df_combined['HealthyVolunteers'] = df_combined['HealthyVolunteers'].replace('', np.nan)

# Ensure 'HealthyVolunteers' column contains only strings or NaN
df_combined['HealthyVolunteers'] = df_combined['HealthyVolunteers'].astype(str).replace('nan', np.nan)

# Convert 'EnrollmentCount' to numeric, coercing errors to NaN
df_combined['EnrollmentCount'] = pd.to_numeric(df_combined['EnrollmentCount'], errors='coerce')

# Save to CSV
csv_file_path = os.path.join('01_data_raw', f'01_{disease.lower()}_done.csv')
df_combined.to_csv(csv_file_path, index=False)

# Save to Parquet
parquet_file_path = os.path.join('01_data_raw', f'01_{disease.lower()}.parquet')
df_combined.to_parquet(parquet_file_path, index=False)

print(f"Data saved to {csv_file_path} and {parquet_file_path}")

In [None]:
import requests



resp = requests.post('https://textbelt.com/text', {

  'phone': '9163802941',

  'message': 'PD Pipe Line Part 1 Done',

  'key': '138adc496234ca311154757db147f552afa8ba83FfrCKJ36kTJNXq65nlsvvF4Pu',

})

print(resp.json())

Infection point - both new and old pulls should be the same


Lets Do some data cleaning 