
# [ODS ETL] Healthcare Quoted Not Taken Policies Refresh to Marketing Cloud


<!-- ## Overview

This notebook serves as the **implementation** and **documentation** for onboarding all the LIT initiative prospect data to the Agent Agency Repository as part of **A360 v2**. The notebook details the algorithm **rules** selected for **matching** the prospect data with the **agency** and **agency_contact** data from **ODS** in addition to applying these rules to the data loaded to the **raw** layer of the **agency_repo**.  -->


## Reference Component Architecture

![title](./img/Production%20ETL%20-%20Marketing%20Schedule%20Driven.png)

In [25]:
#papermill_description=PythonPackageImport
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from datetime import datetime, timedelta, date
from secrets_manager import get_secret
import json
import os
import math

In [None]:
#papermill_description=Start
start_at = datetime.now()
print(f"Starting at {start_at}")

In [27]:
limit = 5000
offset = 0
key = 'pending'
mode = 'baseline'

In [28]:
# os.environ["RDS_SECRETS_MANAGER_ID"] = "arn:aws:secretsmanager:us-east-1:968921834094:secret:affinity-ods2-ar4262-dv001-rds-master-credentials-169S6S"
# os.environ["RDS_PROD_SECRETS_MANAGER_ID"] = "arn:aws:secretsmanager:us-east-1:968921834094:secret:affinity-ods2-ar4262-dv001-rds-prod-svc-credentials-QxmDtd"
# os.environ["SFMC_API_CREDENTIALS_SECRET_MANAGER_ID"] = "arn:aws:secretsmanager:us-east-1:968921834094:secret:affinity-ods-ar4262-dv001-sfmc-api-credentials-RbEIMF"
# os.environ["AWS_REGION"] = "us-east-1"

### Database Credentials Retrieval
Retrieve database credentials for ODS dev and prod Databases using AWS secrets manager

In [30]:
#papermill_description=DatabaseCredentialsRetrieval
db_secret_dev = json.loads(get_secret(
         secret_name=os.environ["RDS_SECRETS_MANAGER_ID"], region_name=os.environ["AWS_REGION"]))
db_secret_prod = json.loads(get_secret(
         secret_name=os.environ["RDS_PROD_SECRETS_MANAGER_ID"], region_name=os.environ["AWS_REGION"]))
sfmc_secret_dev = json.loads(get_secret(
         secret_name=os.environ["SFMC_API_CREDENTIALS_SECRET_MANAGER_ID"], region_name=os.environ["AWS_REGION"]))

### Database Connections
* ods_db_prod (Connection to Affinity AWB Production MySQL DB Cluster)

In [31]:
#papermill_description=DatabaseConnections
# rds_host_dev = "affinity-ods2-develop.cluster-cj3qp6qspcpk.us-east-1.rds.amazonaws.com"
# rds_db_name_dev = "agency_repo"

# ods_conn_str_dev = f"mysql+pymysql://{db_secret_dev['username']}:{db_secret_dev['password']}@{rds_host_dev}/{rds_db_name_dev}"
# ods_db_dev = create_engine(ods_conn_str_dev, echo=False)

rds_host_prod = "affinity-awb2.cluster-cyfwyevzbce3.us-east-1.rds.amazonaws.com"
rds_db_name_prod = "awb"
ods_conn_str_prod = f"mysql+pymysql://{db_secret_prod['username']}:{db_secret_prod['password']}@{rds_host_prod}/{rds_db_name_prod}"
ods_db_prod = create_engine(ods_conn_str_prod, echo=False)

### Input Datasets
* df_ins  (AWB table: awb.marketing_etl_pending_customer)

In [None]:
#papermill_description=MarketingETLCustomers
todayDate = datetime.today().date().strftime('%Y-%m-%d')
if (mode == "baseline"):
    df_ins = pd.read_sql(f"select * from awb.marketing_etl_pending_customer order by account_no limit {limit} offset {offset}", con=ods_db_prod)
    print(df_ins)
elif (mode == "incremental"):
    df_ins = pd.read_sql(f"select * from awb.marketing_etl_pending_customer_inc WHERE Date(modified_date) = '{todayDate}' order by account_no limit {limit} offset {offset}", con=ods_db_prod)
    print(df_ins)
else:  
    df_ins = pd.read_sql(f"select * from awb.marketing_etl_pending_customer order by account_no limit {limit} offset {offset}", con=ods_db_prod)
    print(df_ins)      

#### Filter Invalid email addresses

In [None]:
#papermill_description=FilterInvalidEmail
df_ins = df_ins[df_ins['customer_email_id'].notnull() & df_ins['customer_email_id'].str.contains('@')]
print(df_ins)

In [34]:
def validate_date(date_str=""):
    try:
        if date_str != datetime.strptime(date_str, '%Y%m%d').strftime('%Y%m%d'):
            raise ValueError
        return True
    except ValueError:
        return False

#### Add Attributes
* account_alias
* age

In [35]:
#papermill_description=AddAccountAliasAndAgeAttributes
df_ins['account_alias'] = [(int(x[0]) * 2) + 1 for x in zip(df_ins['account_no'])]
df_ins['age'] = [date.today().year - datetime.strptime(str(x[0]), '%Y%m%d').date().year - ((date.today().month, date.today().day) < (datetime.strptime(str(x[0]), '%Y%m%d').date().month, datetime.strptime(str(x[0]), '%Y%m%d').date().day)) if validate_date(str(x[0])) else None for x in zip(df_ins['dob'])]

#### Change Date Format to SFMC DE Standard

In [36]:
#papermill_description=ChangeDateFormatToSFMCStandard
for col_name in ['action_date', 'dob', 'original_effective_date', 'effective_date', 'last_bill_date', 'policy_effective_date', 'policy_expiration_date', 'termination_date','application_entry_date','application_decline_date','modified_date']:
    df_ins[col_name] = [datetime.strptime(str(x[0]), '%Y%m%d').strftime('%m/%d/%Y') if validate_date(str(x[0])) else str(x[0]) for x in zip(df_ins[col_name])]

In [37]:
df_ins['ACPORG'] = ''
df_ins['ADDRS2'] = ''
df_ins['DEMSCR'] = ''
df_ins['FAXPHN'] = ''
df_ins['SCLTAG'] = ''
df_ins['TLEVEL'] = ''
df_ins['NEW_RENEWAL'] = ''
df_ins['PHNTAG'] = ''
df_ins['SCLTAG'] = ''
df_ins['POLICY_YEAR'] = 0
df_ins['PROGRAM'] = ''
df_ins['DIVISION'] = ''

#### Rename columns

In [38]:
#papermill_description=RenameColumns
df_ins.rename(columns = {
    'account_no':'ACCTNO',
    'action_date':'ACTION_DATE', 
    'address':'ADDRS1',
    'age':'AGE',
    'account_balance':'BALANC',
    'application_decline_date': 'ACADDT',
    'application_entry_date': 'ACAEDT',
    'base_premium':'BASPRM',
    'city':'CITY00',
    'contact_name': 'ACCONT',
    'customer_email_id':'CSEMAD',
    'customer_no': 'CUSTNO',
    'business_type': 'CVBUST',
    'elec_fulfillment_flag': 'CVELEF',
    'employment_status': 'CVEMPS',
    'enrollment_flag':'ENROLLMENT_FLAG',
    'profession_flag': 'CVPRFF',
    'professional_level': 'CVPRFL',
    'effective_date': 'EFFDAT',
    'entity_status': 'ENTSTS',
    'first_name': 'FSTNAM',
    'home_phone': 'HOMPHN',
    'last_bill_date': 'LSTBLD',
    'original_effective_date': 'ORGEFD',
    'policy_effective_date': 'POLEFD',
    'policy_expiration_date': 'POLEXD',
    'state_code': 'STATCD',
    'student_flag': 'STUDENT',
    'total_premium': 'TOTPRM',
    'work_phone': 'WORKPH',
    'zip_code': 'ZIPCOD',
    'new_or_renew': 'NEWRNW',
    'termination_reason_code': 'TRMREA',
    'account_alias': 'ACCTNO_Aliased',
    'last_name': 'LASTNM',
    'dob': 'DOBXXX',
    'subprod_no': 'SUBPROD_NO',
    'subprod_subno': 'SUBPROD_SUBNO',
    'termination_date': 'TERMDT',
    'policy_type': 'POLTYP',
    'modified_date': 'MODIFIED_DATE'
}, inplace = True)

#### Filter Attributes

In [None]:
#papermill_description=FilterAttributes
df_ins = df_ins[['CVPRFF', 'FSTNAM', 'LASTNM', 'ADDRS1', 'CITY00', 'STATCD', 'ZIPCOD', 'HOMPHN', 'WORKPH', 'DOBXXX', 'CSEMAD', 'CUSTNO', 'EFFDAT', 'NEWRNW', 'ACCTNO', 'ORGEFD', 'POLEFD', 'POLEXD', 'ENTSTS', 'TERMDT', 'TRMREA', 'STUDENT', 'CVEMPS', 'CVBUST', 'CVPRFL', 'ENROLLMENT_FLAG', 'BALANC', 'LSTBLD', 'TOTPRM', 'BASPRM', 'ACTION_DATE', 'SUBPROD_NO', 'SUBPROD_SUBNO', 'CVELEF', 'ACCTNO_Aliased', 'AGE', 'ACPORG', 'ADDRS2', 'DEMSCR', 'FAXPHN', 'SCLTAG', 'TLEVEL', 'POLICY_YEAR', 'PROGRAM', 'DIVISION', 'NEW_RENEWAL', 'PHNTAG', 'ACCONT', 'ACAEDT', 'ACADDT', 'MODIFIED_DATE']]

In [24]:

df_ins.loc[:, 'created_at'] = pd.Timestamp(datetime.now())

#### Persist Data to SQL

In [None]:
#papermill_description=PersistDataToMySQL
df_ins.to_sql('marketing_etl_customers_sfmc_de', schema='awb', con=ods_db_prod, if_exists='replace')

#### SFMC API Code
Parse data from Dataframe and send to SFMC API

In [None]:
## TODO
import requests
url = os.environ['TOKEN_URL'] 


payload = json.dumps({
  "grant_type": sfmc_secret_dev['grant_type'],
  "client_id": sfmc_secret_dev['client_id'],
  "client_secret": sfmc_secret_dev['client_secret'],
  "account_id": sfmc_secret_dev['account_id']
})
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)
result = json.loads(response.text)
accessTokenResult = result['access_token']

url = os.environ['SFMC_URL_QUOTED_NOT_TAKEN']
headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer' +' ' + accessTokenResult
        }
chunk_size = 10000 #Define chunk size        
df_ins = df_ins.applymap(lambda x: x.strip() if isinstance(x,str) else x)    #strip spaces from strings
     
df_ins = df_ins.replace(["None", "nan", "NaT", " "], "") #Replace unwanted values

#split dataframes into chunks
chunks = [df_ins.iloc[i:i + chunk_size] for i in range(0, len(df_ins),chunk_size)]

for index, chunk in enumerate(chunks):
  value = chunk.to_json(orient = 'records')
  result = {"items": json.loads(value)}
  payloadJson = json.dumps(result)
  print('TEST')
  print(payloadJson)
  response = requests.request("POST", url, headers=headers, data=payloadJson)
  print(response.text)


