In [1]:
!pip install fhir.resources

In [2]:
import base64
import csv
from datetime import date
from functools import reduce
from github import Github
import json
import pprint
import random
import uuid
from fhir.resources.bundle import Bundle
from fhir.resources.devicedefinition import DeviceDefinition
from IPython.core.debugger import set_trace

In [3]:
g = Github('<github_access_token>')
rdt_scan_repo = g.get_repo('sid-indonesia/rdt-scan')
img_path_prefix = 'app/src/main/res/drawable-nodpi/'

# generate reproducible UUIDs
rd = random.Random()
seed = "Wznza9%R$xs8eigsteuKg5pewMnAuFK8fWidZ2D9r8Fstb!Fpa%ovZs3o3yQQ%Jp4Lv2dS#eAbaH^*A@kxCTCSes*XzZfKXx6KRY9rgV!t^8#woP@GrpMHVnXJb3gQG*"

empty_flag = "ZZZ"

line_code_to_text = {
    "C": "Control",
    "T": "Test",
    "G": "IgG",
    "M": "IgM",
    "G+M": "IgG/IgM"
}

In [4]:

# generate reproducible UUIDs
# rd = random.Random()

# uuid.uuid4()

In [5]:
def is_valid(row):
    return(
        row['Active'] == "TRUE"
        and reduce(lambda x, y: x and row[y],
               ['Manufacturer', 'Model', 'Lines'],
               True)
        and (row['Sample Drops'] or row['Sample Quantity (uL)'])
    )        
        
def add_image(config):
    path = config['REF_IMG']
    if not path: return
    content = None
    try:
        content = rdt_scan_repo.get_contents('{}{}.jpg'.format(img_path_prefix, path)).decoded_content
    except:
        content = rdt_scan_repo.get_contents('{}{}.JPG'.format(img_path_prefix, path)).decoded_content
    config['REF_IMG'] = base64.b64encode(content).decode("utf-8") 
    return config

def build_device_definitions(upto=0, seed=seed):
    # seed reproducible UUIDs per call
    rd.seed(seed)

    device_definitions = []

    with open('fhir-deviceDefinitions-list.csv') as f:
        csv_reader = csv.DictReader(f)
        for i, row in enumerate(csv_reader):
            if upto and i > upto:
                break
            if not is_valid(row):
                print("skipping row {}".format(i))
                continue

            dd_json = {}

            # generate a UUID 4 string with the seed RNG
            dd_json["identifier"] = [
                {
                    "value": str(uuid.UUID(int=rd.getrandbits(128)))
                }
            ]
            dd_json["manufacturerString"] = row['Manufacturer']
            dd_json["deviceName"] = [{
                "name": row['Model'],
                "type": "user-friendly-name"
            }]
            dd_json["udiDeviceIdentifier"] = []
            dd_json["capability"] = []
            dd_json["property"] = []

            if row['GTIN']:
                dd_json["udiDeviceIdentifier"].append({
                    "deviceIdentifier": row['GTIN'],
                    "issuer": "urn:gtin",
                    "jurisdiction": "urn:global"
                })

            if row['Product Code']:
                dd_json["udiDeviceIdentifier"].append({
                    "deviceIdentifier": row['Product Code'],
                    "issuer": "urn:manufacturer",
                    "jurisdiction": "urn:global"
                })

            # define capabilities
            if row['Steps']:
                dd_json["capability"].append({
                    "type": {
                        "text":"instructions"
                    },
                    "description": [
                        {
                            "text": step.strip()
                        } for step in row['Steps'].split('/')
                    ]
                })

            if row['Lines']:
                try:
                    dd_json["capability"].append({
                        "type": {
                            "text":"lines"
                        },
                        "description": [
                            {
                                "coding": [{"code":line}],
                                "text": line_code_to_text[line]
                            } for line in row['Lines'].split('/')
                        ]
                    })
                except KeyError as e:
                    print("Line code to text key '{}' not found, skipping row {}".format(e.args[0], i))
                    continue

            if row['Sample Types']:
                dd_json["capability"].append({
                    "type": {
                        "text":"sample types"
                    },
                    "description": [
                        {
                            "text": sample_type.strip()
                        } for sample_type in row['Sample Types'].split('/')
                    ]
                })

            if row['Test Type']:
                dd_json["capability"].append({
                    "type": {
                        "text":"detected component types"
                    },
                    "description": [
                        {
                            "text": row['Test Type']
                        }
                    ]
                })

            # define timing property
            timing_value_quantities = []
            
            if row['Min Result Time']:
                timing_value_quantities.append({
                    "value": int(row['Min Result Time']),
                    "comparator": ">=",
                    "unit": "minutes"
                })

            if row['Max Result Time']:
                timing_value_quantities.append({
                    "value": int(row['Max Result Time']),
                    "comparator": "<=",
                    "unit": "minutes"
                })
                
            if len(timing_value_quantities):
                dd_json["property"].append({
                    "type": {
                        "text": "Time For Result"
                    },
                    "valueQuantity": timing_value_quantities
                })

            # define temperature property
            temperature_value_quantities = []
            
            if row['Low Temp']:
                temperature_value_quantities.append({
                    "value": float(row['Low Temp']),
                    "comparator": ">=",
                    "unit": "degrees celsius"
                })

            if row['High Temp']:
                temperature_value_quantities.append({
                    "value": float(row['High Temp']),
                    "comparator": "<=",
                    "unit": "degrees celsius"
                })
                
            if len(temperature_value_quantities):
                dd_json["property"].append({
                    "type": {
                        "text": "Temperature"
                    },
                    "valueQuantity": temperature_value_quantities
                })

            # define sample quantity property
            sample_quantity_value_quantities = []

            if row['Sample Drops']:
                sample_quantity_value_quantities.append({
                    "value": int(row['Sample Drops']),
                    "unit": "drop"
                })

            if row['Sample Quantity (uL)']:
                sample_quantity_value_quantities.append({
                    "value": int(row['Sample Quantity (uL)']),
                    "unit": "microliter"
                })


            if len(sample_quantity_value_quantities):
                dd_json["property"].append({
                    "type": {
                        "text": "Sample Quantity"
                    },
                    "valueQuantity": sample_quantity_value_quantities
                })

            # define buffer quantity property
            buffer_quantity_value_quantities = []

            if row['Buffer Drops']:
                buffer_quantity_value_quantities.append({
                    "value": int(row['Buffer Drops']),
                    "comparator": ">=",
                    "unit": "drop"
                })

            if row['Max Buffer Drops']:
                buffer_quantity_value_quantities.append({
                    "value": int(row['Max Buffer Drops']),
                    "comparator": "<=",
                    "unit": "drop"
                })

            if row['Buffer Quantity (uL)']:
                buffer_quantity_value_quantities.append({
                    "value": int(row['Buffer Quantity (uL)']),
                    "unit": "microliter"
                })

            if len(buffer_quantity_value_quantities):
                dd_json["property"].append({
                    "type": {
                        "text": "Buffer Quantity"
                    },
                    "valueQuantity": buffer_quantity_value_quantities
                })

            # define rdt scan configuration property
            if row['RDTScan Configuration']:
                rdtscan_json = json.loads(row['RDTScan Configuration'])
                rdtscan_json = add_image(rdtscan_json)
                value_code = [{
                    "coding": [
                        {
                            "code": key
                        }
                    ],
                    "text": str(value)
                } for key, value in rdtscan_json.items()]
                    
                dd_json["property"].append({
                    "type": {
                        "text": "RDTScan Configuration"
                    },
                    "valueCode": value_code
                })
            
            # convert JSON to FHIR class
            dd = DeviceDefinition(dd_json)
            device_definitions.append(dd)
            
    return device_definitions

device_definitions = build_device_definitions()

device_definitions_json = [{"resource": x.as_json()} for x in device_definitions]

bundle = Bundle({
    "identifier": {
        "value": "rdt-og-device-definitions"
      },
    "type": "collection",
    "entry": device_definitions_json
})

with open('device_definitions-rdt-og-{}.json'.format(date.today().isoformat()), 'w') as f:
    f.write(json.dumps(bundle.as_json(), indent=2))

pprint.pprint(device_definitions_json)