In [33]:
#Import Modules
import pandas as pd
import ncl.sqlsnippets as snips
import json
import ast
import os

#Import env
from os import getenv
from dotenv import load_dotenv

In [34]:
#Import runtime settings from .env file
def import_settings():
    load_dotenv(override=True)

    return {
        "FIN_YEAR": json.loads(getenv("FIN_YEAR")),
        "FIN_MONTH": json.loads(getenv("FIN_MONTH")),
        "PROCEDURES": json.loads(getenv("PROCEDURES")),

        "SQL_ADDRESS": getenv("SQL_ADDRESS"),
        "SQL_DATABASE": getenv("SQL_DATABASE"),
        "SQL_SCHEMA": getenv("SQL_SCHEMA"),
        "SQL_TABLE": getenv("SQL_TABLE"),

        "OUTPUT": getenv("OUTPUT")
    }

In [35]:
#Convert an array into a SQL set (as a string)
def array_to_set (arr):

    #Initalise string
    str_set = "("

    #Add each item of the array to the set string
    for item in arr:
        str_set += f"'{item}', "

    #Close the brackets and remove the comma from the last item added
    str_set = str_set[:-2] + ")"

    return str_set

#Generate the where clause based on the scope of the execution (.env settings)
def load_ref_where (fin_year, fin_month):

    #Array of the clauses because it is not guaranteed they will be used
    clause = ["WHERE", "AND"]
    #Flag to check if the year clause is not blank when processing the month clause
    idx = 0
    #String to build where clause
    sql_where = ""

    if fin_year != []:
        #Convert array to set format
        year_set = array_to_set(fin_year)
        #Build where clause
        sql_where += f"{clause[idx]} opcs.Year IN {year_set}\nAND icd.Year IN {year_set}\n"
        #Update idx flag
        idx += 1

    if fin_month != []:
        #Convert array to set format
        month_set = array_to_set(fin_month)
        #Build where clause
        sql_where += f"{clause[idx]} opcs.Fin_Month IN {month_set}\nAND icd.Fin_Month IN {month_set}"

    return sql_where

#Load the base reference table to apply the where clause to
def load_ref_base (fin_year, fin_month):
    #Path to base reference file
    path_base = "./data/base_ref_table.sql"
    
    #Read sql in the file
    with open(path_base, 'r') as sql_file:
        sql_string = sql_file.read()

    #Derrive the where clause
    where_clause = load_ref_where (fin_year, fin_month)

    #Append the where clause
    sql_string += f"\n\n{where_clause}"

    #Wrap the query (So it is easier to add the where clause for the procedure)
    sql_string = f"SELECT Year, Fin_month, org_code, hospital_no FROM (\n{sql_string}\n\n) base\n\n"

    return sql_string

In [36]:
#Load a given procedure
def find_procedure (id):
    path_procedure = "./data/procedures"

    #Initalise match
    matching_file = False

    #For each file in the procedure directory
    for root, dirs, files in os.walk(path_procedure):
        for filename in files:
            if filename.endswith(".json"):
                file_path = os.path.join(root, filename)

                #Open the json file
                with open(file_path, "r") as json_file:
                    try:
                        #Load the data
                        data = json.load(json_file)

                        #Check if it contains the id
                        if isinstance(data, dict) and "id" in data and data["id"] == id:
                            matching_file = data
                            #Found a match, no need to continue searching
                            break  

                    except json.JSONDecodeError:
                        print(f"Error decoding JSON in {file_path}")

        #If a match was found
        if matching_file:
            break  

    if matching_file == False:
        raise Exception(f"Unable to find id {id}")

    return matching_file

#Get all procedures (for when procedure is not set in .env)
def all_procedures ():
    #Path to procedure directories
    path_procedure = "./data/procedures"

    #Initialise array
    all_ids = []

    #For each file in the directory
    for root, dirs, files in os.walk(path_procedure):
        for filename in files:
            if filename.endswith(".json"):
                file_path = os.path.join(root, filename)

                #Open each json file
                with open(file_path, "r") as json_file:
                    try:
                        data = json.load(json_file)

                        #get the id value of each procedure
                        if isinstance(data, dict) and "id" in data:
                            all_ids.append(data["id"])
                    except json.JSONDecodeError:
                        print(f"Error decoding JSON in {file_path}")

    return all_ids

In [37]:
#Convert age object into sql
def process_age (age):
    if age == "Adult":
        return "age > 16 "
 
    elif age == "Child":
        return "age <= 16 "
    
    else:
        raise Exception (f"Age not recognised: {age}")

#Convert the POD object into sql
def process_pod (pod):
    #Convert array of pod values into sql syntax set
    sql_str = "POD IN " +  "('" + "', '".join(pod) + "') "

    return sql_str

#Convert the TFC object into sql
def process_tfc (tfc):

    #Convert tfc into array of strings as join only works on strings
    tfc = [str(i) for i in tfc]

    #Convert array of tfc values into sql syntax set
    sql_str = "tfc IN " +  "('" + "', '".join(tfc) + "') "

    return sql_str

#Convert the MainSpec object into sql
def process_mainspec (mainspec):

    #Convert tfc into array of strings as join only works on strings
    mainspec = [str(i) for i in mainspec]

    #Convert array of tfc values into sql syntax set
    sql_str = "main_spec IN " +  "('" + "', '".join(mainspec) + "') "

    return sql_str

#Handle the speciality object
def process_speciality (speciality):

    clauses_speciality = []

    if 'TFC' in speciality:
        clauses_speciality.append(process_tfc(speciality["TFC"]))

    if 'MainSpec' in speciality:
        clauses_speciality.append(process_mainspec(speciality["MainSpec"]))

    if len(clauses_speciality) == 2:
        return f"({clauses_speciality[0]} OR {clauses_speciality[1]}) "
    else:
        return clauses_speciality[0]

#Convert variables in the logic eq into sql to be used
def var_to_sql (var, cgs):
    #Get variable information
    cg_id = var["cg"]
    action = var["action"]
    type = var["type"]
    l_first = var["level"]["first"]
    l_last = var["level"]["last"]

    #Initialise sql string
    if action == "out":
        sql_str = "NOT ( "
    else:
        sql_str = "( "

    #Get codes from codegroup
    try:
        cg_set = cgs[str(cg_id)]
    except:
        raise Exception(f"Codegroup not found: {cg_id}")


    #Rules for codes in all columns
    if l_first == 0:

        #Convert codegroups into array
        cg_arr = ast.literal_eval(cg_set)

        #Set name of all codes column from codegroup type
        col = f"{type}_all"

        #Iterate through codegroup
        items = len(cg_arr)
        for idx, cg in enumerate(cg_arr):
            sql_str += f"{col} LIKE '%{cg}%' "

            #For all items that are not the final element
            if idx != items - 1:
                sql_str += "OR "
            else:
                sql_str += ") "

    #Rules for codes in specific columns
    elif l_first <= l_last:

        #Calculate parameters for substring query
        #Starting point of the _all code column
        idx_ss = 1 + (4 * (l_first-1))

        #Number of columns (per 4 characters) to check
        idx_sl = 4 * (l_last - (l_first - 1))
        
        #Convert codegroups into array
        cg_arr = ast.literal_eval(cg_set)

        #Set name of all codes column from codegroup type
        col = f"{type}_all"

        #Iterate through codegroup
        items = len(cg_arr)
        for idx, cg in enumerate(cg_arr):
            sql_str += f"SUBSTRING({col}, {idx_ss}, {idx_sl}) LIKE '%{cg}%' "

            #For all items that are not the final element
            if idx != items - 1:
                sql_str += "OR "
            else:
                sql_str += ") "
                
    else:
        raise Exception (f"Invalid levels specified: from {l_first} to {l_last}")
    
    return sql_str

#Convert logic object into sql
def process_logic (logic, cgs):
    #Split logic by spaces
    items = logic["eq"].split(" ")

    #Initial sql for the logic section
    query_logic = "( "

    #For each word in the logic
    for item in items:
        #Identify keywords that are left as is
        if item not in ("AND", "OR", "(", ")"):
            try:
                #Get the variable object
                variable = logic[item]

                #Get the SQL to handle this variable
                query_logic += var_to_sql(variable, cgs)

            except:
                raise Exception (f"Error: Unable to process {item}")
        else:
            #Leave keyword as is (Add whitespace to end)
            query_logic += item + " "

    query_logic += ") "

    return query_logic 

#Convert the conditions parent object into a WHERE clause for the query
def procedure_where (conditions, cgs):
    
    #Build an array of clauses that are combined into the WHERE clause
    clauses = []

    #Age conditions
    if 'Age' in conditions:
        clauses.append(process_age(conditions["Age"]))

    #Point of Delivery conditions
    if 'POD' in conditions:
        clauses.append(process_pod(conditions["POD"]))

    #Speciality conditions
    if 'Speciality' in conditions:
        clauses.append(process_speciality(conditions["Speciality"]))

    #Codegroup Logic Conditions
    if 'logic' in conditions:
        clauses.append(process_logic(conditions["logic"], cgs))

    #Build WHERE clause
    if len(clauses) > 0:
        return "WHERE " + "AND ".join(clauses)

    return ""

#Code for building the procedure query
def build_procedure_query(json_pro, query_base, cgs):

        #Derrive where clause
        query_where = procedure_where(json_pro["conditions"], cgs)
        
        #Return the query
        return query_base + query_where

In [38]:
#Currently unused as planning to use reference table for procedures
def stamp_procedure_info(df, json_pro):
    df["id_pro"] = json_pro["id"]
    df["name"] = json_pro["name"]
    df["priority"] = json_pro["priority"]
    df["HVLC"] = json_pro["HVLC"]
    df["benchmark"] = json_pro["benchmark"]

#Build the delete query
def build_delete_query(schema, table, fin_year, fin_month, id_pro):
    query_del = f"DELETE FROM {schema}.{table} WHERE id_pro = {id_pro}\n"

    if fin_year != []:
        #Convert array to set format
        year_set = array_to_set(fin_year)
        #Build where clause
        query_del += f"AND Year IN {year_set}\n"

    if fin_month != []:
        #Convert array to set format
        month_set = array_to_set(fin_month)
        #Build where clause
        query_del += f"AND Fin_Month IN {month_set}\n"

    return query_del

#Delete the exisitng data from the table with the execution scope
def delete_existing(engine, schema, table, fin_year, fin_month, id_pro):

    #Get query
    query_del = build_delete_query(schema, table, fin_year, fin_month, id_pro)

    #Execute
    snips.execute_query(engine, query_del)
    

In [39]:
#Main function for the procedure code
def main ():
    #Import settings
    settings = import_settings()

    #Load base table
    query_base = load_ref_base (settings["FIN_YEAR"], settings["FIN_MONTH"])

    #Load codegroups
    with open("./data/codegroups.json", "r") as cg_file:
        codegroups = json.load(cg_file)

    #Load the list of procedures
    if settings["PROCEDURES"] == []:
        procedures = all_procedures()
    else:
        procedures = settings["PROCEDURES"]

    procedures.sort()

    #Iterate through procedures
    for id_pro in procedures:

        #Load the procedure
        json_pro = find_procedure(int(id_pro))

        #Build the procedure query
        query_pro = build_procedure_query(json_pro, query_base, codegroups)

        #print(query_pro)

        #Run the query
        engine = snips.connect(settings["SQL_ADDRESS"], settings["SQL_DATABASE"])
        res = snips.execute_sfw(engine, query_pro)

        #Add the priority value to the dataframe
        #stamp_procedure_info(res, json_pro)
        res["id_pro"] = id_pro

        #Delete existing data for this scope
        delete_existing(engine, settings["SQL_SCHEMA"], settings["SQL_TABLE"], settings["FIN_YEAR"], settings["FIN_MONTH"], id_pro)

        #Export results
        if settings["OUTPUT"] == "sql":
            snips.upload_to_sql(res, engine, settings["SQL_TABLE"], settings["SQL_SCHEMA"], False, chunks=400)
        elif settings["OUTPUT"] == "csv":
            if os.path.isfile("./output/output.csv"):
                res.to_csv("output/output.csv", index=False, header=False, mode="a")
            else:
                res.to_csv("output/output.csv", index=False, header=True, mode="a")
            


        #Log progress
        print(f"Progress: {id_pro} - {json_pro['name']}")

In [40]:
main()

Progress: 1 - Lumbar Decompression / Discectomy
Progress: 2 - Cervical Spine Decompression / Fusion
Progress: 3 - Lumbar Medial Branch Block / Facet Joint Injections
Progress: 4 - Lumbar Nerve Root Block / Therapeutic Epidural Injection
Progress: 5 - One or Two Level Posterior Fusion Surgery
Progress: 6 - Endo Sinus Surgery
Progress: 7 - Tonsillectomy
Progress: 8 - Myringoplasty
Progress: 9 - Septoplasty and Turbinate Surgery
Progress: 10 - Septorhinoplasty
Progress: 11 - Laparoscopic Cholecystectomy
Progress: 12 - Primary Inguinal Hernia Repair
Progress: 13 - Para-umbilical Hernia
Progress: 14 - Operative Laparoscopy
Progress: 15 - Laparoscopic Hysterectomy
Progress: 16 - Endometrial Ablation
Progress: 17 - Hysteroscopy
Progress: 18 - Vaginal Hysterectomy / Vaginal Wall Repair
Progress: 19 - Low Complexity Cataract Surgery
Progress: 20 - Anterior Cruciate Ligament Reconstruction
Progress: 21 - Total Hip Replacement
Progress: 22 - Total Knee Replacement
Progress: 23 - Uni Knee Replacem