# SQL Query from Trino Server

The code below retrieves FHIR data in a three-column dataset format: 'PatientID', 'Code', and 'ResourceType'. It first filters for data between timestamps '2023-02-25 00:00:00' and '2023-02-25 02:00:00' to identify relevant 'PatientID' values associated with specific codes within this period. Then, it retrieves the complete code history of these patients.

Due to computational time limitations, only ICD codes within the specified timestamp are checked to derive the corresponding 'PatientID' values. Once these patient IDs are identified, their full code histories are retrieved.

Trino User and Trino Password should be provided.

In [None]:
os.environ['TRINO_USER']= ""

os.environ['TRINO_PASSWORD']= ""


conn = connect(

    host="https://trino.diz.uk-erlangen.de",

    port=443,

    user=os.environ['TRINO_USER'],

    auth=BasicAuthentication(os.environ['TRINO_USER'], os.environ['TRINO_PASSWORD']),

    verify=False,

    http_scheme="https",

    request_timeout = 60*200,

    catalog="catalog"

)


cur = conn.cursor()

 

import warnings

warnings.filterwarnings("ignore")

 

# Function to execute a query and return the result as a pandas DataFrame

def execute_query_to_df(query):

    cur.execute(query)

    rows = cur.fetchall()

    columns = [desc[0] for desc in cur.description]

    #print(rows, columns)

    return pd.DataFrame(rows, columns=columns)

 

icd_query = """

WITH TimeIntervalICD AS (

    SELECT

        encounter.subject.reference AS PatientID,

        condition_coding.code AS Codes,

        FROM_ISO8601_TIMESTAMP(condition.onsetdatetime) AS EncounterDate

    FROM

        fhir.qs.Encounter encounter

        LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

        LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

        LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

    WHERE

        condition_coding.code IS NOT NULL

        AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

        AND FROM_ISO8601_TIMESTAMP(condition.onsetdatetime) BETWEEN TIMESTAMP '2023-02-25 00:00:00' AND TIMESTAMP '2023-02-25 02:00:00'

),


AllPatientICDCodes AS (

    SELECT

        encounter.subject.reference AS PatientID,

        condition_coding.code AS Codes,

        FROM_ISO8601_TIMESTAMP(condition.onsetdatetime) AS EncounterDate

    FROM

        fhir.qs.Encounter encounter

        LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

        LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

        LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

    WHERE

        condition_coding.code IS NOT NULL

        AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

),


AllPatientOPSCodes AS (

    SELECT

        encounter.subject.reference AS PatientID,

        procedure_coding.code AS OPSCodes,

        FROM_ISO8601_TIMESTAMP(procedure.performeddatetime) AS ProcedureDate

    FROM

        fhir.qs.Encounter encounter

        LEFT JOIN fhir.qs.procedure procedure ON encounter.subject.reference = procedure.subject.reference

        LEFT JOIN UNNEST(procedure.code.coding) AS procedure_coding ON TRUE

    WHERE

        procedure_coding.code IS NOT NULL

        AND procedure_coding.system = 'http://fhir.de/CodeSystem/bfarm/ops'

),


AllPatientLOINCCodes AS (

    SELECT

        encounter.subject.reference AS PatientID,

        observation_coding.code AS LOINCCodes,

        FROM_ISO8601_TIMESTAMP(observation.effectivedatetime) AS ObservationDate

    FROM

        fhir.qs.Encounter encounter

        LEFT JOIN fhir.qs.observation observation ON encounter.subject.reference = observation.subject.reference

        LEFT JOIN UNNEST(observation.code.coding) AS observation_coding ON TRUE

    WHERE

        observation_coding.code IS NOT NULL

        AND observation_coding.system = 'http://loinc.org'

)


SELECT

    p.PatientID,

    ARRAY_AGG(DISTINCT icd.Codes) AS ICDCodesAllTime,

    ARRAY_AGG(DISTINCT ops.OPSCodes) AS OPSCodesAllTime,

    ARRAY_AGG(DISTINCT loinc.LOINCCodes) AS LOINCCodesAllTime

FROM

    TimeIntervalICD p

JOIN

    AllPatientICDCodes icd ON p.PatientID = icd.PatientID

LEFT JOIN

    AllPatientOPSCodes ops ON p.PatientID = ops.PatientID

LEFT JOIN

    AllPatientLOINCCodes loinc ON p.PatientID = loinc.PatientID

GROUP BY

    p.PatientID

"""

 

# icd_query = """

# WITH TimeIntervalICD AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         condition_coding.code AS Codes,

#         FROM_ISO8601_TIMESTAMP(condition.onsetdatetime) AS EncounterDate

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

#         LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

#         LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

#     WHERE

#         condition_coding.code IS NOT NULL

#         AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

#         AND FROM_ISO8601_TIMESTAMP(condition.onsetdatetime) BETWEEN TIMESTAMP '2023-02-25 00:00:00' AND TIMESTAMP '2023-02-25 02:00:00'

# ),

 

# AllPatientICDCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         condition_coding.code AS Codes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

#         LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

#         LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

#     WHERE

#         condition_coding.code IS NOT NULL

#         AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

# ),

 

# AllPatientOPSCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         procedure_coding.code AS OPSCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Procedure procedure ON encounter.subject.reference = procedure.subject.reference

#         LEFT JOIN UNNEST(procedure.code.coding) AS procedure_coding ON TRUE

#     WHERE

#         procedure_coding.code IS NOT NULL

#         AND procedure_coding.system = 'http://fhir.de/CodeSystem/bfarm/ops'

# ),

 

# AllPatientLOINCCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         observation_coding.code AS LOINCCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Observation observation ON encounter.subject.reference = observation.subject.reference

#         LEFT JOIN UNNEST(observation.code.coding) AS observation_coding ON TRUE

#     WHERE

#         observation_coding.code IS NOT NULL

#         AND observation_coding.system = 'http://loinc.org'

# )

 

# SELECT

#     icd.PatientID,

#     ARRAY_AGG(DISTINCT icd.Codes) AS ICDCodesAllTime,

#     ARRAY_AGG(DISTINCT ops.OPSCodes) AS OPSCodesAllTime,

#     ARRAY_AGG(DISTINCT loinc.LOINCCodes) AS LOINCCodesAllTime

# FROM

#     AllPatientICDCodes icd

# LEFT JOIN

#     AllPatientOPSCodes ops ON icd.PatientID = ops.PatientID

# LEFT JOIN

#     AllPatientLOINCCodes loinc ON icd.PatientID = loinc.PatientID

# GROUP BY

#     icd.PatientID

# """

 

# ops_query = """

# WITH TimeIntervalOPS AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         procedure_coding.code AS Codes,

#         FROM_ISO8601_TIMESTAMP(procedure.performeddatetime) AS EncounterDate

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Procedure procedure ON encounter.subject.reference = procedure.subject.reference

#         LEFT JOIN UNNEST(procedure.code.coding) AS procedure_coding ON TRUE

#     WHERE

#         procedure_coding.code IS NOT NULL

#         AND procedure_coding.system = 'http://fhir.de/CodeSystem/bfarm/ops'

#         AND FROM_ISO8601_TIMESTAMP(procedure.performeddatetime) BETWEEN TIMESTAMP '2023-02-25 00:00:00' AND TIMESTAMP '2023-02-25 02:00:00'

# ),

 

# AllPatientICDCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         condition_coding.code AS Codes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

#         LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

#         LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

#     WHERE

#         condition_coding.code IS NOT NULL

#         AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

# ),

 

# AllPatientOPSCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         procedure_coding.code AS OPSCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Procedure procedure ON encounter.subject.reference = procedure.subject.reference

#         LEFT JOIN UNNEST(procedure.code.coding) AS procedure_coding ON TRUE

#     WHERE

#         procedure_coding.code IS NOT NULL

#         AND procedure_coding.system = 'http://fhir.de/CodeSystem/bfarm/ops'

# ),

 

# AllPatientLOINCCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         observation_coding.code AS LOINCCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Observation observation ON encounter.subject.reference = observation.subject.reference

#         LEFT JOIN UNNEST(observation.code.coding) AS observation_coding ON TRUE

#     WHERE

#         observation_coding.code IS NOT NULL

#         AND observation_coding.system = 'http://loinc.org'

# )

 

# SELECT

#     icd.PatientID,

#     ARRAY_AGG(DISTINCT icd.Codes) AS ICDCodesAllTime,

#     ARRAY_AGG(DISTINCT ops.OPSCodes) AS OPSCodesAllTime,

#     ARRAY_AGG(DISTINCT loinc.LOINCCodes) AS LOINCCodesAllTime

# FROM

#     AllPatientICDCodes icd

# LEFT JOIN

#     AllPatientOPSCodes ops ON icd.PatientID = ops.PatientID

# LEFT JOIN

#     AllPatientLOINCCodes loinc ON icd.PatientID = loinc.PatientID

# GROUP BY

#     icd.PatientID

# """

 

# loinc_query = """

# WITH TimeIntervalLOINC AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         observation_coding.code AS Codes,

#         FROM_ISO8601_TIMESTAMP(observation.effectivedatetime) AS EncounterDate

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Observation observation ON encounter.subject.reference = observation.subject.reference

#         LEFT JOIN UNNEST(observation.code.coding) AS observation_coding ON TRUE

#     WHERE

#         observation_coding.code IS NOT NULL

#         AND observation_coding.system = 'http://loinc.org'

#         AND FROM_ISO8601_TIMESTAMP(observation.effectivedatetime) BETWEEN TIMESTAMP '2023-02-25 00:00:00' AND TIMESTAMP '2023-02-25 01:00:00'

# ),

 

# AllPatientICDCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         condition_coding.code AS Codes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN UNNEST(encounter.diagnosis) AS encounter_diagnosis ON TRUE

#         LEFT JOIN fhir.qs.Condition condition ON encounter_diagnosis.condition.reference = CONCAT('Condition/', condition.id)

#         LEFT JOIN UNNEST(condition.code.coding) AS condition_coding ON TRUE

#     WHERE

#         condition_coding.code IS NOT NULL

#         AND condition_coding.system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm'

# ),

 

# AllPatientOPSCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         procedure_coding.code AS OPSCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Procedure procedure ON encounter.subject.reference = procedure.subject.reference

#         LEFT JOIN UNNEST(procedure.code.coding) AS procedure_coding ON TRUE

#     WHERE

#         procedure_coding.code IS NOT NULL

#         AND procedure_coding.system = 'http://fhir.de/CodeSystem/bfarm/ops'

# ),

 

# AllPatientLOINCCodes AS (

#     SELECT

#         encounter.subject.reference AS PatientID,

#         observation_coding.code AS LOINCCodes

#     FROM

#         fhir.qs.Encounter encounter

#         LEFT JOIN fhir.qs.Observation observation ON encounter.subject.reference = observation.subject.reference

#         LEFT JOIN UNNEST(observation.code.coding) AS observation_coding ON TRUE

#     WHERE

#         observation_coding.code IS NOT NULL

#         AND observation_coding.system = 'http://loinc.org'

# )

 

# SELECT

#     icd.PatientID,

#     ARRAY_AGG(DISTINCT icd.Codes) AS ICDCodesAllTime,

#     ARRAY_AGG(DISTINCT ops.OPSCodes) AS OPSCodesAllTime,

#     ARRAY_AGG(DISTINCT loinc.LOINCCodes) AS LOINCCodesAllTime

# FROM

#     AllPatientICDCodes icd

# LEFT JOIN

#     AllPatientOPSCodes ops ON icd.PatientID = ops.PatientID

# LEFT JOIN

#     AllPatientLOINCCodes loinc ON icd.PatientID = loinc.PatientID

# GROUP BY

#     icd.PatientID

# """

 

icd_df = execute_query_to_df(icd_query)

print('icd_df finished',icd_df)

 

# ops_df = execute_query_to_df(ops_query)

# print('ops_df finished', ops_df)

 

# loinc_df = execute_query_to_df(loinc_query)

# print('LOINC_df finished', loinc_df)

 

# Combine dataframes with selected columns

result_df = pd.concat([icd_df[['PatientID', 'ICDCodesAllTime']],

                     icd_df[['PatientID', 'OPSCodesAllTime']],

                     icd_df[['PatientID', 'LOINCCodesAllTime']]],

                     ignore_index=True)


#result_df.to_csv('compare.csv', index=False)

 

# Function to flatten the dataset

def flatten_codes(df):

    # Initialize an empty list to collect rows

    rows = []

 

    # Iterate through each row in the DataFrame

    for _, row in df.iterrows():

        patient_id = row['PatientID']

       

        # Flatten ICD codes

        print(row['ICDCodesAllTime'])

        icd_codes = row['ICDCodesAllTime']

        if isinstance(icd_codes,list) and icd_codes: 

            for code in row['ICDCodesAllTime']:

                rows.append({'PatientID': patient_id, 'Codes': code, 'ResourceType': 'ICD'})

       

        # Flatten OPS codes

        print(row['OPSCodesAllTime'])

        ops_codes = row['OPSCodesAllTime']

        if isinstance(ops_codes,list) and ops_codes:           

            for code in row['OPSCodesAllTime']:

                rows.append({'PatientID': patient_id, 'Codes': code, 'ResourceType': 'OPS'})

       

        # Flatten LOINC codes

        print(row['LOINCCodesAllTime'])

        loinc_codes = row['LOINCCodesAllTime']

        if isinstance(loinc_codes,list) and loinc_codes: 

            for code in row['LOINCCodesAllTime']:

                rows.append({'PatientID': patient_id, 'Codes': code, 'ResourceType': 'LOINC'})

   

    # Convert the list of rows into a DataFrame

    flattened_df = pd.DataFrame(rows)

    return flattened_df

 
flat_df = flatten_codes(result_df)



# Create a mapping of unique PatientID to unique integers

patient_id_mapping = {id: index + 1 for index, id in enumerate(flat_df['PatientID'].unique())}

 

# Replace the PatientID in the DataFrame

flat_df['PatientID'] = flat_df['PatientID'].map(patient_id_mapping)

 

# Drop duplicates if a patient has the same code multiple times

flat_df = flat_df.drop_duplicates()


print(flat_df)

flat_df.to_csv('FHIR_Codes_Whole_Life_Timestamp_25_02_23-2hours.csv', index=False)

flat_df.to_parquet('FHIR_Codes_Whole_Life_Timestamp_25_02_23-2hours.parquet', index=False, compression='snappy')

 

# Close the cursor and connection

cur.close()

conn.close()

