Script to generate and push MeasureReport resources

In [225]:
import json
import requests
from fhirpathpy import compile
from jsonpath_ng import parse
from copy import copy, deepcopy
from datetime import datetime
from dateutil.relativedelta import relativedelta
from dateutil import parser
import random
import uuid
from typing import List
from re import findall
import os

In [226]:
class MeasureReport:
    def __init__(self, template:str):
        self.template = template
        # Get all text from pattern like "$string123"
        self.fields = findall("\"\$([A-Za-z\d]+?)\"", self.template)

    def subject(self, subject, contained=None):
        template = json.loads(self.template)
        if contained is not None:
            if 'contained' in template:
                template['contained'].append(contained)
            else:
                template['contained'] = [contained]
        template['subject'] = subject
        self.template = json.dumps(template)

    def report(self, values:dict):
        temp = str(self.template)
        for field in self.fields:
            value = values.get(field, 0)
            temp = temp.replace(f'"${field}"', str(value))
        return temp

class MeasureReportSubject:
    def __init__(self, subject=None, beds=None, icu_beds=None, ventilators=None, reports:List[MeasureReport]=[]):
        if icu_beds is not None and beds is not None and icu_beds > beds:
            raise ValueError("Total beds must be greater than or equal to beds of a specific type")
        
        beds_min = 50
        beds_max = 500
        if beds is None:
            beds = random.randint(beds_min, beds_max)
        
        icu_beds_min = 2
        icu_beds_max = round(beds * 0.75)
        if icu_beds is None:
            icu_beds = random.randint(icu_beds_min, icu_beds_max)
        
        ventilators_min = 2
        ventilators_max = round(beds * 0.5)
        if ventilators is None:
            ventilators = random.randint(ventilators_min, ventilators_max)

        subject_name = ""
        subject_contained = None
        subject_reference = None

        if subject is None:
                subject = str(uuid.uuid4())
        if isinstance(subject, str) or isinstance(subject, int):
            if isinstance(subject, str) and "/" in subject:
                subject_name = subject.split("/_history")[0].split("/")[-1]
                subject_reference = {
                    "reference": subject,
                    "display": f"Hospital {subject_name}"
                }
            else:
                subject_id = f"hospital{subject}"
                subject_name = f"Hospital {subject}"
                subject_contained = {
                    "resourceType": "Location",
                    "id": subject_id,
                    "name": subject_name,
                    "managingOrganization": {
                        "display": "WA State Department of Health"
                    }
                }

                subject_reference = {
                    "reference": f"#{subject_id}",
                    "display": subject_name
                }
        else:
            subject_contained = subject
            subject_reference = {
                "reference": subject["id"],
                "display": subject["name"]
            }
            subject_name = subject["name"]

        for report in reports:
            report.subject(subject_reference, subject_contained)

        self.subject_name = subject_name
        self.beds = beds # All beds
        self.icu_beds = icu_beds # ICU subset
        self.ventilators = ventilators
        self.reports = reports
        self.values = {}

    def update(self, date, start, end): 
        values = {}

        values['date'] = date
        values['start'] = start
        values['end'] = end

        values['numVent'] = self._try_or_value(lambda: random.randint(0, self.ventilators), 0)
        values['numVentUse'] = self.ventilators - values['numVent']

        values['numBedsOcc'] = self._try_or_value(lambda: random.randint(values['numVentUse'], self.beds), values['numVentUse'])
        values['numICUBedsOcc'] = self._try_or_value(lambda: random.randint(0, min(self.icu_beds, values['numBedsOcc'])), 0)
        values['numICUBedsAvail'] = self.icu_beds - values['numICUBedsOcc']
        values['numNonICUBedsOcc'] = values['numBedsOcc'] - values['numICUBedsOcc']

        values['numC19MechVentPats'] = self._try_or_value(lambda: random.randint(0, values['numVentUse']), 0)
        values['numC19HospPats'] = self._try_or_value(lambda: random.randint(values['numC19MechVentPats'], values['numBedsOcc']), values['numC19MechVentPats'])
        values['numC19HOPats'] = self._try_or_value(lambda: random.randint(0, values['numC19HospPats']), 0)
        values['numC19OFMechVentPats'] = self._try_or_value(lambda: random.randint(0, min(values['numNonICUBedsOcc'], values['numVentUse'] - values['numC19MechVentPats'])), 0)
        values['numC19OverflowPats'] = self._try_or_value(lambda: random.randint(values['numC19OFMechVentPats'], values['numNonICUBedsOcc']), values['numC19OFMechVentPats'])
        values['numC19Died'] = random.randint(0, 20)
        values['numC19Pats'] = self._try_or_value(lambda: random.randint(values['numC19HospPats'], self.beds), self.beds)
        values['numC19VentPats'] = values['numC19MechVentPats'] + values['numC19OFMechVentPats']

        values['totalOrdersIncrease'] = random.randint(5, self.beds * 3)
        values['totalTestResultsIncrease'] = max(1, round(values['totalOrdersIncrease'] * round(random.randint(10, 70)/100, 2)))
        values['positiveIncrease'] = self._try_or_value(lambda: random.randint(0, values['totalTestResultsIncrease']), 0)
        values['positiveIncreasePercent'] = round(values['positiveIncrease'] / values['totalTestResultsIncrease'], 2)

        # TODO accumulate over time
        values['totalOrders'] = values['totalOrdersIncrease']
        values['rejected'] = values['totalOrders'] - values['totalTestResultsIncrease']
        values['totalTestResults'] = values['totalTestResultsIncrease'] 
        values['positive'] = values['positiveIncrease']
        values['positivePercent'] = round(values['positive'] / values['totalTestResults'], 2)

        self.values = values

    def report(self, date:str, start:str, end:str):
        measure_reports = []
        self.update(date, start, end)
        for report in self.reports:
            measure_reports.append(report.report(self.values))
        return measure_reports
    
    def _try_or_value(self, lam, val):
        try:
            return lam()
        except ValueError:
            return val


In [227]:
def write_entries(entries, filename, ndjson=True):
    with open(filename, 'a') as f:
        entries_string = ""
        if ndjson:
            json_entries = [json.dumps(entry) for entry in entries]
            entries_string = "\n".join(json_entries)
        else:
            entries_string = json.dumps(entries, indent=None, separators=(',', ':'))
        f.write(entries_string+"\n")

In [228]:
def generateMeasureReports(subjects:List[MeasureReportSubject], start:datetime, end:datetime, period:relativedelta, folder):
    dt_format = "%Y-%m-%dT%H:%M:%S+00:00"
    generation_time = f'"{datetime.now().strftime(dt_format)}"'
    end_period = period - relativedelta(seconds=1)
    for subject in subjects:
        entries = []
        iter_start = copy(start)
        iter_end = iter_start + end_period
        outfile = f"{folder}/{subject.subject_name.replace(' ', '_')}.ndjson"
        while iter_start < end:
            entries.extend(subject.report(generation_time, f'"{iter_start.strftime(dt_format)}"', f'"{iter_end.strftime(dt_format)}"'))
            iter_start = iter_start + period
            iter_end = iter_start + end_period
        write_entries(entries, outfile)

In [229]:
template_files = [
    "template_CDCPatientImpactAndHospitalCapacity.json",
    "template_FEMADailyHospitalCOVID19Reporting.json",
    "template_ICUBedsReporting.json",
    "template_ICUBedCurrentOccupancy.json",
    "template_ICUBedStaffedCapacity.json",
]

template_json = []

for filename in template_files:
    with open(filename, 'r') as file:
        template_json.append(file.read())

In [230]:
reports = [ MeasureReport(template) for template in template_json ]
start = parser.parse("2021-01-01T00:00:00+00:00")
end = parser.parse("2022-01-01T00:00:00+00:00")
period = relativedelta(days=1)
num_subjects = 100
subjects = [ MeasureReportSubject(subject, reports=deepcopy(reports)) for subject in range(num_subjects) ]

In [231]:
folder = f"output/{datetime.now().strftime('%Y%m%d_%H%M%S')}.MeasureReports"
if not os.path.isdir(folder):
    os.makedirs(folder)
with open(f'{folder}/Subjects.json', 'w') as file:
    subject_info_list = []
    for subject in subjects:
        subject_info = {
            "name": subject.subject_name,
            "beds": subject.beds,
            "icu_beds": subject.icu_beds,
            "ventilators": subject.ventilators,
        }
        subject_info_list.append(subject_info)
    file.write(json.dumps(subject_info_list, indent=4))
generateMeasureReports(subjects, start, end, period, folder)