<a href="https://colab.research.google.com/github/olgen/Adafruit_NeoPixel/blob/master/uplift_report.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# remerge uplift report

This notebook allows you to validate remerge provided uplift reporting numbers. To do so it downloads and analyses exported campaign and event data from S3. The campaign data contains all users that remerge marked to be part of an uplift test, the A/B group assignment, the timestamp of marking, conversion events (click, app open or similar) and their cost. The event data reflects the app event stream and includes events, their timestamp and revenue (if any). We calculate the incremental revenue and the iROAS in line with the [remerge whitepaper](https://drive.google.com/file/d/1PTJ93Cpjw1BeiVns8dTcs2zDDWmmjpdc/view). 

**Hint**: This notebook can be run in any Jupyter instance with enough space/memory, as a [Google Colab notebook](#Google-Colab-version) or as a standalone Python script. If you are using a copy of this notebook running on Colab or locally you can find the original template on [GitHub: remerge/uplift-report](https://github.com/remerge/uplift-report/blob/master/uplift_report_per_campaign.ipynb)

### Notebook configuration

For this notebook to work properly several variables in the [Configuration](#Configuration) section need to be be set: `customer`, `audience`, `
revenue_event`, `dates` and the AWS credentials. All of these will be provided by your remerge account manager. 


### Verification

To verify that the group split is random and has no bias, user events / attributes before the campaign start can be compared and checked for an equal distribution in test and control group. For example the user age distribution, the user activity distribution or the average spend per user  should be the same in both groups pre campaign.



## Google Colab support

This notebook can be run inside Google Colab. Due to size limitations it cointains several optimizations like removing unused fields from the input files and caching files. Furthermore it installs missing dependencies and restarts the kernel. **Because pandas is upgraded the kernel needs to be restarted once per fresh instance. Just run the cell again after restart** 

In [0]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

if IN_COLAB:
  !pip install pyarrow
  !pip install gspread-pandas
  import pandas as pdt
  if pdt.__version__ != '0.23.4':
    # upgrading pandas requires a restart of the kernel
    # (we need an up to date pandas because we write to S3 for caching)
    # we kill it and let it auto restart (only needed once per fresh instance)
    !pip install pandas==0.23.4
    import os
    os.kill(os.getpid(), 9)

## Import needed packages

This notebook/script needs pandas and scipy for analysis and boto to access data store on S3.


In [0]:
from datetime import datetime
import pandas as pd
import re
import os
import gzip
import scipy
import scipy.stats 
import s3fs
from IPython.display import display # so we can run this as script as well
import gc

## Configuration

Set the customer name, audience and access credentials for the S3 bucket and path. Furthermore the event for which we want to evaluate the uplift needs to be set `revenue_event`.

In [0]:
# configure path and revenue event 
customer = ''
audiences = ['']
revenue_event = 'purchase'

# date range for the report
dates = pd.date_range(start='2019-01-01',end='2019-01-01')

# AWS credentials
os.environ["AWS_ACCESS_KEY_ID"] = ''
os.environ["AWS_SECRET_ACCESS_KEY"] = ''

# Configure the reporting output: 

# named groups that aggregate several campaigns
groups = {}

# show uplift results per campaign:
per_campaign_results = False

## Helper
Define a few helper functions to load and cache data.

In [0]:
def path(audience):
  return "s3://remerge-customers/{0}/uplift_data/{1}".format(customer,audience)

def filter_attributions_df(df):
  return df[df.partner_event == revenue_event]
  
# helper to download CSV files, convert to DF and print time needed
# caches files locally and on S3 to be reused
def read_csv(audience, source, date, chunk_filter_fn=None, chunk_size=10**6):
    now = datetime.now()
    
    date_str = date.strftime('%Y%m%d')
    
    filename = '{0}/{1}/{2}.csv.gz'.format(path(audience), source, date_str)
    
    # local cache
    cache_dir = 'cache/{0}/{1}'.format(audience, source)
    cache_filename = '{0}/{1}.parquet'.format(cache_dir, date_str)
    
    # s3 cache (useful if we don't have enough space on the Colab instance)
    s3_cache_filename = '{0}/{1}/cache/{2}.parquet'.format(path(audience), 
                                                           source, date_str)
    
    if source == 'attributions':
      cache_filename = '{0}/{1}-{2}.parquet'.format(cache_dir, date_str, 
                                                    revenue_event)
      
      # s3 cache (useful if we don't have enough space on the Colab instance)
      s3_cache_filename = '{0}/{1}/cache/{2}-{3}.parquet' \
        .format(path(audience), source, date_str, revenue_event)

    if os.path.exists(cache_filename):
        print(now, 'loading from', cache_filename)
        return pd.read_parquet(cache_filename, engine='pyarrow')
    
    fs = s3fs.S3FileSystem(anon=False)
    
    if fs.exists(path=s3_cache_filename):
      print(now, 'loading from S3 cache', s3_cache_filename)
      return pd.read_parquet(s3_cache_filename, engine='pyarrow')
    
    print(now, 'start loading CSV for', audience, source, date)
    
    read_csv_kwargs = {'chunksize': chunk_size}
    
    if source == 'attributions':
      # Only read the columns that are going to be used from attribution
      read_csv_kwargs['usecols'] = ['ts', 'user_id', 'partner_event', 
                                    'revenue_eur', 'ab_test_group']
      
    df = pd.DataFrame()
    for chunk in pd.read_csv(filename, escapechar='\\', low_memory=False,
                             **read_csv_kwargs):
      if chunk_filter_fn:
        filtered_chunk = chunk_filter_fn(chunk)
      else:
        filtered_chunk = chunk
      
      df = pd.concat([df, filtered_chunk], 
                     ignore_index = True, verify_integrity=True)
    
    print(datetime.now(), 'finished loading CSV for', date.strftime('%d.%m.%Y'),
          'took', datetime.now()-now)
    
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir)
        
    df.to_parquet(cache_filename, engine='pyarrow')
    
    # write it to the S3 cache folder as well
    print(datetime.now(), 'caching as parquet', s3_cache_filename)
    
    df.to_parquet(s3_cache_filename, engine='pyarrow')
    return df

## Load CSV data from S3

Load mark, spend and event data from S3. 

### IMPORTANT

**The event data is usually quite large (several GB) so this operation might take several minutes or hours to complete, depending on the size and connection.**

In [0]:
bids_df = pd.concat([read_csv(audience,'marks_and_spend',date) for audience in audiences for date in dates], ignore_index = True, verify_integrity=True)

In [0]:
attributions_df = pd.concat([read_csv(audience, 'attributions', date, filter_attributions_df) for audience in audiences for date in dates], ignore_index = True, verify_integrity=True)

Print some statistics of the loaded data sets.

In [0]:
bids_df.info()

In [0]:
attributions_df.info()

In [0]:
# set formatting options
pd.set_option('display.float_format', '{:.2f}'.format)

## Remove invalid users

Due to a race condition during marking we need to filter out users that are marked as *control* and *test*. In rare cases we see the same user on different servers in the same second, and unknowingly of each other marked him differently. This was fixed in the latest version of the remerge plattform but we need to filter old data.

In [0]:
# users that are in both groups due to racy bids are invalid
# we need to filter them out
groups_per_user = bids_df.groupby('user_id')['ab_test_group'].nunique()
invalid_users = groups_per_user[groups_per_user > 1]

## Define functions to prepare data frames


Calculate the cost of advertising give a dataframe. Remerge tracks monetary values in micro currency units. 

In [0]:
def ad_spend(df):
  ad_spend_micros = df[df.event_type == 'buying_conversion']['cost_eur'].sum()
  return ad_spend_micros / 10**6

The dataframe created by `marked`  will contain all mark events (without the invalid marks). Remerge marks users per campaign.  If a user was marked once for an audience he will have the same group allocation for consecutive marks (different campaigns) unless manually reset on audience level.  

In [0]:
def marked(df):
  mark_df = df[df.event_type == 'mark']
  mark_df = mark_df[~mark_df['user_id'].isin(invalid_users.index)]
  sorted_mark_df = mark_df.sort_values('ts')
  depuplicated_mark_df = sorted_mark_df.drop_duplicates(['user_id'])
  return depuplicated_mark_df

`revenue` creates a dataframe that contains all relevant revenue events.

In [0]:
def revenue(df):
  revenue_df = df[pd.notnull(df['revenue_eur'])]
  return revenue_df[revenue_df.partner_event == revenue_event]

`merge` joins the marked users with the revenue events and excludes any revenue event that happend before the user was marked.

In [0]:
def merge(mark_df,revenue_df):
  merged_df = pd.merge(revenue_df, mark_df, on='user_id')
  return merged_df[merged_df.ts_x > merged_df.ts_y]


## Calculate uplift kpis

We calculate the incremental revenue and the iROAS in line with the [remerge whitepaper](https://drive.google.com/file/d/1PTJ93Cpjw1BeiVns8dTcs2zDDWmmjpdc/view). Afterwards we run a [chi squared test](https://en.wikipedia.org/wiki/Chi-squared_test) on the results to test for significance of the results, comparing conversion to per group uniques.

In [0]:
def uplift(ad_spend,mark_df,revenue_df,index_name):
  # group marked users by their ab_test_group
  grouped = mark_df.groupby(by='ab_test_group')
  control_df = grouped.get_group('control')
  test_df = grouped.get_group('test')
  
  # join marks and revenue events
  merged_df = merge(mark_df,revenue_df)
  grouped_revenue = merged_df.groupby(by='ab_test_group_y')
  
  # init all KPIs with 0s first:
  test_revenue_micros = 0
  test_conversions = 0
  test_converters = 0
  
  control_revenue_micros = 0
  control_conversions = 0
  control_converters = 0

  # we might not have any events for a certain group in the time-period,
  if 'test' in grouped_revenue.groups:
    test_revenue_df = grouped_revenue.get_group('test')
    test_revenue_micros = test_revenue_df['revenue_eur'].sum()  
    test_conversions = test_revenue_df['revenue_eur'].count()  
    test_converters = test_revenue_df[test_revenue_df.partner_event == revenue_event]['user_id'].nunique()
  
  if 'control' in grouped_revenue.groups:
    control_revenue_df = grouped_revenue.get_group('control')
    control_revenue_micros = control_revenue_df['revenue_eur'].sum()  
    control_conversions = control_revenue_df['revenue_eur'].count()
    control_converters = control_revenue_df[control_revenue_df.partner_event == revenue_event]['user_id'].nunique()

    
  # calculate KPIs
  test_group_size = test_df['user_id'].nunique()
  test_revenue = test_revenue_micros / 10**6
  control_group_size = control_df['user_id'].nunique()

  control_revenue = control_revenue_micros / 10**6


  ratio = float(test_group_size) / float(control_group_size)
  scaled_control_conversions = float(control_conversions) * ratio
  scaled_control_revenue_micros = float(control_revenue_micros) * ratio
  incremental_conversions = test_conversions - scaled_control_conversions
  incremental_revenue_micros = test_revenue_micros - scaled_control_revenue_micros
  incremental_revenue = incremental_revenue_micros / 10**6
  iroas = incremental_revenue / ad_spend
  
  rev_per_conversion_test = 0
  rev_per_conversion_control = 0
  if test_conversions > 0:
    rev_per_conversion_test = test_revenue / test_conversions
  if control_conversions > 0:
    rev_per_conversion_control = control_revenue / control_conversions

  
  test_cvr = test_conversions / test_group_size
  control_cvr = control_conversions / control_group_size
  
  uplift = 0
  if control_cvr > 0:
    uplift = test_cvr/control_cvr - 1
  
  
  control_successes, test_successes = control_conversions, test_conversions
  if max(test_cvr, control_cvr) > 1.0:
    control_successes, test_successes = control_converters, test_converters 
  chi_df = pd.DataFrame({
    "conversions": [control_successes, test_successes],
    "total": [control_group_size, test_group_size]
    }, index=['control', 'test'])
  # CHI square calculation will fail with insufficient data
  # Fallback to no significance
  try: 
    chi,p,*_ = scipy.stats.chi2_contingency(pd.concat([chi_df.total - chi_df.conversions, chi_df.conversions], axis=1), correction=False)
  except:
    chi,p = 0,1.0
  
  # show results as a dataframe
  return pd.DataFrame({
    "ad spend": ad_spend,
    "total revenue": test_revenue + control_revenue,
    "test group size": test_group_size,
    "test conversions": test_conversions,
    "test converters": test_converters,
    "test revenue": test_revenue,

    "control group size": control_group_size,
    "control conversions": control_conversions,
    "control_converters": control_converters,
    "control revenue": control_revenue,
    "ratio test/control": ratio,
    "control conversions (scaled)": scaled_control_conversions,
    "control revenue (scaled)": scaled_control_revenue_micros / 10**6,
    "incremental conversions": incremental_conversions,
    "incremental revenue": incremental_revenue,
    "rev/conversions test": rev_per_conversion_test,
    "rev/conversions control": rev_per_conversion_control,
    "test CVR": test_cvr,
    "control CVR": control_cvr,
    "CVR Uplift": uplift,
    "iROAS": iroas,
    "chi^2": chi,
    "p-value": p,
    "significant": p<0.05},index=[index_name]).transpose()

### Calculate and display uplift report for the data set as a whole

This takes the whole data set and calculates uplift KPIs.

In [0]:
# calculate the total result:
revenue_df = revenue(attributions_df)
mark_df = marked(bids_df)
results_df = uplift(ad_spend(bids_df),mark_df,revenue_df,"total")

### Calculate uplift report per group (if configured)

Sometimes it makes sense to look at groups of similar campaigns. If the `groups`  dictionary contains group names as keys and a list of campaign ids as values per key, this function will compile a per group report. 

In [0]:
# if there are groups filter the events against the per campaign groups and generate report
if len(groups) > 0:
  per_group_df = None
  for name, campaigns in groups.items():
    group_marks_df = bids_df[bids_df.campaign_id.isin(campaigns)]
    results_df[name] = uplift(ad_spend(group_marks_df),marked(group_marks_df),revenue_df,name)

### Calculate uplift report per campaign

Sometimes it makes sense to look at the uplift report per campaign. Each campaign usually reflects one segement of users. To do that we iterate over all campaigns in the current dataset.

In [0]:
if per_campaign_results:
  for campaign in bids_df['campaign_id'].unique():
    name = "c_{0}".format(campaign)
    df = bids_df[bids_df.campaign_id == campaign]
    results_df[name] = uplift(ad_spend(df),marked(df),revenue_df,name)

# Uplift Results

You can configure the ouput by using variables in the 'Configuration' section

In [0]:
results_df