In [None]:
import polars as pl
from pdstools import read_ds_export
import re

import plotly.io as pio
import plotly as plotly
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots

plotly.offline.init_notebook_mode()
pio.renderers.default = "vscode"


In [17]:
def capitalize(fields: list) -> list:
    """Applies automatic capitalization.
    Parameters
    ----------
    fields : list
        A list of names

    Returns
    -------
    fields : list
        The input list, but each value properly capitalized
    """
    capitalize_end_words = [
        "ID",
        "Key",
        "Name",
        "Treatment",
        "Count",
        "Category",
        "Class",
        "Time",
        "DateTime",
        "UpdateTime",
        "Version",
        "Rate",
        "Ratio",
        "Negatives",
        "Positives",
        "Threshold",
        "Error",
        "Importance",
        "Type",
        "Percentage",
        "Index",
        "Symbol",
        "ResponseCount",
        "ConfigurationName",
        "Configuration",
    ]
    if not isinstance(fields, list):
        fields = [fields]
    fields_new = [re.sub("^p([xyz])", "", field) for field in fields]
    seen = set(fields)
    for i, item in enumerate(fields_new):
        if item in seen:
            fields_new[i] = fields[i]
    for word in capitalize_end_words:
        fields_new = [re.sub(word + '\b', word, field, flags=re.I) for field in fields_new]
        fields_new = [field[:1].upper() + field[1:] for field in fields_new]
    return fields_new

TODO: see if we can generate such data rather than shipping it

In [None]:
ih = read_ds_export("Data-pxStrategyResult_InteractionFiles_20241213T091932_GMT.zip", path=".")

# we really only need a few columns
# Outcome outcomes: Conversionm, Impression, Pending
ih = ih.select(["pyOutcome", "pxOutcomeTime", "pyChannel", "pyIssue", "pyGroup", "pyName", "ExperimentGroup"])
ih.collect()

In [None]:
dframe_columns = ih.collect_schema().names()
cols = capitalize(dframe_columns)
ih = ih.rename(dict(map(lambda i, j: (i, j), dframe_columns, cols)))
ih.collect_schema()

At first, take a look into the IH dataframe, explore the columns, outcome types and business structure

In [None]:
ih = (
            ih
            .with_columns(
                pl.col('OutcomeTime').str.strptime(pl.Datetime, "%Y%m%dT%H%M%S%.3f %Z").alias('OutcomeDateTime')
            )
            .with_columns(
                [
                    pl.col("OutcomeDateTime").dt.date().alias("Day"),
                    (pl.col("OutcomeDateTime").dt.strftime("%Y-%m")).alias("Month"),
                    pl.col("OutcomeDateTime").dt.year().cast(str).alias("Year"),
                    (pl.col("OutcomeDateTime").dt.year().cast(str) + "_Q" + pl.col(
                        "OutcomeDateTime").dt.quarter().cast(
                        str)).alias("Quarter")
                ]
            )
        )
ih.describe()

Assuming conversion modelling setup folllows OOTB approach, so that IH contains Conversion outcome as positive result and Impression (for inbound channels) and Pending  (for outbound channels) are treated as negative outcome. Each Conversion has corresponding Impression/Pending record, so to calculate correct Conversion Rate is count(Conversion) / (count(Impression/Pending) - 2 * count(Conversion))

In [None]:
positive_model_response = ["Conversion"]
all_model_response = ["Impression", "Pending"]
group_by = ["Day", "Month", "Year", "Quarter", "Channel", "Issue", "Group", "Name", "ExperimentGroup"]

ih = ih.filter(pl.col('ExperimentGroup').is_not_null())

In [None]:
ih_analysis = (
            ih.filter(
                (pl.col("Outcome").is_in(all_model_response + positive_model_response))
            )
            .with_columns([
                pl.when(pl.col('Outcome').is_in(positive_model_response)).
                then(1).otherwise(0).alias('Outcome_Binary')
            ])
            .group_by(group_by)
            .agg([
                pl.len().alias('Count'),
                pl.sum("Outcome_Binary").alias("Positives")
            ])
            .with_columns([
                (pl.col("Count") - (2 * pl.col("Positives"))).alias("Negatives")
            ])
        )

Showing results as a gauge plot across channel dimension to compare conversion rates inside specific channel between conversion and Engagement models. Set relevant reference data (baseline conversion rate). Delta from baseline is shown inside Gauge.

In [None]:
gauge_group_by = ["Channel", "ExperimentGroup"]
reference = {'Web_Conversion-Test' : 0.055, 'Web_Conversion-Control' : 0.055}
gauge_data = (
        ih_analysis.group_by(gauge_group_by)
        .agg(
            pl.sum("Negatives").alias("Negatives"),
            pl.sum("Positives").alias("Positives"),
            pl.sum("Count").alias("Count")
        )
        .with_columns(
            [(pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("ConversionRate")]
        )
        .with_columns(
            [
                (
                    (
                        (
                            (pl.col("ConversionRate") * (1 - pl.col("ConversionRate")))
                            / (pl.col("Positives") + pl.col("Negatives"))
                        )
                        ** 0.5
                    )
                ).alias("StdErr")
            ]
        )
        .sort(gauge_group_by, descending=False)
        .collect()
    )

gauge_data = gauge_data.to_pandas()

cols = gauge_data[gauge_group_by[0]].unique().shape[0]
rows = gauge_data[gauge_group_by[1]].unique().shape[0]

gauge_data['Name'] = gauge_data[gauge_group_by].apply(lambda r: ' '.join(r.values.astype(str)), axis=1)
gauge_data['CName'] = gauge_data[gauge_group_by].apply(lambda r: '_'.join(r.values.astype(str)), axis=1)

fig = make_subplots(rows=rows,
                        cols=cols,
                        specs=[[{"type": "indicator"} for c in range(cols)] for t in range(rows)]
                        )
fig.update_layout(
        height=270 * rows,
        autosize=True,
        title='[CONV] Conversion (Channel/Model Type)',
        margin=dict(b=10, t=120, l=10, r=10))

for index, row in gauge_data.iterrows():
        ref_value = reference.get(row['CName'], None)
        gauge = {
            'axis': {'tickformat': ',.2%'},
            'threshold': {
                'line': {'color': "red", 'width': 2},
                'thickness': 0.75,
                'value': ref_value
            }
        }
        if ref_value:
            if row['ConversionRate'] < ref_value:
                gauge = {
                    'axis': {'tickformat': ',.2%'},
                    'bar': {'color': '#EC5300' if row['ConversionRate'] < (0.75 * ref_value) else '#EC9B00'},
                    'threshold': {
                        'line': {'color': "red", 'width': 2},
                        'thickness': 0.75,
                        'value': ref_value
                    }
                }

        trace1 = go.Indicator(mode="gauge+number+delta",
                              number={'valueformat': ",.2%"},
                              value=row['ConversionRate'],
                              delta={'reference': ref_value, 'valueformat': ",.2%"},
                              title={'text': row['Name']},
                              gauge=gauge,
                              )
        r, c = divmod(index, cols)
        fig.add_trace(
            trace1,
            row=(r + 1), col=(c + 1)
        )
fig.show()
gauge_data

This plot provides detailed view on individual actions conversion rates and model type used.

In [None]:
treemap_group_by = ["Channel", "Issue", "Group", "Name", "ExperimentGroup"]

treemap_data = (
        ih_analysis.group_by(treemap_group_by)
        .agg(
            pl.sum("Negatives").alias("Negatives"),
            pl.sum("Positives").alias("Positives"),
            pl.sum("Count").alias("Count")
        )
        .with_columns(
            [(pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("ConversionRate")]
        )
        .with_columns(
            [
                (
                    (
                        (
                            (pl.col("ConversionRate") * (1 - pl.col("ConversionRate")))
                            / (pl.col("Positives") + pl.col("Negatives"))
                        )
                        ** 0.5
                    )
                ).alias("StdErr")
            ]
        )
        .sort(treemap_group_by, descending=False)
        .collect()
    )

treemap_data = treemap_data.to_pandas()

fig = px.treemap(treemap_data, path=[px.Constant("ALL")] + treemap_group_by, values='Count',
                     color="ConversionRate",
                     color_continuous_scale=px.colors.sequential.RdBu_r,
                     title="[BIZ] Conversion rate treemap",
                     hover_data=['StdErr', 'Positives', 'Negatives'],
                     height=640,
                     )
fig.update_traces(textinfo="label+value+percent parent+percent root")
fig.update_layout(margin=dict(t=50, l=25, r=25, b=25))

fig.show()
treemap_data

Detailed line/bar plot.

In [None]:
line_group_by = ["Day", "Channel", "ExperimentGroup"]

line_data = (
        ih_analysis.group_by(line_group_by)
        .agg(
            pl.sum("Negatives").alias("Negatives"),
            pl.sum("Positives").alias("Positives"),
            pl.sum("Count").alias("Count")
        )
        .with_columns(
            [(pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("ConversionRate")]
        )
        .with_columns(
            [
                (
                    (
                        ((
                            (pl.col("ConversionRate") * (1 - pl.col("ConversionRate")))
                            / (pl.col("Positives") + pl.col("Negatives"))
                        )
                        ** 0.5) * 1.96
                    )
                ).alias("CI")
            ]
        )
        .sort(line_group_by, descending=False)
        .collect()
    )

line_data = line_data.to_pandas()

if len(line_data["Day"].unique()) < 30:
        fig = px.bar(line_data,
                     x="Day",
                     y="ConversionRate",
                     color="ExperimentGroup",
                     error_y='CI',
                     facet_row="Channel",
                     barmode="group",
                     title="[CONV] Daily Conversion Rate with 95% confidence interval",
                     custom_data=["ExperimentGroup"]
                     )
        fig.update_layout(
            updatemenus=[
                dict(
                    buttons=list([
                        dict(
                            args=["type", "bar"],
                            label="Bar",
                            method="restyle"
                        ),
                        dict(
                            args=["type", "line"],
                            label="Line",
                            method="restyle"
                        )
                    ]),
                    direction="down",
                    showactive=True,
                ),
            ]
        )
else:
        fig = px.line(
            line_data,
            x="Day",
            y="ConversionRate",
            color="ExperimentGroup",
            title="[CONV] Daily Conversion Rate",
            acet_row="Channel",
            custom_data=["ExperimentGroup"]
        )

fig.update_xaxes(tickfont=dict(size=10))
fig.update_yaxes(tickformat=',.2%')
yaxis_names = ['yaxis'] + [axis_name for axis_name in fig.layout._subplotid_props if 'yaxis' in axis_name]
yaxis_layout_dict = {yaxis_name + "_tickformat": ',.2%' for yaxis_name in yaxis_names}
fig.update_layout(yaxis_layout_dict)
height = max(640, 300 * len(line_data["Channel"].unique()))
fig.update_layout(
        xaxis_title="Day",
        yaxis_title="Conversion Rate",
        hovermode="x unified",
        height=height
    )
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[1]))
fig = fig.update_traces(hovertemplate="Day" + ' : %{x}' + '<br>' +
                                          "Experiment Group" + ' : %{customdata[0]}' + '<br>' +
                                          "Conversion Rate" + ' : %{y:.2%}' + '<extra></extra>')

fig.show()
line_data

Engagement rates (CTR)

In [None]:
positive_model_response = ["Clicked"]
all_model_response = ["Impression", "Pending"]
group_by = ["Day", "Month", "Year", "Quarter", "Channel", "Issue", "Group", "Name", "ExperimentGroup"]

ih_analysis = (
            ih.filter(
                (pl.col("Outcome").is_in(all_model_response + positive_model_response))
            )
            .with_columns([
                pl.when(pl.col('Outcome').is_in(positive_model_response)).
                then(1).otherwise(0).alias('Outcome_Binary')
            ])
            .group_by(group_by)
            .agg([
                pl.len().alias('Count'),
                pl.sum("Outcome_Binary").alias("Positives")
            ])
            .with_columns([
                (pl.col("Count") - (2 * pl.col("Positives"))).alias("Negatives")
            ])
        )
gauge_group_by = ["Channel", "ExperimentGroup"]
reference = {'Web_Conversion-Test' : 0.25, 'Web_Conversion-Control' : 0.25}
gauge_data = (
        ih_analysis.group_by(gauge_group_by)
        .agg(
            pl.sum("Negatives").alias("Negatives"),
            pl.sum("Positives").alias("Positives"),
            pl.sum("Count").alias("Count")
        )
        .with_columns(
            [(pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("CTR")]
        )
        .with_columns(
            [
                (
                    (
                        (
                            (pl.col("CTR") * (1 - pl.col("CTR")))
                            / (pl.col("Positives") + pl.col("Negatives"))
                        )
                        ** 0.5
                    )
                ).alias("StdErr")
            ]
        )
        .sort(gauge_group_by, descending=False)
        .collect()
    )

gauge_data = gauge_data.to_pandas()

cols = gauge_data[gauge_group_by[0]].unique().shape[0]
rows = gauge_data[gauge_group_by[1]].unique().shape[0]

gauge_data['Name'] = gauge_data[gauge_group_by].apply(lambda r: ' '.join(r.values.astype(str)), axis=1)
gauge_data['CName'] = gauge_data[gauge_group_by].apply(lambda r: '_'.join(r.values.astype(str)), axis=1)

fig = make_subplots(rows=rows,
                        cols=cols,
                        specs=[[{"type": "indicator"} for c in range(cols)] for t in range(rows)]
                        )
fig.update_layout(
        height=270 * rows,
        autosize=True,
        title='[ENG] Click-through rates (Channel/Model Type)',
        margin=dict(b=10, t=120, l=10, r=10))

for index, row in gauge_data.iterrows():
        ref_value = reference.get(row['CName'], None)
        gauge = {
            'axis': {'tickformat': ',.2%'},
            'threshold': {
                'line': {'color': "red", 'width': 2},
                'thickness': 0.75,
                'value': ref_value
            }
        }
        if ref_value:
            if row['CTR'] < ref_value:
                gauge = {
                    'axis': {'tickformat': ',.2%'},
                    'bar': {'color': '#EC5300' if row['CTR'] < (0.75 * ref_value) else '#EC9B00'},
                    'threshold': {
                        'line': {'color': "red", 'width': 2},
                        'thickness': 0.75,
                        'value': ref_value
                    }
                }

        trace1 = go.Indicator(mode="gauge+number+delta",
                              number={'valueformat': ",.2%"},
                              value=row['CTR'],
                              delta={'reference': ref_value, 'valueformat': ",.2%"},
                              title={'text': row['Name']},
                              gauge=gauge,
                              )
        r, c = divmod(index, cols)
        fig.add_trace(
            trace1,
            row=(r + 1), col=(c + 1)
        )
fig.show()
gauge_data

In [None]:
line_group_by = ["Day", "Channel", "ExperimentGroup"]

line_data = (
        ih_analysis.group_by(line_group_by)
        .agg(
            pl.sum("Negatives").alias("Negatives"),
            pl.sum("Positives").alias("Positives"),
            pl.sum("Count").alias("Count")
        )
        .with_columns(
            [(pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("CTR")]
        )
        .with_columns(
            [
                (
                    (
                        ((
                            (pl.col("CTR") * (1 - pl.col("CTR")))
                            / (pl.col("Positives") + pl.col("Negatives"))
                        )
                        ** 0.5) * 1.96
                    )
                ).alias("CI")
            ]
        )
        .sort(line_group_by, descending=False)
        .collect()
    )

line_data = line_data.to_pandas()

if len(line_data["Day"].unique()) < 30:
        fig = px.bar(line_data,
                     x="Day",
                     y="CTR",
                     color="ExperimentGroup",
                     error_y='CI',
                     facet_row="Channel",
                     barmode="group",
                     title="[ENG] Daily Click-through Rate with 95% confidence interval",
                     custom_data=["ExperimentGroup"]
                     )
        fig.update_layout(
            updatemenus=[
                dict(
                    buttons=list([
                        dict(
                            args=["type", "bar"],
                            label="Bar",
                            method="restyle"
                        ),
                        dict(
                            args=["type", "line"],
                            label="Line",
                            method="restyle"
                        )
                    ]),
                    direction="down",
                    showactive=True,
                ),
            ]
        )
else:
        fig = px.line(
            line_data,
            x="Day",
            y="CTR",
            color="ExperimentGroup",
            title="[ENG] Daily Click-through Rate",
            acet_row="Channel",
            custom_data=["ExperimentGroup"]
        )

fig.update_xaxes(tickfont=dict(size=10))
fig.update_yaxes(tickformat=',.2%')
yaxis_names = ['yaxis'] + [axis_name for axis_name in fig.layout._subplotid_props if 'yaxis' in axis_name]
yaxis_layout_dict = {yaxis_name + "_tickformat": ',.2%' for yaxis_name in yaxis_names}
fig.update_layout(yaxis_layout_dict)
height = max(640, 300 * len(line_data["Channel"].unique()))
fig.update_layout(
        xaxis_title="Day",
        yaxis_title="CTR",
        hovermode="x unified",
        height=height
    )
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[1]))
fig = fig.update_traces(hovertemplate="Day" + ' : %{x}' + '<br>' +
                                          "Experiment Group" + ' : %{customdata[0]}' + '<br>' +
                                          "CTR" + ' : %{y:.2%}' + '<extra></extra>')

fig.show()
line_data