# Line-by-line test of the emailer pipeline

This is a notebook that tests the main script `0_email_maker` line by line with outputs for the purpose of debugging. The main script follows these steps:
1. import installed packages and supporting modules
2. set up directories and logging
3. grab data from aws database
4. random assignment
5. save at risk permits data and assignment
6. generate emails with at risk permits data
7. save email files and sync with s3 bucket in whippet

This notebook tests each of the code block and validates outputs.

## Prerequisites for replication:
1. must have Sherlock OAK and GROUP_SCRATCH mounted on your local machine, see [guide](https://asconfluence.stanford.edu/confluence/display/REGLAB/Mount+Sherlock+Folders+to+your+local+machine+-SSHFS).
2. must have saved OAK and GROUP_SCRATCH as environment variables in your `.bash_profile` or `.zshrc` file. For example, 
```
# sherlock directories
export OAK="~/sherlock_oak"
export GROUP_SCRATCH="~/sherlock_group_scratch"
```
3. must have the `esnc_risk_notif` git repo cloned to our local machine

In [None]:
# first set working directory as where the 0_email_maker will sit
import os 

os.chdir('..')
os.getcwd()

### Step 1: import packages

In this step, we are checking whether all the required modules have been installed in the environment. 

In [None]:
# import installed packages
import os
import argparse
import subprocess

import random
import numpy as np
import pandas as pd
import datetime as dt

# import supporting modules
import configs
from utilities import sql_grab
from utilities import sql_save
from utilities import sql_queries
from utilities import json_functions
from utilities import templating
from utilities import random_assignment

In [None]:
# set parsed arguments 
mode = 'test'
model_id = 'esnc_notif_2021Q4_2021-08-03_122849_569766'
quarter = '2021Q4'

### Step 2: set up directories and logging

In this step, we are configuring directories and logging file. We should expect to see global variables from `configs` read correctly and the logging file prints out relevant lines. 

#### Code Block 

In [None]:
print(configs.HELPER_TEXT_EMAIL_MAKER)
print('===== Start running email maker =====')

# ## get parsed variables
# args = get_args()
# mode = args.mode
# model_id = args.model_id
# quarter = args.quarter

## get global variables 
engine = configs.GET_ENGINE()
bucket = configs.BUCKET
seed = configs.SEED
states_list = configs.STATES_LIST
classification_threshold = configs.CLASSIFICATION_THRESHOLD

## generate run id
runtime = dt.datetime.now()
runtime_str = str(runtime).replace(' ', '_').replace(':', '').replace('.', '_')
run_id = quarter + '_' + runtime_str 

## create folders for the current run 
oak_project_dir = configs.GET_OAK_PROJECT_DIR()
s3_project_dir = configs.S3_PROJECT_DIR
oak_run_dir = os.path.join(oak_project_dir, mode, run_id)
assert not os.path.exists(oak_run_dir), "The output directoary already exists. Aborting."

oak_log_dir = os.path.join(oak_run_dir, 'logs')
oak_emails_dir = os.path.join(oak_run_dir, 'emails')
list(map(os.makedirs, [oak_run_dir, oak_log_dir, oak_emails_dir]))

In [None]:
## configure logging
logger, log_capture_string = configs.configure_logging(logger_name = 'email_maker')
logger.info(configs.HELPER_TEXT_EMAIL_MAKER)
logger.info("Configured logger")
logger.info("----- Parsed variables: mode = {}, model_id = {}, quarter = {}".format(mode, model_id, quarter))
logger.info("----- Project variables: states_list = {}, classification_threshold = {}".format(states_list, classification_threshold))
logger.info("----- Sherlock OAK folders: oak_run_dir = {}".format(oak_run_dir))
logger.info("----- S3 bucket: s3_project_dir = {}".format(s3_project_dir))

In [None]:
## print out variables and let the user confirm if they are correct and whether to proceed. 
print("----- Parsed variables: mode = {}, model_id = {}, quarter = {}".format(mode, model_id, quarter))
print("----- Project variables: states_list = {}, classification_threshold = {}".format(states_list, classification_threshold))
print("----- Sherlock OAK folders: oak_run_dir = {}".format(oak_run_dir))
print("----- S3 bucket: s3_project_dir = {}".format(s3_project_dir))

proceed = input('Please verify the above variables. Do you wish to proceed with the run? [y/n]')

#### velidate variables

In [None]:
print(log_capture_string.getvalue())

### Step 3: grab data from aws database

In this step, we are grabbing data with SQL queries from `utilities.sql_queries`. We should expect to check at-risk permits data.

#### code block

In [None]:
logger.info("===== 1/6 Get model result table and database update date =====")

data = sql_grab.get_at_risk_permits(model_id = model_id, states_list = states_list, classification_threshold = classification_threshold, engine = engine)
## subset to permits with historical violation records
missing_violation_count = sum(data.known_violations.isna())
logger.info(f"Excluding {missing_violation_count} permits without any historical violation records from DMRs in the past three years.")
data = data[~data.known_violations.isna()]
assert len(data) != 0, "Expect more than one at-risk permits. Aborting."
logger.info(f"Number of at-risk permits = {len(data)}")

database_update_date = sql_grab.get_database_update_date(engine = engine)
logger.info(f"RegLab AWS database update date: {database_update_date}")

#### validate output

In [None]:
data.head()

### Step 4: random assignment

In this step, we conduct the multi-level randomization scheme. We should expect the levels to be executed sequentially, the `notification_flag`, `sample_violation_flag`, `sample_pollutant_flag` to be complete and eligible, and the logging file prints out the number of permits within each level of randomization. 

#### code block

In [None]:
logger.info("====== 2/6 Random Assignment ======")
logger.info("--- Level 1: randomly split into 50-50. We will send notifications to half of the permits.")
data = random_assignment.level_one_randomization(df=data, treatment_fraction=0.5, random_seed=seed)
assert 'notification_flag' in data.columns.tolist(), "Expect a column named notification_flag after level one randomization. Aborting."
assert sum(data.notification_flag.isna()) == 0, "Expect every permit to have an assignment. Aborting."

In [None]:
logger.info("--- Level 2: for those we send a notification to, see if there are violation records in the past quarter. if there are violation records: split half and half, include a violation sample in half of them")
data = random_assignment.level_two_randomization(df=data, treatment_fraction=1, random_seed=seed)
assert 'sample_violation_flag' in data.columns.tolist(), "Expect sample_violation_flag to be in the column list of data. Aborting."
assert sum(data.sample_violation_flag.isna()) == 0, "Expect every permit to have an assignment. Aborting."

In [None]:
logger.info("--- Level 3: within the facilities for which we will include a sample violation, we will randomize which pollutant to show in the email")
data = random_assignment.level_three_randomization(df=data, random_seed=seed)
assert 'sample_pollutant_flag' in data.columns.tolist(), "Expect sample_pollutant to be in the column list. Aborting"
assert sum(data.sample_pollutant_flag.isna()) == 0, "Expect every permit to have an assignment. Aborting."
assert sum(data[data.sample_violation_flag].selected_violation.isna()) == 0, "Expect permit with sample_violation_flag on to have a selected violation to show. Aborting."

In [None]:
logger.info('We have {} at risk permits in total.'.format(len(data)))
logger.info("We will send a notification to {} permits.".format(sum(data.notification_flag)))
logger.info("We will include a sample violation for {} permits.".format(sum(data.sample_violation_flag)))
logger.info("We will randomize disclosure of pollutant for {} permits.".format(sum(data.sample_pollutant_flag)))

#### validate output

In [None]:
# pay special attention to sample_violation_flag, there should be a control pollutant for sample_violation_flag True
data[data.sample_violation_flag]

In [None]:
print(log_capture_string.getvalue())

### Step 5: save at risk facility data and assignment

In this step, we upload the at-risk permit assignments to RegLab's AWS database. We should expect to retrieve the table in the database. 

#### code block

In [None]:
logger.info("====== 3/6 Save at risk permit data and treatment assignment to database ======")
data['file_timestamp'] = runtime
sql_save.save_at_risk_permits_assignment(mode=mode, data=data, engine=engine)

#### if mode is prod: try doing it again - this should throw an error

In [None]:
if mode == 'prod':
    sql_save.save_at_risk_permits_assignment(mode=mode, data=data, engine=engine)

#### validate output

In [None]:
if mode == 'prod':
    schema = 'esnc_risk_notif'
    table = 'at_risk_permits'
elif mode == 'test':
    schema = 'sandbox'
    table = 'esnc_notif_at_risk_permits'
with engine.begin() as conn:
    df = pd.read_sql(f"""
    SELECT *
    FROM {schema}.{table}
    """, conn)
df.head()

In [None]:
df.columns

### Step 6: generate emails with at risk facility data

In this step, we generate emails with at risk permit data and templates. We should expect to see the sample emails in HTML format. 

#### code block

In [None]:
logger.info('===== 4/6 Generate emails with at-risk permits data and email templates =====')
# subset to permits that we will send the notifications to
notif_group = data[data.notification_flag]
data_dict = notif_group.to_dict('records')
database_update_date_lst = [database_update_date]*len(data_dict)
email_dicts = list(map(templating.generate_email_dict, data_dict, database_update_date_lst))

#### validate output 

1. KY - without sample violation
2. KY - with sample violation
3. MD - without sample violation
4. MD - with sample violation
5. TN - without sample violation
6. TN - with sample violation

In [None]:
assert len(email_dicts) == len(data_dict), "expect the number of email dicts to be the same as the number of info dicts."

In [None]:
from IPython.core.display import display, HTML

# 1. KY - without sample violation - this should throw an error
i = np.where((~notif_group.sample_violation_flag) & (notif_group.permit_state == 'KY'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

In [None]:
# 2. KY - with sample violation
i = np.where((notif_group.sample_violation_flag) & (notif_group.permit_state == 'KY'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

In [None]:
# 3. MD - without sample violation
i = np.where((~notif_group.sample_violation_flag) & (notif_group.permit_state == 'MD'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

In [None]:
# 4. MD - with sample violation
i = np.where((notif_group.sample_violation_flag) & (notif_group.permit_state == 'MD'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

In [None]:
# 5. TN - without sample violation
i = np.where((~notif_group.sample_violation_flag) & (notif_group.permit_state == 'TN'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

In [None]:
# 6. TN - with sample violation
i = np.where((notif_group.sample_violation_flag) & (notif_group.permit_state == 'TN'))[0][0]
email_dict = email_dicts[i]
display(HTML(email_dict['body']))

#### Additional examples: 
1. when number of parameters triggering ESNC or warning last quarter is 0
2. when the number of quarters in ESNC in the past three years is 0

In [None]:
# 1. when number of parameters triggering ESNC or warning last quarter is 0
i = np.where((notif_group.sample_violation_flag) & (~notif_group.triggered_esnc_past_quarter_flag) & (~notif_group.warning_past_quarter_flag) & (notif_group.permit_state != 'KY'))
if len(i[0]) > 0: 
    i = i[0][0]
    email_dict = email_dicts[i]
    display(HTML(email_dict['body']))

In [None]:
# 2. when the number of quarters in ESNC in the past three years is 0
i = np.where(notif_group.num_quarters_in_esnc == None)
if len(i[0]) > 0: 
    i = i[0][0]
    email_dict = email_dicts[i]
    display(HTML(email_dict['body']))

### Step 7: save email files

In this step, we are saving email dictionaries as json files to the Sherlock Oak project folder. We should expect to retrieve the files from the folder. 

#### Code Block

In [None]:
logger.info("====== 5/6 Saving email info to json files ======")
json_functions.save_emails_to_json(email_dicts, emails_dir=oak_emails_dir)

#### Validate Output

In [None]:
os.listdir(oak_emails_dir)[:10]

In [None]:
import json
file_list = os.listdir(oak_emails_dir)
with open(os.path.join(oak_emails_dir, file_list[0]), 'r') as file:
    test_dict = json.load(file)

In [None]:
test_dict

### Step 8: save log file and sync files to s3 bucket

In this step, we save the log file to the Sherlock folder and sync all files to S3 bucket. We should expect to see all new files synced. One quick way to check is to see if the log file in s3 bucket and sherlock folder is the same. 

#### code block

In [None]:
logger.info('======= 6/6 Saving log file and sync to s3 bucket =======')
logger.info(f'Script FINISHED. Log file saved in {oak_log_dir} and all project files synced with S3 bucket.')
with open(os.path.join(oak_log_dir, 'email_maker.log'), 'w') as file:
    file.write(log_capture_string.getvalue())

subprocess.run(['aws', 's3', 'sync', oak_project_dir, s3_project_dir], check=True)

#### validate output

In [None]:
# check the log file in s3 bucket and sherlock folder. they should be the same
s3_log_dir = os.path.join(mode, run_id, 'logs')
s3_content = bucket.Object(os.path.join(s3_log_dir,'email_maker.log')).get()['Body'].read().decode('utf-8')
with open(os.path.join(oak_log_dir, 'email_maker.log'), 'r') as file:
    oak_content = file.read()
s3_content == oak_content

In [None]:
print(oak_content)

---
End of notebook