In [None]:
import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()
role = session.get_current_role().strip('"')
db = session.get_current_database()
schema = session.get_current_schema()
print(session)


In [None]:
USE ROLE {{role}};
USE DATABASE {{db}};
USE SCHEMA {{schema}};

In [None]:
CREATE OR REPLACE TABLE clinical_trial_raw (
    study_id STRING, 
    study_data VARIANT
    );

In [None]:
CREATE OR REPLACE NETWORK RULE CLINICAL_TRIALS_GOV_PUBLIC_DATA_RULE 
MODE = EGRESS 
TYPE = HOST_PORT 
VALUE_LIST = ('clinicaltrials.gov')

In [None]:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION CLINICAL_TRIALS_GOV_PUBLIC_DATA_INTEGRATION
ALLOWED_NETWORK_RULES = (CLINICAL_TRIALS_GOV_PUBLIC_DATA_RULE)
ENABLED = TRUE;

In [None]:
CREATE OR REPLACE FUNCTION FETCH_ENDPOINT (URL STRING)
returns string
language python
runtime_version=3.9
handler = 'fetch_data'
external_access_integrations = (CLINICAL_TRIALS_GOV_PUBLIC_DATA_INTEGRATION)
packages = ('requests')
as
$$
import requests
import _snowflake
session = requests.Session()
def fetch_data(URL):
    response = requests.get(URL)
    return response.json()
$$;

In [None]:
SET url  = 'https://clinicaltrials.gov/api/v2/studies/NCT02953860'

In [None]:
SELECT FETCH_ENDPOINT($url);

In [None]:
INSERT INTO clinical_trial_raw 
SELECT split_part($url, '/', -1)::STRING as study_id, 
fetch_endpoint($url)::VARIANT as study_data;


In [None]:
select study_id, study_data from clinical_trial_raw;

In [None]:
from snowflake.snowpark.functions import col, parse_json, flatten, current_timestamp, sql_expr, lit, split, \
concat, concat_ws, row_number, dense_rank, listagg
from snowflake.snowpark.window import Window

In [None]:
df = session.table("clinical_trial_raw")

In [None]:
def extract_json_elements(df, specs, add_metadata=False, mode="overwrite", table_type="transient"):
    """
    Extracts JSON fields from nested structure, 
    handling arrays and writing to Snowflake table.

    Parameters:
    - df: Snowpark DataFrame with `study_id` and `study_data`
    - specs: List of dicts:
        - json_path: List[str or list] - Path to field(s) in JSON
        - output_columns: Dict[str, str] - Mapping of JSON keys to output column names
        - table_name: str - Target table
    - add_metadata: If True, adds `ingestion_time`
    - mode: Write mode
    """
    for spec in specs:
        json_path = spec["json_path"]
        output_columns = spec["output_columns"]
        table_name = spec["table_name"]

        base_json = parse_json(df["study_data"])

        # Handle parallel extraction keys (branching)
        if isinstance(json_path[-1], list):
            common_path = json_path[:-1]
            branch_keys = json_path[-1]
        else:
            common_path = json_path
            branch_keys = None

        # Traverse to base node
        current = base_json
        for key in common_path:
            current = current[key]

        select_exprs = [df["study_id"].alias("nct_id")]

        if branch_keys:
            for key in branch_keys:
                if key in output_columns:
                    select_exprs.append(
                        current[key].cast("ARRAY").alias(output_columns[key])
                    )
        else:
            for json_key, output_name in output_columns.items():
                if json_key == "VALUE":
                    select_exprs.append(current.cast("STRING").alias(output_name))
                else:
                    select_exprs.append(current[json_key].cast("STRING").alias(output_name))

        if add_metadata:
            select_exprs.append(current_timestamp().alias("ingestion_time"))

        final_df = df.select(*select_exprs)

        # Save or preview
        final_df.write.save_as_table(table_name, mode=mode, table_type=table_type)
        final_df.show()


In [None]:
session.sql("""
    CREATE OR REPLACE STAGE clinical_trials_stage
    DIRECTORY = (ENABLE=TRUE)
    FILE_FORMAT = (TYPE = 'JSON')
""").collect()


In [None]:
# define a list of dictionaries that map JSON paths to output columns and table names for clinical trial data extraction.

clinical_trial_json_path_mapping = [
        {
            "json_path": ["protocolSection", "identificationModule", ["organization", "briefTitle", "officialTitle"]],
            "output_columns": {"organization": "organization", "briefTitle": "brief_title", "officialTitle": "official_title"},
            "table_name": "clinical_trial_identification_int"
        },  
        {
            "json_path": ["protocolSection", "statusModule", ["statusVerifiedDate", "overallStatus", 
                                                              "startDateStruct", "primaryCompletionDateStruct",
                                                              "completionDateStruct", "studyFirstSubmitDate",
                                                              "studyFirstPostDate", "resultsFirstSubmitDate",
                                                              "resultsFirstPostDateStruct", "lastUpdateSubmitDate",
                                                              "lastUpdatePostDateStruct"
                                                             ]
                         ],
            "output_columns": {"statusVerifiedDate": "status_verified_date", "overallStatus": "overall_status", 
                               "startDateStruct": "start_date", "primaryCompletionDateStruct": "primary_completion_date", 
                               "completionDateStruct": "completion_date", "studyFirstSubmitDate": "study_first_submit_date", 
                               "studyFirstPostDate": "study_first_post_date", "resultsFirstSubmitDate":"results_first_submit_date", 
                               "reusltsFirstPostDateStruct": "results_first_post_date", "lastUpdateSubmitDate": "last_update_submit_date", 
                               "lastUpdatePostDateStruct": "last_update_post_date_struct"},
            "table_name": "clinical_trial_status_int"
        },
        {
            "json_path": ["protocolSection", "sponsorCollaboratorsModule", ["leadSponsor", "responsibleParty", "collaborators"]],
            "output_columns": {"leadSponsor":"lead_sponsor", "responsibleParty": "responsible_party", "collaborators": "collaborators"},
            "table_name": "clinical_trial_sponsor_collaborator_int"
        }, 
        {
            "json_path": ["protocolSection", "descriptionModule", ["briefSummary", "detailedDescription"]],
            "output_columns": {"briefSummary": "brief_summary", "detailedDescription": "detailed_description"},
            "table_name": "clinical_trial_description_int"
        }, 
        {
            "json_path": ["protocolSection", "conditionsModule", ["conditions", "keywords"]],
            "output_columns": {"conditions": "conditions", "keywords": "keywords"},
            "table_name": "clinical_trial_conditions_int"
        },       
        {   
            "json_path": ["protocolSection", "designModule", ["studyType", "phases", "designInfo", "enrollmentInfo"]],
            "output_columns": {"studyType": "study_type", "phases": "phases", 
                               "designInfo": "design_info", "enrollmentInfo": "enrollment_info"},
            "table_name": "clinical_trial_design_int"
        },
        {
            "json_path": ["protocolSection", "armsInterventionsModule", ["armGroups", "interventions"]],
            "output_columns": {"armGroups": "arm_groups", "interventions": "interventions"},
            "table_name": "clinical_trial_arms_interventions_int"
        },       
        {   
            "json_path": ["protocolSection", "outcomesModule", ["primaryOutcomes", "secondaryOutcomes"]],
            "output_columns": {"primaryOutcomes": "primary_outcomes", "secondaryOutcomes": "secondary_outcomes"},
            "table_name": "clinical_trial_outcomes_int"
        }, 
        {
            "json_path": ["protocolSection", "eligibilityModule", ["eligibilityCriteria", "healthyVolunteers",
                                                                   "sex", "minimumAge", "maximumAge", "stdAges"]],
            "output_columns": {"eligibilityCriteria": "eligibility_criteria", "healthyVolunteers": "healthy_volunteers", 
                               "sex": "sex", "minimumAge": "min_age", "maximumAge": "max_age", "stdAges": "std_ages"},
            "table_name": "clinical_trial_eligibility_int"
        },
        {
            "json_path": ["protocolSection", "contactsLocationsModule", ["overallOfficials", "locations"]],
            "output_columns": {"overallOfficials": "officials", "locations": "locations"}, 
            "table_name": "clinical_trial_contacts_locations_int"
        },
        {
            "json_path": ["resultsSection", "participantFlowModule", ["groups"]],
            "output_columns": {"groups": "participant_groups"}, 
            "table_name": "clinical_trial_results_participants_int"
        },
        {
            "json_path": ["resultsSection", "baselineCharacteristicsModule", ["populationDescription", "groups", "denoms", "measures"]],
            "output_columns": {"populationDescription": "population_description", "groups": "groups", 
                               "denoms": "denoms", "measures": "measures"}, 
            "table_name": "clinical_trial_results_baseline_characteristics_int"
        },
        {
            "json_path": ["resultsSection", "outcomeMeasuresModule", ["outcomeMeasures"]],
            "output_columns": {"outcomeMeasures": "outcome_measures"}, 
            "table_name": "clinical_trial_results_outcome_measures_int"
        },
        {
            "json_path": ["resultsSection", "adverseEventsModule", ["frequencyThreshold", "timeFrame", "description", 
                                                                   "eventGroups", "seriousEvents", "otherEvents"]],
            "output_columns": {"frequencyThreshold": "frequency_threshold", "timeFrame": "time_frame", "description": "description", 
                               "eventGroups": "event_groups", "seriousEvents": "serious_events", "otherEvents": "other_events"}, 
            "table_name": "clinical_trial_results_adverse_events_int"
        },
        {
            "json_path": ["resultsSection", "moreInfoModule", ["limitationsAndCaveats", "certainAgreement"]],
            "output_columns": {"limitationsAndCaveats": "limitations_caveats", "certainAgreement": "certain_agreement"}, 
            "table_name": "clinical_trial_results_limitations_caveats_int"
        },
        {
            "json_path": ["documentSection", "largeDocumentModule", ["largeDocs"]],
            "output_columns": {"largeDocs": "clinical_documentation_SAP_ICF_Large_PDFs"}, 
            "table_name": "clinical_trial_documentation_int"
        },
    ]

In [None]:
with open("clinical_trial_json_path_mapping.json",'w') as f:  
    f.write(str(clinical_trial_json_path_mapping))
f.close()

In [None]:
session.file.put("clinical_trial_json_path_mapping.json", "@clinical_trials_stage", auto_compress=False)

In [None]:
ls @clinical_trials_stage

In [None]:
import json 

def load_json_path_mapping_from_stage(session, stage_path="@clinical_trials_stage/clinical_trial_json_path_mapping.json"):
    df = session.read.json(stage_path)
    specs_str = df.collect()[0][0]
    return json.loads(specs_str)


In [None]:
specs = load_json_path_mapping_from_stage(session)
extract_json_elements(df, specs, add_metadata=False, mode="overwrite", table_type="transient")

In [None]:
def flatten_clinical_trial_core(session):
    # Core Table (merge high-level modules)
    ident_df = session.table("clinical_trial_identification_int")
    desc_df = session.table("clinical_trial_description_int")
    conditions_df = session.table("clinical_trial_conditions_int")
    sponsor_df = session.table("clinical_trial_sponsor_collaborator_int")
    officials_df = session.table("clinical_trial_contacts_locations_int")
    status_df = session.table("clinical_trial_status_int")

    ident_df = ident_df \
        .select("nct_id", flatten("brief_title"), "official_title") \
        .select("nct_id", col("value").cast("STRING").alias("brief_title"), "official_title") \
        .select("nct_id", "brief_title", flatten("official_title")) \
        .select("nct_id", "brief_title", col("value").cast("STRING").alias("official_title")) 

    desc_df = desc_df \
        .select("nct_id", flatten("brief_summary"), "detailed_description") \
        .select("nct_id", col("value").cast("STRING").alias("brief_summary"), "detailed_description") \
        .select("nct_id", "brief_summary", flatten("detailed_description")) \
        .select("nct_id", "brief_summary", col("value").cast("STRING").alias("detailed_description")) 

    conds_df = conditions_df \
        .select("nct_id", flatten("conditions")) \
        .select("nct_id", col("value").cast("STRING").alias("conditions"))

    keywords_df = conditions_df \
        .select("nct_id", flatten("keywords")) \
        .select("nct_id", col("value").cast("STRING").alias("conditions_specifics"))

    conditions_specs_df = keywords_df \
        .group_by("nct_id") \
        .agg(listagg(col("conditions_specifics"), ", ").alias("conditions_specifics")) 

    conditions_df = conds_df.join(conditions_specs_df, on="nct_id", how="inner")

    sponsor_df = sponsor_df \
        .select("nct_id", flatten("lead_sponsor"), "collaborators") \
        .select("nct_id", col("value")["name"].cast("STRING").alias("lead_sponsor"), "collaborators") \
        .select("nct_id", "lead_sponsor", flatten("collaborators")) \
        .select("nct_id", "lead_sponsor", col("value")["name"].cast("STRING").alias("collaborators")) 
    
    officials_df = officials_df \
        .select("nct_id", flatten("officials")) \
        .select("nct_id", col("value")["name"].cast("STRING").alias("principal_investigator"))

    status_df = status_df \
        .select("nct_id", flatten("overall_status"), "start_date", "completion_date") \
        .select("nct_id", col("value").cast("STRING").alias("overall_status"), "start_date", "completion_date") \
        .select("nct_id", "overall_status", flatten("start_date"), "completion_date") \
        .select("nct_id", "overall_status", col("value")["date"].cast("DATE").alias("start_date"), "completion_date") \
        .select("nct_id", "overall_status", "start_date", flatten("completion_date")) \
        .select("nct_id", "overall_status", "start_date", col("value")["date"].cast("DATE").alias("completion_date"))   

    clinical_trial_core_df = ident_df \
        .join(desc_df, "nct_id", "inner") \
        .join(conditions_df, "nct_id", "inner") \
        .join(sponsor_df, "nct_id", "inner") \
        .join(officials_df, "nct_id", "inner") \
        .join(status_df, "nct_id", "inner")

   
    clinical_trial_core_df.write.save_as_table(
        "clinical_trial_core_normalized", mode="overwrite", table_type=""
    )
    
    clinical_trial_core_df.show()

    print("✅ Clinical Trial Core flattened and saved.")

In [None]:
def flatten_locations(session):
    df = session.table("clinical_trial_contacts_locations_int")

    locations_df = df.select("nct_id", flatten("locations")) \
        .select("nct_id", 
                col("value")["facility"].cast("STRING").alias("facility"),
                col("value")["geoPoint"]["lon"].cast("FLOAT").alias("longitude"),
                col("value")["geoPoint"]["lat"].cast("FLOAT").alias("latitude"),
                concat_ws(lit(" "), lit("POINT("), 
                      col("value")["geoPoint"]["lon"].cast("STRING"), 
                      concat(col("value")["geoPoint"]["lat"].cast("STRING"), lit(")"))
                     ).alias("point_wkt"),
                col("value")["city"].cast("STRING").alias("city"),
                col("value")["state"].cast("STRING").alias("state"),
                col("value")["zip"].cast("STRING").alias("zip"),
                col("value")["country"].cast("STRING").alias("country")
               ) \

    locations_df = locations_df.with_column("geo_point", sql_expr("TO_GEOGRAPHY(point_wkt)")
)
    
    # generate sequence id for locations
    window_spec = Window.partition_by("nct_id").order_by("facility")
    locations_df = locations_df.with_column("location_id", row_number().over(window_spec))

    # drop helper column
    locations_df = locations_df.drop("point_wkt")

    # Save as permanent table
    locations_df.write.save_as_table("clinical_trials_locations_normalized", mode="overwrite", table_type="")
    locations_df.show()

    print("✅ Locations flattened and saved.")

In [None]:
def flatten_clinical_trial_design(session):
    # Core Table (merge high-level modules)
    design_core_df = session.table("clinical_trial_design_int")
    interventions_df = session.table("clinical_trial_arms_interventions_int")
    eligibility_df = session.table("clinical_trial_eligibility_int")

    design_core_df = design_core_df \
        .select("nct_id", flatten("study_type"), "phases", "enrollment_info", "design_info") \
        .select("nct_id", col("value").cast("STRING").alias("study_type"), "phases", "enrollment_info", "design_info") \
        .select("nct_id", "study_type", flatten("phases"), "enrollment_info", "design_info") \
        .select("nct_id", "study_type", col("value").cast("STRING").alias("phases"), "enrollment_info", "design_info") \
        .select("nct_id", "study_type", "phases", flatten("enrollment_info"), "design_info") \
        .select("nct_id", "study_type", "phases", col("value")["count"].cast("NUMERIC").alias("enrollment_info"), "design_info") \
        .select("nct_id", "study_type", "phases", "enrollment_info", flatten("design_info")) \
        .select("nct_id", "study_type", "phases", "enrollment_info", 
                col("value")["interventionModel"].cast("STRING").alias("intervention_model"), 
                col("value")["maskingInfo"]["masking"].cast("STRING").alias("masking_info"), 
                col("value")["primaryPurpose"].cast("STRING").alias("primary_purpose") 
               )

    interventions_core_df = interventions_df \
        .select("nct_id", flatten("interventions")) \
        .select("nct_id",  
                col("value")["type"].cast("STRING").alias("intervention_type"), 
                col("value")["description"].cast("STRING").alias("intervention_description"), 
                col("value")["name"].cast("STRING").alias("intervention_name")
               ) 
    
    interventions_other_df = interventions_df \
        .select("nct_id", flatten("interventions")) \
        .select("nct_id",                
                col("value")["otherNames"].alias("intervention_other_names"),
               ) \
        .select("nct_id", flatten("intervention_other_names")) \
        .select("nct_id", col("value").cast("STRING").alias("intervention_other_names")) 
    
    other_agg_df = interventions_other_df \
        .group_by("nct_id") \
        .agg(listagg(col("intervention_other_names"), ", ").alias("intervention_other_names")) 

    interventions_df = interventions_core_df.join(other_agg_df, on="nct_id", how="inner")

    eligibility_df = eligibility_df \
        .select("nct_id", flatten("eligibility_criteria"), "healthy_volunteers", "sex", "min_age", "max_age") \
        .select("nct_id", col("value").cast("STRING").alias("eligibility_criteria"), "healthy_volunteers", "sex", "min_age", "max_age") \
        .select("nct_id", "eligibility_criteria", flatten("healthy_volunteers"), "sex", "min_age", "max_age") \
        .select("nct_id", "eligibility_criteria", col("value").cast("BOOLEAN").alias("healthy_volunteers"), "sex", "min_age", "max_age") \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", flatten("sex"), "min_age", "max_age") \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", col("value").cast("STRING").alias("sex"), "min_age", "max_age") \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", "sex", flatten("min_age"), "max_age") \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", "sex", 
                split(col("value"), lit(" "))[0].cast("STRING").alias("min_age"), "max_age") \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", "sex", "min_age", flatten("max_age")) \
        .select("nct_id", "eligibility_criteria", "healthy_volunteers", "sex", "min_age", 
                split(col("value"), lit(" "))[0].cast("STRING").alias("max_age")
               )

    design_df = design_core_df \
        .join(interventions_df, "nct_id", "inner") \
        .join(eligibility_df, "nct_id", "inner") 
      
    
    design_df.write.save_as_table("clinical_trial_design_normalized", mode="overwrite", table_type="")
    design_df.show()

    print("✅ Clinical trial design flattened and saved.")


In [None]:
def flatten_design_outcomes(session):
    df = session.table("clinical_trial_outcomes_int")

    outcomes_df = df.select("nct_id", "primary_outcomes", "secondary_outcomes") 

    # Flatten primary outcomes with type column    
    primary_df = outcomes_df.select("nct_id", flatten("primary_outcomes")) \
        .select(
            "nct_id",
            lit("primary").alias("outcome_type"),
            col("value")["measure"].cast("STRING").alias("measure"),
            col("value")["description"].cast("STRING").alias("description"),
            col("value")["timeFrame"].cast("STRING").alias("time_frame")           
        ) 

    # Flatten secondary outcomes with type column    
    secondary_df = outcomes_df.select("nct_id", flatten("secondary_outcomes")) \
        .select(
            "nct_id",
            lit("secondary").alias("outcome_type"),
            col("value")["measure"].cast("STRING").alias("measure"),
            col("value")["description"].cast("STRING").alias("description"),
            col("value")["timeFrame"].cast("STRING").alias("time_frame")           
        )     

    # Union both
    all_outcomes_df = primary_df.union_all(secondary_df)

    # generate sequence id for outcomes
    window_spec = Window.order_by("nct_id")
    all_outcomes_df = all_outcomes_df.with_column("outcome_id", row_number().over(window_spec))

    # Drop columns in main dataframe
    outcomes_df = outcomes_df.drop("primary_outcomes", "secondary_outcomes")

    # Join dataframes
    all_outcomes_final_df = all_outcomes_df.join(outcomes_df, on="nct_id", how="inner")
        
    # Save as permanent table
    all_outcomes_final_df.write.save_as_table("clinical_trials_design_outcomes_normalized", mode="overwrite", table_type="")
    all_outcomes_final_df.show()

    print("✅ Design outcomes flattened and saved.")

In [None]:
def flatten_baseline_measures(session):
    df = session.table("clinical_trial_results_baseline_characteristics_int")

    baseline_measures_df = df.select("nct_id", flatten("measures")) \
        .select(
            "nct_id",
            col("value")["classes"].alias("classes"),
            col("value")["title"].cast("STRING").alias("param_title"),
            col("value")["paramType"].cast("STRING").alias("param_type"),
            col("value")["unitOfMeasure"].cast("STRING").alias("param_unit_of_measure")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", flatten("classes")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", 
                col("value")["title"].cast("STRING").alias("class_title"),
                col("value")["categories"].alias("categories")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", "class_title", flatten("categories")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", "class_title",
                col("value")["title"].cast("STRING").alias("category_title"),
                col("value")["measurements"].alias("measurements")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", "class_title",
                "category_title", flatten("measurements")) \
        .select("nct_id", "param_title", "param_type", "param_unit_of_measure", "class_title",
                "category_title",
                col("value")["groupId"].cast("STRING").alias("group_id"),
                col("value")["value"].cast("NUMERIC").alias("value"),
                col("value")["lowerLimit"].cast("NUMERIC").alias("lower_limit"),
                col("value")["upperLimit"].cast("NUMERIC").alias("upper_limit")
               ) 
    # generate sequence id for measures
    window_spec = Window.partition_by("nct_id").order_by("param_title")
    baseline_measures_df = baseline_measures_df.with_column("measure_id", dense_rank().over(window_spec))  

    # alternative
    # session.sql("""CREATE OR REPLACE SEQUENCE baseline_measure_id_seq START = 1""").collect()
    #.with_column("measure_id", sql_expr("measure_id_seq.nextval")) \

       
    # Save as permanent table
    baseline_measures_df.write.save_as_table("clinical_trials_baseline_measures_normalized", mode="overwrite", table_type="")
    baseline_measures_df.show()

    print("✅ Baseline measures flattened and saved.")


In [None]:
def flatten_outcome_measures(session):
    df = session.table("clinical_trial_results_outcome_measures_int")

    outcome_measures_df = df.select("nct_id", flatten("outcome_measures")) \
        .select(
            "nct_id",
            col("value")["type"].cast("STRING").alias("outcome_type"),
            col("value")["title"].cast("STRING").alias("outcome_title"),
            col("value")["description"].cast("STRING").alias("outcome_description"),
            col("value")["reportingStatus"].cast("STRING").alias("reporting_status"),
            col("value")["timeFrame"].cast("STRING").alias("time_frame"),
            col("value")["populationDescription"].cast("STRING").alias("population_description"),
            col("value")["paramType"].cast("STRING").alias("param_type"),
            col("value")["unitOfMeasure"].cast("STRING").alias("param_unit_of_measure"),
            col("value")["denoms"].alias("denoms"),
            col("value")["classes"].alias("classes")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", flatten("denoms"), "classes") \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                col("value")["counts"].alias("denoms_counts"),
                "classes") \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                flatten("denoms_counts"), "classes") \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                col("value")["groupId"].cast("STRING").alias("denoms_group_id"),
                col("value")["value"].cast("NUMERIC").alias("denoms_value"),
                "classes") \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", flatten("classes")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value",                 
                col("value")["title"].cast("STRING").alias("class_title"),
                col("value").alias("categories")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", flatten("categories")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", col("value").alias("measurements")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", flatten("measurements")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", 
                col("value")["measurements"].alias("measurement")
               ) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", flatten("measurement")) \
        .select("nct_id", "outcome_type", "outcome_title", "outcome_description", 
                "reporting_status", "time_frame", 
                "population_description", "param_type", "param_unit_of_measure", 
                "denoms_group_id", "denoms_value", "class_title", 
                col("value")["groupId"].cast("STRING").alias("measurement_group_id"),
                col("value")["value"].cast("NUMERIC").alias("measurement_value")                                             
               )   

    # generate sequence id for measures
    window_spec = Window.order_by("nct_id", "outcome_type", "outcome_title")
    outcome_measures_df = outcome_measures_df.with_column("measure_id", row_number().over(window_spec))   
    
    # Save as permanent table
    outcome_measures_df.write.save_as_table("clinical_trials_outcome_measures_normalized", mode="overwrite", table_type="")
    outcome_measures_df.show()

    print("✅ Outcome measures flattened and saved.")


In [None]:
def flatten_adverse_events(session):
    df = session.table("clinical_trial_results_adverse_events_int")

    adverse_events_df = df.select("nct_id", flatten("description"), "time_frame", "frequency_threshold", "event_groups",
                                 "serious_events", "other_events") \
        .select(
            "nct_id", 
            col("value").cast("STRING").alias("time_frame"), "frequency_threshold", "event_groups",
            "serious_events", "other_events") \
        .select(
            "nct_id", 
            "time_frame", flatten("frequency_threshold"), "event_groups",
            "serious_events", "other_events") \
        .select(
            "nct_id", 
            "time_frame", col("value").cast("NUMERIC").alias("frequency_threshold"), "event_groups",
            "serious_events", "other_events") \
        .select(
            "nct_id", 
            "time_frame", "frequency_threshold", flatten("event_groups"),
            "serious_events", "other_events") \
        .select(
            "nct_id", 
            "time_frame", "frequency_threshold", 
            col("value")["id"].cast("STRING").alias("adverse_event_group_id"),
            col("value")["description"].cast("STRING").alias("adverse_event_description"),
            col("value")["deathsNumAtRisk"].cast("NUMERIC").alias("overall_participants_at_risk"),
            col("value")["seriousNumAffected"].cast("NUMERIC").alias("serious_adverse_events_affected_participants"),
            col("value")["otherNumAffected"].cast("NUMERIC").alias("other_adverse_events_affected_participants"),
            col("value")["deathsNumAffected"].cast("NUMERIC").alias("death_affected_participants"),
            "serious_events", "other_events") 

    # Flatten serious events with type column    
    serious_events_df = adverse_events_df.select("nct_id", flatten("serious_events")) \
        .select(
            "nct_id",
            lit("serious").alias("adverse_event_type"),
            col("value")["assessmentType"].cast("STRING").alias("assessment_type"),
            col("value")["organSystem"].cast("STRING").alias("organ_system"),
            col("value")["term"].cast("STRING").alias("term"),
            col("value")["sourceVocabulary"].cast("STRING").alias("source_vocabulary"),
            col("value")["notes"].cast("STRING").alias("notes"),
            col("value")["stats"].alias("stats")
        ) \
        .select("nct_id", "adverse_event_type", "assessment_type", "organ_system", "term", 
                "source_vocabulary", "notes", flatten("stats")
        ) \
        .select("nct_id", "adverse_event_type", "assessment_type", "organ_system", 
                "term", "source_vocabulary", "notes", 
                col("value")["groupId"].cast("STRING").alias("adverse_event_group_id"),
                col("value")["numAffected"].cast("STRING").alias("affected_participants")
        ) \

    # Flatten other events with type column    
    other_events_df = adverse_events_df.select("nct_id", flatten("other_events")) \
        .select(
            "nct_id",
            lit("other").alias("adverse_event_type"),
            col("value")["assessmentType"].cast("STRING").alias("assessment_type"),
            col("value")["organSystem"].cast("STRING").alias("organ_system"),
            col("value")["term"].cast("STRING").alias("term"),
            col("value")["sourceVocabulary"].cast("STRING").alias("source_vocabulary"),
            col("value")["notes"].cast("STRING").alias("notes"),
            col("value")["stats"].alias("stats")
        ) \
        .select("nct_id", "adverse_event_type", "assessment_type", "organ_system", "term", 
                "source_vocabulary", "notes", flatten("stats")
        ) \
        .select("nct_id", "adverse_event_type", "assessment_type", "organ_system", 
                "term", "source_vocabulary", "notes", 
                col("value")["groupId"].cast("STRING").alias("adverse_event_group_id"),
                col("value")["numAffected"].cast("STRING").alias("affected_participants")
        ) \

    # Union both
    all_events_df = serious_events_df.union_all(other_events_df)

    # generate sequence id for adverse events
    window_spec = Window.order_by("nct_id", "term")
    all_events_df = all_events_df.with_column("adverse_event_id", row_number().over(window_spec))   

    # Drop columns in main dataframe
    adverse_events_df = adverse_events_df.drop("serious_events", "other_events")

    # Join dataframes
    adverse_events_final_df = all_events_df.join(
        adverse_events_df,
        on=["nct_id", "adverse_event_group_id"],
        how="left"
    )
        
    # Save as permanent table
    adverse_events_final_df.write.save_as_table("clinical_trials_adverse_events_normalized", mode="overwrite", table_type="")
    adverse_events_final_df.show()

    print("✅ Adverse events flattened and saved.")


In [None]:
def flatten_limitations_caveats(session):
    df = session.table("clinical_trial_results_limitations_caveats_int")

    limitations_df = df.select("nct_id", flatten("limitations_caveats")) \
            .select("nct_id", 
                    col("value")["description"].cast("STRING").alias("description")
                   ) 

    # Save as permanent table
    limitations_df.write.save_as_table("clinical_trials_limitations_normalized", mode="overwrite", table_type="")
    limitations_df.show()

In [None]:
def flatten_documents(session):
    df = session.table("clinical_trial_documentation_int")
    url = lit('https://cdn.clinicaltrials.gov/large-docs/60/NCT02953860/')

    documentation_df = df.select("nct_id", flatten("clinical_documentation_SAP_ICF_Large_PDFs")) \
            .select("nct_id", 
                    col("value")["filename"].cast("STRING").alias("filename"),              
                    col("value")["size"].cast("NUMERIC").alias("size"),
                    col("value")["typeAbbrev"].cast("STRING").alias("type_abbreviation"),
                    col("value")["label"].cast("STRING").alias("label"),
                    col("value")["date"].cast("DATE").alias("created_date"),
                    col("value")["uploadDate"].cast("TIMESTAMP").alias("upload_date")
                   ) \
            .select("nct_id", "filename", concat(url, col("filename")).alias("url"), "size",
                   "type_abbreviation", "label", "created_date", "upload_date")

    # Save as permanent table
    documentation_df.write.save_as_table("clinical_trials_documents_normalized", mode="overwrite", table_type="")
    documentation_df.show()

    print("✅ Documents flattened and saved.")

In [None]:
def normalize_clinical_trials(session):
    flatten_clinical_trial_core(session)
    flatten_locations(session)
    flatten_clinical_trial_design(session)
    flatten_design_outcomes(session)
    flatten_baseline_measures(session)
    flatten_outcome_measures(session)
    flatten_adverse_events(session)
    flatten_limitations_caveats(session)
    flatten_documents(session)

    print("✅ Clinical trials normalized successfully.")



In [None]:
normalize_clinical_trials(session)
