## ETL - Guideline

This notebook was designed with the purpose of guiding you with the development of your first ETL

There are many functions and methods that we provide you in the melitk.analytics library (which comes pre-installed), but will not appear in this demo. The general idea behind this demo is for you to understand how the Fury Data Apps expects your ETL process in order to run successfully and generate the dataset you will want to use in your training

### Example

Here we will generate a dataset that we will use in order to get the probabilities of some users to use the MELI or MP APP in order to make a Cellphone Recharge

## Imports

In [None]:
import datetime
import os
import pickle
import pandas as pd
import s3fs

from melitk.analytics.connectors.core.authentication import Authentication
from melitk.analytics.connectors.teradata import ConnTeradata
from melitk.analytics.connectors.presto import ConnPresto
from melitk.fda import workspace

from shared.settings import DATASET_FILENAME, SAMPLE

## Setting credentials

In [None]:
teradata_user = os.environ['SECRET_TERADATA_USER']
teradata_pass = os.environ['SECRET_TERADATA_PASS']

melilake_user = os.environ['SECRET_MELILAKE_USER']
melilake_pass = os.environ['SECRET_MELILAKE_PASS']

s3_access_key = os.environ['SECRET_S3_ACCESS_KEY']
s3_secret_key = os.environ['SECRET_S3_SECRET_KEY']

## Getting Data From Teradata

### Establishing connection

In [None]:
tera = ConnTeradata(teradata_user, teradata_pass, auth_method=Authentication.APP)

### Defining queries

* Monthly active users

In [None]:
create_maus_table = """
CREATE MULTISET VOLATILE TABLE MAUS, NO LOG AS
(
SELECT      CUS_CUST_ID,

            MAX(CASE 
            WHEN PHOTO_MES = '201812' AND ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END )AS MAU_MP_M3,
            
            MAX(CASE 
            WHEN PHOTO_MES = '201901' AND ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END) AS MAU_MP_M2,
            
            MAX(CASE 
            WHEN PHOTO_MES = '201902' AND ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END) AS MAU_MP_M1,
            
            MAX(CASE 
            WHEN PHOTO_MES = '201812' AND ZEROIFNULL(DAYS_MAU_ML) > 0 THEN 1
            ELSE 0
            END) AS MAU_ML_M3,
            MAX(CASE 
            WHEN PHOTO_MES = '201901' AND ZEROIFNULL(DAYS_MAU_ML) > 0 THEN 1
            ELSE 0
            END) AS MAU_ML_M2,
            MAX(CASE 
            WHEN PHOTO_MES = '201902' AND ZEROIFNULL(DAYS_MAU_ML) > 0 THEN 1
            ELSE 0
            END) AS MAU_ML_M1,
            SUM(CASE 
            WHEN PHOTO_MES = '201812' AND ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END) AS MAU_M3,
            SUM(CASE 
            WHEN PHOTO_MES = '201901' AND ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END) AS MAU_M2,
            SUM(CASE
            WHEN PHOTO_MES = '201902' AND ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP) > 0 THEN 1
            ELSE 0
            END) AS MAU_M1,
            SUM(CASE
            WHEN PHOTO_MES = '201812' THEN ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP)
            ELSE 0
            END) AS DAYS_MAU_M3,
            SUM(CASE
            WHEN PHOTO_MES = '201901' THEN ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP)
            ELSE 0
            END) AS DAYS_MAU_M2,
            SUM(CASE
            WHEN PHOTO_MES = '201902' THEN ZEROIFNULL(DAYS_MAU_ML) + ZEROIFNULL(DAYS_MAU_MP)
            ELSE 0
            END) AS DAYS_MAU_M1
            
FROM        WHOWNER.LK_WALLET_MAUS

WHERE       SIT_SITE_ID = 'MLA'
            AND PHOTO_MES BETWEEN '201812' AND '201902'
            
GROUP BY 1
) WITH DATA UNIQUE PRIMARY INDEX (CUS_CUST_ID) ON COMMIT PRESERVE ROWS"""

### Executing query and getting result

In [None]:
tera.execute(create_maus_table)

In [None]:
# Define this sample var during dev to speed-up the etl (results won't be good).
maus_result = tera.execute_response("SELECT * FROM MAUS SAMPLE {}".format(SAMPLE))
maus_df = pd.DataFrame(maus_result)

## Getting Data From Melilake

### Establishing connection

In [None]:
presto = ConnPresto(melilake_user, melilake_pass)

### Defining queries

In [None]:
with open('mp_payments.sql', 'r') as query_file:
    mp_payments = query_file.read()

### Executing query and getting result

In [None]:
mp_payments_result = presto.execute_response(mp_payments)
mp_payments_df = pd.DataFrame(mp_payments_result)

## Getting Data From AWS S3

### Connect to S3

In [None]:
s3 = s3fs.S3FileSystem(key=s3_access_key, secret=s3_secret_key)

### Reading Data

In [None]:
mp_payment_methods_df = pd.read_csv( s3.open(path="s3://bi-public-data/training/lk_mp_pay_payment_methods.csv") )

## Merge Data

### Inner Join

In [None]:
wallet_payers_merge_df = pd.merge(mp_payments_df, 
                            mp_payment_methods_df, 
                            how='inner',  
                            left_on='PAY_PM_ID', 
                            right_on='PAY_PM_TYPE_ID')

### Cleaning Data

In [None]:
wallet_payers_merge_df.columns = map(str.upper, wallet_payers_merge_df.columns)

In [None]:
wallet_payers_merge_df = wallet_payers_merge_df[[
    'CUS_CUST_ID',
    'PAY_MOVE_DATE',
    'TPV_SEGMENT_DETAIL',
    'PAY_PAYMENT_ID',
    'PAY_COUPON_AMOUNT_AMT',
    'PAY_PM_TYPE_DESC']]

In [None]:
wallet_payers_merge_df['YEAR_MONTH_DATE'] = pd.to_datetime(wallet_payers_merge_df['PAY_MOVE_DATE']).dt.strftime('%Y%m').astype(int)

### Making Features

In [None]:
df = wallet_payers_merge_df

In [None]:
# Pagos Mensuales
payments_m3 = (df[ df.YEAR_MONTH_DATE == 201812 ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_M3'}))

payments_m2 = (df[ df.YEAR_MONTH_DATE == 201901 ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_M2'}))

payments_m1 = (df[ df.YEAR_MONTH_DATE == 201902 ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_M1'}))


In [None]:
# Cantidad de dias que opero
days_w_payments_m3 = (df[ df.YEAR_MONTH_DATE == 201812 ]
    .groupby(['CUS_CUST_ID','PAY_MOVE_DATE'])
    .count()
    .groupby('CUS_CUST_ID')
    .count()[['TPV_SEGMENT_DETAIL']]
    .rename(columns={'TPV_SEGMENT_DETAIL':'DAYS_W_PAYMENTS_M3'}))
   
days_w_payments_m2 = (df[ df.YEAR_MONTH_DATE == 201901 ]
    .groupby(['CUS_CUST_ID','PAY_MOVE_DATE'])
    .count()
    .groupby('CUS_CUST_ID')
    .count()[['TPV_SEGMENT_DETAIL']]
    .rename(columns={'TPV_SEGMENT_DETAIL':'DAYS_W_PAYMENTS_M2'}))

days_w_payments_m1 = (df[ df.YEAR_MONTH_DATE == 201902 ]
    .groupby(['CUS_CUST_ID','PAY_MOVE_DATE'])
    .count()
    .groupby('CUS_CUST_ID')
    .count()[['TPV_SEGMENT_DETAIL']]
    .rename(columns={'TPV_SEGMENT_DETAIL':'DAYS_W_PAYMENTS_M1'}))

In [None]:
# Pagos Mensuales de Cellphone Rech.
payments_cp_r_m3 = (df[ (df.YEAR_MONTH_DATE == 201812) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_CP_R_M3'}))

payments_cp_r_m2 = (df[ (df.YEAR_MONTH_DATE == 201901) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_CP_R_M2'}))

payments_cp_r_m1 = (df[ (df.YEAR_MONTH_DATE == 201902) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_CP_R_M1'}))

In [None]:
# Pagos Mensuales sin descuento
payments_no_disc_m3 = (df[ (df.YEAR_MONTH_DATE == 201812) & (df.PAY_COUPON_AMOUNT_AMT == 0) ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_M3'}))

payments_no_disc_m2 = (df[ (df.YEAR_MONTH_DATE == 201901) & (df.PAY_COUPON_AMOUNT_AMT == 0) ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_M2'}))

payments_no_disc_m1 = (df[ (df.YEAR_MONTH_DATE == 201902) & (df.PAY_COUPON_AMOUNT_AMT == 0) ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_M1'}))

In [None]:
# Pagos Mensuales sin descuento de Cellphone Rech.
payments_no_disc_cp_r_m3 = (df[ (df.YEAR_MONTH_DATE == 201812) & (df.PAY_COUPON_AMOUNT_AMT == 0) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_CP_R_M3'}))

payments_no_disc_cp_r_m2 = (df[ (df.YEAR_MONTH_DATE == 201901) & (df.PAY_COUPON_AMOUNT_AMT == 0) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_CP_R_M2'}))

payments_no_disc_cp_r_m1 = (df[ (df.YEAR_MONTH_DATE == 201902) & (df.PAY_COUPON_AMOUNT_AMT == 0) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge') ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_NO_DISC_CP_R_M1'}))

In [None]:
# Primer y ultimo pago
first_payment_m3 = ( df[ df.PAY_MOVE_DATE <= '2019-02-28' ]
    .groupby('CUS_CUST_ID')
    .agg({'PAY_MOVE_DATE':'min'})
    .rename(columns={'PAY_MOVE_DATE':'FIRST_PAYMENT_M3'}))

last_payment_m3 = ( df[ df.PAY_MOVE_DATE <= '2019-02-28' ]
    .groupby('CUS_CUST_ID')
    .agg({'PAY_MOVE_DATE':'max'})
    .rename(columns={'PAY_MOVE_DATE':'LAST_PAYMENT_M3'}))

In [None]:
# Pagos Por Method
payments_acc_money = (df[ df.PAY_PM_TYPE_DESC == 'Account Money' ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_ACC_MONEY'}))

payments_credit_card = (df[ df.PAY_PM_TYPE_DESC == 'Credit Card' ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_CREDIT_CARD'}))

payments_debit_card = (df[ df.PAY_PM_TYPE_DESC == 'Debit Card' ]
    .groupby('CUS_CUST_ID')
    .count()[['PAY_MOVE_DATE']]
    .rename(columns={'PAY_MOVE_DATE':'PAYMENTS_DEBIT_CARD'}))

In [None]:
# Target
df['TARGET'] = 0
df.loc[ (df.YEAR_MONTH_DATE == 201903) & (df.TPV_SEGMENT_DETAIL == 'Cellphone Recharge'), 'TARGET'] = 1

target = (
    df
    .groupby('CUS_CUST_ID')
    .agg({'TARGET':'max'})
)

### Join all New Features

In [None]:
# LK Customers
customers = (
    df
    .groupby('CUS_CUST_ID')[['PAY_MOVE_DATE']]
    .count()).drop('PAY_MOVE_DATE',axis=1)

In [None]:
features = pd.merge(customers,payments_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_m2,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_m1,how='left',on='CUS_CUST_ID')

features = pd.merge(features,days_w_payments_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,days_w_payments_m2,how='left',on='CUS_CUST_ID')
features = pd.merge(features,days_w_payments_m1,how='left',on='CUS_CUST_ID')

features = pd.merge(features,payments_cp_r_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_cp_r_m2,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_cp_r_m1,how='left',on='CUS_CUST_ID')

features = pd.merge(features,payments_no_disc_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_no_disc_m2,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_no_disc_m1,how='left',on='CUS_CUST_ID')

features = pd.merge(features,payments_no_disc_cp_r_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_no_disc_cp_r_m2,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_no_disc_cp_r_m1,how='left',on='CUS_CUST_ID')

features = pd.merge(features,first_payment_m3,how='left',on='CUS_CUST_ID')
features = pd.merge(features,last_payment_m3,how='left',on='CUS_CUST_ID')

features = pd.merge(features,payments_acc_money,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_credit_card,how='left',on='CUS_CUST_ID')
features = pd.merge(features,payments_debit_card,how='left',on='CUS_CUST_ID')

features = pd.merge(features,target,how='left',on='CUS_CUST_ID')

features = features.fillna(0)

In [None]:
wallet_payers_df = features.reset_index().copy()

## Checking format and handling missing values

In [None]:
numeric_columns_maus_df = [
    'MAU_MP_M3', 'MAU_MP_M2', 'MAU_MP_M1', 'MAU_ML_M3', 'MAU_ML_M2', 'MAU_ML_M1', 
    'MAU_M3', 'MAU_M2', 'MAU_M1', 'DAYS_MAU_M3', 'DAYS_MAU_M2','DAYS_MAU_M1'
]

In [None]:
numeric_columns_wallet_payers_df = [
    'PAYMENTS_M3', 'PAYMENTS_M2', 'PAYMENTS_M1',
    'DAYS_W_PAYMENTS_M3', 'DAYS_W_PAYMENTS_M2', 'DAYS_W_PAYMENTS_M1',
    'PAYMENTS_CP_R_M3', 'PAYMENTS_CP_R_M2', 'PAYMENTS_CP_R_M1',
    'PAYMENTS_NO_DISC_M3', 'PAYMENTS_NO_DISC_M2', 'PAYMENTS_NO_DISC_M1',
    'PAYMENTS_NO_DISC_CP_R_M3', 'PAYMENTS_NO_DISC_CP_R_M2', 'PAYMENTS_NO_DISC_CP_R_M1',
    'PAYMENTS_ACC_MONEY', 'PAYMENTS_CREDIT_CARD', 'PAYMENTS_DEBIT_CARD'
]

In [None]:
maus_df[numeric_columns_maus_df] = maus_df[numeric_columns_maus_df].fillna(0)
wallet_payers_df[numeric_columns_wallet_payers_df] = wallet_payers_df[numeric_columns_wallet_payers_df].fillna(0)

In [None]:
wallet_payers_df['FIRST_PAYMENT_M3'] = pd.to_datetime(wallet_payers_df['FIRST_PAYMENT_M3'])
wallet_payers_df['LAST_PAYMENT_M3'] = pd.to_datetime(wallet_payers_df['LAST_PAYMENT_M3'])

In [None]:
target_date = datetime.datetime(2019, 2, 28, 0, 0, 0, 0)
wallet_payers_df['DATE_BEFORE_TARGET'] = target_date

In [None]:
wallet_payers_df['DAYS_SINCE_LAST_PAYMENT'] = (
    wallet_payers_df['DATE_BEFORE_TARGET'] - wallet_payers_df['LAST_PAYMENT_M3']
).dt.days

wallet_payers_df['DAYS_SINCE_FIRST_PAYMENT'] = (
    wallet_payers_df['DATE_BEFORE_TARGET'] - wallet_payers_df['FIRST_PAYMENT_M3']
).dt.days

In [None]:
relative_date_cols = ['DAYS_SINCE_LAST_PAYMENT', 'DAYS_SINCE_FIRST_PAYMENT']
wallet_payers_df[relative_date_cols] = wallet_payers_df[relative_date_cols].fillna(100)

## Exploring target distribution

In [None]:
wallet_payers_df[['CUS_CUST_ID', 'TARGET']].groupby('TARGET').agg('count')

## Joining dataframes

In [None]:
df = pd.merge(wallet_payers_df, maus_df, on='CUS_CUST_ID')

In [None]:
columns_to_remove = ['CUS_CUST_ID', 'FIRST_PAYMENT_M3', 'LAST_PAYMENT_M3', 'DATE_BEFORE_TARGET']
df = df.drop(columns_to_remove, axis=1)

## Saving our job
By saving the dataset this way, we are letting the platform handle the storage of our data, which guarantees us that
all the steps will be able to access to the appropriate paths without having to create any storage nor 
asking for roles, credentials, etc

### Split the dataset into train and test, and store

In [None]:
# Moving 'TARGET' column to the last place just to be able to assemble train and dev datasets easily
cols_at_end = ['TARGET']
df = df[[c for c in df if c not in cols_at_end] + [c for c in cols_at_end if c in df]]

In [None]:
# Asserting that I am only working with 2 classes, as I was expecting for this demo
df['TARGET'] = df['TARGET'].fillna(value=0)
assert(len(df['TARGET'].unique()) in [1,2])

In [None]:
# The resulting Pandas dataframe is serialized using Python's standard pickle module
serialized_dataset = pickle.dumps(df)
workspace.save_etl_file(DATASET_FILENAME, serialized_dataset)

In [None]:
# Just for validation purposes
loaded_obj = workspace.load_etl_file(DATASET_FILENAME)
loaded_dataset = pickle.loads(loaded_obj)
assert(len(df) == len(loaded_dataset))