In [106]:
# all paths
patientsRecordsPath = "wasb:///hive_patient_master_dd-1lab-20171016-140000.csv"
labORdersResultPath =  "wasb:///hive_accession_orders_results-1lab-20171019-104500.csv"
diagnosisRecordsPath = "wasb:///hive_diagnosis_dd-1lab-20171016-140000.csv"
cptRecordsPath = "wasb:///hive_cpts_dd-1lab-20171016-140000.csv"
outputPath = "wasb:///patient-ranking"
demographicsPath = "wasb:///demographics/all-patients.txt"


In [96]:
# Search By Test Name and Value
def searchTestRecords(row, testName, testExpression, isFasting):
    testRecord = row[0]
    actualtestValue = testRecord.result_idw_result_value.isdigit() and int(testRecord.result_idw_result_value)
    testEvalString = testExpression.format(actualtestValue)
    isFastingMatch =  testRecord.accession_idw_fasting_ind == 'Y' if isFasting else True
    testValueMatch = actualtestValue and eval(testEvalString) and isFastingMatch
    actualTestName = testRecord.result_idw_result_name
    return ((testName in actualTestName) and testValueMatch)

In [97]:
def handleResource(op, records):
    resource = op['resource']
    resourceType = resource['resourceType']
    
    if resourceType == 'Observation':
        records = records.filter(lambda x : searchTestRecords(x, op['resource']['code']['text'],op['resource']['expression'], True))
    
    return records

def handleAllOperators(operators, records):
    for op in operators :
        if('resource') in op:
            print('and (')
            records = records.intersection(handleResource(op, records))
            print(')')

        if('any') in op:
            print('and (') 
            records = records.intersection(handleAnyOperators(op['any'], records))
            print(')') 

        if('and') in op:
            print('and (') 
            records = records.intersection(handleAllOperators(op['and']),records )
            print(')') 
    
    return records
        
def handleAnyOperators(operators, records):
    
    orrecords = False
    for op in operators :
        if('resource') in op:
            print('or (')
            if (not orrecords):
                orrecords = True
                records = handleResource(op, records)
            records = records.union (handleResource(op, records))
            print(')')

        if('any') in op:
            print('or (')
            if (not orrecords):
                orrecords = True
                records = handleAnyOperators(op['any'],records)
            records = records.union (handleAnyOperators(op['any'],records))
            print(')')

        if('all') in op:
            print('or (')
            records = records.union (handleAllOperators(op['and'],records))        
            print(')')
            
    return records
            

In [107]:
# read the file
labData = sc.textFile(labORdersResultPath)
patientData = sc.textFile(patientsRecordsPath)
diagnosisRecords = sc.textFile(diagnosisRecordsPath)
cptRecords = sc.textFile(cptRecordsPath)
demographics = sc.textFile(demographicsPath)

In [3]:
# Filter the header row 
patientHeader = patientData.first()
labHeader = labData.first()
diagnosisHeader = diagnosisRecords.first()
cptHeader = cptRecords.first()

In [4]:
# Get the data row
patientDataWoHeader = patientData.filter(lambda x: x !=patientHeader)
labDataWoHeader = labData.filter(lambda x: x !=labHeader) 
diagnosisRecordWoHeader = diagnosisRecords.filter(lambda x: x !=diagnosisHeader) 
cptRecordWoHeader = cptRecords.filter(lambda x: x !=cptHeader) 

In [5]:
import csv
from io import StringIO
from collections import namedtuple

In [6]:
patientFields   = patientHeader.replace(" ","_").replace("/","_").split("\t")
labFields   = labHeader.replace(" ","_").replace("/","_").split("\t")
diagnosisRecordsFields = diagnosisHeader.split("\t")
cptRecordsFields = cptHeader.split("\t")
patientDemoGraphicsFields = ['']

In [7]:
cptRecordsFields

['unique_accession_id', 'idw_accn_id', 'idw_cpt_id', 'idw_service_id', 'idw_dos_id', 'idw_analyte_code', 'idw_implosion_level', 'idw_cpt_level', 'idw_quantity', 'idw_qbs_bill_number', 'idw_date_created', 'idw_date_modified', 'idw_cpt_code', 'idw_cpt_name', 'idw_qbs_service_code', 'idw_qbs_service_name', 'dummy_cpts_ind', 'idw_lab_code']

In [8]:
Patient   = namedtuple('Patient', patientFields, verbose=False)
LabRecord   = namedtuple('LabRecord', labFields, verbose=False)
DiagnosisRecord = namedtuple ('DiagnosisRecord', diagnosisRecordsFields, verbose = False)
CptRecord = namedtuple ('CptRecord', cptRecordsFields, verbose = False)

In [9]:
def parseLabRecord(row):
    reader = csv.reader(StringIO(row), delimiter='\t')
    row=next(reader)
    return LabRecord(*row)

In [10]:
def parsePatient(row):
    reader = csv.reader(StringIO(row), delimiter='\t')
    row=next(reader)
    return Patient(*row)

In [11]:
def parseDiagnosisRecord(row):
    reader = csv.reader(StringIO(row), delimiter='\t')
    row=next(reader)
    return DiagnosisRecord(*row)  

In [12]:
def parseCptRecord(row):
    reader = csv.reader(StringIO(row), delimiter='\t')
    row=next(reader)
    return CptRecord(*row)           

In [13]:
# filter record which do not have all the columns
labDataWoHeaderFiltered = labDataWoHeader.filter(lambda x: len(x.split("\t")) == len(labFields) )


In [14]:
# map the records
patients = patientDataWoHeader.map(parsePatient)
labRecords = labDataWoHeaderFiltered.map(parseLabRecord)
diagnosis = diagnosisRecordWoHeader.map(parseDiagnosisRecord)
cpts = cptRecordWoHeader.map(parseCptRecord)

In [15]:
# Search By Test Name and Value
def searchTestAndValue(row, testData):
    testValue = testData['value']
    testName = testData['name']
    isFasting = testData['fasting']
    actualtestValue = row.result_idw_result_value.isdigit() and int(row.result_idw_result_value)
    testEvalString = testValue.format(actualtestValue)
    isFastingMatch =  row.accession_idw_fasting_ind == 'Y' if isFasting else True
    testValueMatch = actualtestValue and eval(testEvalString) and isFastingMatch
    actualTestName = row.result_idw_result_name
    return ((testName in actualTestName) and testValueMatch)

In [16]:
# Search By Patient Demographics
def searchPatientByDemoGraphics(row, patientDat):
    testAge = patientDat['age']
    testGender = patientDat['gender']
    ageInYears = int(row.age_in_years) if row.age_in_years.isdigit() else 0 
    ageEvalString = testAge.format(ageInYears)
    genderEvalString = testGender.format(row.pm_gender)
    return eval(ageEvalString) and eval(genderEvalString)

In [17]:
testMatchFields = ['patientId','criteria','testNameMatch', 'testValueMatch']
TestMatch = namedtuple('TestMatch', testMatchFields, verbose=False)

In [18]:
def rankPatientsByTest(row, testData):
    testValue = testData['value']
    testName = testData['name']
    isFasting = testData['fasting']
    patient = row[0]
    testRecord = row[1]
    actualtestValue = testRecord.result_idw_result_value.isdigit() and int(testRecord.result_idw_result_value)
    testEvalString = testValue.format(actualtestValue)
    isFastingMatch =  testRecord.accession_idw_fasting_ind == 'Y' if isFasting else True
    actualTestName = testRecord.result_idw_result_name
    testNameMatch = (testName in actualTestName)
    testValueMatch = testNameMatch and actualtestValue and eval(testEvalString) and isFastingMatch    
    patientId = patient.pat_master_id
    return TestMatch(patientId = patientId,criteria = testValue.format(testName), testNameMatch = testNameMatch, testValueMatch = testValueMatch)
    


In [19]:
criteria = {
    'test':{
     'name': 'LDL-C',
     'value': '{0} > 130 and {0} < 190',
     'fasting': True
  },
    'demographics': {
        'age':' {0} > 45',
        'gender':'\'{0}\' == \'M\''
    }    
}


In [98]:
#input criteria
import json

def loadCriteriaJson(trialId):
    inputCriteriaPath ="wasb:///input/{0}.json".format(trialId)
    jsonRDD = sc.wholeTextFiles(inputCriteriaPath).map(lambda x: x[1])
    criteria = jsonRDD.collect()[0];
    json_data = json.loads(criteria)
    return json_data
    

In [100]:

def filterRecordsByCriteria(trialId):

    json_data = loadCriteriaJson(trialId)
    
    labRecordsMap = labRecords.map(lambda x :(x.pat_master_id, x))
    patientsMap = patients.map(lambda x :(x.pat_master_id, x))
    
    allRecords = labRecordsMap.join(patientsMap).map(lambda x: x[1])  
    
    if('any') in json_data:
        operators = json_data['any']
        filteredRecords = handleAnyOperators(operators, allRecords)

    if('all') in json_data:
        operators = json_data['all']
        filteredRecords = handleAllOperators(operators, allRecords)

    if('resource') in json_data:
        resource = json_data['resource']
        filteredRecords = handleResource(resource, allRecords)

    return filteredRecords



In [103]:
filteredRecords = filterRecordsByCriteria('12346')



An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 81.0 failed 4 times, most recent failure: Lost task 0.3 in stage 81.0 (TID 166, 10.0.0.4, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 171, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 166, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1338, in takeUpToNumLeft
    yield next(iterator)
  File "<stdin>", line 3, in <lambda>
AttributeError: 'tuple' object has no attribute 'pat_master_id'

	at org.apache.spark.api.python.Pyth

In [108]:
filteredRecordsMap = filteredRecords.map(lambda x:  (x[0].pat_master_id, x))
demographicsMap = demographics.map(lambda x:  (x[0], x))
filteredRecords.join(demographics).map(lambda x: x[1]).take(1)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 87.0 failed 4 times, most recent failure: Lost task 9.3 in stage 87.0 (TID 197, 10.0.0.5, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 171, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 166, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<stdin>", line 6, in <lambda>
  File "<stdin>", line 5, in searchTestRecords
KeyError: '0 '

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:2

In [20]:
# filter by the criteria
filteredTests = labRecords.filter(lambda x: searchTestAndValue(x, criteria['test']))
filteredPatients = patients.filter(lambda x : searchPatientByDemoGraphics(x, criteria['demographics']))
testCount = filteredTests.count()
patientsCount = filteredPatients.count()
totalPatCount = patients.count()
totalRecordsCount = labRecords.count()
patientsCount
totalPatCount
totalRecordsCount

1754327

In [21]:
# join the lab records and patients
filteredTestsmap = filteredTests.map(lambda x :(x.pat_master_id, x))
filteredpatientsmap = filteredPatients.map(lambda x :(x.pat_master_id, x))
joinResult = filteredTestsmap.join(filteredpatientsmap).map(lambda x: x[1])
resCount = joinResult.map(lambda x: "{} {} {}".format(x[0].result_idw_result_name, x[0].result_idw_result_value,x[1].age_in_years)).count()
resCount

24

In [22]:
# Ranking of patients by test matching
allTestsMap = labRecords.map(lambda x :(x.pat_master_id, x))
joinResult = filteredpatientsmap.join(allTestsMap).map(lambda x: x[1])
joinCount = joinResult.map(lambda x: rankPatientsByTest(x,  criteria['test'])).filter(lambda x : x.testNameMatch == True and x.testValueMatch == True).count()

In [23]:
import uuid
uuid.uuid4()
fileName = "{0}-{1}".format(outputPath,str(uuid.uuid4()))


In [24]:
joinResult.saveAsTextFile(fileName)