In [None]:
# ! pip install snowflake-connector-python
# ! pip install snowflake-sqlalchemy
# ! pip install xgboost

In [1]:
# importing the required libraries
from datetime import datetime, timedelta
import time
import pytz

# importing credentials
from creds import ACCOUNT,USERNAME,PASSWORD

import warnings
warnings.filterwarnings('ignore')

### Deployment Utility functions

In [3]:
# %%writefile utils.py -a


""" DEPLOYMENT UITLITY FUNCTIONS"""


# defining the function to verify the feautures of incoming data
def check_create_model_features(data,features_lst):
    temp = pd.DataFrame()
    for col in features_lst:
        if col in data.columns.tolist():
            temp[col]=data[col]
        else:
            temp[col]=0
    return temp

Appending to utils.py


In [4]:
# %%writefile utils.py -a

# defining a function to insert the predictions into the snowflake table
def insert_predictions(data,connection,engine):
    
   # establishing the connection to the snowflake 
    conn = snowflake.connector.connect(
    user=USERNAME,
    password=PASSWORD,
    account=ACCOUNT,
    role='ACCOUNTADMIN',
    warehouse='COMPUTE_WH',
    database='HEALTH_DB',
    schema='PUBLIC'
    )
    
    
    # creating the logging table
    table = "PREDICTION_LOGGING_TEST"
    table_creation_query =f"""
                            CREATE TABLE IF NOT EXISTS {table} (
                                CASE_ID STRING,
                                HOSPITAL_CODE INT,
                                HOSPITAL_TYPE_CODE STRING,
                                CITY_CODE_HOSPITAL INT,
                                HOSPITAL_REGION_CODE STRING,
                                AVAILABLE_EXTRA_ROOMS_IN_HOSPITAL INT,
                                DEPARTMENT STRING,
                                WARD_TYPE STRING,
                                WARD_FACILITY_CODE STRING,
                                BED_GRADE INT,
                                PATIENTID STRING,
                                CITY_CODE_PATIENT INT,
                                TYPE_OF_ADMISSION STRING,
                                SEVERITY_OF_ILLNESS STRING,
                                VISITORS_WITH_PATIENT INT,
                                AGE STRING,
                                ADMISSION_DEPOSIT FLOAT,
                                ADMISSION_DATE DATE,
                                DISCHARGE_DATE DATE,
                                ADMISSION_MONTH STRING,
                                ADMISSION_DAY STRING,
                                ADMISSION_ILLNESS STRING,
                                ILLNESS_BEDGRADE STRING,
                                DEPARTMENT_ILLNESS STRING,
                                LOS INT,
                                LOS_PREDICTED INT
                            )
                            """
    connection.execute(table_creation_query)
    
    # inserting the predictions to the table
    write_pandas(conn, data, table_name=table)
    return "Success"



Appending to utils.py


In [5]:
# %%writefile utils.py -a


# defining the function to send notofications via mail 
def send_mail(mail_string):
   
    subject = 'Patient LOS Prediction - STATUS MAIL'
    mail_content = mail_string

    username= MAIL_ID
    password= MAIL_PASSWORD
    send_from =MAIL_ID
    send_to = MAIL_ID
    Cc = ''
    
    msg = MIMEMultipart()
    msg['From'] = send_from
    msg['To'] = send_to
    msg['Cc'] = Cc
    msg['Date'] = formatdate(localtime = True)
    msg['Subject'] = subject
    msg.attach(MIMEText(mail_content, 'plain'))
    smtp = smtplib.SMTP('smtp.gmail.com',587)
    smtp.ehlo()
    smtp.starttls()
    smtp.login(username,password)
    smtp.sendmail(send_from, send_to.split(',') + msg['Cc'].split(','), msg.as_string())
    smtp.quit()
    
    


Appending to utils.py


In [6]:
# %%writefile utils.py -a


# defining the function for batch prediction and writing it to the snowflake table
def deploy():
        
        # defining the query to load new test data from the simulation data
        QUERY="""

        WITH BASE AS (

            SELECT CASE_ID,
                   COALESCE(HOSPITAL_CODE,0) AS HOSPITAL_CODE,
                   COALESCE(HOSPITAL_TYPE_CODE,'None') AS HOSPITAL_TYPE_CODE,
                   COALESCE(CITY_CODE_HOSPITAL,0) AS CITY_CODE_HOSPITAL,
                   COALESCE(HOSPITAL_REGION_CODE,'None') AS HOSPITAL_REGION_CODE,
                   COALESCE(AVAILABLE_EXTRA_ROOMS_IN_HOSPITAL,0) AS AVAILABLE_EXTRA_ROOMS_IN_HOSPITAL,
                   COALESCE(DEPARTMENT,'None') AS DEPARTMENT,
                   COALESCE(WARD_TYPE,'None') AS WARD_TYPE,
                   COALESCE(WARD_FACILITY_CODE,'None') AS WARD_FACILITY_CODE,
                   COALESCE(BED_GRADE,0) AS BED_GRADE,
                   PATIENTID,
                   COALESCE(CITY_CODE_PATIENT,0) AS CITY_CODE_PATIENT,
                   COALESCE(TYPE_OF_ADMISSION,'None') AS TYPE_OF_ADMISSION,
                   COALESCE(SEVERITY_OF_ILLNESS,'Minor') AS SEVERITY_OF_ILLNESS,
                   COALESCE(VISITORS_WITH_PATIENT,0) AS VISITORS_WITH_PATIENT,
                   COALESCE(AGE,'None') AS AGE,
                   COALESCE(ADMISSION_DEPOSIT,0) AS ADMISSION_DEPOSIT,
                   ADMISSION_DATE,
                   DISCHARGE_DATE

            FROM HEALTH_DB.PUBLIC.SIMULATION_DATA

        ),

        BASE_WITH_FEATURES AS (

            SELECT *,
                    MONTHNAME(ADMISSION_DATE) AS ADMISSION_MONTH,
                    DAYNAME(ADMISSION_DATE) AS ADMISSION_DAY,    
                    CONCAT(TYPE_OF_ADMISSION,'-',SEVERITY_OF_ILLNESS) AS ADMISSION_ILLNESS,
                    CONCAT(SEVERITY_OF_ILLNESS,'-',BED_GRADE) AS ILLNESS_BEDGRADE,
                    CONCAT(DEPARTMENT,'-',SEVERITY_OF_ILLNESS) AS DEPARTMENT_ILLNESS,
                    DATEDIFF(day,ADMISSION_DATE,DISCHARGE_DATE) AS LOS
            FROM BASE 

        )    
        SELECT * FROM BASE_WITH_FEATURES WHERE ADMISSION_DATE=CURRENT_DATE-580


        """

        # defining an empty list to store notification for each phases
        mail_lst = []
        


        # Creating the connection engine (way 1)
        engine = create_engine(URL(
            account=ACCOUNT,
            user= USERNAME,
            password= PASSWORD,
            role="ACCOUNTADMIN",
            warehouse="COMPUTE_WH",
            database="HEALTH_DB",
            schema="PUBLIC"
        ))

        # Connecting to the DB and executing the query
        with engine.connect() as conn:

            # loading the test data
            result = conn.execute(text(QUERY))
            test_data = pd.DataFrame(result.fetchall())
            test_data.columns = result.keys()
            mail_lst.append("STEP-1: Successfully loaded the test data ")


            # appplying the preprocessing steps
            test_data.columns = [col.upper() for col in test_data.columns.tolist()]
            test_preprocessed = preprocess_data(test_data)
            mail_lst.append("STEP-2: Successfully applied the data preprocessing on test data ")

            # applying the feature selection by calling the helper function to verify the feautures of incoming data
            final_features = pd.read_pickle("artifacts/final_features.pkl")
            test_data_final = check_create_model_features(test_preprocessed,final_features)
            mail_lst.append("STEP-3: Successfully applied the feature selection")

            # getting the predictions
            model = xgboost.XGBRegressor()
            model.load_model("artifacts/xgb.model")
            test_data_final['LOS_PREDICTED'] = np.ceil(model.predict(test_data_final))
            mail_lst.append("STEP-4: Successfully got the predictions")


            # wrirting the dataframe into a table
            test_data_final.reset_index(inplace=True)
            predictions = test_data_final[['CASE_ID','LOS_PREDICTED']]
            logs = pd.merge(test_data,test_data_final[['CASE_ID','LOS_PREDICTED']],on="CASE_ID")
            status = insert_predictions(logs,conn,engine)
            mail_lst.append("STEP-5: Successfully wrote the predictions to snowflake table")       

            # creating the mail body and sending the notifications
            mail_string = ",\n".join(map(str,mail_lst))
            send_mail(mail_string)

            print(status)
            return status

Appending to utils.py


In [13]:
from utils import deploy
deploy()

Success
Successfully sent mails
Success


'Success'

### Deployment Script

In [1]:
from utils import deploy

# defining the timezone
tz_ny = pytz.timezone('Asia/Kolkata')

# defining the time of running
schduled_time = ["11:35"]

while True:
    # getting the timestamp
    now=datetime.now(tz_ny)
    
    # extracting hours,minute from the timestamp
    hour = str(now.hour)
    minute = str(now.minute)
    
    # formatting the current time
    curr_time = f"{hour}:{minute}"
    
    if curr_time in schduled_time:
        deploy()
        break
        
    