In [None]:
#Job Parameters
job_id='11'
job_name= 'PACKAGECOUNT_FACILITY_DELIVERY'

# #Snowflake Input Credentials
user_id='xxxx'
passcode='xxxx'
WAREHOUSE='FUSION_CS_DWH'
snowflake_account='xxxx'

#Snowflake Output Parameters
snowflake_output_database = 'FUSION_FDR_DB'
snowflake_output_schema = 'FUSION_ANOMOLY_DEV_SCHEMA'
snowflake_output_table = 'ANOMALY_LIVE_TIMESERIES'

#Cloudwatch Logs And Alert SNS ARN
sns_arn = 'xxxx'
log_group_name = 'xxxx'

In [None]:
###################
# SETUP LOGGER
###################
import time
import boto3
import traceback

sns = boto3.client("sns", region_name='us-west-2')
logs = boto3.client("logs", region_name='us-west-2')
next_sequence_token = ''


log_stream_name= 'Package Count Facility Scan Event Tracking' + str(int(time.time()))

params = [sns, logs, sns_arn, log_group_name, log_stream_name, next_sequence_token]

def publish_multi_message(params, error_msg):
    sns_arn = params[2]
    sns = params[0]
    message = {
            # Change the subject by replacing with the appropriate name of the anomaly
            "subject":"Anomaly Package Count Scan Event Failed",
            "email_message":"Anomaly Package Count Scan Event Failed. Please look into the logs." + "\nHere is the error msg:\n " + error_msg 
        }
    response = sns.publish(
        TopicArn=sns_arn,
        Message=message["email_message"], 
        Subject=message["subject"]
        )
    message_id = response['MessageId']
    return message_id

def create_log_stream(params):
    log_group_name = params[3]
    log_stream_name = params[4]
    logs = params[1]

    response = logs.create_log_stream(
        logGroupName=log_group_name,
        logStreamName=log_stream_name
    )
    return response

def put_log_events(params, message):
    log_group_name = params[3]
    log_stream_name = params[4]
    next_sequence_token = params[-1]
    logs = params[1]

    if len(next_sequence_token):
        response = logs.put_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            logEvents=[{
                'timestamp': int(time.time() * 1000),
                'message': message
            }],
            sequenceToken=next_sequence_token
        )
        params[-1] = response["nextSequenceToken"]
    else:
        response = logs.put_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            logEvents=[{
                'timestamp': int(time.time() * 1000),
                'message': message
            }]
        )
        params[-1] = response["nextSequenceToken"]
    return params

def log_and_publish_to_cloud(params):
    log_msg = "Exception Occurred!!!"
    params = put_log_events(params, log_msg)
    
    error_msg = traceback.format_exc()
    params = put_log_events(params, error_msg)
    message_id = publish_multi_message(params, error_msg)
    return params

create_log_stream(params)

"""
params = put_log_events(params, "Uploading is completed !!!")
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e
"""

In [None]:
try:
    #import packages
    params = put_log_events(params, "Importing the packages")
    import numpy as np
    import pandas as pd
    from datetime import datetime, timedelta
    import pytz
    import snowflake.connector
    import itertools
    from prophet import Prophet
    import matplotlib.pyplot as plt
    import time
    from tqdm import *
    import warnings
except Exception as e:
    params = put_log_events(params, "Error occured while importing the packages")
    params = log_and_publish_to_cloud(params)
    raise e

In [None]:
#Training Parameters 
#total days
test_last_days = 31

# length of training period (days)
training_period=test_last_days-1

# training interval increment (days)
interval=1

# window between checkpoint (hours)
window='6H'

#number of forecast (based on windows between checkpoint)
number_of_forecast = 24/int(window[0])

#group-by variable for printing during training
category_1='Facility'
category_2='Event'

#list of confidence interval
confidence_interval=[0.80,0.95]   

# Format output file
monitor_name = 'PackageCount_Facility_Delivery'
lob = 'Tracking'
version = 1.0

In [None]:
#start and end date filter for data query
et=datetime.now(pytz.timezone('US/Central')).date()
st=et-timedelta(days=test_last_days)

#query from Snowflake
query = '''
            select date_trunc('HOUR',eventrptdatetime) as Event_Time,
            eventfacilityname AS Category_1,trackingeventkey AS Category_2,count(1) AS Actual

            from "FDR_DWH_DB_PROD"."RETURNS"."REVTRACKINGEVENT" 
            where date(eventrptdatetime) >='''+"'"+str(st)+" 00:00:00'"+'''
            and date(eventrptdatetime) <= '''+"'"+str(et)+" 00:00:00'"+'''
            and eventfacilityname in ('IND3','EWR2','ATL4','ONT1','RNO2','DFW5') 
            and trackingeventkey in (7,14,33,335,334,333) 
            group by Event_Time,eventfacilityname,trackingeventkey;
            '''
#             and eventfacilityname in ('IND3','EWR2','ATL4','ONT1','RNO2','DFW5','CVG1','ORD2','BWI1','MCO1') 
#             and trackingeventkey in (7,14,33,335,334,333) 


In [None]:
import sys
# sys.path.append("/path/to/file/")
sys.path.append("/10_PACKAGECOUNT_FACILITY_DELIVERY/")
#import from functions.py
from functions import createConnection,extractData,exploratory_data_analysis,preprocessing,modeling_production_live,output_cleaning,formatted_csv

In [None]:
#connect to Snowflake and query data
try:
    params = put_log_events(params, "Creating the snowflake connector")
    createConnection(user_id,passcode)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

try:
    params = put_log_events(params, "Extracting the data")
    df=extractData(st,et,query)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

In [None]:
#EDA
try:
    params = put_log_events(params, "Performing the EDA")
    exploratory_data_analysis(df)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

In [None]:
#preprocessing data to train
try:
    params = put_log_events(params, "Preprocessing the Data")
    df=preprocessing(df,window)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

#get list of category_1 and 2
category_1_list=list(df['CATEGORY_1'].unique())
category_2_list=list(df['CATEGORY_2'].unique())

In [None]:
df

In [None]:
#modeling
try:
    params = put_log_events(params, "Running the model")
    final_output=modeling_production_live(category_1,category_2,confidence_interval,category_1_list,category_2_list,df,window,training_period,interval,number_of_forecast)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

In [None]:
#cleaning the Prophet output
try:
    params = put_log_events(params, "Cleaning the output")
    cleaned_output=output_cleaning(final_output,df)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e

cleaned_output

In [None]:
#format CSV file
try:
    params = put_log_events(params, "Formatting the cleaned output")
    results=formatted_csv(cleaned_output,monitor_name,lob,version)
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e
    
results

In [None]:
#results structure
results.info()

In [None]:
from snowflake.connector.pandas_tools import pd_writer
import os

def write_to_snowflake(data, params):
    try:
        job_run_time=datetime.today()
        print("Writing to Snowflake")
        
        #Append Job Parameters in OutputDataFrame
        data.insert(0,'JOB_ID',job_id)
    #     data.insert(1,'RUN_DATE',datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    #     data.insert(2,'ANOMALY_NAME',job_name)

        ctx = snowflake.connector.connect(
        user= user_id,
        password= passcode,
        account= snowflake_account
        )
        #create a cursor object.
        cs = ctx.cursor()

        #Use Snowflake_Output_Schema
        cs.execute("USE WAREHOUSE " + WAREHOUSE)
        cs.execute("USE DATABASE " + snowflake_output_database)
        cs.execute("USE SCHEMA " + snowflake_output_schema)
        
        #Create File Format
        Format_name = "ANOMOLY_FILE_FORMAT"
        fileFormatQuery = 'CREATE OR REPLACE FILE FORMAT ' +Format_name+ ' TYPE = \'CSV\' NULL_IF = (\'\') EMPTY_FIELD_AS_NULL = TRUE field_delimiter = \'|\''
        cs.execute(fileFormatQuery)
        inputFileName = str(job_id)+'_'+str(job_name)+'_'+datetime.now().strftime('%Y%m%d_%H%M%S')+'.csv'
        params = put_log_events(params, "Input Filename is "+inputFileName)
        
        #Create Local Job Output Directory
        os.system("mkdir -p job_output")
        
        # save file to local
        data.to_csv('job_output/'+inputFileName,index=False,header=False,sep='|')
        
        # upload to stage
        uploadToStageQuery = 'put file://job_output/'+inputFileName+' @%ANOMALY_LIVE_TIMESERIES;'
        params = put_log_events(params, "Query to upload the file to the stage table " + uploadToStageQuery)
        cs.execute(uploadToStageQuery)
        params = put_log_events(params, "Data Uploaded to the stage table is with filename " + inputFileName)
        
        
        # copy to table
        copyIntoQuery  = 'COPY INTO "'+snowflake_output_table+'" FROM @"%ANOMALY_LIVE_TIMESERIES" FILE_FORMAT = "'+Format_name+'"'
        params = put_log_events(params, copyIntoQuery)
        cs.execute(copyIntoQuery)
        params = put_log_events(params, "Data Uploaded to the table " + snowflake_output_table)

    except Exception as e:
        params = put_log_events(params, "Exception occurred!!!")
        raise e
    finally:       
        # Remove Staged Files from Staged
        removeStagedQuery = 'rm @%ANOMALY_LIVE_TIMESERIES/'+str(inputFileName) + '.gz'
        cs.execute(removeStagedQuery)
        cs.close()
      
    pass 

In [None]:
try:
    params = put_log_events(params, "Writing the data to the snowflake")
    write_to_snowflake(results, params) 
except Exception as e:
    params = log_and_publish_to_cloud(params)
    raise e