# Cohort Building: Acute Coronary Syndrome Study

The goal of this notebook is to show how we can build a cohort by running [queries](http://build.fhir.org/ig/HL7/vulcan-rwd/acs.html) using data created and hosted by [InterSystems](https://www.intersystems.com).

We'll also show how we can extract an [IPS](https://hl7.org/fhir/uv/ips/) like bundle of data for patients that meet the inclusion criteria.

See:
- Inclusion critera based on [REWINDER](https://clinicaltrials.gov/ct2/show/NCT02190123)
- If you'd like to try other FHIR servers, the [public test servers wiki](https://confluence.hl7.org/display/FHIR/Public+Test+Servers) might help
- [rough notes we worked on during the connectathon](https://github.com/pete88b/vulcan_rwd_ig/blob/main/_getting_started.ipynb)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pete88b/vulcan_rwd_ig/blob/main/_rwd_ig_cohort_building.ipynb)

# Utils

The first part of this notebook defines classes and functions that hopefully make the cohort building code easy to read. Feel free to [skip to the next section](#Cohort-building).

In [None]:
import requests, json, datetime, collections, typing
from IPython import display
from pathlib import Path

In [None]:
class DotPathDict(collections.UserDict):
    "Wraps a `dict` to allow simple dot notation search of nested `dict`s"
    
    def __getitem__(self, dot_paths):
        "Allows dot search via subscript"
        for dot_path in dot_paths.split(' OR '):
            data, found, path_parts = self.data, True, []
            for path_part in dot_path.split('.'):
                if not isinstance(data, (dict, DotPathDict)):
                    path_parts = '.'.join(path_parts)
                    raise Exception(f'Expected "{path_parts}" to be a `dict` but found {type(data)} {data}')
                path_parts.append(path_part)
                if not path_part in data:
                    found = False
                    break # try the next dot_path, if we have one
                data = data[path_part]
                if isinstance(data, list) and data:
                    data = data[0] # TODO: is it OK to just pull the 1st item from the list?
            if found:
                return DotPathDict(data) if isinstance(data, dict) else data

In [None]:
test_resource = DotPathDict({
    'resource': {
        'resourceType': 'TestResource', 
        'meta': {
            'versionId': '1', 
            'source': '#dswfkjei2k3'
        }}})

In [None]:
test_resource['resource.resourceType'], test_resource['resource.id']

('TestResource', None)

In [None]:
test_resource['resource.id OR resource.resourceType']

'TestResource'

In [None]:
test_resource_meta = test_resource['resource.meta']
test_resource_meta['source']

'#dswfkjei2k3'

In [None]:
class FhirClient:
    "Helps to GET FHIR resources"
    def __init__(self, api_base:str, x_api_key:str=None):
        self.api_base = api_base
        self.request_headers = {}
        if x_api_key is not None:
            self.request_headers['x-api-key'] = x_api_key
        self.default_params = {}
    
    def get_as_response(self, resource_type:str, params:dict=None) -> requests.Response:
        "GET FHIR resources of `resource_type` and return python `Reponse`"
        url = f'{self.api_base}/{resource_type}'
        params = self.default_params if params is None else params
        response = requests.get(url, params, headers=self.request_headers)
        print('GET', response.url, 'Status', response.status_code)
        return response
    
    def get_as_raw_json(self, resource_type:str, params:dict=None) -> dict: # TODO: rename to get and wrap results
        "GET FHIR resources of `resource_type` in JSON format"
        return self.get_as_response(resource_type, params).json()
    
    def get_next_as_raw_json(self, json_response:dict) -> dict:
        "GET the next set of results"
        for link in json_response['link']:
            if link['relation'] == 'next':
                url = link['url']
                response = requests.get(url, headers=self.request_headers)
                print('GET', url, 'Status', response.status_code)
                return response.json()

    def get_all_entries(self, resource_type:str, params:dict=None, page_limit:int=100) -> typing.List[DotPathDict]:
        "Return a list of entries of `resource_type` in JSON format while taking care of bundle pageing"
        page_count, result = 0, []
        bundle = self.get_as_raw_json(resource_type, params)
        total = bundle.get('total', 'Unknown')
        if total == 0:
            print('Returning', len(result), 'entries')
            return result
        while bundle is not None:
            if bundle.get('resourceType', None) != 'Bundle':
                raise Exception(f'Expected a bundle but found', bundle) # might be {'resourceType': 'OperationOutcome' ... 
            result.extend(bundle['entry']) # todo check for OperationOutcome etc in `entry`
            page_count += 1
            if page_count > page_limit:
                print('Stopping early. Will return', len(result), 'entries out of total', total)
                break
            bundle = client.get_next_as_raw_json(bundle)
        def _expected_resource_type(resource):
            actual_resource_type = resource.get('resource', {}).get('resourceType', None)
            if actual_resource_type != resource_type:
                print('Removing resource. Expected', resource_type, 'but found', actual_resource_type)
                return False
            return True
        result = [r for r in result if _expected_resource_type(r)]
        result = [DotPathDict(r) for r in result]
        print('Returning', len(result), 'entries')
        return result
    
    def get_all_resources(self, resource_type:str, params:dict=None, page_limit:int=100):
        "Return a list of resources of `resource_type` in JSON format"
        result = self.get_all_entries(resource_type, params, page_limit)
        result = [r['resource'] for r in result]
        return result
    
    def get_by_reference(self, reference:str):
        "Return a resource read from a FHIR server by reference, as a list containg a single bundle entry"
        if reference.startswith(self.api_base):
            reference = reference[len(self.api_base):].strip('/')
        if reference.startswith('http'):
            print(f'WARNING: Found reference {reference} that does not start with {api_base}')
            return []
        resource_type, id = reference.split('/')
        single_resource = self.get_as_raw_json(resource_type, id)
        return [dict(fullUrl = f'{self.api_base}/{resource_type}/{id}', resource = single_resource)]

In [None]:
client = FhirClient('https://ips.health/fhir')
patient_resources = client.get_all_resources('Patient', page_limit=2)
patient_resource = patient_resources[0]    # grab a patient from the list
display.HTML(patient_resource['text.div']) # display its generated narrative

GET https://ips.health/fhir/Patient Status 200
GET https://ips.health/fhir?_getpages=14c3fb2a-591e-423f-9e9f-96846bf061a6&_getpagesoffset=20&_count=20&_pretty=true&_bundletype=searchset Status 200
GET https://ips.health/fhir?_getpages=14c3fb2a-591e-423f-9e9f-96846bf061a6&_getpagesoffset=40&_count=20&_pretty=true&_bundletype=searchset Status 200
Stopping early. Will return 60 entries out of total 189
Returning 60 entries


0,1,2,3,4
-,Relationship,Name,Telecom,Address
*,mother (RoleCode#MTH),Martha Mum,+33-555-20036,Promenade des Anglais 111 Lyon 69001 FR


In [None]:
def extract_patient_ids(resources):
    "Return a list relative references of all patients found in a `resources`"
    # Note: no checks are made that the bundle contains resources of the same type etc
    result = []
    for resource in resources:
        if resource['resourceType'] == 'OperationOutcome':
            continue # e.g. "Unrecognized parameter 'dischargeDisposition'. exp"
        if resource['resourceType'] == 'Patient':
            result.append('Patient/' + resource['id'])
        else:
            result.append(resource['subject']['reference'])
    return result

In [None]:
def intersection_patient_ids(*resource_lists):
    "Returns a list of references for all patients found in all resource lists"
    all_patient_ids = []
    for resource_list in resource_lists:
        all_patient_ids.append(extract_patient_ids(resource_list))
    all_patient_ids = [set(ids) for ids in all_patient_ids]
    result = all_patient_ids[0]
    for ids in all_patient_ids[1:]:
        result = result & ids
    return list(result)

In [None]:
def extract_resources_by_patient_id(resource_list, patient_reference):
    "Return a list of resources pulled from `resource_list` that belong to `patient_reference`"
    result = []
    for resource in resource_list:
        if resource['resourceType'] == 'OperationOutcome':
            continue # e.g. "Unrecognized parameter 'dischargeDisposition'. exp"
        if resource['resourceType'] == 'Patient':
            if resource['id'] == patient_reference.split('/')[1]:
                result.append(resource)
        else:
            if resource['subject']['reference'] == patient_reference:
                result.append(resource)
    return result

<a name="Cohort-building" id="Cohort-building"></a>
# Cohort building

The patients for this study would have the following criteria: 
- female or male aged 18 years or older
- have a Encounter record representing a hospitalization with an initial diagnosis of Acute Coronary Syndrome where the patient was discharged alive some time between September 2020 to September 2021 :
    - ACS is represented for this scenario one of these ICD 10 codes (I21 Acute myocardial infarction; I20-I25 Ischemic heart diseases; I24 Other acute ischemic heart diseases)
    - the Encounter diagnosis will point to a Condition with one of those codes
    - the Encounter will have hospitalization information included
    - the Encounter hospitalization discharge disposition code is not ‘exp’ (expired)
- have been given one of ticagrelor, prasugrel or clopidogrel after the date of diagnosis of ACS (as represented by the Condition or Encounter record found above)

| Drug Name    | Brand Name  | RxNorm CUI            |
|--------------|-------------|-----------------------|
| ticagrelor   | brilinta    | 1116632               |
| prasurgrel   | effient     | 613391                |
| clopidogrel  | plavix      | 32968, 687667, 153658 |

These criteria would be represented by the following queries:

```
/Patient?birthdate=le2002-09-01&gender=male,female

/Encounter?reason-code:below=I20,I21,I22,I23,I24,I25&date=ge2020-09-01&date=le2021-09-31&status=finished&dischargeDisposition:not=exp

/MedicationAdministration?status=completed&effective-time=ge[Encounter-Start-Date]&
  code=http://www.nlm.nih.gov/research/umls/rxnorm|1116632,http://www.nlm.nih.gov/research/umls/rxnorm|613391,http://www.nlm.nih.gov/research/umls/rxnorm|32968,http://www.nlm.nih.gov/research/umls/rxnorm|687667,http://www.nlm.nih.gov/research/umls/rxnorm|153658
```

Please note: We subtract 6 years from all dates in the inclusion criteria - just to match the data we have

# Create a client to get data from a FHIR server

We worked with the following FHIR servers during the connectathon;
- RWD https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io (preferred)
    - Contains `Encounter`s with `dischargeDisposition`
    - `dischargeDisposition` search parameter enabled
        Note: the `not` modifier is not supported - so `dischargeDisposition:not=exp` won't work
- PRD https://fhir.rykpjsvemdtg.workload-prod-fhiraas.isccloud.io

In [None]:
client = FhirClient('https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io')

feel free to change the FHIR server URL &uarr;

# Get patient, encounter and medication administration resources

In [None]:
patient_resources = client.get_all_resources('Patient', {
    'birthdate': 'le1996-09-01',
    'gender': 'male,female'
})

GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/Patient?birthdate=le1996-09-01&gender=male%2Cfemale Status 200
Returning 29 entries


In [None]:
encounter_resources = client.get_all_resources('Encounter', {
        'reason-code': 'I20,I21,I22,I23,I24,I25', # TODO: not using below for now 'reason-code:below': 'I20,I21,I22,I23,I24,I25',
        'date': ['ge2014-09-01', 'le2015-09-30'], # TODO: Not sure this is filtering as we want ...
        'status': 'finished',
#         'dischargeDisposition:not':'exp' # TODO: Do this client side for now
})

GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/Encounter?reason-code=I20%2CI21%2CI22%2CI23%2CI24%2CI25&date=ge2014-09-01&date=le2015-09-30&status=finished Status 200
Returning 63 entries


In [None]:
medication_administration_resources = client.get_all_resources('MedicationAdministration', {
        'status': 'completed',
#         'effective-time': 'ge[Encounter-Start-Date]', # TODO: don't think this is possible via FHIR query
        'code': 'http://www.nlm.nih.gov/research/umls/rxnorm|1116632,'
                'http://www.nlm.nih.gov/research/umls/rxnorm|613391,'
                'http://www.nlm.nih.gov/research/umls/rxnorm|32968,'
                'http://www.nlm.nih.gov/research/umls/rxnorm|687667,'
                'http://www.nlm.nih.gov/research/umls/rxnorm|153658'})

GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministration?status=completed&code=http%3A%2F%2Fwww.nlm.nih.gov%2Fresearch%2Fumls%2Frxnorm%7C1116632%2Chttp%3A%2F%2Fwww.nlm.nih.gov%2Fresearch%2Fumls%2Frxnorm%7C613391%2Chttp%3A%2F%2Fwww.nlm.nih.gov%2Fresearch%2Fumls%2Frxnorm%7C32968%2Chttp%3A%2F%2Fwww.nlm.nih.gov%2Fresearch%2Fumls%2Frxnorm%7C687667%2Chttp%3A%2F%2Fwww.nlm.nih.gov%2Fresearch%2Fumls%2Frxnorm%7C153658 Status 200
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministration?page=2&queryId=88c4d5e8-3d86-11ed-a787-02f861e3f62a Status 200
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministration?page=3&queryId=88c4d5e8-3d86-11ed-a787-02f861e3f62a Status 200
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministration?page=4&queryId=88c4d5e8-3d86-11ed-a787-02f861e3f62a Status 200
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministration?

## Find which patients have all 3 resources

In [None]:
patient_ids = intersection_patient_ids(patient_resources, encounter_resources, medication_administration_resources)
patient_ids

['Patient/1bbc2bc53ed277ac09507e6893743410',
 'Patient/00d7dde9ae58163184c3836f01deff61',
 'Patient/f7f2e775f7ae3f7a095146cb4deaa497',
 'Patient/6863ac983b0b55455da78f1fdd1288ff',
 'Patient/a3f4f0ffc5c2fb1a4708452a485d1442',
 'Patient/ff7c22942a1e16167f1b9c44f12aae05',
 'Patient/ceba63b6dcbda783668cf3efeea1d3dd',
 'Patient/51ff4d27ccf78c1d2ff6438175b541c7',
 'Patient/897dfe86f0b710793927d8034e568ee4',
 'Patient/e4c9f85f8b2b9a85b32f7d9a67ea1046']

In [None]:
def get_encounter_date(resource_list, patient_id):
    "Returns the earliest encounter date for a patient"
    encounters = extract_resources_by_patient_id(resource_list, patient_id)
    dates = []
    for encounter in encounters:
        dates.append(datetime.date.fromisoformat(encounter['period.start']))
    return min(dates)

In [None]:
get_encounter_date(encounter_resources, 'Patient/ff7c22942a1e16167f1b9c44f12aae05')

datetime.date(2014, 3, 25)

# Apply "after the date of diagnosis of ACS" criteria to medication bundle

Encounter start date is the date of diagnosis of ACS - which we calculate with `get_encounter_date`.

Quick EDA &darr; shows us that all patients have a medication that meets this criteria

In [None]:
for patient_id in patient_ids:    
    encounter_date = get_encounter_date(encounter_resources, patient_id)
    print(patient_id, encounter_date)
    for medication_administration in medication_administration_resources:
        edt = medication_administration['effectiveDateTime OR effectivePeriod.start']
        if edt is not None:
            edt = datetime.datetime.fromisoformat(edt).date() # Note: we're dropping time part for this comparison
            if edt >= encounter_date:
                print('This patient would be included', edt, 'is after', encounter_date)
                break

Patient/1bbc2bc53ed277ac09507e6893743410 2013-01-22
This patient would be included 2013-07-11 is after 2013-01-22
Patient/00d7dde9ae58163184c3836f01deff61 2013-07-31
This patient would be included 2013-07-31 is after 2013-07-31
Patient/f7f2e775f7ae3f7a095146cb4deaa497 2012-07-22
This patient would be included 2012-07-22 is after 2012-07-22
Patient/6863ac983b0b55455da78f1fdd1288ff 2012-12-13
This patient would be included 2013-07-11 is after 2012-12-13
Patient/a3f4f0ffc5c2fb1a4708452a485d1442 2014-06-29
This patient would be included 2014-07-01 is after 2014-06-29
Patient/ff7c22942a1e16167f1b9c44f12aae05 2014-03-25
This patient would be included 2014-07-01 is after 2014-03-25
Patient/ceba63b6dcbda783668cf3efeea1d3dd 2013-12-23
This patient would be included 2014-01-07 is after 2013-12-23
Patient/51ff4d27ccf78c1d2ff6438175b541c7 2012-09-05
This patient would be included 2013-07-11 is after 2012-09-05
Patient/897dfe86f0b710793927d8034e568ee4 2013-12-26
This patient would be included 2014-

In [None]:
def has_med_after_diagnosis(patient_id):
    "Return `True` if `patient_id` has a medication record after the encounter, `False` if they don't"
    encounter_date = get_encounter_date(encounter_resources, patient_id)
    for medication_administration in medication_administration_resources:
        edt = medication_administration['effectiveDateTime OR effectivePeriod.start']
        if edt is not None:
            edt = datetime.datetime.fromisoformat(edt).date() # Note: we're dropping time part for this comparison
            if edt >= encounter_date:
                return True
    return False

In [None]:
print('Starting with', len(patient_ids), 'patients ...')
patient_ids = [patient_id for patient_id in patient_ids if has_med_after_diagnosis(patient_id)]
print('...', len(patient_ids), 'patients have a medication record after the ACS diagnosis')

Starting with 10 patients ...
... 10 patients have a medication record after the ACS diagnosis


### So ... we have 10 patients that meet all inclusion critieria

# Create an IPS like per-patient bundle

This is just a quick example of how we could create per-patient bundles
- using code borrowed from https://github.com/pete88b/vulcan_medication_bundle
- adding some of the resource that [IPS](http://build.fhir.org/ig/HL7/fhir-ips/StructureDefinition-Composition-uv-ips.html) would include

In [None]:
from uuid import uuid4

def extract_references_from_resource(resource, field_name):
    "Return a list of references extracted from a single resource and field"
    result = []
    if field_name in resource:
        references = resource[field_name]
        if not isinstance(references, list): references = [references]
        for reference in references:
            _reference = reference.get('reference')
            if _reference is None: 
                continue
            if _reference.startswith('#'): 
                continue
            # TODO: check that we have a relative reference or handle other kinds too
            result.append(_reference)
    return result

def extract_references(entries, field_names):
    "Return a list of relative references e.g. `['Condition/1ddef4ad-fb76-46d6-9f1d-8ed58b173ee8']`"
    result = []
    for entry in entries:
        resource = entry['resource']
        for f in field_names:
            result.extend(extract_references_from_resource(resource, f))
    return list(set(result)) # de-duplicate but still return a list

def timestamp_now():
    return datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

def new_bundle(bundle_type='collection'):
    return dict(resourceType='Bundle', 
                id=str(uuid4()),
                type=bundle_type, 
                timestamp=timestamp_now(),
                entry=[])

def create_single_patient_bundle(patient_id):
    "Return a Bundle containing one Patient and ... when we don't have IPS available"
    result = new_bundle()
    references = []
    for resource_type, params in [
            ['Patient', dict(_id=patient_id)],
            ['AllergyIntolerance', dict(patient=patient_id)],
            ['Condition', dict(patient=patient_id)],
            ['Procedure', dict(patient=patient_id)],
            ['MedicationRequest', dict(subject=f'Patient/{patient_id}')],
            ['MedicationDispense', dict(subject=f'Patient/{patient_id}')],
            ['MedicationAdministration', dict(subject=f'Patient/{patient_id}')],
            ['MedicationStatement', dict(subject=f'Patient/{patient_id}')]]:
        single_resource_entries = client.get_all_entries(resource_type, params)
        result['entry'].extend(single_resource_entries)
        references.extend(extract_references(single_resource_entries, ['medicationReference', 'reasonReference']))
        
    for reference in set(references):
        try:
            result['entry'].extend(client.get_by_reference(reference))
        except Exception as ex:
            print(f'Failed to reference {reference} from {client.api_base}\n{ex}')
    return result

In [None]:
patient_id = patient_ids[8]
if patient_id.startswith('Patient/'):
    patient_id = patient_id[8:]
bundle = create_single_patient_bundle(patient_id)

GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/Patient?_id=897dfe86f0b710793927d8034e568ee4 Status 200
Returning 1 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/AllergyIntolerance?patient=897dfe86f0b710793927d8034e568ee4 Status 200
Returning 0 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/Condition?patient=897dfe86f0b710793927d8034e568ee4 Status 200
Returning 11 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/Procedure?patient=897dfe86f0b710793927d8034e568ee4 Status 200
Returning 0 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationRequest?subject=Patient%2F897dfe86f0b710793927d8034e568ee4 Status 200
Returning 1 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationDispense?subject=Patient%2F897dfe86f0b710793927d8034e568ee4 Status 200
Returning 0 entries
GET https://fhir.ggyxlz8lbozu.workload-prod-fhiraas.isccloud.io/MedicationAdministr

Write the patient medication bundle we jsut created to file &darr;

In [None]:
output_path='data'
Path(output_path).mkdir(exist_ok=True)
f_name = f'{output_path}/patient_bundle_{patient_id}.json'
with open(f_name, 'w') as f:
    json.dump(bundle, f, indent=2, default=dict)
print('Bundle saved to', f_name)

Bundle saved to data/patient_bundle_897dfe86f0b710793927d8034e568ee4.json


Note: we use `default=dict` when writing JSON &uarr; because `UserDict` is not supported by [JSONEncoder](https://github.com/python/cpython/blob/main/Lib/json/encoder.py).

If you'd like to explore the bundle programatically &darr; we can wrap it in a `DotPathDict` ...

In [None]:
bundle = DotPathDict(bundle)
bundle['entry.resource.extension']

{'extension': [{'url': 'detailed', 'valueCoding': {'system': 'urn:oid:2.16.840.1.113883.6.238', 'code': '2106-3', 'display': 'White'}}], 'url': 'https://hl7.org/fhir/us/core/StructureDefinition/us-core-race'}