### Notes:

In this notebook, we calculate multi-touch attribution values for a given set of channels using following methods:
1. Shapley values - considering only positive conversions
2. Shapley values - considering both positive and negative conversions
3. Markov chain values 
4. Last touch based 

* Shapley values code is implemented based on the logic presented in this [paper](https://arxiv.org/pdf/1804.05327.pdf)
* Markov chain values are based on the following [whitepaper](https://www.channelattribution.net/pdf/Whitepaper.pdf)

Setup:
1. This code runs in AWS Sagemaker.
2. Input data is to be prepared using Rudderstack Reverse ETL (more details in accompanying blogpost)
3. Edit constants in the cell below. Only the first cell requires editing. No other edit is necessary. 
4. The output results get posted as a parquet file in the given output. The same can also be viewed in the notebook itself, as we go towards the end. 
5. Throughout the notebook, along with the code, we can also see several descriptions around the data and the results.
6. Towards the end, we also have a robustness check to ensure how robust the results are if input data changes.

In [None]:
# Loading the required libraries
import json
import boto3
import os
import sys
import time
import gzip
from typing import List, Optional, Union, Dict, Tuple
import datetime

from math import factorial
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import logging
import yaml
import itertools

from dataclasses import dataclass
from pprint import pprint
from functools import reduce
from pathlib import Path

pd.set_option('display.max_columns', None)

from tqdm import tqdm
tqdm.pandas()

### Part I: CONFIG:

Define constants below

In [None]:
# Parameters cell for papermill. These values can get overridden by parameters passed by papermill
run_id = str(int(time.time()))
folder_utils_path = None # "/opt/ml/processing/input/code/")
local_output_path = "data"

In [None]:
if run_id:
    run_id = str(run_id)

In [None]:
if folder_utils_path:
    sys.path.append(folder_utils_path)
    
from utils import create_logger
from load_data import pipe as get_raw_data
from load_data import SnowflakeDataIO

In [None]:
IMAGE_FORMAT = 'png'
try:
    logging = create_logger(os.path.join("logs","multi_touch_attribution.log"))
except Exception as e:
    print(str(e))
    pass

logging.info("\n\n\t\tSTARTING FEATURE PREPROCESSING")

In [None]:
if folder_utils_path is None:
    logging.info("Running locally")
    mode = "local"
    config_path = "config/analysis_config.yaml"
else:
    logging.info("Running inside a container")
    mode = "container"
    config_path = "/opt/ml/processing/code/config/analysis_config.yaml"
    
with open(config_path, "r") as f:
    config = yaml.safe_load(f)

print("Config used:")
pprint(config)

In [None]:
print(f"Mode used: {mode}")
with open(config["mode"][mode]["wh_credentials_path"], "r") as f:
    creds = yaml.safe_load(f)

**No other manual change is expected. Rest of the code should run without any edit.**

You can run the whole notebook by going to `Run -> Run all cells`. This creates the parquet file output in prescribed s3 location, and also prints the data here for instant consumption

### Part II: Data Preprocessing:

In [None]:
database = creds["snowflake"].get("database") #analytics_db
schema = creds["snowflake"].get("schema") # dbt_usecases
table = creds["snowflake"].get("query_data_table")# "lead_scoring_mock_features_materalized"
results_table = creds["snowflake"].get("results_table_name")

#Read configurations for data preparation
min_date = config["data"]["min_date"]
ignore_events = config["data"]["ignore_events"]
group_events = config["data"]["group_events"]
group_events_mapping = config["data"]["group_events_mapping"]

primary_key_column = config["data"]["primary_key_column"]
min_date = config["data"]["min_date"]
events_column_name = config["data"]["events_column_name"]
events_type_category_column_name = config["data"]["events_type_category_column_name"]
call_conversion_time_column_name = config["data"]["call_conversion_time_column_name"]
filter_columns = config["data"]["filter_columns"]
timestamp_column_name = config["data"]["timestamp_column_name"]
min_event_interval_in_sec = config["analysis"]["min_event_interval_in_sec"]

events_type_mapping = reduce(lambda x, y: {**x,**y}, [{val:key for val in list_vals} for key, list_vals in group_events_mapping.items()])

In [None]:
# All the output files get stored in the output_directory. Each run of the feature_processing generates a new sub directory based on the timestamp.
# output directory structure
# - data
#   - <run_id>
#      - visuals

output_directory = os.path.join(local_output_path, run_id)
visuals_dir = os.path.join( output_directory, "visuals" )

logging.info(f"All the output files will be saved to following location: {output_directory}")
for output_path in [output_directory, visuals_dir]:
    Path(output_path).mkdir(parents=True, exist_ok=True)

In [None]:
logging.info(f"WH multi touch data config: database: {database}, schema: {schema}, table: {table}")

table_name =  f"{database}.{schema}.{table}"

In [None]:
print(f"Getting data from wh with params:\n\tdata: all\n\tignoring touches: {ignore_events}")
logging.info(f"Getting data from wh with params:\n\tdata: all\n\tignoring touches: {ignore_events}")
input_data = get_raw_data(
    table_name = table_name, 
    snowflake_creds = creds["snowflake"],
    min_date = min_date,
    timestamp_column=timestamp_column_name,
    debug=True
)

In [None]:
input_data.head()

In [None]:
domain_conversion_dates = input_data.query(f"~{call_conversion_time_column_name}.isnull()").set_index(primary_key_column).to_dict()[call_conversion_time_column_name]
raw_data = input_data.drop(columns=['row_id', call_conversion_time_column_name]).drop_duplicates()

In [None]:
# Transformations on the raw data. We apply the constraints defined in the constants cell above.

def dedup_by_ts_delta(df: pd.DataFrame, primary_key: str, timestamp: str, event_type: str, max_lag: int) -> pd.DataFrame:
    """
    ### Parameters
    1. df : pd.DataFrame
        - User touches dataframe. 
    2. primary_key : str
        - column name of the column that contains user_id. 
    3. timestamp: str
        - column name of the column that contains event timestamp
    4. event_type: str
        - column name of the column that contains event/touch data
    5. max_lag: int
        - max time (in sec) between consecutive events to be considered as duplicates. 

    ### Returns
    - DataFrame after doing following steps
    Based on primary key and event_type, it checks if two consecutive events occur within the max_lag time window. If so, they are considered same event and the latter event is dropped. 
    """
    if max_lag <= 0:
        return df
    df = df.sort_values(by=[primary_key, timestamp], ascending=True).reset_index(drop=True)
    original_columns = df.columns
    df["prev_user"] = df[primary_key].shift()
    df["prev_event"] = df[event_type].shift()
    df["prev_ts"] = df[timestamp].shift()

    def is_duplicate(row):
        if pd.isnull(row["prev_user"]) or pd.isnull(row["prev_event"]) or pd.isnull(row["prev_ts"]):
            return False
        elif row[primary_key] == row["prev_user"] and row[event_type] == row["prev_event"] and (
                row[timestamp] - row["prev_ts"]).total_seconds() <= max_lag:
            return True
        else:
            return False

    df["is_duplicate"] = df.progress_apply(is_duplicate, axis=1)
    return df.query("is_duplicate==False")[original_columns].reset_index(drop=True)

def process_raw_data(raw_data_df: pd.DataFrame,
                     dedup_min_time: int,
                     reduce_touches: bool = True) -> pd.DataFrame:
    """
    ### Parameters
    1. raw_data_df : Raw data 
    2. ignore_touches: Ignores the touches present in this list. 
    3. min_date: Ignores events before this date
    4. dedup_min_time: Time (in sec) between two events of same type. Events that repeat within this interval are combined as one (earlier timestamp is considered)
    5. reduce_touches : Whether to combine touchpoints based on their logical groupings

    ### Returns
    - DataFrame after doing following steps
    1. Groups tracks pages if reduce_touches flag is True
    2. Deduplicates based on 5 min interval 
    3. Ignores touches based on ignore_touches list
    4. Ðrops events before the min_date timestamp.
    """
    if reduce_touches:
        raw_data_df[events_column_name] = raw_data_df[events_column_name].apply(lambda touch: events_type_mapping.get(touch, touch))
    
    dedup_data_df = (dedup_by_ts_delta(raw_data_df
                                       .query(f"~{events_column_name}.isnull()",engine='python')
                                       .drop_duplicates(),
                                       primary_key_column,
                                       timestamp_column_name, 
                                       events_column_name, 
                                       dedup_min_time)
                     .filter(filter_columns)
                    )
    return dedup_data_df.query(f'{events_column_name} not in @ignore_events', engine='python')

In [None]:
%%time
touch_data_filtered = process_raw_data(raw_data, min_event_interval_in_sec, group_events)
del raw_data

In [None]:
# Combining the label data to events data
touch_data_filtered['is_converted'] = touch_data_filtered[primary_key_column].apply(lambda domain: 1 if domain in domain_conversion_dates else 0)

In [None]:
# Sample data:
touch_data_filtered.head()

In [None]:
positive_touchpoints = touch_data_filtered.query("is_converted==1", engine='python').rename(columns={events_column_name: "event_type"})
negative_touchpoints = touch_data_filtered.query("is_converted==0", engine='python').rename(columns={events_column_name: "event_type"})

print("Summary stats on converted and non converted journeys:\n")
print(f"Total rows in converted journeys: {len(positive_touchpoints)}")
print(f"Distinct converted journeys: {len(positive_touchpoints[primary_key_column].unique())}")
print(f"Total rows in non-converted journeys: {len(negative_touchpoints)}")
print(f"Distinct non-converted journeys: {len(negative_touchpoints[primary_key_column].unique())}")


### Part III: Data distribution (Optional)

A few stats on the converted journeys before going into the actual attribution problem:

Note: This section is not required to calculate the attribution values. Instead, it is just to show the distribution of no:of events and no:of days before users convert. You can skip directly to [Part IV](#MTA-Calculations-begin)

In [None]:
conversion_summary = positive_touchpoints.query("event_type!='webapp'").groupby([primary_key_column]).agg({timestamp_column_name: "min", "event_type":["size", "nunique"]}).reset_index()
conversion_summary.columns = [primary_key_column, timestamp_column_name, "n_events", "n_distinct_events"]
conversion_summary["days_to_convert"] = conversion_summary.apply(lambda row: (domain_conversion_dates.get(row[primary_key_column]) - row[timestamp_column_name]).days, axis=1)

conversion_summary.head()

In [None]:
fig, axs = plt.subplots(1,2,figsize=(16,6))
sns.histplot(conversion_summary["n_events"], bins=50, ax=axs[0])
axs[0].set_title("No:of events before conversion")
axs[0].set_ylabel("Conversions")
axs[0].set_xlabel("Event count");

sns.histplot(conversion_summary["days_to_convert"], bins=70, ax=axs[1])
axs[1].set_title("Days to conversion")
axs[1].set_ylabel("Conversions")
axs[1].set_xlim([0,100])
axs[1].set_xlabel("Days since first seen");

plt.savefig(os.path.join(visuals_dir, f"data_distribution.{IMAGE_FORMAT}"))

In [None]:
# Checks percentile counts at various levels. Can modify this list to get a different percentile value (ex: For 90th percentile, add 90 to the list.)
percentile_points = [0,5, 25, 50, 75, 95, 99, 100]

In [None]:
print(f"Avg no:of touches before a user converts: {conversion_summary['n_events'].mean():.2f}; Median: {np.median(conversion_summary['n_events'])}")
print("\nPercentiles for No:of touches:")
(pd.DataFrame.from_dict(
    dict(zip(percentile_points, 
             np.percentile(conversion_summary['n_events'],
                           percentile_points).round())),
    orient='index', 
    columns=['n_events'])
 .reset_index().rename(columns={"index":"percentile"}))

In [None]:
print(f"Avg days to convert: {conversion_summary['days_to_convert'].mean():.2f}; Median: {np.median(conversion_summary['days_to_convert'])}")
print("\nPercentiles for No:of days to convert:")
(pd.DataFrame.from_dict(
    dict(zip(percentile_points, 
             np.percentile(conversion_summary['days_to_convert'],
                           percentile_points).round())),
    orient='index', 
    columns=['n_days_to_convert'])
 .reset_index().rename(columns={"index": "percentile"}))

<a id='MTA-Calculations-begin'></a>

### Part IV: Shapley values calculation


In [None]:
## Helper function to transform data to the required input form 

def collect_touchpoints(touchpoints_df: pd.DataFrame, 
                        primary_key: str=primary_key_column, 
                        ts_column: str=timestamp_column_name,
                        touchpoint_column: str="event_type") -> pd.DataFrame:
    """
    Transform dataframe with each touch as a row to a new dataframe where each journey has a single row with all touches as a list in chronological order
    touchpoints_df: A dataframe with each row corresponding to a touch. 
    primary_key: Name of the column containing unique user identifier
    ts_column: Name of column containing timestamp using which journeys are sorted chronologically
    touchpoint_column: Name of column containing touch points. 
    
    Returns:
    A dataframe with two columns, first has primary key and second has the touchpoint_column
    """
    return (touchpoints_df
            .sort_values(by=[primary_key, ts_column], 
                         ascending=True)
            .reset_index(drop=True)
            .groupby(primary_key)[touchpoint_column]
            .apply(list)
            .reset_index())

In [None]:
touchpoints_list_pos = collect_touchpoints(positive_touchpoints)

touchpoints_list_pos.head()

In [None]:


# Computing normalizing factor for marginal contribution
shapley_weight = lambda p, s: (factorial(s)*factorial(p-s-1)/factorial(p))

# Getting shapley value of a given channel and values mapping for channel subsets.
def compute_shapley_values(v_values_map: dict, 
                            channel: str,
                            n_channels: int) -> float:
    # Initiating shap with marginal contribution from S = null subset
    shap = 1/n_channels * v_values_map.get(channel, 0) 
    for subset_str, subset_contrib in v_values_map.items():
        subset = subset_str.split(",")
        if channel not in subset:
            subset_union_channel = ','.join(sorted(subset + [channel]))
            marginal_contrib = (v_values_map.get(subset_union_channel,0) -
                                v_values_map.get(subset_str,0))
            shap += shapley_weight(n_channels, len(subset)) * marginal_contrib
    
    return shap

# Helper function to generate subsets for a given set of channels
def generate_subsets(touchpoints: List[str]) -> List[List[str]]:
    subset_list = []
    for subset_size in range(len(touchpoints)):
        for subset in itertools.combinations(touchpoints, subset_size + 1):
            subset_list.append(list(sorted(subset)))
    return subset_list

# Computes the utility function value v(s) for given subset S, using contribution values for all subsets.
def utility_function(touchpoint_set: List[str], 
                     contributions_mapping: dict) -> Union[int, float]:
    subset_list = generate_subsets(touchpoint_set)
    return sum([contributions_mapping.get(','.join(subset),0) for subset in subset_list])


# Master function combining all the above functions to compute shapley values for each touchpoint from a list of journeys. 
def get_shapley_values(journeys_list: List[List[str]], 
                       contribs_list: List[Union[int, float]])->Dict[str, float]:
    """

    Args:
        journeys_list (List[List[str]]): List of journeys.
         Each journey is a list of touchpoints..
        contribs_list (List[Union[int, float]]): List of contributions corresponding to each journey in journeys_list.
         Should have same length as journeys_list

    Returns:
        Dict[str, float]: A dictionary with key as channel/touchpoint, and Shapley value as its value 
    """
    flattened_journeys = [channel for journey in journeys_list for channel in set(journey)]
    unique_channels = sorted(list(set(flattened_journeys)))
    all_subsets = generate_subsets(unique_channels)
    contrib_map = {}
    for n, journey in enumerate(journeys_list):
        journey_ = ",".join(sorted(set(journey))) # Ensures deduplication and sorting of journeys
        contrib_map[journey_] = contrib_map.get(journey_,0) + contribs_list[n]
    v_values = {}
    for subset in all_subsets:
        v_values[",".join(subset)] = utility_function(subset, contrib_map)
        
    shapley_values = {}
    for channel in unique_channels:
        shapley_values[channel] = compute_shapley_values(v_values, 
                                                          channel, 
                                                          len(unique_channels))
    return shapley_values
    

In [None]:
touches_shapley_values = get_shapley_values(touchpoints_list_pos['event_type'].values, [1] * len(touchpoints_list_pos))
touches_shapley_values = pd.DataFrame.from_dict(touches_shapley_values, orient="index").reset_index()
touches_shapley_values.columns = ["touch", "shap"]
touches_shapley_values = touches_shapley_values.sort_values(by= 'shap', ascending=False).reset_index(drop=True)
plt.figure(figsize=(16,6))
sns.barplot(x="shap", y="touch", data=touches_shapley_values, color="salmon")
plt.title("Shapley Values with converted only journeys");

plt.savefig(os.path.join(visuals_dir, f"shapley_values_for_converted.{IMAGE_FORMAT}"))

In [None]:
touches_shapley_values

**Shapley values including negative paths**

In [None]:

def aggregate_by_journeys(touchpoints_df, identifier_col, touches_col):
    """
    Returns a dataframe where each row contains a sorted list of touchpoints and count of how many times this journey is observed.
    Within a journey, order of touches is ignored and a touch is counted only once.
    """
    return (touchpoints_df
            .groupby(identifier_col)[touches_col]
            .apply(list)
            .apply(lambda x: ",".join(sorted(list(set(x)))))
            .reset_index()
            .groupby(touches_col)
            .size()
            .reset_index())

In [None]:
pos_data_counts = aggregate_by_journeys(positive_touchpoints, primary_key_column, "event_type")
neg_data_counts = aggregate_by_journeys(negative_touchpoints, primary_key_column, "event_type")
pos_data_counts.columns = ['events', 'conversions']
neg_data_counts.columns = ['events', 'non_conversions']

all_data_counts = pos_data_counts.merge(neg_data_counts, on="events", how="outer").fillna(0)
all_data_counts["conversion_rate"] = all_data_counts.apply(lambda row: row["conversions"]/(row["conversions"] + row["non_conversions"]), axis=1)
all_data_counts['events'] = all_data_counts['events'].apply(lambda x: x.split(","))
del pos_data_counts, neg_data_counts



In [None]:
all_data_counts.head()

In [None]:
touches_shapley_values_all_paths = get_shapley_values(all_data_counts['events'].values, all_data_counts['conversion_rate'])

touches_shapley_values_all_paths = pd.DataFrame.from_dict(touches_shapley_values_all_paths, orient="index").reset_index()
touches_shapley_values_all_paths.columns = ["touch", "shap"]
touches_shapley_values_all_paths = touches_shapley_values_all_paths.sort_values(by= 'shap', ascending=False).reset_index(drop=True)
plt.figure(figsize=(16,6))
sns.barplot(x="shap", y="touch", data=touches_shapley_values_all_paths, color="salmon")
plt.title("Shapley Values including non converted users");

plt.savefig(os.path.join(visuals_dir, f"shapley_values_non_converted_included.{IMAGE_FORMAT}"))

In [None]:
# Merging both forms of shapley values
touches_shapley_values_all_paths['shap'] = touches_shapley_values_all_paths['shap']/touches_shapley_values_all_paths['shap'].sum()
touches_shapley_values['shap_normalized'] = touches_shapley_values['shap']/touches_shapley_values['shap'].sum()

In [None]:
mta_vals = (touches_shapley_values_all_paths
            .rename(columns={"shap":"shap_all_paths"})
            .merge(touches_shapley_values
                   .filter(["touch", "shap_normalized"])
                   .rename(columns={'shap_normalized':'shap_conversions'}),
                   on='touch'))
mta_vals

### Part V: Markov Chain Values Calculation

Index of transition counts: 1st: source, last: destination. 2 to -1: same order as dict_touches_inv keys

In [None]:
touchpoints_list_neg = collect_touchpoints(negative_touchpoints)

all_touches = list(positive_touchpoints["event_type"].unique())


In [None]:

def generate_transition_counts(journey_list: List[List[str]], 
                               distinct_touches_list: List[str], 
                               is_positive: bool):
    if is_positive:
        destination_idx = -1
    else:
        destination_idx = -2
    transition_counts = np.zeros(((len(distinct_touches_list)+3), (len(distinct_touches_list)+3)))
    for journey in journey_list:
        transition_counts[0, (distinct_touches_list.index(journey[0])+1)] += 1 # First point in the path
        for n, touch_point in enumerate(journey):
            if n == len(journey) - 1:
                # Reached last point
                transition_counts[(distinct_touches_list.index(touch_point)+1), destination_idx] += 1
                transition_counts[destination_idx, destination_idx]+=1
            else:
                transition_counts[(distinct_touches_list.index(touch_point)+1), (distinct_touches_list.index(journey[n+1]) + 1)] +=1
    transition_labels = distinct_touches_list.copy()
    transition_labels.insert(0, "Start")
    transition_labels.extend(["Dropoff", "Converted"])
    return transition_counts, transition_labels

row_normalize_np_array = lambda transition_counts: transition_counts / transition_counts.sum(axis=1)[:, np.newaxis]

def plot_transitions(transition_probabilities: np.array, labels: List[str], title="Transition Probabilities", show_annotations=True):
    ax = sns.heatmap(transition_probabilities,
                     linewidths=0.5,
                     robust=True, 
                     annot_kws={"size":8}, 
                     annot=show_annotations,
                     fmt=".2f",
                     cmap="YlGnBu",
                     xticklabels=labels,
                     yticklabels=labels)
    ax.tick_params(labelsize=10)
    ax.figure.set_size_inches((16, 10))
    ax.set_ylabel("Previous Step")
    ax.set_xlabel("Next Step")
    ax.set_title(title);


def get_transition_probabilities(converted_touchpoints_list: List[List[int]], 
                                 dropoff_touchpoints_list: List[List[int]], 
                                 distinct_touches_list: List[str], 
                                 visualize=False) -> Tuple[np.array, List[str]]:
    pos_transitions, _ = generate_transition_counts(converted_touchpoints_list, distinct_touches_list, is_positive=True)
    neg_transitions, labels = generate_transition_counts(dropoff_touchpoints_list, distinct_touches_list, is_positive=False)
    all_transitions = pos_transitions + neg_transitions
    transition_probabilities = row_normalize_np_array(all_transitions)
    if visualize:
        plot_transitions(transition_probabilities, labels, show_annotations=True)
    return transition_probabilities, labels

def converge(transition_matrix, max_iters=200, verbose=True):
    T_upd = transition_matrix
    prev_T = transition_matrix
    for i in range(max_iters):
        T_upd = np.matmul(transition_matrix, prev_T)
        if np.abs(T_upd - prev_T).max()<1e-5:
            if verbose:
                print(f"{i} iters taken for convergence")
            return T_upd
        prev_T = T_upd
    if verbose:
        print(f"Max iters of {max_iters} reached before convergence. Exiting")
    return T_upd


def get_removal_affects(transition_probs, labels, ignore_labels=["Start", "Dropoff","Converted"], default_conversion=1.):
    removal_affect = {}
    for n, label in enumerate(labels):
        if label in ignore_labels:
            continue
        else:
            drop_transition = transition_probs.copy()
            drop_transition[n,:] = 0.
            drop_transition[n,-2] = 1.
            drop_transition_converged = converge(drop_transition, 500, False)
            removal_affect[label] = default_conversion - drop_transition_converged[0,-1]
    return removal_affect

def get_markov_attribution(tp_list_positive: List[List[int]],
                           tp_list_negative: List[List[int]], 
                           distinct_touches_list: List[str], 
                           visualize=False) -> Tuple[Dict[str, float], np.array]:
    transition_probabilities, labels = get_transition_probabilities(tp_list_positive, tp_list_negative, distinct_touches_list, visualize=visualize)
    transition_probabilities_converged = converge(transition_probabilities, max_iters=500, verbose=False)
    removal_affects = get_removal_affects(transition_probabilities, labels, default_conversion=transition_probabilities_converged[0,-1])
    total_conversions = len(tp_list_positive)
    attributable_conversions = {}
    total_weight = sum(removal_affects.values())
    for tp, weight in removal_affects.items():
        attributable_conversions[tp] = weight/total_weight * total_conversions
    return attributable_conversions, transition_probabilities


In [None]:
markov_attribution_values, transition_probabilities = get_markov_attribution(touchpoints_list_pos["event_type"].values, touchpoints_list_neg["event_type"].values, all_touches, visualize=True)
plt.savefig(os.path.join(visuals_dir, f"markov_transition_probabilities.{IMAGE_FORMAT}"))

In the above graphic, we can see the transition probabilities from each touch (Y-axis) to the next touch (X-axis). 

In [None]:
pos_transitions, labels = generate_transition_counts(touchpoints_list_pos["event_type"].values, all_touches, is_positive=True)
neg_transitions, labels = generate_transition_counts(touchpoints_list_neg["event_type"].values, all_touches, is_positive=False)
all_transitions = pos_transitions + neg_transitions

In [None]:

fig, axs=plt.subplots(1,2, figsize=(18, 5))
sns.set_style("white")
sns.barplot(x=labels[:-2], y=100*all_transitions[:-2, -2]/all_transitions[:-2, -2].sum(), ci=None, color="salmon", ax=axs[0])
    
axs[0].set_xticklabels(labels[:-2],rotation=60)
axs[0].set_ylabel("Percent")
axs[0].set_title("Distribution of last touch before dropoff");

sns.barplot(x=labels[:-2], y=100*all_transitions[:-2, -1]/all_transitions[:-2, -1].sum(), ci=None, color="salmon", ax=axs[1])
axs[1].set_xticklabels(labels[:-2],rotation=60)
axs[1].set_ylabel("Percent")
axs[1].set_title("Distribution of last touch before convert");

plt.savefig(os.path.join(visuals_dir, f"distribution_of_last_touch_before_dropoff_and_convert.{IMAGE_FORMAT}"))

### Part VI: Results summary

In [None]:
last_touch_based_mta = touchpoints_list_pos["event_type"].apply(lambda event_list: event_list[-1]).value_counts().reset_index()
last_touch_based_mta.columns = ['touch', 'last_touches']
last_touch_based_mta['last_touches'] = last_touch_based_mta['last_touches']/last_touch_based_mta['last_touches'].sum()

markov_attr_values_df = pd.DataFrame.from_dict(markov_attribution_values, orient="index").reset_index()
markov_attr_values_df.columns = ["touch", "markov"]
markov_attr_values_df['markov'] = markov_attr_values_df['markov']/markov_attr_values_df['markov'].sum()
mta_values = markov_attr_values_df.merge(mta_vals, on="touch", how="outer").merge(last_touch_based_mta, on='touch',how='outer').fillna(0)

In [None]:
mta_values

In [None]:
mta_long = pd.melt(mta_values, "touch", ["markov",  "shap_conversions", "last_touches"])
plt.figure(figsize=(16,6))
sns.barplot(data=mta_long, x='touch',y='value',hue='variable');
plt.xticks(rotation=90);
plt.savefig(os.path.join(visuals_dir, f"results_summary.{IMAGE_FORMAT}"))

In [None]:
mta_values.corr()

In [None]:
SnowflakeDataIO.write_to_snowflake_table(mta_values, creds["snowflake"]["results_table_name"], creds["snowflake"], if_exists="append")
print(f'The output data is stored in the warehouse table: {creds["snowflake"]["results_table_name"]}')

In [None]:
print(f"MTA values will be written to the location:\n\t{output_directory}")

In [None]:
mta_values.to_parquet(f"{output_directory}/mta_values.parquet")

### Part VII: Robustness testing

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
markov_vals = {}
shapley_vals = {}

touchpoints_list_pos_rand1, touchpoints_list_pos_rand2 = train_test_split(touchpoints_list_pos, train_size=0.5)
touchpoints_list_neg_rand1, touchpoints_list_neg_rand2 = train_test_split(touchpoints_list_neg, train_size=0.5)

touches_shapley_values_rand1 = get_shapley_values(touchpoints_list_pos_rand1['event_type'].values, [1] * len(touchpoints_list_pos_rand1))
# get_shapley_values(touchpoints_list_pos_rand1["event_id"].values, dict_touches, visualize=False)
markov_attribution_values_rand1, _ = get_markov_attribution(touchpoints_list_pos_rand1["event_type"].values, touchpoints_list_neg_rand1["event_type"].values, all_touches, visualize=False)

touches_shapley_values_rand2 = get_shapley_values(touchpoints_list_pos_rand2['event_type'].values, [1] * len(touchpoints_list_pos_rand2))

markov_attribution_values_rand2, _ = get_markov_attribution(touchpoints_list_pos_rand2["event_type"].values, touchpoints_list_neg_rand2["event_type"].values, all_touches, visualize=False)


for key, val in touches_shapley_values_rand1.items():
    shapley_vals[key] = [val, touches_shapley_values_rand2.get(key,0)]

for key, val in markov_attribution_values_rand1.items():
    markov_vals[key] = [val, markov_attribution_values_rand2.get(key,0)]

    
shapley_vals_df = pd.DataFrame.from_dict(shapley_vals, orient='index')
markov_vals_df = pd.DataFrame.from_dict(markov_vals, orient='index')
print(f"Correlation of shapley values between two non-overlapping splits: {shapley_vals_df.corr()[0][1]:.3f}")
print(f"Correlation of markov values between two non-overlapping splits: {markov_vals_df.corr()[0][1]:.3f}")

In [None]:
shapley_vals_df = pd.melt(shapley_vals_df.reset_index(), id_vars='index')
shapley_vals_df.columns = ['touch', 'iter', 'shap']

markov_vals_df = pd.melt(markov_vals_df.reset_index(), id_vars='index')
markov_vals_df.columns = ['touch', 'iter', 'markov']

tp_order = shapley_vals_df.groupby('touch')['shap'].mean().reset_index().sort_values("shap", ascending=False)['touch'].values

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(16,6))
sns.barplot(x='touch', y='shap', hue='iter', data=shapley_vals_df, ax=axs[0], order=tp_order)
for item in axs[0].get_xticklabels():
    item.set_rotation(90)
axs[0].set_title("Shapley vals for touchpoints in non-overlapping splits");

sns.barplot(x='touch', y='markov', hue='iter', data=markov_vals_df, ax=axs[1], order=tp_order)
for item in axs[1].get_xticklabels():
    item.set_rotation(90)
axs[1].set_title("Markov vals for touchpoints in non-overlapping splits");

plt.savefig(os.path.join(visuals_dir, f"shapley_markov_values_non_overlapping_splits.{IMAGE_FORMAT}"))

In [None]:
markov_vals = {}
shapley_vals = {}
for iters in range(10):
    touchpoints_list_pos_rand, _ = train_test_split(touchpoints_list_pos, train_size=0.7)
    touchpoints_list_neg_rand, _ = train_test_split(touchpoints_list_neg, train_size=0.7)
    touches_shapley_values_rand = get_shapley_values(touchpoints_list_pos_rand["event_type"].values, [1] * len(touchpoints_list_pos_rand))
    markov_attribution_values_rand, _ = get_markov_attribution(touchpoints_list_pos_rand["event_type"].values, touchpoints_list_neg_rand["event_type"].values, all_touches, visualize=False)
    for touch, shap in touches_shapley_values_rand.items():
        curr = shapley_vals.get(touch, [])
        curr.append(shap)
        shapley_vals[touch] = curr
    for touch, mark in markov_attribution_values_rand.items():
        curr = markov_vals.get(touch, [])
        curr.append(mark)
        markov_vals[touch] = curr

In [None]:
shapley_ranks = pd.DataFrame.from_dict(shapley_vals, orient='index').fillna(0).rank(axis=0, ascending=False).astype(int)
markov_ranks = pd.DataFrame.from_dict(markov_vals, orient='index').fillna(0).rank(axis=0, ascending=False).astype(int)

In [None]:
shapley_ranks.sort_values(by=0)

In [None]:
markov_ranks.sort_values(by=0)

Shapley values and markov values rank order remain quite stable with only minor differences, as can be seen above

Markov is significantly more robust compared to Shapley

In [None]:
logging.info("Done")

In [None]:
## Cell to hide code while converting to a html page
from IPython.display import HTML

HTML('''<script>
$('div.input').hide();
</script>''')