In [3]:
print("Hello World")

Hello World


In [12]:
#----DB config, fetch and insert
import psycopg2
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
#from tqdm import tqdm
from datetime import datetime, timezone
#import datetime  # module
#from datetime import datetime as dt  # class, aliased to avoid conflict
#from datetime import datetime as tz
#import datetime
import pprint
import dateutil.parser
import json
import logging
import traceback

# Path to your service account JSON key
#key_path = "/Users/toniventura/keys/bq_key.json" 
key_path = "C:\\Users\\tonim\\keys\\bq_key.json"

# Create credentials and BigQuery client
#credentials = service_account.Credentials.from_service_account_file(key_path)
credentials = service_account.Credentials.from_service_account_file(key_path)

# Postgres config
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    #"database": "fhir",
    "database" : "FHIR_staging",
    #"user": "toniventura",
    "user": "postgres",
    #"password": "fhir_project"
    "password": "new_password"
}
#records =[]

# BigQuery config
#BQ_PROJECT = "your-gcp-project"
BQ_PROJECT = "fhir-synthea-data"
#BQ_DATASET = "fhir_curated"
BQ_DATASET = "fhir_curated_sample"
#client = bigquery.Client(project=BQ_PROJECT)
client = bigquery.Client(project="fhir-synthea-data", credentials=credentials)
#client = bigquery.Client(project=BQ_PROJECT, credentials=credentials)
dataset_ref = bigquery.Dataset(f"{BQ_PROJECT}.{BQ_DATASET}")

logging.basicConfig(
    filename="etl_eobs.log",
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

# Helper: fetch staged data
def fetch_staged_data(table, batch_size=10000):
    try:
        conn = psycopg2.connect(**PG_CONFIG)
        cur = conn.cursor()
        #cur.execute(f"SELECT * FROM fhir_staging.{table}")
        #cur.execute("SELECT * FROM fhir_staging.patients_fhir_raw LIMIT 5;")
        cur.execute(f"SELECT * FROM fhir_staging_sample.{table}")
        #cur.execute(f"SELECT * FROM fhir_staging_sample.{table};")
        while True:
            rows = cur.fetchmany(batch_size)
            logging.info(f"rows: {rows}")
            if not rows:
                break
            yield rows
        cur.close()
        conn.close()
    except Exception as e:
        logging.error(f"Postgres connection or query failed: {e}")
        


# Helper: insert dataframe into BigQuery
def insert_to_bq(df, table_name):
    table_id = f"{BQ_PROJECT}.{BQ_DATASET}.{table_name}"
    job = client.load_table_from_dataframe(df, table_id)
    job.result()  # wait for completion


def safe_bq_timestamp(dt):
    if not dt:
        return None
    return dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")






In [13]:
def transform_observations(rows):
    records = []
    for r in rows:
        rid, resource = r[1], r[2]

        # Initialize columns
        codings  = resource.get("code", {}).get("coding", [])
        code_text = resource.get("code", {}).get("text")

        status = resource.get("status") if codings else None
        system = codings[0].get("system")if codings else None
        code = codings[0].get("code") if codings else None
        
        codings_struct = [
            {
                "system" : c.get("system"), 
                "code" : c.get("code"), 
                "display": c.get("display")
                
            }for c in codings
        ]

        value_numeric = None
        unit = None
        value_text = None
        value_codings = []

        if "valueQuantity" in resource:
            q = resource["valueQuantity"]
            value_numeric = q.get("value")
            unit = q.get("unit")
            # system/code available but usually redundant here

        elif "valueString" in resource:
            value_text = resource["valueString"]

        elif "valueCodeableConcept" in resource:
            cc = resource["valueCodeableConcept"]
            # Store text in value_text for quick querying
            value_text = cc.get("text")
            # Capture codings for full fidelity
            for c in cc.get("coding", []):
                value_codings.append({
                "system": c.get("system"),
                "code": c.get("code"),
                "display": c.get("display")
             })

        elif "valueDateTime" in resource:
            value_text = resource["valueDateTime"]  # or a dedicated column

        elif "valuePeriod" in resource:
            value_text = json.dumps(resource["valuePeriod"])  # or expand into start/end columns

        patient_id_ref = resource.get("subject", {}).get("reference")
        patient_id = patient_id_ref.split(":")[-1]

        encounter_id_ref = resource.get("encounter",{}).get("reference")
        encounter_id = encounter_id_ref.split(":")[-1]

        effective_date_str = resource.get("effectiveDateTime")
        effective_date = None

        if effective_date_str:
            effective_datetime = dateutil.parser.isoparse(effective_date_str)
        
        records.append({
            "observation_id": rid,
            "status" : status,
            "obs_code": code,
            "system": system,
            "obs_code_text": code_text,
            "codings": codings_struct,
            "value_numeric": value_numeric, 
            "value_text" : value_text,
            "unit" : unit,
            "value_codings" : value_codings, 
            "patient_id" : patient_id,
            "encounter_id" : encounter_id,
            "effective_datetime": effective_date, #not inserting
            "load_timestamp": datetime.now(timezone.utc) 
        })

    return pd.DataFrame(records)


# Main ETL loop
def etl_observations():
    try:
        for batch in fetch_staged_data("observations_fhir_raw"):
            df = transform_observations(batch)
            if not df.empty:
                insert_to_bq(df, "observations")
                print("***Inserted batch***")
    except Exception as e:
        print(f"Error in observations ETL: {e}")

if __name__ == "__main__":
    etl_observations()




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***




***Inserted batch***


In [5]:
import socket

socket.gethostbyname("bigquery.googleapis.com")

'142.250.113.95'

In [6]:
from google.api_core.retry import Retry
retry = Retry(deadline=60)  # retry for up to 60 seconds
client = bigquery.Client(retry=retry)


TypeError: Client.__init__() got an unexpected keyword argument 'retry'