# Buyer Stage Training Data Set Generation 
### Following is the high-level ER on how the data set is generated
----
![title](img/buyer-intent-ER.jpg)

In [None]:
Following are the steps to install the package on SageMaker notebook :

STEP 1: run !which python in your sell and get the location of your python packages. For example, in my case it is : /home/ec2-user/anaconda3/envs/python3/bin/python
STEP 2: run !/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade  --ignore-installed --force-reinstall https://s3-us-west-2.amazonaws.com/move-dl-common-binary-distrubution/python/move_dl_common_api-3.2.131-release.tar.gz
STEP 3: End (hopefully you installed it successfully!)

In [None]:
!/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade --ignore-installed --force-reinstall https://s3-us-west-2.amazonaws.com/move-dl-common-binary-distrubution/python/move_dl_common_api-3.2.131-release.tar.gz
from move_dl_common_api.athena_util import AthenaUtil
from datetime import datetime
from pytz import timezone
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from scipy import stats
import json
import sys
import os
import functools
import warnings
import s3fs
s3 = s3fs.S3FileSystem()
import warnings
import logging
if not sys.warnoptions:
    warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=FutureWarning)
logger = logging.getLogger()
fhandler = logging.FileHandler(filename='buyer_intent.log', mode='a')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fhandler.setFormatter(formatter)
logger.addHandler(fhandler)
logger.setLevel(logging.INFO)

## Following are the main functions used to generate training data set. The SQL part will be used in building ETL pipline of the buyer intent app and training pipline. Following are the list of functions:
#### read_config_file : read configuration file needed to make training data set. It is in the file ds_generation_config_params.json 
#### create_db : 
#### get_table_names_in_db :
#### drop_tables_in_db :
#### drop_table_in_db :
#### random_sample_based_on_string_attr : 
#### copy_consp_member_id_summary_001 : Create a copy of CAP data with member_id in our development enviroment
#### copy_consp_member_id_summary_007 :
#### copy_consp_member_id_summary_030 :
#### copy_consp_member_id_summary_060 :
#### copy_consp_member_id_summary_090 :
#### create_table_over_qualtrics_data :
#### make_training_dataset_001_007_030_060_090 : Generate training data set contains user behaviours
#### union_two_tables :
#### union_all_tables :
#### parse_time_to_mst :
#### transform_qualtrics_survey : transform and reformat qualtric data to match its table schema
#### label_training_data : Generate final training data set for modeling purpose

In [3]:
def read_config_file(file_name = 'ds_generation_config_params.json'):
    # Load Configurations of DBs
    with open(file_name, 'r') as fp:
        param_dict = json.load(fp)
    return param_dict

In [4]:
def create_db(param_dict):
    # Setup Athena Staging
    util = AthenaUtil(s3_staging_folder = param_dict['s3_staging_path'])
    # Create db on dev account under the name consumer-behaviour
    ctas_query_create_consumer_behaviour_db = """CREATE DATABASE IF NOT EXISTS {db_name};""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_create_consumer_behaviour_db)
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return util
    else:
        return util 

In [5]:
def get_table_names_in_db(param_dict, util):     
    try:
        ctas_sql_table_list = """ SHOW TABLES IN {db_name};""".format(**param_dict)
        res = util.execute_query(sql_query = ctas_sql_table_list)
        length_lst = len(res['ResultSet']['Rows'][:])
        lst = []
        for i in range(0,length_lst, 1):
            lst.append(res['ResultSet']['Rows'][:][i]['Data'][0]['VarCharValue'])
        return lst
    except Exception as message:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        f_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        logger.error("filename = %s - function = get_table_names_in_db in line_number = %s - hint = %s " % (
            f_name, exc_tb.tb_lineno, message.__str__()))
        return -1

In [6]:
def drop_tables_in_db(param_dict, util):
    try:
        table_list = get_table_names_in_db(param_dict, util)
        for elem in table_list:
            drop_table_in_db(param_dict, elem, util)
        return 1
    except Exception as message:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        f_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        logger.error("filename = %s - function = drop_tables_in_db - exception in line_number = %s - hint = %s " % (
                f_name, exc_tb.tb_lineno, message.__str__()))
        return -1

In [7]:
def drop_table_in_db(param_dict, table_name, util):
    try:
        tmp_param = param_dict.copy()
        tmp_param['table_name'] = table_name     
        ctas_sql_drop_table = """ DROP TABLE {db_name}.{table_name};""".format(**tmp_param)
        res = util.execute_query(sql_query = ctas_sql_drop_table)
        del tmp_param
        return 1
    except Exception as message:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        f_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        logger.error("filename = %s - function = drop_table_in_db - exception in line_number = %s - hint = %s " % (
            f_name, exc_tb.tb_lineno, message.__str__()))
        return -1

In [8]:
def random_sample_based_on_string_attr(util, 
                                       db_name, 
                                       s3_path, 
                                       to_table_name, 
                                       from_table_name, 
                                       string_atr, 
                                       sample_rate = 20, 
                                       extr_loc_by_table_name = False):
        try:
            if extr_loc_by_table_name is True:
                ctas_sql_random_sampling = """CREATE TABLE {}.{}
                WITH (
                  external_location = '{}/tables/{}',
                  format='PARQUET'
                ) AS
                SELECT * 
                FROM {}.{} 
                WHERE MOD(from_big_endian_64(xxhash64(to_utf8({}))), CAST({} AS BIGINT)) = 0;
                """.format(db_name, 
                           to_table_name,
                           s3_path,
                           to_table_name,
                           db_name, 
                           from_table_name, 
                           string_atr, 
                           str(sample_rate))
            else:
                ctas_sql_random_sampling = """CREATE TABLE {}.{}
                WITH (
                  format='PARQUET'
                ) AS
                SELECT * 
                FROM {}.{} 
                WHERE MOD(from_big_endian_64(xxhash64(to_utf8({}))), CAST({} AS BIGINT)) = 1;
                """.format(cls.__db_name, 
                           to_table_name,
                           cls.__db_name, 
                           from_table_name, 
                           string_atr, 
                           str(sample_rate))

            res = execute_query(sql_query = ctas_sql_random_sampling)
            return 1
       
        except Exception as message:
            return -1

In [9]:
def get_last_n_days_as_mst_yyyymmdd(n):
    lst=[]
    for i in range(2,n):
        date_i_days_ago = datetime.now() - timedelta(days=i)
        lst.append(date_i_days_ago.strftime ("%Y%m%d")) 
    return lst

In [10]:
def copy_consp_member_id_summary_001(param_dict, util):
    with open('consp_member_id_summary_t001.sql', 'r') as query:
        ctas_query_copy_consp_member_id_summary_t001 = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_copy_consp_member_id_summary_t001)
    # While the table is partioned need to be repaired
    ctas_query_repair_table_consp_member_id_summary_t001 = """MSCK REPAIR TABLE {db_name}.consp_member_id_summary_t001;""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_repair_table_consp_member_id_summary_t001)
    # While the table is partioned need to be repaired
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [11]:
def copy_consp_member_id_summary_007(param_dict, util):
    with open('consp_member_id_summary_t007.sql', 'r') as query:
        ctas_query_copy_consp_member_id_summary_t007 = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_copy_consp_member_id_summary_t007)
    # While the table is partioned need to be repaired
    ctas_query_repair_table_consp_member_id_summary_t007 = """MSCK REPAIR TABLE {db_name}.consp_member_id_summary_t007;""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_repair_table_consp_member_id_summary_t007)
    # While the table is partioned need to be repaired
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [12]:
def copy_consp_member_id_summary_030(param_dict, util):
    with open('consp_member_id_summary_t030.sql', 'r') as query:
        ctas_query_copy_consp_member_id_summary_t030 = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_copy_consp_member_id_summary_t030)
    # While the table is partioned need to be repaired
    ctas_query_repair_table_consp_member_id_summary_t030 = """MSCK REPAIR TABLE {db_name}.consp_member_id_summary_t030;""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_repair_table_consp_member_id_summary_t030)
    # While the table is partioned need to be repaired
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [13]:
def copy_consp_member_id_summary_060(param_dict, util):
    with open('consp_member_id_summary_t060.sql', 'r') as query:
        ctas_query_copy_consp_member_id_summary_t060 = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_copy_consp_member_id_summary_t060)
    # While the table is partioned need to be repaired
    ctas_query_repair_table_consp_member_id_summary_t060 = """MSCK REPAIR TABLE {db_name}.consp_member_id_summary_t060;""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_repair_table_consp_member_id_summary_t060)
    # While the table is partioned need to be repaired
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [14]:
def copy_consp_member_id_summary_090(param_dict, util):
    with open('consp_member_id_summary_t090.sql', 'r') as query:
        ctas_query_copy_consp_member_id_summary_t090 = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_copy_consp_member_id_summary_t090)
    # While the table is partioned need to be repaired
    ctas_query_repair_table_consp_member_id_summary_t090 = """MSCK REPAIR TABLE {db_name}.consp_member_id_summary_t090;""".format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_repair_table_consp_member_id_summary_t090)
    # While the table is partioned need to be repaired
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [15]:
def create_table_over_qualtrics_data(param_dict, util):
    with open('qualtrics_survey.sql', 'r') as query:
        ctas_query_qualtrics_table_creation = query.read().format(**param_dict)
    res = util.execute_query(sql_query = ctas_query_qualtrics_table_creation)
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [16]:
def make_training_dataset_001_007_030_060_090(param_dict, table_generation_date, sql_file_name, util):    
    # Now Join Table consp_member_id_summary_t001, consp_member_id_summary_t007, consp_member_id_t030, consp_member_id_t060, consp_member_id_t090
    # STEP 1 :
    # Delete table if exists
    param_dict['table_generation_date'] = table_generation_date
    ctas_query_drop_table_db = """DROP TABLE {db_name}.buyer_intent_training_set_001_007_030_060_090_{table_generation_date};""".format(**param_dict)
    util.execute_query(sql_query = ctas_query_drop_table_db)

    #STEP 2:
    #Creat Data Set
    with open(sql_file_name, 'r') as query:
        ctas_training_set_001_007_030_060_090 = query.read().format(**param_dict)

    res = util.execute_query(sql_query = ctas_training_set_001_007_030_060_090)
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return "buyer_intent_training_set_001_007_030_060_090_{table_generation_date}".format(**param_dict)
    else:
        return "buyer_intent_training_set_001_007_030_060_090_{table_generation_date}".format(**param_dict)

In [17]:
def union_two_tables(param_dict, table_1, table_2, util):
    tmp_param_dict = param_dict.copy()
    tmp_param_dict['generated_table'] = table_1[len(table_1)-2:] + table_2[len(table_1)-2:]
    tmp_param_dict['table_1'] = table_1
    tmp_param_dict['table_2'] = table_2
    ctas_union_query = """CREATE TABLE {db_name}.bi_tmp_union_{generated_table}
    WITH (
    external_location = '{s3_path_training_sets}/bi_tmp_union_{generated_table}',
    format='PARQUET'
    ) AS
    SELECT * FROM {db_name}.{table_1}
    UNION ALL
    SELECT * FROM {db_name}.{table_2};""".format(**tmp_param_dict)
    res = util.execute_query(sql_query = ctas_union_query)
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return "bi_tmp_union_{generated_table}".format(**tmp_param_dict)
    else:
        return None

In [18]:
def union_all_tables(param_dict, table_list, util):
    final_table = functools.reduce((lambda table_1, table_2: union_two_tables(param_dict, table_1, table_2, util)), table_list)
    return final_table

In [19]:
def parse_time_to_mst(time_str):
    try:
        date = datetime.strptime(time_str, '%m/%d/%Y %H:%M')
        return date.strftime ("%Y%m%d")
    except Exception as message:
        return -1

def transform_qualtrics_survey(input_file_name, output_file_name):
    try:
        df_qulatrics_survey = pd.read_csv(input_file_name)
        df_qulatrics_survey = df_qulatrics_survey.iloc[2:]
        df_qulatrics_survey = df_qulatrics_survey[['StartDate','EndDate', 'Member_id', 'Stagelabel1','Stagelabel2','Stagelabel3']]
        df_qulatrics_survey = df_qulatrics_survey.rename(columns={'StartDate':'start_date','EndDate':'end_date', 'Member_id':'member_id', 'Stagelabel1':'stage_label_1','Stagelabel2':'stage_label_2','Stagelabel3':'stage_label_3'})
        df_qulatrics_survey = df_qulatrics_survey[['member_id','start_date', 'end_date', 'stage_label_1', 'stage_label_2', 'stage_label_3']]
        df_qulatrics_survey['start_date'] = df_qulatrics_survey['start_date'].apply(parse_time_to_mst)
        df_qulatrics_survey['end_date'] = df_qulatrics_survey['end_date'].apply(parse_time_to_mst)
        df_qulatrics_survey.to_csv(output_file_name, index = False)
        return 1
    except Exception as message:
        return -1  

#transform_qualtrics_survey('Stage_survey_data_2020.csv', 'qualtrics_buyer_stage_20200202.csv')

In [20]:
def label_training_data(param_dict, data_set_table, qualtrics_table, sql_query_file, table_generation_date, util):
    param_dict['table_generation_date'] = table_generation_date
    param_dict['data_set_table'] = data_set_table
    param_dict['qualtrics_survey'] = qualtrics_table
    param_dict['table_generation_date'] = table_generation_date

    with open(sql_query_file, 'r') as query:
        ctas_label_data = query.read().format(**param_dict)

    res = util.execute_query(sql_query = ctas_label_data)
    if res['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 1
    else:
        return -1

In [21]:
def main():
    try:
        logger.info('STEP 1: Read Configuration File')
        param_dict = read_config_file('ds_generation_config_params.json')
        util = create_db(param_dict)

        logger.info('STEP 2: Clean Up All tables generated in DB')
        drop_tables_in_db(param_dict, util)
        
        logger.info('STEP 3: Clean Up all related s3 locations for tables')
        cmd = 'aws s3 rm --recursive {s3_path_training_sets}'.format(**param_dict)
        res = os.system(cmd)
        if res == 0:
            logger.info('STEP 3.1: Cleaning is done successfully')

        logger.info('STEP 4: Copy CAP Summery for Member_Id for 001, 007, 030, 060, 090')
        res_001 = copy_consp_member_id_summary_001(param_dict, util)
        res_007 = copy_consp_member_id_summary_007(param_dict, util)
        res_030 = copy_consp_member_id_summary_030(param_dict, util)
        res_060 = copy_consp_member_id_summary_060(param_dict, util)
        res_090 = copy_consp_member_id_summary_090(param_dict, util)

        logger.info('STEP 5: Creat Table over Qualtrics Survey Data')
        create_table_over_qualtrics_data(param_dict, util)
        
        # Generate Training Data Set for Specific Time got from Qualtrics
        # Some users are answered couple of time while the servey was sending in diffrent batch from marketing and UX group
        qualtrics_survey_answered_days = ['20200209','20200202','20200207','20200117','20200204','20200131','20200205','20200211','20200210','20200206','20200208','20200201','20200203']
        lst_next_day = ['20200208','20200201','20200206','20200116','20200203','20200130','20200204','20200210','20200211','20200207','20200207','20200131','20200202']

        for i in lst_next_day:
            if i not in qualtrics_survey_answered_days:
                qualtrics_survey_answered_days.append(i)

        table_list = []

        logger.info('STEP 6: Generate the required training dataset related to the answered time of users')
        for elem in qualtrics_survey_answered_days:
            table_list.append(make_training_dataset_001_007_030_060_090(param_dict, elem, 
                                                                        'buyer_intent_training_set_001_007_030_060_090_outlier_filter.sql', util))
        
        logger.info('STEP 7: We need to union of all data sets while we want to join with the users answer. We should make sure that we have all users behavioural data for labeling job')
        final_union_table = union_all_tables(param_dict, table_list, util)
        logger.info("The final unioned table is :%s", final_union_table )
        
        logger.info('STEP 8 : label useres behabiour beased on their answers')
        res = label_training_data(param_dict, final_union_table, 'qualtrics_survey','buyer_intent_label_data_set.sql' ,'final', util )
        return 1

    except Exception as message:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        f_name = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        logger.error("filename = %s - function = main - exception in line_number = %s - hint = %s " % (
                f_name, exc_tb.tb_lineno, message.__str__()))
        return -1
        
    

In [22]:
main()

2020-02-13 22:28:35.260 INFO <ipython-input-21-ded20d39fcf5>.3 - STEP 1: Read Configuration File
2020-02-13 22:28:35.261 INFO athena_util.py.111 - Read config from default schema
2020-02-13 22:28:35.262 INFO athena_util.py.114 - Schema fetched from default config file:
2020-02-13 22:28:35.262 INFO athena_util.py.441 - s3://datascience-workspace-dev/buyer-intent/buyer_intent_data_sets
2020-02-13 22:28:35.381 INFO athena_util.py.462 - 2020-02-13 22:28:35.381728
2020-02-13 22:28:36.416 INFO athena_util.py.464 - 2020-02-13 22:28:36.416537
2020-02-13 22:28:36.777 INFO <ipython-input-21-ded20d39fcf5>.7 - STEP 2: Clean Up All tables generated in DB
2020-02-13 22:28:36.777 INFO athena_util.py.441 - s3://datascience-workspace-dev/buyer-intent/buyer_intent_data_sets
2020-02-13 22:28:36.858 INFO athena_util.py.462 - 2020-02-13 22:28:36.858838
2020-02-13 22:28:37.897 INFO athena_util.py.464 - 2020-02-13 22:28:37.897828
2020-02-13 22:28:37.999 INFO athena_util.py.441 - s3://datascience-workspace-de

-1