### Read Me:

In this notebook, we calculate multi-touch attribution values for a given set of channels using following methods:
1. Shapley values
2. Markov chain values 
3. Last touch based 

* Shapley values code is implemented based on the logic presented in this [paper](https://arxiv.org/pdf/1804.05327.pdf)
* Markov chain values are based on the following [whitepaper](https://www.channelattribution.net/pdf/Whitepaper.pdf)

Setup:
1. This code runs in AWS Sagemaker.
2. Input data is to be prepared using Rudderstack Reverse ETL (more details in blog post linked at the end)
3. Edit constants in the cell below. Only the first cell requires editing. No other edit is necessary. 
4. The output reslults get pushed to RudderStack event stream to a Python SDK source. From RudderStack website, we can setup a destination and push the data to any supported destination.
5. The output results also get posted as a parquet file. The same can also be viewed in the notebook itself, as we go towards the end. 
6. Throughout the notebook, along with the code, we can also see several descriptions around the data and the results.
7. Towards the end, we also have a robustness check to ensure how robust the results are if input data changes.
8. **Max no:of touches permissible for shapley value model is around 15.** If larger than this, the algo fails as no:of combinations increases exponentially. We can either drop a few unimportant touches or group related touches together to reduce this count.

Blog post about Rudderstack ETL and event stream used for the model: 

https://www.rudderstack.com/blog/from-first-touch-to-multi-touch-attribution-with-rudderstack-dbt-and-sagemaker/

This notebook can run in any standard python environment.

It is accompanied with a creds.yaml file that expects following keys:

```
rudder:
  DATA_PLANE_URL: <from Rudderstack webapp>
  WRITE_KEY: <from Rudderstack webapp>
aws:
  access_key_id: <from aws account>
  access_key_secret: <from aws account>
  region: <from aws account>
```

The aws credentials should point to an account that have access to the aws bucket location where RudderStack Warehouse actions pushes the data into.

It also has a requirements.txt file that needs to be run for the first time. 

In [None]:
#!pip install requirements.txt
# This needs to be run only for the first time to install all required packages.

### Part I: Constants:

Define constants below

In [None]:
# AWS DATA BUCKET where both input data is stored, and output data should be stored. 
# RudderStack Reverse ETL should be configured to write into this location
S3_BUCKET = '<enter your s3 bucket name>'

# INPUT DATA LOCATION. Folder within aws bucket where Rudderstack Reverse ETL pushes the input data into
S3_INPUT_FOLDER_LOCATION = '<enter the folder path inside the s3>'

# INPUT LABEL LOCATION. Folder within aws bucket where input label data is stored as json files (extracted from Rudderstack Reverse ETL)
S3_LABEL_DATA_FOLDER_LOCATION = '<enter the folder path inside the s3>'

# Output location. Folder within aws bucket where output data 
S3_OUTPUT_FOLDER_LOCATION='<enter the folder path inside the s3 where output should be copied to>'

# MODEL CONFIGURATION.

# We may want to combine a few touches into one group. Ex: all video ads from different sources may be combined into one.
# In such case, we make below flag True (else, False) and create a touchpoint_mapping dictionary (see below) to group related touch points together
GROUP_TOUCHES = False

# This needs to be configured based on your touch points. 
# This is required only if GROUP_TOUCHES = True. In below mapping, each key corresponds to a touch we want to group. 
# Ex, sources, directory, home etc are all converted to 'webapp' and video-library, guides, case-studies are converted to 'docs'
# If a touch is not found in the key, no modification is applied on that and it stays as is.

touchpoint_mapping = {"sources": "webapp",
                      "directory": "webapp",
                      "home": "webapp",
                      "destinations": "webapp",
                      "integration": "product",
                      "syncs": "webapp",
                      "transformations": "product",
                      "team": "webapp",
                      "video-library": "docs",
                      'case-studies': 'docs'}

#touchpoint_mapping = None # If no mapping is required.

# Dedup logic. IF same event repeats consecutively within this interval (in seconds), they are considered the same and first occurence timestamp is counted. 
# If we don't want a dedup logic, we can make this value as 0.
MIN_EVENT_INTERVAL_IN_SEC = 300


# Max no:of touches permissible for shapley value model. If larger than this, the algo fails as no:of combinations increases exponentially
MAX_DISTINCT_TOUCHES=15

## Data specific column variables. 

ENTITY_COL = 'domain'
TIMESTAMP_COL  = 'timestamp'
CONVERSION_TIME_COL = 'call_conversion_time'
EVENT_COL = 'touch'
ID_COL = 'row_id' # Random id used in Event Stream. 


# Rudder Analytics parameters (Ref: https://www.rudderstack.com/docs/stream-sources/rudderstack-sdk-integration-guides/rudderstack-python-sdk/)

USER_ID = 'multitouch_attribution_notebook' 
EVENT_TYPE = 'multitouch_attribution_results'

**No other manual change is expected. Rest of the code should run without any edit.**

You can run the whole notebook by going to `Run -> Run all cells`. This creates the parquet file output in prescribed s3 location, and also prints the data here for instant consumption

### Part II: Data Preprocessing:

In [None]:
# Loading the required libraries
import json
import boto3

import os
import sys
import gzip
from typing import List, Optional, Union, Dict, Tuple
import datetime

from math import factorial
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import logging
import yaml
import itertools
from dataclasses import dataclass
pd.set_option('display.max_columns', None)


from tqdm import tqdm
tqdm.pandas()

In [None]:
logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
                              datefmt='%Y-%m-%d %H:%M:%S')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

output_file_handler = logging.FileHandler("attribution_model.log")
output_file_handler.setFormatter(formatter)

#stdout_handler = logging.StreamHandler(sys.stdout)

logger.addHandler(output_file_handler)
#logger.addHandler(stdout_handler)

# To prevent unwanted logs from matplotlib and pandas 
logging.getLogger('matplotlib.font_manager').disabled = True
logging.getLogger('chardet.charsetprober').setLevel(logging.INFO)

logging.info("\n\n\t\tSTARTING MTA MODEL")

In [None]:
time_suffix = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%dT%H:%M:%S')
s3_output_location = f"s3://{S3_BUCKET}/{S3_OUTPUT_FOLDER_LOCATION}/{time_suffix}"
local_output_location = f"data/mta_output_files/{time_suffix}"
os.makedirs(local_output_location)
logging.info(f"MTA values will be written to the location:\n\t{local_output_location}")

In [None]:

# Helper functions to load input data
def load_json(json_str: str,
              key: Optional[str]=None):
    if key:
        return json.loads(json_str)[key]
    else:
        return json.loads(json_str)

def load_s3_compressed_dictionary_object(s3_file_object, key: Optional[str]=None):
    with gzip.GzipFile(fileobj=s3_file_object.get()['Body']) as gzipfile:
        content = gzipfile.read().decode('utf-8').split('\n')
    
    return [load_json(line, key) for line in content if line]

def load_json_data_from_s3(s3_instance, s3_bucket_name: str, files_prefix: str, json_dict_key: Optional[str]=None) -> pd.DataFrame:
    bucket = s3_instance.Bucket(s3_bucket_name)
    all_rows = []
    for file_obj in tqdm(bucket.objects.filter(Prefix=files_prefix)):
        all_rows.append(load_s3_compressed_dictionary_object(file_obj, json_dict_key))
    
    df = pd.DataFrame.from_records([row for rows in all_rows for row in rows])
    df.columns = [col.lower() for col in list(df)]
    return df


In [None]:
with open("creds.yaml", "r") as f:
    config = yaml.safe_load(f)

In [None]:
session = boto3.Session(
    aws_access_key_id=config['aws']['access_key_id'],
    aws_secret_access_key=config['aws']['access_key_secret'],
    region_name=config['aws']['region']
)


In [None]:
%%time
# Loading data from Reverse ETL output into Python
s3 = session.resource('s3')

raw_data = load_json_data_from_s3(s3, S3_BUCKET, S3_INPUT_FOLDER_LOCATION, json_dict_key='traits')


raw_data[TIMESTAMP_COL] = pd.to_datetime(raw_data[TIMESTAMP_COL]).dt.tz_localize(None)

label_data = load_json_data_from_s3(s3, S3_BUCKET, S3_LABEL_DATA_FOLDER_LOCATION, json_dict_key='traits')
label_data[CONVERSION_TIME_COL] = pd.to_datetime(label_data[CONVERSION_TIME_COL]).dt.tz_localize(None)
domain_conversion_dates = label_data.set_index(ENTITY_COL).to_dict()[CONVERSION_TIME_COL]
raw_data = raw_data.drop(columns=[ID_COL, CONVERSION_TIME_COL]).drop_duplicates()

In [None]:
# Transformations on the raw data. We apply the constraints defined in the constants cell above.

def dedup_by_ts_delta(df: pd.DataFrame, primary_key: str, timestamp: str, event_type: str, max_lag: int) -> pd.DataFrame:
    """
    ### Parameters
    1. df : pd.DataFrame
        - User touches dataframe. 
    2. primary_key : str
        - column name of the column that contains user_id. 
    3. timestamp: str
        - column name of the column that contains event timestamp
    4. event_type: str
        - column name of the column that contains event/touch data
    5. max_lag: int
        - max time (in sec) between consecutive events to be considered as duplicates. 

    ### Returns
    - DataFrame after doing following steps
    Based on primary key and event_type, it checks if two consecutive events occur within the max_lag time window. If so, they are considered same event and the latter event is dropped. 
    """
    if max_lag <= 0:
        return df
    df = df.sort_values(by=[primary_key, timestamp], ascending=True).reset_index(drop=True)
    original_columns = df.columns
    df[f"prev_{primary_key}"] = df[primary_key].shift()
    df[f"prev_{event_type}"] = df[event_type].shift()
    df[f"prev_{timestamp}"] = df[timestamp].shift()

    def is_duplicate(row):
        if pd.isnull(row[f"prev_{primary_key}"]) or pd.isnull(row[f"prev_{event_type}"]) or pd.isnull(row[f"prev_{timestamp}"]):
            return False
        elif row[primary_key] == row[f"prev_{primary_key}"] and row[event_type] == row[f"prev_{event_type}"] and (
                row[timestamp] - row[f"prev_{timestamp}"]).total_seconds() <= max_lag:
            return True
        else:
            return False

    df["is_duplicate"] = df.progress_apply(is_duplicate, axis=1)
    return df.query("is_duplicate==False")[original_columns].reset_index(drop=True)

def process_raw_data(raw_data_df: pd.DataFrame,
                     dedup_min_time: int,
                     reduce_touches: bool = True) -> pd.DataFrame:
    """
    ### Parameters
    1. raw_data_df : Raw data 
    2. ignore_touches: Ignores the touches present in this list. 
    3. min_date: Ignores events before this date
    4. dedup_min_time: Time (in sec) between two events of same type. Events that repeat within this interval are combined as one (earlier timestamp is considered)
    5. reduce_touches : Whether to combine touchpoints based on their logical groupings

    ### Returns
    - DataFrame after doing following steps
    1. Groups tracks pages if reduce_touches flag is True
    2. Deduplicates based on 5 min interval 
    3. Ignores touches based on ignore_touches list
    4. Ðrops events before the min_date timestamp.
    """
    if reduce_touches:
        raw_data_df[EVENT_COL] = raw_data_df[EVENT_COL].apply(lambda touch: touchpoint_mapping.get(touch, touch))
    
    dedup_data_df = (dedup_by_ts_delta(raw_data_df
                                       .query(f"~{EVENT_COL}.isnull()",
                                              engine='python')
                                       .drop_duplicates(),
                                       ENTITY_COL,
                                       TIMESTAMP_COL, 
                                       EVENT_COL, 
                                       dedup_min_time)
                     .filter([ENTITY_COL, EVENT_COL, TIMESTAMP_COL])
                    )
    return dedup_data_df

In [None]:
%%time
touch_data_filtered = process_raw_data(raw_data,  MIN_EVENT_INTERVAL_IN_SEC, GROUP_TOUCHES)
del raw_data

In [None]:
# Combining the label data to events data
touch_data_filtered['is_converted'] = touch_data_filtered[ENTITY_COL].apply(lambda domain: 1 if domain in domain_conversion_dates else 0)

In [None]:
# Sample data:
touch_data_filtered.head()

In [None]:
try:
    assert len(touch_data_filtered[EVENT_COL].unique()) <= MAX_DISTINCT_TOUCHES
except AssertionError:
    logging.info("Max no:of touches higher than permissible")
    raise 

In [None]:
positive_touchpoints = touch_data_filtered.query("is_converted==1", engine='python')
negative_touchpoints = touch_data_filtered.query("is_converted==0", engine='python')

print("Summary stats on converted and non converted journeys:\n")
print(f"Total rows in converted journeys: {len(positive_touchpoints)}")
print(f"Distinct converted journeys: {len(positive_touchpoints['domain'].unique())}")
print(f"Total rows in non-converted journeys: {len(negative_touchpoints)}")
print(f"Distinct non-converted journeys: {len(negative_touchpoints['domain'].unique())}")


### Part III: Data distribution (Optional)

A few stats on the converted journeys before going into the actual attribution problem:

Note: This section is not required to calculate the attribution values. Instead, it is just to show the distribution of no:of events and no:of days before users convert. You can skip directly to [Part IV](#MTA-Calculations-begin)

In [None]:
conversion_summary = positive_touchpoints.groupby([ENTITY_COL]).agg({TIMESTAMP_COL: "min", EVENT_COL:["size", "nunique"]}).reset_index()
conversion_summary.columns = [ENTITY_COL, TIMESTAMP_COL, "n_events", "n_distinct_events"]
conversion_summary["days_to_convert"] = conversion_summary.apply(lambda row: (domain_conversion_dates.get(row[ENTITY_COL]) - row[TIMESTAMP_COL]).days, axis=1)

conversion_summary.head()

In [None]:
fig, axs = plt.subplots(1,2,figsize=(16,6))
sns.histplot(conversion_summary["n_events"], bins=50, ax=axs[0])
axs[0].set_title("No:of events before conversion")
axs[0].set_ylabel("Conversions")
axs[0].set_xlabel("Event count");

sns.histplot(conversion_summary["days_to_convert"], bins=70, ax=axs[1])
axs[1].set_title("Days to conversion")
axs[1].set_ylabel("Conversions")
axs[1].set_xlim([0,100])
axs[1].set_xlabel("Days since first seen");

In [None]:
# Checks percentile counts at various levels. Can modify this list to get a different percentile value (ex: For 90th percentile, add 90 to the list.)
percentile_points = [0,5, 25, 50, 75, 95, 99, 100]

In [None]:
print(f"Avg no:of touches before a user converts: {conversion_summary['n_events'].mean():.2f}; Median: {np.median(conversion_summary['n_events'])}")
print("\nPercentiles for No:of touches:")
(pd.DataFrame.from_dict(
    dict(zip(percentile_points, 
             np.percentile(conversion_summary['n_events'],
                           percentile_points).round())),
    orient='index', 
    columns=['n_events'])
 .reset_index().rename(columns={"index":"percentile"}))

In [None]:
print(f"Avg days to convert: {conversion_summary['days_to_convert'].mean():.2f}; Median: {np.median(conversion_summary['days_to_convert'])}")
print("\nPercentiles for No:of days to convert:")
(pd.DataFrame.from_dict(
    dict(zip(percentile_points, 
             np.percentile(conversion_summary['days_to_convert'],
                           percentile_points).round())),
    orient='index', 
    columns=['n_days_to_convert'])
 .reset_index().rename(columns={"index": "percentile"}))

<a id='MTA-Calculations-begin'></a>



### Part IV: Shapley values calculation


In [None]:
## Helper function to transform data to the required input form 

def collect_touchpoints(touchpoints_df: pd.DataFrame, 
                        primary_key: str=ENTITY_COL, 
                        ts_column: str=TIMESTAMP_COL,
                        touchpoint_column: str=EVENT_COL) -> pd.DataFrame:
    """
    Transform dataframe with each touch as a row to a new dataframe where each journey has a single row with all touches as a list in chronological order
    touchpoints_df: A dataframe with each row corresponding to a touch. 
    primary_key: Name of the column containing unique user identifier
    ts_column: Name of column containing timestamp using which journeys are sorted chronologically
    touchpoint_column: Name of column containing touch points. 
    
    Returns:
    A dataframe with two columns, first has primary key and second has the touchpoint_column
    """
    return (touchpoints_df
            .sort_values(by=[primary_key, ts_column], 
                         ascending=True)
            .reset_index(drop=True)
            .groupby(primary_key)[touchpoint_column]
            .apply(list)
            .reset_index())

In [None]:
touchpoints_list_pos = collect_touchpoints(positive_touchpoints)

touchpoints_list_pos.head()

In [None]:


# Computing normalizing factor for marginal contribution
shapley_weight = lambda p, s: (factorial(s)*factorial(p-s-1)/factorial(p))

# Getting shapley value of a given channel and values mapping for channel subsets.
def compute_shapley_values(v_values_map: dict, 
                            channel: str,
                            n_channels: int) -> float:
    # Initiating shap with marginal contribution from S = null subset
    shap = 1/n_channels * v_values_map.get(channel, 0) 
    for subset_str, subset_contrib in v_values_map.items():
        subset = subset_str.split(",")
        if channel not in subset:
            subset_union_channel = ','.join(sorted(subset + [channel]))
            marginal_contrib = (v_values_map.get(subset_union_channel,0) -
                                v_values_map.get(subset_str,0))
            shap += shapley_weight(n_channels, len(subset)) * marginal_contrib
    
    return shap

# Helper function to generate subsets for a given set of channels
def generate_subsets(touchpoints: List[str]) -> List[List[str]]:
    subset_list = []
    for subset_size in range(len(touchpoints)):
        for subset in itertools.combinations(touchpoints, subset_size + 1):
            subset_list.append(list(sorted(subset)))
    return subset_list

# Computes the utility function value v(s) for given subset S, using contribution values for all subsets.
def utility_function(touchpoint_set: List[str], 
                     contributions_mapping: dict) -> Union[int, float]:
    subset_list = generate_subsets(touchpoint_set)
    return sum([contributions_mapping.get(','.join(subset),0) for subset in subset_list])


# Master function combining all the above functions to compute shapley values for each touchpoint from a list of journeys. 
def get_shapley_values(journeys_list: List[List[str]], 
                       contribs_list: List[Union[int, float]])->Dict[str, float]:
    """

    Args:
        journeys_list (List[List[str]]): List of journeys.
         Each journey is a list of touchpoints..
        contribs_list (List[Union[int, float]]): List of contributions corresponding to each journey in journeys_list.
         Should have same length as journeys_list

    Returns:
        Dict[str, float]: A dictionary with key as channel/touchpoint, and Shapley value as its value 
    """
    flattened_journeys = [channel for journey in journeys_list for channel in set(journey)]
    unique_channels = sorted(list(set(flattened_journeys)))
    all_subsets = generate_subsets(unique_channels)
    contrib_map = {}
    for n, journey in enumerate(journeys_list):
        journey_ = ",".join(sorted(set(journey))) # Ensures deduplication and sorting of journeys
        contrib_map[journey_] = contrib_map.get(journey_,0) + contribs_list[n]
    v_values = {}
    for subset in all_subsets:
        v_values[",".join(subset)] = utility_function(subset, contrib_map)
        
    shapley_values = {}
    for channel in unique_channels:
        shapley_values[channel] = compute_shapley_values(v_values, 
                                                          channel, 
                                                          len(unique_channels))
    return shapley_values
    

In [None]:
touches_shapley_values = get_shapley_values(touchpoints_list_pos[EVENT_COL].values, [1] * len(touchpoints_list_pos))
touches_shapley_values = pd.DataFrame.from_dict(touches_shapley_values, orient="index").reset_index()
touches_shapley_values.columns = [EVENT_COL, "shap"]
touches_shapley_values = touches_shapley_values.sort_values(by= 'shap', ascending=False).reset_index(drop=True)
plt.figure(figsize=(16,6))
sns.barplot(x="shap", y=EVENT_COL, data=touches_shapley_values, color="salmon")
plt.title("Shapley Values");

In [None]:
touches_shapley_values

In [None]:
touches_shapley_values["shap_normalized"] = touches_shapley_values["shap"]/touches_shapley_values["shap"].sum()

### Part V: Markov Chain Values Calculation

Index of transition counts: 1st: source, last: destination. 2 to -1: same order as dict_touches_inv keys

In [None]:

def generate_transition_counts(journey_list: List[List[str]], 
                               distinct_touches_list: List[str], 
                               is_positive: bool):
    if is_positive:
        destination_idx = -1
    else:
        destination_idx = -2
    transition_counts = np.zeros(((len(distinct_touches_list)+3), (len(distinct_touches_list)+3)))
    for journey in journey_list:
        transition_counts[0, (distinct_touches_list.index(journey[0])+1)] += 1 # First point in the path
        for n, touch_point in enumerate(journey):
            if n == len(journey) - 1:
                # Reached last point
                transition_counts[(distinct_touches_list.index(touch_point)+1), destination_idx] += 1
                transition_counts[destination_idx, destination_idx]+=1
            else:
                transition_counts[(distinct_touches_list.index(touch_point)+1), (distinct_touches_list.index(journey[n+1]) + 1)] +=1
    transition_labels = distinct_touches_list.copy()
    transition_labels.insert(0, "Start")
    transition_labels.extend(["Dropoff", "Converted"])
    return transition_counts, transition_labels

row_normalize_np_array = lambda transition_counts: transition_counts / transition_counts.sum(axis=1)[:, np.newaxis]

def plot_transitions(transition_probabilities: np.array, labels: List[str], title="Transition Probabilities", show_annotations=True):
    ax = sns.heatmap(transition_probabilities,
                     linewidths=0.5,
                     robust=True, 
                     annot_kws={"size":8}, 
                     annot=show_annotations,
                     fmt=".2f",
                     cmap="YlGnBu",
                     xticklabels=labels,
                     yticklabels=labels)
    ax.tick_params(labelsize=10)
    ax.figure.set_size_inches((16, 10))
    ax.set_ylabel("Previous Step")
    ax.set_xlabel("Next Step")
    ax.set_title(title);


def get_transition_probabilities(converted_touchpoints_list: List[List[int]], 
                                 dropoff_touchpoints_list: List[List[int]], 
                                 distinct_touches_list: List[str], 
                                 visualize=False) -> Tuple[np.array, List[str]]:
    pos_transitions, _ = generate_transition_counts(converted_touchpoints_list, distinct_touches_list, is_positive=True)
    neg_transitions, labels = generate_transition_counts(dropoff_touchpoints_list, distinct_touches_list, is_positive=False)
    all_transitions = pos_transitions + neg_transitions
    transition_probabilities = row_normalize_np_array(all_transitions)
    if visualize:
        plot_transitions(transition_probabilities, labels, show_annotations=True)
    return transition_probabilities, labels

def converge(transition_matrix, max_iters=200, verbose=True):
    T_upd = transition_matrix
    prev_T = transition_matrix
    for i in range(max_iters):
        T_upd = np.matmul(transition_matrix, prev_T)
        if np.abs(T_upd - prev_T).max()<1e-5:
            if verbose:
                print(f"{i} iters taken for convergence")
            return T_upd
        prev_T = T_upd
    if verbose:
        print(f"Max iters of {max_iters} reached before convergence. Exiting")
    return T_upd


def get_removal_affects(transition_probs, labels, ignore_labels=["Start", "Dropoff","Converted"], default_conversion=1.):
    removal_affect = {}
    for n, label in enumerate(labels):
        if label in ignore_labels:
            continue
        else:
            drop_transition = transition_probs.copy()
            drop_transition[n,:] = 0.
            drop_transition[n,-2] = 1.
            drop_transition_converged = converge(drop_transition, 500, False)
            removal_affect[label] = default_conversion - drop_transition_converged[0,-1]
    return removal_affect

def get_markov_attribution(tp_list_positive: List[List[int]],
                           tp_list_negative: List[List[int]], 
                           distinct_touches_list: List[str], 
                           visualize=False) -> Tuple[Dict[str, float], np.array]:
    transition_probabilities, labels = get_transition_probabilities(tp_list_positive, tp_list_negative, distinct_touches_list, visualize=visualize)
    transition_probabilities_converged = converge(transition_probabilities, max_iters=500, verbose=False)
    removal_affects = get_removal_affects(transition_probabilities, labels, default_conversion=transition_probabilities_converged[0,-1])
    total_conversions = len(tp_list_positive)
    attributable_conversions = {}
    total_weight = sum(removal_affects.values())
    for tp, weight in removal_affects.items():
        attributable_conversions[tp] = weight/total_weight * total_conversions
    return attributable_conversions, transition_probabilities


In [None]:
touchpoints_list_neg = collect_touchpoints(negative_touchpoints)
all_touches = list(positive_touchpoints[EVENT_COL].unique())

In [None]:
markov_attribution_values, transition_probabilities = get_markov_attribution(touchpoints_list_pos[EVENT_COL].values, 
                                                                             touchpoints_list_neg[EVENT_COL].values, 
                                                                             all_touches, 
                                                                             visualize=True)
plt.savefig(f"{local_output_location}/transition_probabilities_heatmap.png");

In the above graphic, we can see the transition probabilities from each touch (Y-axis) to the next touch (X-axis). 

In [None]:
pos_transitions, labels = generate_transition_counts(touchpoints_list_pos[EVENT_COL].values, all_touches, is_positive=True)
neg_transitions, labels = generate_transition_counts(touchpoints_list_neg[EVENT_COL].values, all_touches, is_positive=False)
all_transitions = pos_transitions + neg_transitions

In [None]:

fig, axs=plt.subplots(1,2, figsize=(18, 5))
sns.set_style("white")
sns.barplot(x=labels[:-2], y=100*all_transitions[:-2, -2]/all_transitions[:-2, -2].sum(), ci=None, color="salmon", ax=axs[0])
    
axs[0].set_xticklabels(labels[:-2],rotation=60)
axs[0].set_ylabel("Percent")
axs[0].set_title("Distribution of last touch before dropoff");

sns.barplot(x=labels[:-2], y=100*all_transitions[:-2, -1]/all_transitions[:-2, -1].sum(), ci=None, color="salmon", ax=axs[1])
axs[1].set_xticklabels(labels[:-2],rotation=60)
axs[1].set_ylabel("Percent")
axs[1].set_title("Distribution of last touch before convert");

### Part VI: Results summary

In [None]:
last_touch_based_mta = touchpoints_list_pos[EVENT_COL].apply(lambda event_list: event_list[-1]).value_counts().reset_index()
last_touch_based_mta.columns = [EVENT_COL, 'last_touches']
last_touch_based_mta['last_touches'] = last_touch_based_mta['last_touches']/last_touch_based_mta['last_touches'].sum()

markov_attr_values_df = pd.DataFrame.from_dict(markov_attribution_values, orient="index").reset_index()
markov_attr_values_df.columns = [EVENT_COL, "markov"]
markov_attr_values_df['markov'] = markov_attr_values_df['markov']/markov_attr_values_df['markov'].sum()
mta_values = markov_attr_values_df.merge(touches_shapley_values.filter([EVENT_COL, "shap_normalized"]).rename(columns={"shap_normalized":"shap"}),
                                         on=EVENT_COL, how="outer").merge(last_touch_based_mta, on=EVENT_COL,how='outer').fillna(0)


In [None]:
mta_values

In [None]:
mta_long = pd.melt(mta_values, EVENT_COL, ["markov",  "shap", "last_touches"])
plt.figure(figsize=(16,6))
sns.barplot(data=mta_long, x=EVENT_COL,y='value',hue='variable');
plt.title("Normalized values of various attribution methods")
plt.xticks(rotation=90)
plt.savefig(f"{local_output_location}/mta_values.png");

In [None]:
print("""Correlation between various values. Closer to 1 indicate they are giving same information. \
Closer to 0 indicate they are highly independent and negative values indicate they are giving opposing information\n\n""")
mta_values.corr()

In [None]:

mta_values.to_parquet(f"{local_output_location}/mta_values.parquet")

### Part VII: Robustness testing

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
markov_vals = {}
shapley_vals = {}

touchpoints_list_pos_rand1, touchpoints_list_pos_rand2 = train_test_split(touchpoints_list_pos, train_size=0.5)
touchpoints_list_neg_rand1, touchpoints_list_neg_rand2 = train_test_split(touchpoints_list_neg, train_size=0.5)

touches_shapley_values_rand1 = get_shapley_values(touchpoints_list_pos_rand1[EVENT_COL].values, [1] * len(touchpoints_list_pos_rand1))
# get_shapley_values(touchpoints_list_pos_rand1["event_id"].values, dict_touches, visualize=False)
markov_attribution_values_rand1, _ = get_markov_attribution(touchpoints_list_pos_rand1[EVENT_COL].values, touchpoints_list_neg_rand1[EVENT_COL].values, all_touches, visualize=False)

touches_shapley_values_rand2 = get_shapley_values(touchpoints_list_pos_rand2[EVENT_COL].values, [1] * len(touchpoints_list_pos_rand2))

markov_attribution_values_rand2, _ = get_markov_attribution(touchpoints_list_pos_rand2[EVENT_COL].values, touchpoints_list_neg_rand2[EVENT_COL].values, all_touches, visualize=False)


for key, val in touches_shapley_values_rand1.items():
    shapley_vals[key] = [val, touches_shapley_values_rand2.get(key,0)]

for key, val in markov_attribution_values_rand1.items():
    markov_vals[key] = [val, markov_attribution_values_rand2.get(key,0)]

    
shapley_vals_df = pd.DataFrame.from_dict(shapley_vals, orient='index')
markov_vals_df = pd.DataFrame.from_dict(markov_vals, orient='index')
logging.info("Robustness testing:")
logging.info(f"Correlation of shapley values between two non-overlapping splits: {shapley_vals_df.corr()[0][1]:.3f}")
logging.info(f"Correlation of markov values between two non-overlapping splits: {markov_vals_df.corr()[0][1]:.3f}")

In [None]:
shapley_vals_df = pd.melt(shapley_vals_df.reset_index(), id_vars='index')
shapley_vals_df.columns = [EVENT_COL, 'iter', 'shap']

markov_vals_df = pd.melt(markov_vals_df.reset_index(), id_vars='index')
markov_vals_df.columns = [EVENT_COL, 'iter', 'markov']

tp_order = shapley_vals_df.groupby(EVENT_COL)['shap'].mean().reset_index().sort_values("shap", ascending=False)[EVENT_COL].values

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(16,6))
sns.barplot(x=EVENT_COL, y='shap', hue='iter', data=shapley_vals_df, ax=axs[0], order=tp_order)
for item in axs[0].get_xticklabels():
    item.set_rotation(90)
axs[0].set_title("Shapley vals for touchpoints in non-overlapping splits");

sns.barplot(x=EVENT_COL, y='markov', hue='iter', data=markov_vals_df, ax=axs[1], order=tp_order)
for item in axs[1].get_xticklabels():
    item.set_rotation(90)
axs[1].set_title("Markov vals for touchpoints in non-overlapping splits");

In [None]:
markov_vals = {}
shapley_vals = {}
for iters in range(10):
    touchpoints_list_pos_rand, _ = train_test_split(touchpoints_list_pos, train_size=0.7)
    touchpoints_list_neg_rand, _ = train_test_split(touchpoints_list_neg, train_size=0.7)
    touches_shapley_values_rand = get_shapley_values(touchpoints_list_pos_rand[EVENT_COL].values, [1] * len(touchpoints_list_pos_rand))
    markov_attribution_values_rand, _ = (get_markov_attribution(touchpoints_list_pos_rand[EVENT_COL].values, 
                                                                touchpoints_list_neg_rand[EVENT_COL].values,
                                                                all_touches, 
                                                                visualize=False))
    for touch, shap in touches_shapley_values_rand.items():
        curr = shapley_vals.get(touch, [])
        curr.append(shap)
        shapley_vals[touch] = curr
    for touch, mark in markov_attribution_values_rand.items():
        curr = markov_vals.get(touch, [])
        curr.append(mark)
        markov_vals[touch] = curr

shapley_ranks = pd.DataFrame.from_dict(shapley_vals, orient='index').fillna(0).rank(axis=0, ascending=False).astype(int)
markov_ranks = pd.DataFrame.from_dict(markov_vals, orient='index').fillna(0).rank(axis=0, ascending=False).astype(int)

In [None]:
ax = sns.heatmap(shapley_ranks.sort_values(by=0),
                 linewidths=0.5,
                 robust=True, 
                 annot_kws={"size":10}, 
                 annot=True,
                 fmt="d",
                 cmap="YlGnBu",
                 cbar=False)
ax.tick_params(labelsize=14)
ax.figure.set_size_inches((10, 6))
#ax.set_ylabel("Year")
ax.set_xlabel("Iteration")
ax.set_title("Shapley value rank in different iterations");

In [None]:
ax = sns.heatmap(markov_ranks.sort_values(by=0),
                 linewidths=0.5,
                 robust=True, 
                 annot_kws={"size":10}, 
                 annot=True,
                 fmt="d",
                 cmap="YlGnBu",
                 cbar=False)
ax.tick_params(labelsize=14)
ax.figure.set_size_inches((10, 6))
#ax.set_ylabel("Year")
ax.set_xlabel("Iteration")
ax.set_title("Markov Chain based value rank in different iterations");

Shapley values and markov values rank order remain quite stable with only minor differences, as can be seen above

Markov is more robust compared to Shapley

### Part VIII: Streaming records to Rudder Event Stream

In [None]:
import rudder_analytics
import logging


def np_decoder(element):
    if isinstance(element, np.generic):
        return element.item()
    else:
        return element
    

def stream(user_id, event_type, write_key, data_plane_url, df):
    rudder_analytics.write_key = write_key
    rudder_analytics.data_plane_url = data_plane_url
    record_count = 0
    for i in df.index:
        event = {}
        for j in df.columns:
            event[j] = np_decoder(df.iloc[i][j])

        rudder_analytics.track(user_id, event_type, event)
        record_count = record_count + 1
    rudder_analytics.flush()    
    logging.info(f"Total {record_count} records inserted into rudder_analytics as {event_type} event")


In [None]:
stream(USER_ID, EVENT_TYPE, config['rudder']['WRITE_KEY'], config['rudder']['DATA_PLANE_URL'], mta_values)

### Part IX: Copying all results to s3

In [None]:
s3_client = session.client('s3')

for root, dirs, files in os.walk(f'data/mta_output_files/{time_suffix}'):
    for file in files:
        s3_client.upload_file(os.path.join(root, file), S3_BUCKET, f"{S3_OUTPUT_FOLDER_LOCATION}/{time_suffix}/{file}")

In [None]:
logging.info(f"Done generating mta values and streaming them to RS event-stream. All the output files are saved to s3 location  {S3_BUCKET}/{S3_OUTPUT_FOLDER_LOCATION}/{time_suffix}/{file}")