In [None]:
#----Patient ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
#from tqdm import tqdm
from datetime import datetime, timezone
#import datetime  # module
#from datetime import datetime as dt  # class, aliased to avoid conflict
#from datetime import datetime as tz
#import datetime
import json

# Path to your service account JSON key
key_path = "/Users/toniventura/keys/bq_key.json" 

# Create credentials and BigQuery client
#credentials = service_account.Credentials.from_service_account_file(key_path)
credentials = service_account.Credentials.from_service_account_file(key_path)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "fhir",
    "user": "toniventura",
    "password": "fhir_project"
}
#records =[]

# BigQuery config
#BQ_PROJECT = "your-gcp-project"
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
#client = bigquery.Client(project=BQ_PROJECT)
client = bigquery.Client(project="fhir-synthea-data", credentials=credentials)
#client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)
dataset_ref = bigquery.Dataset(f"{BQ_PROJECT}.{BQ_DATASET}")

# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        #cur.execute(f"SELECT * FROM fhir_staging.{table}")
        cur.execute("SELECT * FROM fhir_staging.patients_fhir_raw LIMIT 5;")
        while True:
            rows = cur.fetchmany(batch_size)
            print(f"rows: {rows}")
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")
        


# Helper: insert dataframe into BigQuery
def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()  # wait for completion

# Example: Transform & load Patients
def transform_patients(rows):
    #print("transforming data")
    records = []
    print(f"rows: {len(rows)}")
    for r in rows:
        print("inside loop")
        rid, resource = r[1], r[2] # adjust index if needed
        #print(f"rid: {rid}")
        #print(f"resource: {resource}")
        #birth_date = resource.get("birthdate")
        records.append({
            "patient_id": rid,
            "first_name": resource.get("name", [{}])[0].get("given", [""])[0],
            "last_name": resource.get("name", [{}])[0].get("family", ""),
            #"birth_date": datetime.date.fromisoformat(resource.get("birthDate"))
                #if resource.get("birthDate") else None,
            "birth_date": datetime.fromisoformat(resource.get("birthDate"))
                if resource.get("birthDate") else None,
            "gender": resource.get("gender"),
            #"load_timestamp" : datetime.datetime.utcnow()
            "load_timestamp": datetime.now(timezone.utc)
        })
    return pd.DataFrame(records)

# Main ETL loop
def etl_patients():
    try:
        for batch in tqdm(fetch_staged_data("patients_fhir_raw")):
            df = transform_patients(batch)
            if not df.empty:
                insert_to_bq(df, "patients")
                print("***Inserting***")
    except Exception as e:
        print(f"Error in patients ETL: {e}")

if __name__ == "__main__":
    etl_patients()




In [None]:
#---Practitioner ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint

# Path to your service account JSON key
key_path = "/Users/toniventura/keys/bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "fhir",
    "user": "toniventura",
    "password": "fhir_project"
}

# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 5;")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

# Helper: insert dataframe into BigQuery
def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()

# Transform Practitioners
def transform_practitioners(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]  # adjust index if needed

        #Initialize variables so they exist even if not found
        npi = None
        license_number = None
        other_ids = []

    for ident in resource.get("identifier", []):
        # Debugging: show the whole identifier object
        pprint.pprint(ident)

        system = ident.get("system")
        value = ident.get("value")

        print(f"system: {system}")
        print(f"value: {value}")

        if system == "http://hl7.org/fhir/sid/us-npi":
            npi = value
        elif system == "http://example.org/license-number":
            license_number = value
        else:
            other_ids.append(value)


        print("---- Results ----")
        print(f"NPI: {npi}")
        print(f"License Number: {license_number}")
        print(f"Other IDs: {other_ids}")
        name_info = resource.get("name", [{}])[0]

        records.append({
            "practitioner_id": rid,
            "first_name": name_info.get("given", [""])[0],
            "last_name": name_info.get("family", ""),
            "prefix": name_info.get("prefix", [""])[0] if name_info.get("prefix") else None,
            "gender": resource.get("gender"),
            #"birth_date": datetime.fromisoformat(resource.get("birthDate")) if resource.get("birthDate") else None,
            "npi": npi,
            "license_number": license_number,
            "primary_email": next((t.get("value") for t in resource.get("telecom", []) if t.get("system") == "email"), None),
            "primary_phone": next((t.get("value") for t in resource.get("telecom", []) if t.get("system") == "phone"), None),
            "load_timestamp": datetime.now(timezone.utc)
        })
    return pd.DataFrame(records)

# Main ETL loop
def etl_practitioners():
    try:
        for batch in fetch_staged_data("practitioners_fhir_raw"):
            df = transform_practitioners(batch)
            if not df.empty:
                insert_to_bq(df, "practitioners")
                print("***Inserted batch***")
    except Exception as e:
        print(f"Error in practitioners ETL: {e}")

if __name__ == "__main__":
    etl_practitioners()


{'system': 'http://hl7.org/fhir/sid/us-npi', 'value': '9999928192'}
system: http://hl7.org/fhir/sid/us-npi
value: 9999928192
---- Results ----
NPI: 9999928192
License Number: None
Other IDs: []
***Inserted batch***


In [None]:
#---Practitioner Roles ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint

# Path to your service account JSON key
key_path = "/Users/toniventura/keys/bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "fhir",
    "user": "toniventura",
    "password": "fhir_project"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()

def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 5;")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

# Helper: insert dataframe into BigQuery
def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()

# Transform Practitioners
def transform_practitioner_roles(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # Initialize columns
        specialty_text = resource.get("specialty", {})[0].get("text")
        #specialty_code = resource.get("specialty", {})[0].get("code",[{}][0].get("coding",[{}][0].get("code")))
        specialty_code = resource.get("specialty", [{}])[0].get("coding", [{}])[0].get("code")
        role_text = resource.get("code", [{}])[0].get("text")
        role_code = resource.get("code", [{}])[0].get("coding", [{}])[0].get("code")
        
        print(specialty_code)
        print(specialty_text)
        print(role_text)
        print(role_code)

        

        records.append({
            "practitioner_role_id": rid,
            "practitioner_npi": resource.get("practitioner", {}).get("identifier").get("value"),
            "organization_id": resource.get("organization", {}).get("identifier").get("value"),
            "specialty_code": specialty_code,
            "specialty_text": specialty_text,
            "role_text" : role_text, 
            "role_code": role_code, 
            #                 if resource.get("telecom") and resource["telecom"][0].get("system")=="email" else None,
            #"role_text": resource.get("telecom", [{}])[0].get("value") 
                              #if resource.get("telecom") and resource["telecom"][0].get("system")=="phone" else None,
            "load_timestamp": datetime.now(timezone.utc)
        })

    return pd.DataFrame(records)


# Main ETL loop
def etl_practitioner_roles():
    try:
        for batch in fetch_staged_data("practitioner_roles_fhir_raw"):
            df = transform_practitioner_roles(batch)
            if not df.empty:
                insert_to_bq(df, "practitioner_roles")
                print("***Inserted batch***")
    except Exception as e:
        print(f"Error in practitioner roles ETL: {e}")

if __name__ == "__main__":
    etl_practitioner_roles()


208D00000X
General Practice Physician
General Practice Physician
208D00000X
208D00000X
General Practice Physician
General Practice Physician
208D00000X
208D00000X
General Practice Physician
General Practice Physician
208D00000X
208D00000X
General Practice Physician
General Practice Physician
208D00000X
208D00000X
General Practice Physician
General Practice Physician
208D00000X
***Inserted batch***


In [None]:
#---Observations ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint
import dateutil.parser  # optional, for robust ISO parsing

# Path to your service account JSON key
key_path = "/Users/toniventura/keys/bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "fhir",
    "user": "toniventura",
    "password": "fhir_project"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"Loaded {job.output_rows} rows to {table_id}")



# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 5;")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

# Transform Practitioners
def transform_observations(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # Initialize columns
        codings  = resource.get("code", {}).get("coding", [])
        code_text = resource.get("code", {}).get("text")

        status = resource.get("status") if codings else None
        system = codings[0].get("system")if codings else None
        code = codings[0].get("code") if codings else None
        
        codings_struct = [
            {
                "system" : c.get("system"), 
                "code" : c.get("code"), 
                "display": c.get("display")
                
            }for c in codings
        ]

        value_numeric = None
        unit = None
        value_text = None
        value_codings = []

        if "valueQuantity" in resource:
            q = resource["valueQuantity"]
            value_numeric = q.get("value")
            unit = q.get("unit")
            # system/code available but usually redundant here

        elif "valueString" in resource:
            value_text = resource["valueString"]

        elif "valueCodeableConcept" in resource:
            cc = resource["valueCodeableConcept"]
            # Store text in value_text for quick querying
            value_text = cc.get("text")
            # Capture codings for full fidelity
            for c in cc.get("coding", []):
                value_codings.append({
                "system": c.get("system"),
                "code": c.get("code"),
                "display": c.get("display")
             })

        elif "valueDateTime" in resource:
            value_text = resource["valueDateTime"]  # or a dedicated column

        elif "valuePeriod" in resource:
            value_text = json.dumps(resource["valuePeriod"])  # or expand into start/end columns

        patient_id_ref = resource.get("subject", {}).get("reference")
        patient_id = patient_id_ref.split(":")[-1]

        encounter_id_ref = resource.get("encounter",{}).get("reference")
        encounter_id = encounter_id_ref.split(":")[-1]

        effective_date_str = resource.get("effectiveDateTime")
        effective_date = None

        if effective_date_str:
            effective_datetime = dateutil.parser.isoparse(effective_date_str)
        
        records.append({
            "observation_id": rid,
            "status" : status,
            "obs_code": code,
            "system": system,
            "obs_code_text": code_text,
            "codings": codings_struct,
            "value_numeric": value_numeric, 
            "value_text" : value_text,
            "unit" : unit,
            "value_codings" : value_codings, 
            "patient_id" : patient_id,
            "encounter_id" : encounter_id,
            "effective_datetime": effective_date, #not inserting
            "load_timestamp": datetime.now(timezone.utc) 
        })

    return pd.DataFrame(records)


# Main ETL loop
def etl_observations():
    try:
        for batch in fetch_staged_data("observations_fhir_raw"):
            df = transform_observations(batch)
            if not df.empty:
                insert_to_bq(df, "observations")
                print("***Inserted batch***")
    except Exception as e:
        print(f"Error in observations ETL: {e}")

if __name__ == "__main__":
    etl_observations()


Loaded 5 rows to fhir-synthea-data.fhir_curated.observations
***Inserted batch***


In [21]:
#---Conditions ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint
import dateutil.parser  # optional, for robust ISO parsing

# Path to your service account JSON key
key_path = "/Users/toniventura/keys/bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "database": "fhir",
    "user": "toniventura",
    "password": "fhir_project"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"Loaded {job.output_rows} rows to {table_id}")



# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 5;")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

# Transform Practitioners
def transform_conditions(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # Initialize columns
        codings  = resource.get("code", {}).get("coding", [])
        code_text = resource.get("code", {}).get("text")

        status = resource.get("status") if codings else None
        system = codings[0].get("system")if codings else None
        code = codings[0].get("code") if codings else None
        
        codings_struct = [
            {
                "system" : c.get("system"), 
                "code" : c.get("code"), 
                "display": c.get("display")
                
            }for c in codings
        ]

        category = None
        category_text = None

        if resource.get("category"):
            first_category = resource["category"][0]["coding"][0]  # first category -> first coding
            category = first_category.get("code")
            category_text = first_category.get("display")
        else:
            category = None
            category_text = None

        # Nested array for full fidelity
        category_codings = []
        for cat in resource.get("category", []):
            for coding in cat.get("coding", []):
                category_codings.append({
                "system": coding.get("system"),
                "code": coding.get("code"),
                "display": coding.get("display")
             })

    
        patient_id_ref = resource.get("subject", {}).get("reference")
        patient_id = patient_id_ref.split(":")[-1]

        encounter_id_ref = resource.get("encounter",{}).get("reference")
        encounter_id = encounter_id_ref.split(":")[-1]

        onset_date_str = resource.get("effectiveDateTime")
        onset_date_time = None

        if onset_date_str:
            onset_date_time = dateutil.parser.isoparse(onset_date_str)
        
        records.append({
            "condition_id": rid,
            "clinical_status" : status,
            "code": code,
            "code_system": system,
            "code_text": code_text,
            "codings": codings_struct,
            "category_code": category,
            "category": category_text,
            "category_codings": category_codings,
            "patient_id" : patient_id,
            "encounter_id" : encounter_id,
            "onset_date": onset_date_time, #not inserting
            "load_timestamp": datetime.now(timezone.utc) 
        })

    return pd.DataFrame(records)


# Main ETL loop
def etl_conditions():
    try:
        for batch in fetch_staged_data("conditions_fhir_raw"):
            df = transform_conditions(batch)
            if not df.empty:
                insert_to_bq(df, "conditions")
                print("***Inserted batch***")
    except Exception as e:
        print(f"Error in conditions ETL: {e}")

if __name__ == "__main__":
    etl_conditions()


Loaded 5 rows to fhir-synthea-data.fhir_curated.conditions
***Inserted batch***


In [None]:
#---Claims ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint
import dateutil.parser  # optional, for robust ISO parsing
from google.cloud import bigquery

# Path to your service account JSON key
#key_path = "/Users/toniventura/keys/bq_key.json"
key_path = "C:\\Users\\tonim\\keys\\bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    #"database": "fhir",
    "database" : "FHIR_staging",
    #"user": "toniventura",
    "user": "postgres",
    #"password": "fhir_project"
    "password": "new_password"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"Loaded {job.output_rows} rows to {table_id}")


# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        cur.execute(f"SELECT * FROM fhir_staging_sample.{table};")
        while True:
            rows = cur.fetchmany(batch_size)
        
            if not rows:
                break
            resource = rows[0][2]  # assuming column 2 has JSON resource
            items_list = resource.get("item", [])
            print(f"Claim ID: {rows[0][1]}, Number of items: {len(items_list)}")
            if items_list:
                print("First item:", json.dumps(items_list[0], indent=2))
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

def safe_list_of_structs(input_list, required_keys):
    """Ensure a list of dicts is PyArrow-safe."""
    if not input_list or not isinstance(input_list, list):
        return [{k: None for k in required_keys}]
    cleaned = []
    for item in input_list:
        if not isinstance(item, dict):
            continue
        cleaned.append({k: item.get(k, None) for k in required_keys})
    return cleaned if cleaned else [{k: None for k in required_keys}]

def normalize_list_of_dicts(lst, keys):
    """Ensure lst is a list of dicts with all keys present, filling missing keys with None."""
    if not lst or not isinstance(lst, list):
        return []
    normalized = []
    for d in lst:
        if not isinstance(d, dict):
            continue
        normalized.append({k: d.get(k, None) for k in keys})
    return normalized

def transform_claims_bq(rows):
    records = []

    for r in rows:
        #rows = list(fetch_staged_data("claims_fhir_raw", batch_size=1))
        #resource = rows[0][2]
        #print("Items:", json.dumps(resource.get("item", []), indent=2))

        rid, resource = r[1], r[2]
        ##printresource.get("item", [])
        # --- Basic fields ---
        use = resource.get("use")
        status = resource.get("status")
        patient_ref = resource.get("patient", {}).get("reference")
        patient_id = patient_ref.split(":")[-1] if patient_ref else None

        # Claim type
        type_info = resource.get("type")
        claim_type = None
        if isinstance(type_info, dict):
            coding = type_info.get("coding")
            if coding:
                claim_type = coding[0].get("code")
        elif isinstance(type_info, list) and type_info:
            coding = type_info[0].get("coding")
            if coding:
                claim_type = coding[0].get("code")

        # --- Totals ---
        total_value = resource.get("total", {}).get("value")
        total_currency = resource.get("total", {}).get("currency")

        # --- Dates ---
        billable_start = resource.get("billablePeriod", {}).get("start")
        billable_end = resource.get("billablePeriod", {}).get("end")
        created = resource.get("created")

        # --- Provider ---
        provider_obj = resource.get("provider") or {}
        provider_reference = provider_obj.get("reference")
        provider_reference_id = provider_reference.split("|")[-1] if provider_reference else None
        provider_reference_type = provider_reference.split("?")[0] if provider_reference and "?" in provider_reference else None
        provider_display = provider_obj.get("display")

        '''billing_provider = safe_list_of_structs(
            [{"provider_reference_id": provider_id,
              "provider_reference_type": provider_type,
              "provider_display": provider_display}],
            ["provider_reference_id", "provider_reference_type", "provider_display"]
        )'''

        # --- Facility ---
        facility_json = resource.get("facility") or {}
        facility_reference_complete = facility_json.get("reference")
        facility_reference = facility_reference_complete.split("|")[0] if facility_reference_complete else None
        facility_id = facility_reference_complete.split("|")[-1] if facility_reference_complete else None
        facility_display = facility_json.get("display")

        facility = []
        if facility_json:
            facility.append({
                "facility_reference": facility_reference,
                "facility_id": facility_id,
                "facility_display": facility_display
            })

        # --- Priority ---
        priority_codings = resource.get("priority", {}).get("coding", [])
        priority_code = priority_codings[0].get("code") if priority_codings else None

        # --- Insurance ---
        insurances = resource.get("insurance", [])
        all_insurances = []
        for insurance in insurances:
            all_insurances.append({
                "sequence": insurance.get("sequence"),
                "focal": insurance.get("focal"),
                "coverage": insurance.get("coverage", {}).get("display")
            })

        ## --- Items ---
        items = resource.get("item", [])
        all_items = []
        for item in items:
            product = item.get("productOrService", {})
            coding = product.get("coding", [{}])[0] if product else {}
            location_coding = item.get("locationCodeableConcept", [{}])[0] if item.get("locationCodeableConcept") else {}

            encounter_array = item.get("encounter", [])
            encounter_ref = encounter_array[0].get("reference").split(":")[-1] if encounter_array else None

            net = item.get("net", {})
            all_items.append({
                "sequence": item.get("sequence"),
                "item_type": "diagnosis sequence" if item.get("diagnosisSequence") else (
                             "information sequence" if item.get("informationSequence") else (
                             "procedure sequence" if item.get("procedureSequence") else None)),
                "system": coding.get("system"),
                "code": coding.get("code"),
                "display": coding.get("display"),
                "service_start": item.get("servicePeriod", {}).get("start"),
                "service_end": item.get("servicePeriod", {}).get("end"),
                "net_value": net.get("value"),
                "net_currency": net.get("currency"),
                "location": [{
                    "facility_id": facility_id,
                    "system": location_coding.get("system"),
                    "code": location_coding.get("code"),
                    "display": location_coding.get("display")
                }] if location_coding else [],
                "encounter": encounter_ref,
                "item_text": item.get("text")
            })

         # --- Diagnoses ---
        diagnoses = resource.get("diagnosis", []) or []
        all_diagnosis = []
        for diag in diagnoses:
            diag_ref = diag.get("diagnosisReference", {}).get("reference")
            diag_id = diag_ref.split(":")[-1] if diag_ref else None
            all_diagnosis.append({
                "sequence": diag.get("sequence"),
                "diagnosis": diag_id
            })

        # --- Append record ---
        records.append({
            "claim_id": rid,
            "status": status,
            "use": use,
            "patient_id": patient_id,
            "claim_type_info": claim_type,
            "total_value": total_value,
            "total_currency": total_currency,
            "billable_start": billable_start,
            "billable_end": billable_end,
            "created": created,
            "provider_reference_id": provider_reference_id,
            "provider_reference_type": provider_reference_type,
            "provider_display": provider_display,
            "priority_code": priority_code,
            "facility": facility,
            "all_insurances": all_insurances,
            "diagnoses": all_diagnosis,
            "items": all_items,
            "load_timestamp": datetime.now(timezone.utc).isoformat()
        })
    return records

# Main ETL loop
def etl_claims():
    try:
        total_loaded = 0
        table_id = "fhir-synthea-data.fhir_curated_sample.claims"
         # Define schema (nested fields)
        job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("claim_id", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("status", "STRING"),
            bigquery.SchemaField("use", "STRING"),
            bigquery.SchemaField("patient_id", "STRING"),
            bigquery.SchemaField("claim_type_info", "STRING"),
            bigquery.SchemaField("total_value", "FLOAT"),
            bigquery.SchemaField("total_currency", "STRING"),
            bigquery.SchemaField("billable_start", "TIMESTAMP"),
            bigquery.SchemaField("billable_end", "TIMESTAMP"),
            bigquery.SchemaField("created", "TIMESTAMP"),
            bigquery.SchemaField("provider_reference_id", "STRING"),
            bigquery.SchemaField("provider_reference_type", "STRING"),
            bigquery.SchemaField("provider_display", "STRING"),
            bigquery.SchemaField("priority_code", "STRING"),
            bigquery.SchemaField("facility", "RECORD", mode="REPEATED", fields=[
            bigquery.SchemaField("facility_reference", "STRING"),
            bigquery.SchemaField("facility_id", "STRING"),
            bigquery.SchemaField("facility_display", "STRING"),
        ]),
        bigquery.SchemaField("all_insurances", "RECORD", mode="REPEATED", fields=[
            bigquery.SchemaField("sequence", "INTEGER"),
            bigquery.SchemaField("focal", "BOOLEAN"),
            bigquery.SchemaField("coverage", "STRING"),
        ]),
        bigquery.SchemaField("diagnoses","RECORD", mode="REPEATED", fields=[
            bigquery.SchemaField("sequence", "INTEGER"),
            bigquery.SchemaField("diagnosis", "STRING"),
        ]),
        bigquery.SchemaField("items", "RECORD", mode="REPEATED", fields=[
            bigquery.SchemaField("sequence", "INTEGER"),
            bigquery.SchemaField("item_type", "STRING"),
            bigquery.SchemaField("system", "STRING"),
            bigquery.SchemaField("code", "STRING"),
            bigquery.SchemaField("display", "STRING"),
            bigquery.SchemaField("service_start", "STRING"),
            bigquery.SchemaField("service_end", "STRING"),
            bigquery.SchemaField("net_value", "FLOAT"),
            bigquery.SchemaField("net_currency", "STRING"),
            bigquery.SchemaField("location", "RECORD", mode="REPEATED", fields=[
                bigquery.SchemaField("facility_id", "STRING"),
                bigquery.SchemaField("system", "STRING"),
                bigquery.SchemaField("code", "STRING"),
                bigquery.SchemaField("display", "STRING"),
            ]),
            bigquery.SchemaField("encounter", "STRING"),
            bigquery.SchemaField("item_text", "STRING"),
        ]),
        bigquery.SchemaField("load_timestamp", "TIMESTAMP", mode="REQUIRED"),
        ],
        )
        for batch in fetch_staged_data("claims_fhir_raw", batch_size=10000):
            '''df = transform_claims(batch)
            if not df.empty:
                insert_to_bq(df, "claims")
                print("***Inserted batch***")'''
            # Load data
            records = transform_claims_bq(batch)
            job = client.load_table_from_json(records, table_id, job_config=job_config)
            job.result()
            batch_count = len(records)
            total_loaded += batch_count
            print(f"Loaded {batch_count} rows this batch, total {total_loaded}")
        print(f"Finished ETL, total rows loaded: {total_loaded}")

    except Exception as e:
        print(f"Error in claims ETL: {e}")

if __name__ == "__main__":
    etl_claims()


Claim ID: 20665f6e-65c8-8461-bc42-f4c0661d92e0, Number of items: 3
First item: {
  "sequence": 1,
  "encounter": [
    {
      "reference": "urn:uuid:278214a4-ab02-994d-742e-a3ddccf4c33f"
    }
  ],
  "productOrService": {
    "text": "Emergency room admission (procedure)",
    "coding": [
      {
        "code": "50849002",
        "system": "http://snomed.info/sct",
        "display": "Emergency room admission (procedure)"
      }
    ]
  }
}
Loaded 10000 rows this batch, total 10000
Claim ID: 4d0a2124-8604-15e5-d3e7-115cf243bd14, Number of items: 1
First item: {
  "sequence": 1,
  "encounter": [
    {
      "reference": "urn:uuid:6713e71d-0d7a-dd7b-d2ac-a23577870bd1"
    }
  ],
  "productOrService": {
    "text": "Cisplatin 50 MG Injection",
    "coding": [
      {
        "code": "1736854",
        "system": "http://www.nlm.nih.gov/research/umls/rxnorm",
        "display": "Cisplatin 50 MG Injection"
      }
    ]
  }
}
Loaded 10000 rows this batch, total 20000
Claim ID: cd9eb34b-c

In [26]:
#---Organizations ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint
import dateutil.parser  # optional, for robust ISO parsing

# Path to your service account JSON key
#key_path = "/Users/toniventura/keys/bq_key.json"
key_path = "C:\\Users\\tonim\\keys\\bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    #"database": "fhir",
    "database" : "FHIR_staging",
    #"user": "toniventura",
    "user": "postgres",
    #"password": "fhir_project"
    "password": "new_password"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"Loaded {job.output_rows} rows to {table_id}")



# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        #cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 2000;")
        cur.execute(f"SELECT * FROM fhir_staging_sample.{table}")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")

# Transform Practitioners
def transform_organizations_bq(rows):
    print("transforming")
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # --- Basic fields ---
        name = resource.get("name")
        if not name:
            identifiers = resource.get("identifier", [])
            if identifiers and isinstance(identifiers, list):
                name = identifiers[0].get("value")
                
        print(f"name: {name}")

        active = resource.get("active")
        print(f"active: {active}")

        # --- Type ---
        org_type = None
        type_info = resource.get("type", [])
        if type_info and isinstance(type_info, list):
            coding = type_info[0].get("coding", [])
            if coding:
                org_type = coding[0].get("display")

        records.append({
            "organization_id": rid,
            "organization_name": name,
            "organization_type": org_type,
            "active": active,
            "load_timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
        })
    return records


# Main ETL loop
def etl_organizations():
    print("ETL organizations")
    try:
        for batch in fetch_staged_data("organizations_fhir_raw"):
            #table_id = "fhir-synthea-data.fhir_curated.organizations"
            table_id = "fhir-synthea-data.fhir_curated_sample.organizations"

            job_config = bigquery.LoadJobConfig(
                schema=[
                    bigquery.SchemaField("organization_id", "STRING", mode="REQUIRED"),
                    bigquery.SchemaField("organization_name", "STRING"),
                    bigquery.SchemaField("organization_type", "STRING"),
                    bigquery.SchemaField("active", "BOOL"),
                    bigquery.SchemaField("load_timestamp", "TIMESTAMP", mode="REQUIRED"),
                ]
            )

            records = transform_organizations_bq(batch)
            job = client.load_table_from_json(records, table_id, job_config=job_config)
            job.result()
            print(f"Loaded {len(records)} rows to {table_id}")

    except Exception as e:
        print(f"Error in organizations ETL: {e}")

if __name__ == "__main__":
    etl_organizations()


ETL organizations
transforming
name: Fitchburg Outpatient Clinic
active: True
name: Fitchburg Outpatient Clinic
active: None
name: BOSTON HEALTH CARE FOR THE HOMELESS PROGRAM INC
active: True
name: BOSTON HEALTH CARE FOR THE HOMELESS PROGRAM INC
active: None
name: EDWARD M KENNEDY COMMUNITY HEALTH CENTER INC
active: True
name: EDWARD M KENNEDY COMMUNITY HEALTH CENTER INC
active: None
name: ACTIVATED BY WELLNESS LLC
active: True
name: ACTIVATED BY WELLNESS LLC
active: None
name: NURSE ON CALL
active: True
name: NURSE ON CALL
active: None
name: CAPE HERITAGE REHABILITATION & HEALTH CARE CENTER
active: True
name: CAPE HERITAGE REHABILITATION & HEALTH CARE CENTER
active: None
name: UMASS MEMORIAL HEALTHALLIANCE CLINTON HOSPITAL INC
active: True
name: UMASS MEMORIAL HEALTHALLIANCE CLINTON HOSPITAL INC
active: None
name: VA Boston Healthcare System, West Roxbury Campus
active: True
name: VA Boston Healthcare System, West Roxbury Campus
active: None
name: DUFFY HEALTH CENTER
active: True
name

In [28]:
#---Encounters ETL
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
from datetime import datetime, timezone
import json
import pprint
import dateutil.parser  # optional, for robust ISO parsing

# Path to your service account JSON key
#key_path = "/Users/toniventura/keys/bq_key.json"
key_path = "C:\\Users\\tonim\\keys\\bq_key.json"

# Create credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file(key_path)
BQ_PROJECT = "fhir-synthea-data"
BQ_DATASET = "fhir_curated"
client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)

# Postgres config
# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    #"database": "fhir",
    "database" : "FHIR_staging",
    #"user": "toniventura",
    "user": "postgres",
    #"password": "fhir_project"
    "password": "new_password"
}

def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()
    print(f"Loaded {job.output_rows} rows to {table_id}")

# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        #cur.execute(f"SELECT * FROM fhir_staging.{table} LIMIT 2000;")
        cur.execute(f"SELECT * FROM fhir_staging_sample.{table}")
        while True:
            rows = cur.fetchmany(batch_size)
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        print(f"Postgres connection or query failed: {e}")
    
def safe_bq_timestamp(dt):
    if not dt:
        return None
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

# Transform Encounters
def transform_encounters_bq(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # --- Basic fields ---
        patient_ref = resource.get("subject", {}).get("reference").split(":")[-1]  # e.g., "Patient/123"
        org_ref = resource.get("serviceProvider", {}).get("reference").split("|")[-1]  # e.g., "Organization|456"
        encounter_class = resource.get("class",{}).get("code")
        encounter_status = resource.get("status")
        start = resource.get("period", {}).get("start")
        end = resource.get("period", {}).get("end")

        # --- Encounter Type ---
        type_code = None
        type_display = None
        type_info = resource.get("type", [])
        if type_info and isinstance(type_info, list):
            coding = type_info[0].get("coding", [])
            if coding:
                type_code = coding[0].get("code")
                type_display = coding[0].get("display")

        # Optional: parse timestamps safely
        try:
            start_ts = dateutil.parser.parse(start) if start else None
            end_ts = dateutil.parser.parse(end) if end else None
        except Exception:
            start_ts = end_ts = None

        records.append({
            "encounter_id": rid,
            "patient_id": patient_ref.split("/")[-1] if patient_ref else None,
            "organization_id": org_ref.split("/")[-1] if org_ref else None,
            "encounter_class": encounter_class,
            "encounter_status": encounter_status,
            "encounter_type_code": type_code,
            "encounter_type_display": type_display,
            "start_datetime": safe_bq_timestamp(start_ts),
            "end_datetime": safe_bq_timestamp(end_ts),
            "load_timestamp": safe_bq_timestamp(datetime.now(timezone.utc))
        })
    return records

# Main ETL loop
def etl_encounters():
    print("ETL encounters")
    try:
        for batch in fetch_staged_data("encounters_fhir_raw"):
            #table_id = "fhir-synthea-data.fhir_curated.encounter"
            table_id = "fhir-synthea-data.fhir_curated_sample.encounter"

            job_config = bigquery.LoadJobConfig(
                schema=[
                    bigquery.SchemaField("encounter_id", "STRING", mode="REQUIRED"),
                    bigquery.SchemaField("patient_id", "STRING"),
                    bigquery.SchemaField("organization_id", "STRING"),
                    bigquery.SchemaField("encounter_class", "STRING"),
                    bigquery.SchemaField("encounter_status", "STRING"),
                    bigquery.SchemaField("encounter_type_code", "STRING"),
                    bigquery.SchemaField("encounter_type_display", "STRING"),
                    bigquery.SchemaField("start_datetime", "TIMESTAMP"),
                    bigquery.SchemaField("end_datetime", "TIMESTAMP"),
                    bigquery.SchemaField("load_timestamp", "TIMESTAMP", mode="REQUIRED")
                ]
            )

            records = transform_encounters_bq(batch)
            job = client.load_table_from_json(records, table_id, job_config=job_config)
            job.result()
            print(f"Loaded {len(records)} rows to {table_id}")

    except Exception as e:
        print(f"Error in encounters ETL: {e}")

if __name__ == "__main__":
    etl_encounters()


ETL encounters
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows to fhir-synthea-data.fhir_curated_sample.encounter
Loaded 10000 rows t