# Pick your Day Test vs Control
 - Measure Incremental Sales and Activation
 - (6/25 -7/17 20% Off) 
 - FB Target Audience groups (Test & hold out controls) to be:
1.  12 month active
2.  over 12 month Lapsed
3.  Recent enrolls not yet activated:\

KPIs:
Perspectives:
- whole period: 6-25 to 7-17
- weekly

Metrics:
- Total unique ids
- Total unique shoppers
- Total sales
- Total transactions

Once the above extracted from the data, others can be easily edited in Excel or you are free to add the lines in your code:

- Conversion rate (cvr): shopper/total_id
- Trans per id: total trans / total unique ids
- Sales per id: total sales / total unique ids
- AOV: tot sales / tot trxns

## Data

In [1]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import numpy as np
import os
from pathlib import Path
import sqlalchemy
import dask
import dask.dataframe as dd

DATA_PATH = Path('../data')
DATA_PATH_RAW = DATA_PATH / 'raw'
DATA_PATH_INTERIM = DATA_PATH / 'interim'
DATA_PATH_PROCESSED = DATA_PATH / 'processed'

In [5]:
def adjust_date(date, offset):
    """Adjust the date by offset.
    
    Args:
        date (str or date object):
            The date to be adjusted.
        offset (int): how many days to adjust
    
    Returns: date str of 'YYYY-MM-DD' format.
    """
    d = pd.to_datetime(date, infer_datetime_format=True)
    new_date = d + pd.to_timedelta(offset, unit='D')
    return new_date.strftime('%Y-%m-%d')


def is_in_date_range(x, start, end, include_start=False, include_end=True):
    """Helper func to extract the date string in path 
    and determine if it's in the given range.
    
    Returns: True or False
    """
    d = str(x).split('_by_weeks/MediaStorm_')[1][:10]
    flag_start = d >= start if include_start else d > start
    flag_end = d <= end if include_end else d < end
    return flag_start and flag_end


def list_files_by_date_range(start, end, file_type, 
                             include_start=False, include_end=True):
    """List all available weekly data files under path:
        /home/jian/BigLots/YYYY_by_weeks/
    on 192 server that match given date range.
    
    Args:
        start: str start date in format 'YYYY-MM-DD'
        end: str
        file_type: str one of the following: ['DailySales', 'MasterWeekly']
    
    Returns:
        list of absolute file paths(pathlib.Path)
    """
    files = []
    for p in list(Path('/home/jian/BigLots/').glob('*_by_weeks/')):
        files += (list(p.rglob('MediaStorm'+file_type+'*.[Tt][Xx][Tt]')))
    files = [x for x in files 
             if is_in_date_range(x, start, end, include_start, include_end)]
    return files


def collect_data_by_date_range(start, end, file_type, use_cols=None,
                               include_start=True, include_end=True,
                               output_path=None):
    """Read weekly data files under path:
        /home/jian/BigLots/YYYY_by_weeks/
    on 192 server that match given date range.
    
    Args:
        start (str): start date in format 'YYYY-MM-DD'
        end (str): end date in format 'YYYY-MM-DD'
        file_type (str): one of the following: ['DailySales', 'MasterWeekly']
        use_cols (iterable): columns to use when reading files
        include_start (bool): if start date is inclusive
        include_end (bool): if end date is inclusive
        output_path (str or pathlib.Path): if specified, file will be saved to path
            in parquet format.
    
    Returns:
        pd.DataFrame
    """
            
    files = list_files_by_date_range(
        start=start, 
        end=end, 
        file_type=file_type, 
        include_start=include_start, 
        include_end=include_end)
    df = pd.concat(
        [pd.read_csv(f, sep='|', usecols=use_cols) for f in files]
    ).dropna().drop_duplicates()
    
    if output_path:
        df.to_parquet(Path(output_path), index=False)
    
    return df


def count_days_since_sign_up(df, inspect_date):
    t = pd.to_datetime(inspect_date) - pd.to_datetime(df['sign_up_date'])
    return t.apply(lambda x:x.days)


def filter_ids_by_window(df, window):
    df = df.loc[(df['days_since_sign_up'] >= window[0])
                & (df['days_since_sign_up'] <= window[1])]
    return df


def read_customer_data_from_db(start, end):
    # TO BE EXTENDED
    BL_SQL_CONNECTION= 'mysql+pymysql://nick:Nick-2020@localhost/BigLots' 
    BL_engine = sqlalchemy.create_engine(
        BL_SQL_CONNECTION, 
        pool_recycle=1800)

    all_ids = pd.read_sql(
        """SELECT customer_id_hashed, email_address_hash
           FROM BL_Rewards_Master
           WHERE sign_up_date >= %s AND sign_up_date <= %s;""",
        con = BL_engine,
        params=(start, end))
    return all_ids


def generate_weekly_intervals(start, end, week_start_on='SUN'):
    week_starts = pd.date_range(start, end, freq='W-' + week_start_on)
    week_ends = week_starts  - pd.Timedelta('1D')
    week_starts = [start] + week_starts.strftime('%Y-%m-%d').tolist()
    week_ends = week_ends.strftime('%Y-%m-%d').tolist() + [end]
    return week_starts, week_ends


def calc_kpis(group, transactions, name, start=None, end=None):
    """Calculate KPIs."""
    if start:
        transactions = transactions.loc[transactions['transaction_dt'] >= start]
    else:
        start = transactions['transaction_dt'].min()
    if end:
        transactions = transactions.loc[transactions['transaction_dt'] <= end]
    else:
        end = transactions['transaction_dt'].max()
    
    
    group = group[['customer_id_hashed']].drop_duplicates().dropna()
    group = group.merge(transactions, how='left', on='customer_id_hashed')

    tot_ids = group['customer_id_hashed'].drop_duplicates().shape[0]
    tot_shprs = (group.loc[~group['transaction_id'].isna(), 'customer_id_hashed']
                 .drop_duplicates().shape[0])
    tot_sales = group['item_transaction_amt'].sum()
    tot_trxns = group.loc[~group['transaction_id'].isna(), :].shape[0]
    cvr = tot_shprs / tot_ids # conversion rate
    tpi = tot_trxns / tot_ids # trxns per id
    spi = tot_sales / tot_ids # sales per id
    tps = tot_trxns / tot_shprs # trxns per shopper
    sps = tot_sales / tot_shprs # sales per shopper
    aov = tot_sales / tot_trxns # avg order value

    res = pd.DataFrame({'start': [start],
                        'end': [end],
                        'group': [name],
                        'tot_ids': [tot_ids],
                        'tot_shprs': [tot_shprs],
                        'tot_sales': [tot_sales],
                        'tot_trxns': [tot_trxns],
                        'convertion_rate': [cvr],
                        'trxns_per_id': [tpi],
                        'sales_per_id': [spi],
                        'trxns_per_shpr': [tps],
                        'sales_per_shpr': [sps],
                        'avg_order_value': [aov]},)
    return res


def normalize_kpis(kpis, notional_count=100000, by='tot_ids'):
    norm_kpis = kpis.copy()
    factor = notional_count / kpis['tot_ids']
    cols_to_normalize = ['tot_ids', 'tot_shprs',
                         'tot_sales', 'tot_trxns']
    norm_kpis[cols_to_normalize] = norm_kpis[cols_to_normalize] * factor
    return norm_kpis


def analyze(test, control, transactions, start=None, end=None,
            by_week=False, name=None, add_norm_res=False):
    """Analyze transactions and sales for test & control group."""
    if not start:
        start = transactions['transaction_dt'].min()
    if not end:
        end = transactions['transaction_dt'].max()
        
    if by_week:
        kpis_by_week = []
        week_starts, week_ends = generate_weekly_intervals(start, end)
        for st, en in zip(week_starts, week_ends):
            kpis_test = calc_kpis(test, transactions, 'test', st, en)
            kpis_ctrl = calc_kpis(control, transactions, 'control', st, en)
            kpis_by_week.append(pd.concat([kpis_test, kpis_ctrl]))
        kpis = pd.concat(kpis_by_week)
            
    else:
        kpis_test = calc_kpis(test, transactions, 'test', start, end)
        kpis_ctrl = calc_kpis(control, transactions, 'control', start, end)
        kpis = pd.concat([kpis_test, kpis_ctrl])

    if name:
        kpis.insert(0, 'name', name)
        
    if add_norm_res and not by_week:
        norm_kpis = kpis.apply(normalize_kpis, axis=1)
        norm_kpis['name'] = norm_kpis['name'] + '_normalized'
        kpis = pd.concat([kpis, norm_kpis])
    return kpis


def read_agg_trxns(start, end):
    """Wrapper for reading trxn data"""
    print("Collecting transactions data...")
    agg_trxn_filename = ('agg_trxns_'
                         + start.replace('-', '')
                         + '_'
                         + end.replace('-', '')
                         + '.parquet')    
    agg_trxn_path = DATA_PATH_INTERIM / agg_trxn_filename

    if agg_trxn_path.exists():
        print("Loading existing agg trxns data...")
        agg_trxns = pd.read_parquet(agg_trxn_path)
        print("Agg trxns read from path: " + str(agg_trxn_path))
    else:
        print("No existing agg trxn data...")
        trxn_start = adjust_date(start, -6)
        trxn_end = adjust_date(end, +6)
        trxn_filename = ('trxns_'
                         + trxn_start.replace('-', '')
                         + '_'
                         + trxn_end.replace('-', '')
                         + '.parquet')
        trxn_path = DATA_PATH_RAW / trxn_filename
        if trxn_path.exists():
            print("Found existing transactions data. Will generate agg trxns data from this dataset.")
            print("Loading transaction data...")
            transactions = pd.read_parquet(trxn_path)
            print("Transactions data read from path: " + str(trxn_path))
        else:
            print("No existing trxn data...collecting from server...")
            transactions = collect_data_by_date_range(
                start=trxn_start,
                end=trxn_end,
                file_type='DailySales', 
                use_cols=['location_id', 'transaction_dt', 'transaction_id',
                          'customer_id_hashed', 'item_transaction_amt'],
                output_path=trxn_path)
            print("Trxns data has been gathered and saved to: " + str(trxn_path))


        print("Transactions data shape: " + str(transactions.shape))

        print("\nAggregating trxns data...")
        agg_trxns = (transactions
                     .groupby(['location_id',
                               'transaction_dt',
                               'transaction_id',
                               'customer_id_hashed'])
                     ['item_transaction_amt'].sum().reset_index())

        # filter transactions to date range
        msk = ((agg_trxns['transaction_dt'] >= start)
               & (agg_trxns['transaction_dt'] <= end))
        agg_trxns = agg_trxns.loc[msk, :]
        
        # save to file
        agg_trxns.to_parquet(agg_trxn_path, index=False)                 
        print("Data has been aggregated and saved to: " + str(agg_trxn_path))

    print("Agg trxns data shape: " + str(agg_trxns.shape) + "\n")
    return agg_trxns

def read_groups(p):
    return (pd.read_parquet(p)
              .rename(columns={'hashed_customer_num': 'customer_id_hashed'}))

In [6]:
%%time
start = '2020-06-25'
end = '2020-07-17'

# read agg trxns
agg_trxns = read_agg_trxns(start, end)

# read audiences and calculate kpis
lapsed = {'test': DATA_PATH_RAW / 'PYD_Lapsed.parquet',
          'control': DATA_PATH_RAW / 'PYD_Lapsed_Control.parquet',
          'name': 'lapsed'}
active = {'test': DATA_PATH_RAW / 'PYD_Active.parquet',
          'control': DATA_PATH_RAW / 'PYD_Active_Control.parquet',
          'name': 'active'}
inactive = {'test': DATA_PATH_RAW / 'PYD_Web_Not_Active.parquet',
            'control': DATA_PATH_RAW / 'PYD_Web_Not_Active_Control.parquet',
            'name': 'inactive'}
experiments = [lapsed, active, inactive]

kpis_alltime_list = []
kpis_weekly_list = []
for exp in experiments:
    print("Calculating KPIs for group: " + exp['name'])
    test = read_groups(exp['test'])
    control = read_groups(exp['control'])
    name = exp['name']
    kpis_alltime = analyze(test, control, agg_trxns, start, end,
                           by_week=False, name=name, add_norm_res=True)
    kpis_weekly = analyze(test, control, agg_trxns, start, end,
                          by_week=True, name=name)
    kpis_alltime.to_csv(DATA_PATH_PROCESSED / (name + '_alltime.csv'), index=False)
    kpis_weekly.to_csv(DATA_PATH_PROCESSED / (name + '_weekly.csv'), index=False)
    kpis_alltime_list.append(kpis_alltime)
    kpis_weekly_list.append(kpis_weekly)

print("Merging results...")    
kpis_alltime_all = pd.concat(kpis_alltime_list)
kpis_weekly_all = pd.concat(kpis_weekly_list)
kpis_alltime_all.to_csv(DATA_PATH_PROCESSED / ('all_groups_alltime.csv'), index=False)
kpis_weekly_all.to_csv(DATA_PATH_PROCESSED / ('all_groups_weekly.csv'), index=False)

print("Done.")

Collecting transactions data...
Loading existing agg trxns data...
Agg trxns read from path: ../data/interim/agg_trxns_20200625_20200717.parquet
Agg trxns data shape: (5685942, 5)

Calculating KPIs for group: lapsed
Calculating KPIs for group: active
Calculating KPIs for group: inactive
Merging results...
Done.
CPU times: user 22min 12s, sys: 10min 55s, total: 33min 8s
Wall time: 33min 46s
