# COVID-19 - unvaccinated high risk patients

Analysis of FHIR source data using a [Pathling FHIR server](https://pathling.csiro.au/docs/server).

This query counts patients, grouped by whether they have received a COVID-19 vaccination and whether they are high-risk based upon a number of factors (CKD, heart disease, BMI).

In [1]:
import json
from http.client import HTTPConnection
from time import time
from urllib.parse import urlencode

import pandas as pd

start = time()
pathling_conn = HTTPConnection('localhost', 8080)

## Load the data

First we load the NDJSON files into the Pathling server.

Provide each NDJSON file as a parameter to the [import operation](https://pathling.csiro.au/docs/server/operations/import), which will read each file and persist it as a Parquet table in the configured warehouse location.

In [4]:
import_start = time()
resources = ['Patient', 'Immunization', 'Condition', 'Observation']
parameters = {
    'resourceType': 'Parameters',
    'parameter': [
        {
            'name': 'source',
            'part': [
                {'name': 'resourceType', 'valueCode': resource},
                {'name': 'url',
                 'valueUrl': f'file:///home/twilson/github/aehrc/synthea/md/fhir/{resource}.ndjson'}
            ]
        } for resource in resources
    ]
}

pathling_conn.request('POST', '/fhir/$import', json.dumps(parameters),
                      headers={'Content-Type': 'application/fhir+json'})
response = pathling_conn.getresponse()
json_string = response.read()

parsed = json.loads(json_string)
parsed

{'resourceType': 'OperationOutcome',
 'issue': [{'severity': 'error',
   'code': 'processing',
   'diagnostics': 'Not allowed to import from URL: file:///home/twilson/github/aehrc/synthea/md/fhir/Patient.ndjson'}]}

In [3]:
print(f"Import completed in: {time() - import_start:.3f} s")

Import completed in: 32.009 s


## Run the aggregate query

Next we compose an query to the [aggregate operation](https://pathling.csiro.au/docs/server/operations/aggregate) to count the number of patients by vaccination status and high-risk status.

We use a FHIR value set containing CVX codes to identify COVID-19 vaccinations.

Subsumption queries are used for chronic kidney disease and diabetes, while an ECL expression is used to identify heart disease.

In [4]:
aggregate_start = time()
params = [
    # AGGREGATIONS
    # Number of patients
    ('aggregation', 'count()'),
    # GROUPINGS
    # Vaccinated against COVID-19
    ('grouping',
     "reverseResolve(Immunization.patient).vaccineCode.memberOf('https://aehrc.csiro.au/fhir/ValueSet/covid-19-vaccines') contains true"),
    # High risk
    ('grouping',
     # Chronic kidney disease
     "reverseResolve(Condition.subject).code.subsumedBy(http://snomed.info/sct|709044004) contains true "
     "or "
     # Heart disease - << 49601007|Cardiovascular disease| : << 363698007|Finding site| = << 80891009|Structure of heart|
     "reverseResolve(Condition.subject).code.memberOf('http://snomed.info/sct?fhir_vs=ecl/%3C%3C%2049601007%20%3A%20%3C%3C%20363698007%20%3D%20%3C%3C%2080891009%20') contains true "
     "or "
     # BMI > 30
     "reverseResolve(Observation.subject).where(code.coding contains http://loinc.org|39156-5||'Body Mass Index').valueQuantity"
     ".where(system = 'http://unitsofmeasure.org').where(code = 'kg/m2').where(value > 30).empty().not()"),
    # FILTERS
    # Age 18-60
    ('filter', 'birthDate < @2004-07-30 and birthDate > @1962-07-30')
]
url = f'/fhir/Patient/$aggregate?{urlencode(params)}'

pathling_conn.request('GET', url)
response = pathling_conn.getresponse()
json_string = response.read()

In [5]:
parsed = json.loads(json_string)
parsed

{'resourceType': 'Parameters',
 'parameter': [{'name': 'grouping',
   'part': [{'name': 'label', 'valueBoolean': True},
    {'name': 'label', 'valueBoolean': False},
    {'name': 'result', 'valueUnsignedInt': 266},
    {'name': 'drillDown',
     'valueString': "(reverseResolve(Immunization.patient).vaccineCode.memberOf('https://aehrc.csiro.au/fhir/ValueSet/covid-19-vaccines') contains true) and ((reverseResolve(Condition.subject).code.subsumedBy(http://snomed.info/sct|709044004) contains true or reverseResolve(Condition.subject).code.memberOf('http://snomed.info/sct?fhir_vs=ecl/%3C%3C%2049601007%20%3A%20%3C%3C%20363698007%20%3D%20%3C%3C%2080891009%20') contains true or reverseResolve(Observation.subject).where($this.code.coding contains http://loinc.org|39156-5||'Body Mass Index').valueQuantity.where($this.system = 'http://unitsofmeasure.org').where($this.code = 'kg/m2').where($this.value > 30).empty().not()) = false) and (birthDate < @2004-07-30 and birthDate > @1962-07-30)"}]},
  {'n

In [6]:
print(f"Aggregate completed in: {time() - aggregate_start:.3f} s")

Aggregate completed in: 20.256 s


Extract the values from the response and put them into a Pandas dataframe.

In [7]:
def get_value(name, value, i):
    return [
        [
            part[value]
            for part
            in parameter['part']
            if part['name'] == name
        ][i]
        for parameter
        in parsed['parameter']
        if parameter['name'] == 'grouping'
    ]

In [8]:
data = {
    'Vaccinated against COVID-19': get_value('label', 'valueBoolean', 0),
    'High risk': get_value('label', 'valueBoolean', 1),
    'Number of patients': get_value('result', 'valueUnsignedInt', 0)
}
df = pd.DataFrame(data=data)
df

Unnamed: 0,Vaccinated against COVID-19,High risk,Number of patients
0,True,False,266
1,True,True,215
2,False,False,81
3,False,True,85


## List the high risk unvaccinated patients

Finally, we use the [extract operation](https://pathling.csiro.au/docs/server/operations/extract) to list out the patients that are high-risk and have not been vaccinated, along with the specific risk factors that were identified for each patient.

In [9]:
extract_start = time()
params = [
    # COLUMNS
    # Family name
    ('column', 'name.first().family'),
    # Given name
    ('column', 'name.first().given.first()'),
    # Phone number
    ('column', "telecom.where(system = 'phone').first().value"),
    # Chronic kidney disease
    ('column',
     'reverseResolve(Condition.subject).code.subsumedBy(http://snomed.info/sct|709044004) contains true'),
    # Heart disease
    ('column',
     "reverseResolve(Condition.subject).code.memberOf('http://snomed.info/sct?fhir_vs=ecl/%3C%3C%2049601007%20%3A%20%3C%3C%20363698007%20%3D%20%3C%3C%2080891009%20') contains true"),
    # BMI > 30
    ('column',
     "reverseResolve(Observation.subject).where(code.coding contains http://loinc.org|39156-5||'Body Mass Index').valueQuantity"
     ".where(system = 'http://unitsofmeasure.org').where(code = 'kg/m2').where(value > 30).empty().not()"),
    # FILTERS
    # Not vaccinated against COVID-19
    ('filter',
     "(reverseResolve(Immunization.patient).vaccineCode.memberOf('https://aehrc.csiro.au/fhir/ValueSet/covid-19-vaccines') contains true).not()"),
    # High risk
    ('filter',
     "reverseResolve(Condition.subject).code.subsumedBy(http://snomed.info/sct|709044004) contains true "
     "or "
     "reverseResolve(Condition.subject).code.memberOf('http://snomed.info/sct?fhir_vs=ecl/%3C%3C%2049601007%20%3A%20%3C%3C%20363698007%20%3D%20%3C%3C%2080891009%20') contains true "
     "or "
     "reverseResolve(Observation.subject).where(code.coding contains http://loinc.org|39156-5||'Body Mass Index').valueQuantity"
     ".where(system = 'http://unitsofmeasure.org').where(code = 'kg/m2').where(value > 30).empty().not()"),
    # Age 18-60
    ('filter', 'birthDate < @2004-07-30 and birthDate > @1962-07-30')
]
url = f'/fhir/Patient/$extract?{urlencode(params)}'

pathling_conn.request('GET', url)
response = pathling_conn.getresponse()
json_string = response.read()

In [10]:
parsed = json.loads(json_string)
parsed

{'resourceType': 'Parameters',
 'parameter': [{'name': 'url',
   'valueUrl': 'http://localhost:8080/fhir/$result?id=xbUwZ0obgdvBKqCf'}]}

In [11]:
print(f"Extract completed in: {time() - extract_start:.3f} s")

Extract completed in: 38.177 s


Get the result URL out of the extract response, download it and use it to populate a Pandas data frame.

Sort the data frame by patient name before display, so that our result is deterministic.

In [12]:
result_url = [
    parameter['valueUrl']
    for parameter
    in parsed['parameter']
    if parameter['name'] == 'url'
][0]
result_df = pd.read_csv(result_url, names=['Family name', 'Given name', 'Phone number',
                                           'Chronic kidney disease', 'Heart disease', 'BMI > 30'])
result_df = result_df.sort_values(['Family name', 'Given name'])
result_df

Unnamed: 0,Family name,Given name,Phone number,Chronic kidney disease,Heart disease,BMI > 30
38,Abernathy524,Kathline630,555-746-7353,True,False,True
29,Bartell116,Rhett759,555-257-6514,False,False,True
75,Bashirian201,Aldo414,555-300-9051,False,False,True
42,Beahan375,Neva514,555-809-1747,False,False,True
11,Bednar518,Chase54,555-812-1196,False,False,True
...,...,...,...,...,...,...
33,Wintheiser220,Georgene966,555-630-3157,False,False,True
7,Wunsch504,Domenic627,555-973-4643,False,True,True
13,Wyman904,Joey457,555-833-6044,False,False,True
76,Zulauf375,Elvin140,555-925-7111,False,True,False


In [13]:
print(f"Total execution time: {time() - start:.3f} s")


Total execution time: 90.490 s
