# Steps to run expressions on sql table using python

## 1. We run the pipeline to generate the parquet files.

## 2. Spark client setup

The cell below sets up the Spark client and creates a new dataset that will be used for this analysis. It also creates a Spark view "runner", which is used to apply declarative views of FHIR in Spark.

In [1]:
import pandas

from sqlalchemy import dialects
from sqlalchemy import engine

from google.fhir.views import r4
from google.fhir.views import spark_runner

# The Spark dataset containing FHIR data. This may be read-only to the user.
fhir_dataset = 'default'

# The Spark dataset where we will create views, value sets, and other derived tables
# as needed. This must be writeable by the user. This will use the default project
# where this notebook is running.
analysis_dataset = 'statin_analysis_example'

dialects.registry.register('hive', 'pyhive.sqlalchemy_hive', 'HiveDialect')

# The endpoint of the Hive ThriftServer to connect to
query_engine = engine.create_engine('hive://localhost:10001/default')

# Create a runner to execute the views over Spark.
runner = spark_runner.SparkRunner(
    query_engine=query_engine,
    fhir_dataset=fhir_dataset,
    view_dataset=analysis_dataset,
    snake_case_resource_tables=True,
)

## 3. Create tables from the generated parquet files.

In [2]:
with query_engine.connect() as curs:
  curs.execute(f'DROP TABLE IF EXISTS {fhir_dataset}.questionnaire_response;')

  curs.execute(
      f'CREATE TABLE IF NOT EXISTS {fhir_dataset}.questionnaire_response USING PARQUET'
      f" LOCATION '/dwh/controller_DWH_TIMESTAMP_2023_07_19T15_00_27_224851400Z/QuestionnaireResponse/*';"
  )

In [4]:
with query_engine.connect() as curs:
  curs.execute(f'DROP TABLE IF EXISTS {fhir_dataset}.observation;')  
  curs.execute(f'DROP TABLE IF EXISTS {fhir_dataset}.immunization;')
  curs.execute(f'DROP TABLE IF EXISTS {fhir_dataset}.encounter;')


  curs.execute(
      f'CREATE TABLE IF NOT EXISTS {fhir_dataset}.observation USING PARQUET'
      f" LOCATION '/dwh/controller_DWH_TIMESTAMP_2023_07_07T10_38_24_324267300Z/Observation*';"
  )
  curs.execute(
      f'CREATE TABLE IF NOT EXISTS {fhir_dataset}.immunization USING PARQUET'
      f" LOCATION '/dwh/controller_DWH_TIMESTAMP_2023_07_07T10_38_24_324267300Z/Immunization*';"
  )
  curs.execute(
      f'CREATE TABLE IF NOT EXISTS {fhir_dataset}.encounter USING PARQUET'
      f" LOCATION '/dwh/controller_DWH_TIMESTAMP_2023_07_07T10_38_24_324267300Z/Encounter*';"
  ) 

## 4. Transform a column of items from a QuestionnaireResponse into separate columns in a   tabular format.

In [3]:
import pandas as pd
from sqlalchemy import create_engine

# Define your database connection string
db_connection_string = "hive://localhost:10001/default"

# Create a SQLAlchemy engine
engine = create_engine(db_connection_string)

# Define the query using triple quotes
query = '''
SELECT QR.id as qr_id,QR.encounter['encounterId'],
  FIRST(QR.questionnaire, true) AS questionnaire,
  FIRST((CASE item_1.linkId WHEN '1.0' THEN item_1.answer[0].value.coding.code ELSE NULL END), true) AS answer_1_1,
  FIRST((CASE item_2.linkId WHEN '2.2' THEN item_2.answer[0].value.dateTime ELSE NULL END), true) AS birth_time,
  FIRST((CASE item_3.linkId WHEN '3.1.1' THEN item_3.answer[0].value.decimal ELSE NULL END), true) AS birth_weight
FROM questionnaire_response AS QR
  LATERAL VIEW OUTER EXPLODE(QR.item) AS item_1
  LATERAL VIEW OUTER EXPLODE(item_1.item) AS item_2
  LATERAL VIEW OUTER EXPLODE(item_2.item) AS item_3
 GROUP BY QR.id, QR.encounter['encounterId'] ;
'''

# Use Pandas to read the query result into a DataFrame
questionnaireResponse_df = pd.read_sql_query(query, con=engine)


## 5.  Creating views of FHIR resources. 

In [5]:
# Load views based on the base FHIR R4 profile definitions.
views = r4.base_r4()

obs = views.view_of('Observation')
enr = views.view_of('Encounter')
imn = views.view_of('Immunization')

## 6.  Creating FHIRPath expressions to select specific items from FHIR resources


In [6]:
observation_df = runner.to_dataframe(
    obs.select(
        {
            "ob_id": obs.id,
            "ob_tag_code": obs.meta.tag.code,
            "ob_coding_code":obs.code.coding.code,
            "ob_coding_display":obs.code.coding.display,
            "ob_subject_ref": obs.subject.reference,
            "ob_encounter_ref": obs.encounter.reference,
            "ob_performer_ref": obs.performer.reference,
            "ob_value": obs.valueString,
            "ob_valueCodeableConcept": obs.valueCodeableConcept.coding.code,
            "ob_lastUpdated": obs.meta.lastUpdated
        }
    )
)
observation_df['ob_subject_ref'] = observation_df['ob_subject_ref'].apply(lambda x: x.split('/')[-1] if x else None)
observation_df['ob_encounter_ref'] = observation_df['ob_encounter_ref'].apply(lambda x: x.split('/')[-1] if x else None)

In [7]:
import datetime

encounter_df = runner.to_dataframe(
    enr.select(
        {
            "en_id": enr.id,
            "en_tag_code": enr.meta.tag.code,
            "en_subject_ref":enr.subject.reference,
            "en_start": enr.period.start,
            "en_end": enr.period.end,
            "en_serviceProv_ref":enr.serviceProvider.reference,
        }
    )
)

In [9]:
immunization_df = runner.to_dataframe(
    imn.select(
        {
            "im_id": imn.id,
            "im_tag_code": imn.meta.tag.code,
            "im_status": imn.status,
            "im_coding_code":imn.vaccineCode.coding.code,
            "im_coding_display":imn.vaccineCode.coding.display,
            "im_subject_ref": imn.patient.reference,
            "im_encounter_ref": imn.encounter.reference,
            "im_performer_ref": imn.performer.actor.reference,
            "im_lastUpdated": imn.meta.lastUpdated
        }
    )
)

# Extract only the identifier value from the im_subject_ref column
immunization_df['im_subject_ref'] = immunization_df['im_subject_ref'].apply(lambda x: x.split('/')[-1] if x else None)
immunization_df['im_encounter_ref'] = immunization_df['im_encounter_ref'].apply(lambda x: x.split('/')[-1] if x else None)

## 7. Filter Encounter resources based on start and end date.

In [8]:
import pandas as pd
import datetime



# Convert start column to datetime64[ns]
encounter_df['en_start'] = pd.to_datetime(encounter_df['en_start'])
encounter_df['en_end'] = pd.to_datetime(encounter_df['en_end'])


# Filter for records between the dates 2022-09-14 and 2022-09-20
# filtered_df = encounter_df[
#     (encounter_df['en_start'] >= pd.to_datetime(datetime.date(2022, 9, 14))) &
#     (encounter_df['en_end'] <= pd.to_datetime(datetime.date(2022, 9, 20)))
# ]

filtered_df = encounter_df[
    (encounter_df['en_start'] >= pd.to_datetime(datetime.date(2022, 9, 14))) &
    (encounter_df['en_end'] >= pd.to_datetime(datetime.date(2022, 9, 14))) &
    (encounter_df['en_end'] <= pd.to_datetime(datetime.date(2022, 9, 20) + pd.DateOffset(days=1)))  # Add 1 day to include the end date
]

# Set the display option to show complete data
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)

## 8. Joining Dataframes (Observation,Immunization,QuestionnaireResponse) with Encounter using Left Join.

In [10]:
# Perform left join between filtered_df and observation_df
merged_with_observation_df = pd.merge(filtered_df, observation_df, left_on='en_id', right_on='ob_encounter_ref', how='left')

# Perform left join between merged_df and immunization_df
merged_with_immunization_df = pd.merge(merged_with_observation_df, immunization_df, left_on='en_id', 
                                       right_on='im_encounter_ref', how='left')


merged_with_questionnaireResponse_df = pd.merge(merged_with_immunization_df, questionnaireResponse_df, 
                                                left_on='en_id', right_on='encounterId', how='left')



# Set the display option to show complete data
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.width', None)


## Sample JSON Definition

In [None]:
 [
 {
    "id": 1,
    "categoryId": "antenatal",
    "name": "Malaria Tests Positive",
    "description": "No. of Malaria test positive",
    "redYellow": "2",
    "yellowGreen": "5",
    "fhirPath": {
      "expression": "(ob_coding_code == '[\"B54\"]') & (ob_valueCodeableConcept == '[\"positive\"]') | 
        (ob_value == 'positive')"
    }
  },  
  {
    "id": 2,
    "categoryId": "antenatal",
    "name": "Malaria Tests Done",
    "description": "No. of Malaria test done",
    "redYellow": "2",
    "yellowGreen": "5",
    "fhirPath": {
      "expression":  "(ob_coding_code == '[\"B54\"]') & (ob_valueCodeableConcept == '[\"positive\"]') | 
        (ob_value == 'positive') + (ob_coding_code == '[\"B54\"]') & (ob_valueCodeableConcept == '[\"negative\"]') 
        | (ob_value == 'negative') + (ob_coding_code == '[\"B54\"]') & (ob_valueCodeableConcept == '[\"invalid\"]') 
        | (ob_value == 'invalid')"
    }
  },
  {
    "id": 3,
    "categoryId": "labour-and-delivery",
    "name": "Newborn Deaths",
    "description": "Newborn babies death",
    "redYellow": "!1",
    "yellowGreen": "!0",
    "fhirPath": {
      "expression": "(ob_coding_code == '[\"IPRD.DE67\"]')"
    }
  },
  {
    "id": 4,
    "categoryId": "child-growth-monitoring",
    "name": "Number of Children growth monitored",
    "description": "No. of Child growth monitored",
    "redYellow": "2",
    "yellowGreen": "5",
    "fhirPath": {
      "expression":  "(ob_coding_code == '[\"IPRD.DE63\"]')"
    }
  }
]

## 9. Loading JSON data from a file

In [130]:
import json
file_path = "C:/Repos/fhir-data-pipes/docker/dwh/controller_DWH_TIMESTAMP_2023_07_19T15_00_27_224851400Z/definitions.json"
# Open the file and load its contents
with open(file_path, "r") as json_file:
    data = json.load(json_file)

## 10. Running expressions in the flattened table.

In [131]:
import pandas as pd  # Import pandas if not already imported
import re

# # Load your DataFrame or create df_for_observation
df_for_observation = merged_with_questionnaireResponse_df.drop_duplicates(subset=['ob_id'])

# # Assuming your JSON data is loaded in the 'data' variable

# # Extract the expression from the JSON
# expression = data[0]["fhirPath"]["expression"]



for defn in data:
    expression = defn["fhirPath"]["expression"]
    if '+' in expression:
        split_expressions = [expr.strip() for expr in re.split(r'\s*\+\s*', expression)]
        total_sum = 0
        for exp in split_expressions:
            filtered_df = df_for_observation.eval(exp)
            total_sum += filtered_df.sum()
            filtered_df = pd.DataFrame()
        print(defn["name"],":",total_sum)    
    else:
        filtered_df = df_for_observation.eval(expression)
        print(defn["name"],":",filtered_df.sum())

    

# Evaluate the expression on df_for_observation
# filtered_df = df_for_observation.eval(expression)

# print("malaria positive count is", filtered_df)


Malaria Tests Positive : 26
Malaria Tests Done : 52
Newborn Deaths : 0
Number of Children growth monitored : 0


## Exporting Dataframes to csv

In [119]:
import shutil

# Keep only unique rows based on 'ob_id'
# final_df_unique = final_df.drop_duplicates(subset='ob_id')
df_for_observation = merged_with_questionnaireResponse_df.drop_duplicates(subset=['ob_id'])
# Save the DataFrame as CSV
df_for_observation.to_csv('obs.csv', index=False)

# Specify the download path
download_path = 'C:\\Users\\developer\\Downloads\\obs.csv'

# Move the CSV file to the download path
shutil.move('obs.csv', download_path)

'C:\\Users\\developer\\Downloads\\obs.csv'

## Running expressions in the flattened table manually.

In [120]:
df_for_observation = merged_with_questionnaireResponse_df.drop_duplicates(subset=['ob_id'])

filtered_df = df_for_observation[df_for_observation["ob_coding_code"] == '["IPRD.DE67"]']
count_result = filtered_df.shape[0]
print("newborn_deaths : ", count_result)


mpt_df = df_for_observation[(df_for_observation["ob_coding_code"] == '["B54"]') & (df_for_observation["ob_valueCodeableConcept"] == '["positive"]') | (df_for_observation["ob_value"] == 'positive')] 
mnt_df = df_for_observation[(df_for_observation["ob_coding_code"] == '["B54"]') & (df_for_observation["ob_valueCodeableConcept"] == '["negative"]') | (df_for_observation["ob_value"] == 'negative')]
mivd_df =  df_for_observation[(df_for_observation["ob_coding_code"] == '["B54"]') & (df_for_observation["ob_valueCodeableConcept"] == '["invalid"]') | (df_for_observation["ob_value"] == 'invalid')]

count_result = mpt_df.shape[0] + mnt_df.shape[0] + mivd_df.shape[0]
print("malaria tests done: ", count_result)
print("malaria positive: ", mpt_df.shape[0])

childgrowth_df = df_for_observation[df_for_observation["ob_coding_code"] == '["IPRD.DE63"]']
print("child growth monitored: ", childgrowth_df.shape[0])



newborn_deaths :  0
malaria tests done:  52
malaria positive:  26
child growth monitored:  0


In [49]:
df_for_immunization = merged_with_questionnaireResponse_df.drop_duplicates(subset=['im_id'])

# Filter out rows with missing values in the "im_coding_display" column
df_for_immunization_filtered = df_for_immunization.dropna(subset=['im_coding_display'])

# Use str.contains on the filtered DataFrame
routineImmunization = df_for_immunization_filtered[df_for_immunization_filtered["im_coding_display"].str.contains('routine-immunization')]
print("routineImmunization:", routineImmunization.shape[0])

routineImmunization: 0


In [49]:
df_for_questionnaireResponse = merged_with_questionnaireResponse_df.drop_duplicates(subset=['qr_id'])

postnatalMotherCare = df_for_questionnaireResponse[df_for_questionnaireResponse["questionnaire"] == "Questionnaire/post-natal-mother"].drop_duplicates(subset=['encounterId'])
print("Postnatal Mother Care: ", postnatalMotherCare.shape[0])

postnatalBabyCare =  df_for_questionnaireResponse[df_for_questionnaireResponse["questionnaire"] == "Questionnaire/post-natal-baby"].drop_duplicates(subset=['encounterId'])
print("Postnatal Baby Care: ", postnatalBabyCare.shape[0])

antenatalCare = (
    df_for_questionnaireResponse[df_for_questionnaireResponse["questionnaire"] == "Questionnaire/anc-visit"].drop_duplicates(subset=['encounterId']).shape[0] +
    df_for_questionnaireResponse[df_for_questionnaireResponse["questionnaire"] == "Questionnaire/anc-visit-v1"].drop_duplicates(subset=['encounterId']).shape[0] +
    df_for_questionnaireResponse[df_for_questionnaireResponse["questionnaire"] == "Questionnaire/vitals"].drop_duplicates(subset=['encounterId']).shape[0]
)
print("Antenatal Care: ", antenatalCare)

newbornRegistered = df_for_questionnaireResponse[
    (df_for_questionnaireResponse["questionnaire"] == "Questionnaire/delivery") &
    (df_for_questionnaireResponse["answer_1_1"] == 'live-birth')
]

print("Newborns Registered:", newbornRegistered.shape[0])


Postnatal Mother Care:  0
Postnatal Baby Care:  0
Antenatal Care:  58
Newborns Registered: 0
