# remerge uplift report

This notebook allows you to validate remerge provided uplift reporting numbers. To do so it downloads and analyses exported campaign and event data from S3 (access credentials to the S3 bucket are provided by your remerge account manager). The campaign data contains all users that remerge marked to be part of an uplift test, the A/B group assignment, the timestamp of marking, conversion events (click, app open or similar) and their cost. The event data reflects the event stream provided by the customer and includes events, their timestamp and revenue (if any). 

To verify that the group split is random and has no bias, user events / attributes before campaign start can be compared and check for an equal distribution in test and control group. For example the user age distribution, the user activity distribution or the average spend per user  should be the same in both groups pre campaign. 

## Import needed packages

This notebook/script needs pandas and scipy for analysis and boto to access data store on S3.


In [1]:
from datetime import datetime
import pandas as pd
import boto3 # check if we need to import this
import re
import os
import gzip
import scipy
import scipy.stats 
import s3fs
from IPython.display import display # so we can run this as script as well

## Configuration

Set the customer name, audience + access credentials for the S3 bucket and path. Furthermore the event for which we want to evaluate the uplift needs to be set `revenue_event`.

In [2]:
customer = ""
audience = ""
revenue_event = ''

In [3]:
# os.environ["AWS_ACCESS_KEY_ID"] = "xxxxxxxx"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "xxxxxxxx"

os.environ["AWS_ACCESS_KEY_ID"] = ""
os.environ["AWS_SECRET_ACCESS_KEY"] = ""

In [4]:
dates = pd.date_range(start='2019-02-14',end='2019-02-27')

In [5]:
path = "s3://remerge-customers/{0}/uplift_data/{1}/".format(customer,audience)
path

'/'

In [6]:
# helper to download CSV files, convert to DF and print time needed
def read_csv(date,source):
    now = datetime.now()
    filename = path + source+'/'+date.strftime('%Y%m%d')+'.csv.gz'
    cache_dir = 'cache/'+source
    cache_filename = cache_dir + '/' + date.strftime('%Y%m%d')+'.parquet'
    if os.path.exists(cache_filename):
        print(now, "loading from cache", cache_filename)
        return pd.read_parquet(cache_filename, engine='pyarrow')
    print(now, "start loading CSV for", date)
    df = pd.read_csv(filename, escapechar='\\')
    print(datetime.now(), "finished loading CSV for", date.strftime('%d.%m.%Y'), "took", datetime.now()-now)
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)
    df.to_parquet(cache_filename, engine='pyarrow')
    return df

## Load CSV data from S3

Load mark,spend and event data from S3. 
## IMPORTANT
**The event data is usually quite large (several GB) so this operation might take several minutes or hours to complete, depending on the size and connection**

In [7]:
bid_df = pd.concat([read_csv(date,'marks_and_spend') for date in dates], ignore_index = True, verify_integrity=True)

2019-03-07 16:21:27.568440 loading from cache cache/marks_and_spend/20190214.parquet
2019-03-07 16:21:27.738527 loading from cache cache/marks_and_spend/20190215.parquet
2019-03-07 16:21:27.776029 loading from cache cache/marks_and_spend/20190216.parquet
2019-03-07 16:21:27.807911 loading from cache cache/marks_and_spend/20190217.parquet
2019-03-07 16:21:27.832332 loading from cache cache/marks_and_spend/20190218.parquet
2019-03-07 16:21:27.854468 loading from cache cache/marks_and_spend/20190219.parquet
2019-03-07 16:21:27.957119 loading from cache cache/marks_and_spend/20190220.parquet
2019-03-07 16:21:28.013446 loading from cache cache/marks_and_spend/20190221.parquet
2019-03-07 16:21:28.062236 loading from cache cache/marks_and_spend/20190222.parquet
2019-03-07 16:21:28.108072 loading from cache cache/marks_and_spend/20190223.parquet
2019-03-07 16:21:28.146834 loading from cache cache/marks_and_spend/20190224.parquet
2019-03-07 16:21:28.197832 loading from cache cache/marks_and_spe

In [None]:
attributions_df = pd.concat([read_csv(date,'attributions') for date in dates], ignore_index = True, verify_integrity=True)

2019-03-07 16:25:01.729613 start loading CSV for 2019-02-14 00:00:00
2019-03-07 16:33:54.249781 finished loading CSV for 14.02.2019 took 0:08:52.520218
2019-03-07 16:34:09.308277 start loading CSV for 2019-02-15 00:00:00
2019-03-07 16:42:26.072868 finished loading CSV for 15.02.2019 took 0:08:16.764644
2019-03-07 16:42:42.233706 start loading CSV for 2019-02-16 00:00:00
2019-03-07 16:54:20.098443 finished loading CSV for 16.02.2019 took 0:11:37.864803
2019-03-07 16:54:36.198460 start loading CSV for 2019-02-17 00:00:00
2019-03-07 17:11:24.099303 finished loading CSV for 17.02.2019 took 0:16:47.900896
2019-03-07 17:11:38.670664 start loading CSV for 2019-02-18 00:00:00
2019-03-07 17:22:45.158189 finished loading CSV for 18.02.2019 took 0:11:06.487590
2019-03-07 17:23:02.600611 start loading CSV for 2019-02-19 00:00:00
2019-03-07 17:30:06.088882 finished loading CSV for 19.02.2019 took 0:07:03.488369
2019-03-07 17:30:22.702401 start loading CSV for 2019-02-20 00:00:00
2019-03-07 17:36:42

Print some statistics of the loaded data sets.

In [17]:
bid_df.info()
bid_df.describe()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 220850 entries, 0 to 220849
Data columns (total 9 columns):
ts               220850 non-null object
event_type       220850 non-null object
ab_test_group    220850 non-null object
user_id          220845 non-null object
campaign_id      220850 non-null int64
cost_currency    5647 non-null object
cost             5647 non-null float64
cost_eur         5647 non-null float64
campaign_name    220850 non-null object
dtypes: float64(2), int64(1), object(6)
memory usage: 15.2+ MB


Unnamed: 0,campaign_id,cost,cost_eur
count,220850.0,5647.0,5647.0
mean,16490.696192,275202.762529,242813.084824
std,438.53797,96205.499843,84835.409771
min,16171.0,100000.0,88062.0
25%,16175.0,200000.0,176328.0
50%,16177.0,270000.0,238191.0
75%,17099.0,330000.0,291061.5
max,17100.0,500000.0,441407.0


In [33]:
attributions_df.info()
attributions_df.describe()
attributions_df.count()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 140545704 entries, 0 to 140545703
Data columns (total 11 columns):
ts                  object
user_id             object
event_id            float64
partner             object
partner_event       object
revenue             float64
revenue_currency    object
revenue_eur         float64
ab_test_group       object
event_data          object
date                object
dtypes: float64(3), object(8)
memory usage: 11.5+ GB


ts                  140545704
user_id             140545704
event_id                    0
partner             140545704
partner_event       140545704
revenue              10206312
revenue_currency    112620385
revenue_eur          10206312
ab_test_group         6867501
event_data          112165510
date                140545704
dtype: int64

In [19]:
# sanity check - remove later
attributions_df["ts"].count()

31045386

# Uplift report prep

In [21]:
pd.set_option('display.float_format', '{:.2f}'.format)

In [23]:
# sanity check - remove later ✅
bid_df.groupby(by='ab_test_group')[["cost_eur"]].sum()

Unnamed: 0_level_0,cost_eur
ab_test_group,Unnamed: 1_level_1
control,0.0
test,1371165490.0


In [22]:
# sanity check - remove later ✅
bid_df.groupby(by='ab_test_group')['user_id'].nunique()

ab_test_group
control     40827
test       161048
Name: user_id, dtype: int64

In [24]:
# sanity check - remove later ✅
# there should not be duplicate users ids that are in both groups
# we need to fix that first
bid_df.drop_duplicates('user_id').groupby(by='ab_test_group')['user_id'].nunique()

ab_test_group
control     39628
test       159797
Name: user_id, dtype: int64

## Remove invalid users

Due to a race condition during marking we need to filter out users that are marked as *control* and *test*. In rare cases we see the same user on different servers in the same second, and unknowingly of each other mark him differently. This is going to be fixed in a future version. 

In [25]:
# users with both groups, test is a problem due to racy bids
# we need to filter them out and fix this 
groups_per_user = bid_df.groupby('user_id')['ab_test_group'].nunique()
invalid_users = groups_per_user[groups_per_user > 1]
invalid_users.count()

2450

In [27]:
# sanity check - remove for exported notebook
attributions_df['date'] = pd.to_datetime(attributions_df['ts']).dt.date
attribution_by_date = attributions_df.groupby(by='date')
attribution_by_date['revenue_eur'].sum()

date
2019-02-19   714720194413680128.00
2019-02-20      277465639950744.00
2019-02-21      209884677199881.00
Name: revenue_eur, dtype: float64

The mark_df dataframe will contain all mark events (without the invalid marks). It is then grouped by the assigend A/B test group.

In [28]:
mark_df = bid_df[bid_df.event_type == 'mark']
mark_df = mark_df[~mark_df['user_id'].isin(invalid_users.index)]
grouped = mark_df.groupby(by='ab_test_group')
control_df = grouped.get_group('control')
test_df = grouped.get_group('test')

In [30]:
# remove from final notebook
# there shouldn't be any duplicates
mark_df.drop_duplicates('user_id').groupby(by='ab_test_group')['user_id'].nunique()


ab_test_group
control     38377
test       156101
Name: user_id, dtype: int64

Calculate the cost of advertising. Remerge tracks monetary valuesin micro currency units. 

In [32]:
ad_spend_micros = bid_df[bid_df.event_type == 'buying_conversion']['cost_eur'].sum()
ad_spend = ad_spend_micros / 10**6
ad_spend

1371.16549

Create a dataframe that contains all relevant revenue events.

In [33]:
revenue_df = attributions_df[pd.notnull(attributions_df['revenue_eur'])]
revenue_df = revenue_df[revenue_df.partner_event == revenue_event]

In [34]:
# Sanity check - remove later
revenue_df.groupby('ab_test_group')['revenue_eur'].sum() / 10**6

ab_test_group
control   211064.63
test      840173.60
Name: revenue_eur, dtype: float64

In [36]:
# Sanity check - remove later
revenue_df['user_id'].nunique()

92767

In [37]:
# Sanit check - remove later
revenue_df['revenue_eur'].sum() / 10**6

34766241.004789

Remerge marks users per campaign. This analysis looks at the per audience uplift, for that reason we drop duplicate marks for users that were marked by multiple campaigns. If a user was marked once for an audience he will have the same group allocation for consecutive marks unless manually reset on audience level.  

In [38]:
sorted_mark_df = mark_df.sort_values('ts')
depuplicated_mark_df = sorted_mark_df.drop_duplicates(['user_id'])

In [39]:
# Sanity check remove later
depuplicated_mark_df['user_id'].count()

194478

In [40]:
# Sanity check remove later
depuplicated_mark_df.groupby('ab_test_group')['user_id'].nunique()

ab_test_group
control     38377
test       156101
Name: user_id, dtype: int64

Join the marked users with the revenue events and excluded any revenue event that happend before the user was marked.

In [41]:
merged_df = pd.merge(revenue_df, depuplicated_mark_df, on='user_id')
merged_df = merged_df[merged_df.ts_x > merged_df.ts_y]


In [42]:
# sanity check - remove later
merged_df['date'] = pd.to_datetime(merged_df['ts_x']).dt.date


## Calculate uplift kpis

We calculate the incremental revenue and the iROAS in line with the [remerge whitepaper](https://drive.google.com/file/d/1PTJ93Cpjw1BeiVns8dTcs2zDDWmmjpdc/view). Afterwards run a [chi squared test](https://en.wikipedia.org/wiki/Chi-squared_test) on the results to test for significance of the results.

In [47]:
grouped_revenue = merged_df.groupby(by='ab_test_group_y')
test_group_size = test_df['user_id'].nunique()
test_revenue_micros = grouped_revenue.get_group('test')['revenue_eur'].sum()
test_revenue = test_revenue_micros / 10**6
control_group_size = control_df['user_id'].nunique()
control_revenue_micros = grouped_revenue.get_group('control')['revenue_eur'].sum()
control_revenue = control_revenue_micros / 10**6
test_conversions = grouped_revenue.get_group('test')['revenue_eur'].count()
control_conversion = grouped_revenue.get_group('control')['revenue_eur'].count()
ratio = float(test_group_size) / float(control_group_size)
scaled_control_conversions = float(control_conversion) * ratio
scaled_control_revenue_micros = float(control_revenue_micros) * ratio
incremental_conversions = test_conversions - scaled_control_conversions
incremental_revenue_micros = test_revenue_micros - scaled_control_revenue_micros
incremental_revenue = incremental_revenue_micros / 10**6
iroas = incremental_revenue / ad_spend
chi_df = pd.DataFrame({
    "conversions": [control_conversion, test_conversions],
    "total": [control_group_size, test_group_size]
    }, index=['control', 'test'])

chi,p,*_ = scipy.stats.chi2_contingency(pd.concat([chi_df.total - chi_df.conversions, chi_df.conversions], axis=1), correction=False)

In [56]:
# show it in nice
result_df = pd.DataFrame({
    "ad spend": ad_spend,
    "total revenue": test_revenue + control_revenue,
    "test group size": test_group_size,
    "test conversions": test_conversions,
    "test revenue": test_revenue,
    "size control group": control_group_size,
    "control conversion": control_conversion,
    "control revenue": control_revenue,
    "ratio test/control": ratio,
    "control conversions (scaled)": scaled_control_conversions,
    "control revenue (scaled)": scaled_control_revenue_micros / 10**6,
    "incremental conversions": incremental_conversions,
    "incremental revenue": incremental_revenue,
    "rev/conversions test":test_revenue / test_conversions,
    "rev/conversions control": control_revenue / control_conversion,
    "iROAS": iroas,
    "chi^2":chi,
    "p-value":p,
    "significant":p>0.05},index=["value"]).transpose()

display(result_df)

Unnamed: 0,value
ad spend,1371.17
total revenue,345024.41
test group size,156101
test conversions,1537
test revenue,277092.21
size control group,38377
control conversion,406
control revenue,67932.20
ratio test/control,4.07
control conversions (scaled),1651.43
