# Bundle Resources
- Access Postgres DB
- Grab individual patients and their related resources
- Check size of bundles (before sending off)

In [None]:
# Import libraries
import numpy as np
import pandas as pd
import json
import psycopg2
import requests
import matplotlib
import base64
from pathlib import Path
import os
from dotenv import load_dotenv

In [None]:
# load environment varialbes
load_dotenv(load_dotenv(Path(Path.cwd()).parents[0] / '.env'))

SQLUSER = os.getenv('SQLUSER')
SQLPASS = os.getenv('SQLPASS')
DBNAME_MIMIC = os.getenv('DBNAME_MIMIC')
HOST = os.getenv('DBHOST')
FHIR_SERVER = os.getenv('FHIR_SERVER')
MIMIC_JSON_PATH = os.getenv('MIMIC_JSON_PATH')
FHIR_BUNDLE_ERROR_PATH = os.getenv('FHIR_BUNDLE_ERROR_PATH')
FHIR_SERVER = os.getenv('FHIR_SERVER')

# Connect to database
con = psycopg2.connect(dbname=DBNAME_MIMIC, user=SQLUSER, password=SQLPASS, host=HOST)

## Grab individual patient

In [None]:
q_patient = "SELECT * FROM mimic_fhir.patient LIMIT 5;"
patient1 = pd.read_sql_query(q_patient,con)

In [None]:
patient1.fhir[3]

## Grab patient plus encounter/condition

In [None]:
q_encounter = f"""SELECT * 
                FROM mimic_fhir.encounter 
                WHERE patient_id = '{patient1.id[1]}';"""
q_encounter
encounters = pd.read_sql_query(q_encounter,con)

In [None]:
encounters

In [None]:
q_condition = f"""SELECT * 
                FROM mimic_fhir.condition 
                WHERE patient_id = '{patient1.id[1]}';"""
q_condition
conditions = pd.read_sql_query(q_condition,con)

In [None]:
conditions

## Bundle basic resources for patient
- patient
- encoutner
- condition

In [None]:
class FhirBundle:
    def __init__(self,id):
        self.resourceType = 'Bundle'
        self.type = 'transaction'
        self.id = id
        self.entry = []
        
    def add_entry(self, resource, request):        
        new_request = {}
        new_request['method'] = request
        if request == 'POST':
            new_request['url'] = resource['resourceType']
        elif request == 'PUT':
            new_request['url'] = resource['resourceType'] +'/' + resource['id']
        else:
            raise Exception(f'Request {request}, is not currently supported')
        
        new_entry = {}
        new_entry['resource'] = resource
        new_entry['request'] = new_request   
        new_entry['fullUrl'] = resource['id']
        self.entry.append(new_entry)
    
    def to_json(self):
        return self.__dict__
    
    def get_size(self):
        return len(self.entry)
    
    def request(self):
        #requests.post(url,  json = self.to_json(), headers={"Content-Type": "application/fhir+json"} )
        resp = requests.post(FHIR_SERVER,  json = self.to_json(), headers={"Content-Type": "application/fhir+json"} )
        output = json.loads(resp.text)
        return output

In [None]:
b_pat.to_json()['entry'][40]

In [None]:
b_pat.to_json()['entry'][41]

In [None]:
%%time
#table_names = ['encounter', 'condition', 'procedure'] #, 'observation_labs'
table_names = ['observation_labs']

q_patient = "SELECT * FROM mimic_fhir.patient;"
patients = pd.read_sql_query(q_patient,con)
for index, pat in patients.iterrows():
    b_pat = FhirBundle('patient-bundle')
    b_pat.add_entry(pat.fhir, 'PUT')
    for table in table_names:
        q_table = f"""SELECT * 
                    FROM mimic_fhir.{table} 
                    WHERE patient_id = '{pat.id}';"""
        resources = pd.read_sql_query(q_table,con)
        for _, resource in resources.iterrows():
            b_pat.add_entry(resource.fhir, 'PUT')
    # send patient bundle!
    # check size for now
    #print(f'Patient {pat.id} size is: {b_pat.get_size()}')
    if index %50 == 0:
        print(f'Patient {index}')
    if b_pat.get_size() < 100:
        output = b_pat.request()
        if output['resourceType'] == 'OperationOutcome':
            print(output['issue'])
            break
    else:
        print(f'Patient {pat.id} size is: {b_pat.get_size()}')
        
    
print('-----------------------------------')
print('COMPLETE')  

# Testing 123
            
# send full microbio bundle        
#output = b_micro.request()



## Validate individual resource

In [None]:
%%time
table = 'observation_micro_org'
q_table = f"""SELECT * 
            FROM mimic_fhir.{table} 
            LIMIT 3; """
resources = pd.read_sql_query(q_table,con)
for _, resource in resources.iterrows():

    url = f"{FHIR_SERVER}/{resource.fhir['resourceType']}/{resource.fhir['id']}"
    resp = requests.put(url,  json = resource.fhir, headers={"Content-Type": "application/fhir+json"} )
    output = json.loads(resp.text)
  
    

## Expunge database before use

In [None]:
q_patient = "SELECT * FROM mimic_fhir.patient;"
patients = pd.read_sql_query(q_patient,con)

In [None]:
# post data first
resource= 'Patient'

for patient in patients.fhir:
    url = f"{FHIR_SERVER}{resource}/{patient['id']}"
    resp = requests.put(url,  json = patient, headers={"Content-Type": "application/fhir+json"} )
    fhir_json = json.loads(resp.text)
    if fhir_json['resourceType'] == 'OperationOutcome':
        print(fhir_json)
        #print(f"resourcetype: {fhir_json['resourceType']}, and id: {fhir_json['id']}")

In [None]:
# check that it got there...
for patient in patients.fhir:
    url = f"{FHIR_SERVER}{resource}/{patient['id']}"
    #url = f"{FHIR_SERVER}{resource}/123456789"
    resp = requests.get(url,  headers={"Content-Type": "application/fhir+json"} )
    fhir_json = json.loads(resp.text)
    if fhir_json['resourceType'] == 'OperationOutcome':
        print(fhir_json)
    #else:
    #    print(f"resourcetype: {fhir_json['resourceType']}, and id: {fhir_json['id']}")

In [None]:
url = f"{FHIR_SERVER}{resource}/{patient['id']}"
#url = f"{FHIR_SERVER}{resource}/123456789"
resp = requests.get(url,  headers={"Content-Type": "application/fhir+json"} )
fhir_json = json.loads(resp.text)
if fhir_json['resourceType'] == 'OperationOutcome':
    print(fhir_json)

In [None]:
json.loads(resp.text)['issue'][0]['diagnostics'][0:100]
len(json.loads(resp.text)['issue'])

In [None]:

url = f"{FHIR_SERVER}Patient/87dd177c-b3f5-584e-bf76-86e2ee047c1f?_cascade=delete"
resp = requests.delete(url, headers={"Content-Type": "application/fhir+json"})
print(resp.text)

In [None]:
# delete resource with cascading deletes
resource = 'Encounter'


resp_list = []
for patient in patients.fhir:
    url = f"{FHIR_SERVER}{resource}/{patient['id']}?_cascade=delete"
    resp = requests.delete(url, headers={"Content-Type": "application/fhir+json"})
    if len(json.loads(resp.text)['issue']) == 1:
        i =5
        resp_list.append(json.loads(resp.text)['issue'][0]['diagnostics'][0:40])
    else:
        resp_list.append(json.loads(resp.text)['issue'][1]['diagnostics'][0:40])

In [None]:
# expunge resource with parameters
expunge_resource = {}
expunge_resource['resourceType'] = 'Parameters'
parameters = []
par1 = {}
par1['name'] = 'expungeEverything'
par1['valueBoolean'] = True
parameters.append(par1)
expunge_resource['paramater'] = parameters
expunge_resource


In [None]:
expunge_resource = {}
expunge_resource['resourceType'] = 'Parameters'
parameters = []
par1 = {}
par1['name'] = 'expungeDeletedResources'
par1['valueBoolean'] = True
parameters.append(par1)
expunge_resource['paramater'] = parameters
expunge_resource

In [None]:
resp.content

In [None]:
# then expunge it
url_exp = f"{FHIR_SERVER}$expunge"
url_exp = 'http://localhost:8080/fhir/$expunge'


resp = requests.post(url_exp, json = expunge_resource, headers={"Content-Type": "application/fhir+json"} )
print(resp.text)

In [None]:
resp.status_code

## Bulk Export of bundled data

In [None]:
# Export single resource to json
# post data first
q_patient = "SELECT * FROM mimic_fhir.patient LIMIT 1;"
patients = pd.read_sql_query(q_patient,con)
resource= 'Patient'

for patient in patients.fhir:
    url = f"{FHIR_SERVER}{resource}/{patient['id']}"
    resp = requests.put(url,  json = patient, headers={"Content-Type": "application/fhir+json"} )
    fhir_json = json.loads(resp.text)
    if fhir_json['resourceType'] == 'OperationOutcome':
        print(fhir_json)

# check it got there
for patient in patients.fhir:
    url = f"{FHIR_SERVER}{resource}/{patient['id']}"
    #url = f"{FHIR_SERVER}{resource}/123456789"
    resp = requests.get(url,  headers={"Content-Type": "application/fhir+json"} )
    fhir_json = json.loads(resp.text)
    if fhir_json['resourceType'] == 'OperationOutcome':
        print(fhir_json)

In [None]:
# then export it
#url = f"{FHIR_SERVER}$export?_typeFilter=Patient?_meta:profile=http://fhir.mimic.mit.edu/StructureDefinition/mimic-patient"
url = f"{FHIR_SERVER}$export?_type=Observation&_typeFilter=Observation?_profile=http://fhir.mimic.mit.edu/StructureDefinition/mimic-observation-lab"
resp_export = requests.get(url, headers={"Content-Type": "application/fhir+json", "Prefer": "respond-async"} )
print(resp_export)

In [None]:
url

In [None]:
resp_export.headers

In [None]:
# Call GET at the polling  location to get json
# Only pull in content location is export request was accepted
if resp_export.status_code == 202:
    url_content_location = resp_export.headers['Content-Location']
else: 
    url_content_location = ''
    print('response bad...')

resp_get_data = requests.get(url_content_location, headers={"Content-Type": "application/fhir+json"})
print(resp_get_data.text)

In [None]:
url_download = json.loads(resp_get_data.text)['output'][0]['url']
resp_download = requests.get(url_download, headers={"Content-Type": "application/fhir+json"})

output_data = base64.b64decode(json.loads(resp_download.content)['data']).decode()
output_file = f'{MIMIC_JSON_PATH}output_from_hapi/lab_test.ndjson'
with open(output_file, 'w+') as out_file:
    out_file.write(output_data)

#print(resp.text)

## Optimal Bundle Size
Calculate the optimal bundle size by testing different bundle sizes to send to server

In [None]:
from py_mimic_fhir.bundle import Bundle
import time
import gc

In [None]:
 q_resource = f"""
    SELECT fhir FROM mimic_fhir.observation_chartevents LIMIT 1000
"""
pd_resources = pd.read_sql_query(q_resource, con)
resources = pd_resources.fhir.to_list()

In [None]:
for bundle_size in range(10, 60, 10):
    start_time = time.time()
    bundle = Bundle()
    bundle.add_entry(resources)
    resp = bundle.request(FHIR_SERVER, True, FHIR_BUNDLE_ERROR_PATH, bundle_size)

    delta_time = time.time() - start_time
    print(f'Bundle size {bundle_size} completed 1000 resources in {delta_time}s')

In [None]:
for bundle_size in range(10,50, 10):
    start_time = time.time()
    bundle = Bundle()
    bundle.add_entry(resources)
    resp = bundle.request(FHIR_SERVER, True, FHIR_BUNDLE_ERROR_PATH, bundle_size)

    delta_time = time.time() - start_time
    print(f'Bundle size {bundle_size} completed 1000 resources in {delta_time}s')