# Holley Click Bandit Model for Emails (Test Run)

Adapted from JCOM's Click Bandit Model for Holley (company_1950).

- A Bandit style model using Normal-Inverse-Gamma (NIG) formulation
- Based on historical observations, we compute opens and clicks of each treatment
- Thompson Sampling with (user, treatment, date) hash-based seed for deterministic randomness

**Key Differences from JCOM:**
- Dataset: `company_1950` (not `company_1925`)
- Interaction table: `treatment_interaction` (not `email_interactions_table`)
- Surface ID: `929` (not `817`)
- Data window: Uses all available data (campaign started Dec 4, 2025)

**This notebook is for testing/analysis only - no deployment.**

# Setup

In [None]:
!pip install keyring keyrings.google-artifactregistry-auth
!pip install -U auxia.prediction.colab --index-url https://asia-northeast1-python.pkg.dev/auxia-gcp/auxia-pip/simple/ --extra-index-url https://pypi.org/simple

In [None]:
from google.cloud import bigquery_storage
from google.cloud import bigquery
from google.colab import data_table
from google.api_core import exceptions
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import datetime
from IPython import get_ipython
ipython = get_ipython()

In [None]:
import tensorflow as tf

In [None]:
from auxia.prediction.colab import control_flow_magic
from auxia.prediction.colab.algorithms.stats_utils import *
from auxia.prediction.colab.algorithms.ml_utils import gen_binary_classification_reports
from matplotlib import pyplot as plt
import seaborn as sns
from google.cloud import bigquery
from google.cloud.bigquery import magics

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

# Configuration

Holley-specific settings

In [None]:
# Holley configuration
COMPANY_ID = "1950"
COMPANY_DATASET = f"company_{COMPANY_ID}"
SURFACE_ID = 929  # Holley's email surface

# Data window - use all available data since campaign started Dec 4
DATA_WINDOW_DAYS = 60  # Will use whatever is available within this window

print(f"Company: {COMPANY_ID}")
print(f"Dataset: {COMPANY_DATASET}")
print(f"Surface ID: {SURFACE_ID}")

# Data Load

In [None]:
current_user_name = !gcloud config get-value account
current_user_name = current_user_name[0].split('@')[0]
assert current_user_name
display(f'Detected user name {current_user_name}')

Load treatment history and interactions data.

**Note**: Holley uses `treatment_interaction` table (not `email_interactions_table` like JCOM)

In [None]:
%%bigquery df --project auxia-reporting

WITH
th as
(
  SELECT user_id, treatment_id, treatment_tracking_id, treatment_sent_timestamp, arm_id, surface_id, rank
  FROM `auxia-gcp.company_1950.treatment_history_sent`
  WHERE DATE(treatment_sent_timestamp) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  AND request_source = "LIVE"
),

-- Holley uses treatment_interaction (not email_interactions_table)
views as
(
  SELECT treatment_tracking_id, interaction_timestamp_micros
  FROM `auxia-gcp.company_1950.treatment_interaction`
  WHERE DATE(interaction_timestamp_micros) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  AND interaction_type = "VIEWED"
),

clicks as
(
  SELECT DISTINCT treatment_tracking_id
  FROM `auxia-gcp.company_1950.treatment_interaction`
  WHERE DATE(interaction_timestamp_micros) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 60 DAY) AND DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  AND interaction_type = "CLICKED"
)

SELECT th.treatment_id, th.surface_id, count(distinct views.treatment_tracking_id) as n,
count(distinct clicks.treatment_tracking_id) as clicks
FROM
th
JOIN views
ON th.treatment_tracking_id = views.treatment_tracking_id
LEFT OUTER JOIN clicks
ON th.treatment_tracking_id = clicks.treatment_tracking_id
GROUP BY ALL
;

In [None]:
df["ctr"] = df["clicks"]/df["n"]

In [None]:
print(f"Number of treatments: {len(df)}")
print(f"Total opens: {df['n'].sum()}")
print(f"Total clicks: {df['clicks'].sum()}")
print(f"Overall CTR: {df['clicks'].sum() / df['n'].sum():.2%}")
print(f"\nCTR Statistics:")
df["ctr"].describe()

In [None]:
print("Top 10 treatments by CTR:")
df.sort_values("ctr", ascending=False).head(10)

In [None]:
print("All treatments sorted by volume:")
df.sort_values("n", ascending=False)

In [None]:
# Check data volume per treatment
print("Treatments by open volume:")
print(f"  >1000 opens: {len(df[df['n'] > 1000])} treatments")
print(f"  100-1000 opens: {len(df[(df['n'] >= 100) & (df['n'] <= 1000)])} treatments")
print(f"  <100 opens: {len(df[df['n'] < 100])} treatments (high variance)")

# NIG Implementation

- Update rules based on input query
- Thompson Sampling to generate sampled rewards

In [None]:
assert df["ctr"].max() <= 1, "CTR should not exceed 1"

In [None]:
from auxia.prediction.colab.algorithms.bandits import NormalInverseGammaClickBandit
from auxia.prediction.colab import tensorflow_model
from auxia.prediction.colab.tensorflow import tf_random

In [None]:
click_bandit = NormalInverseGammaClickBandit(df, treatment_id_col="treatment_id", click_col="clicks", view_col="n")
click_bandit.update()
print(f"Bandit updated with {len(click_bandit.treatments)} treatments")

In [None]:
std_scaling = 1
treatment_ids = tf.constant([str(t) for t in click_bandit.treatments])
mean_vector = tf.cast(tf.constant([click_bandit.get_posterior_mean(t) for t in click_bandit.treatments]), tf.float32)
stddev_vector = tf.cast(tf.constant([(x**0.5)*std_scaling if np.isfinite(x) else 0.1 for x in [click_bandit.get_posterior_variance(t) for t in click_bandit.treatments]]), tf.float32)

In [None]:
assert tf.reduce_min(stddev_vector).numpy() > 0, "All stddev values must be positive"

In [None]:
print("Posterior Statistics:")
print(f"  Mean CTR range: {tf.reduce_min(mean_vector).numpy():.4f} - {tf.reduce_max(mean_vector).numpy():.4f}")
print(f"  Stddev range: {tf.reduce_min(stddev_vector).numpy():.6f} - {tf.reduce_max(stddev_vector).numpy():.6f}")

In [None]:
# Show posterior parameters for each treatment
posterior_df = pd.DataFrame({
    'treatment_id': [str(t) for t in click_bandit.treatments],
    'posterior_mean': mean_vector.numpy(),
    'posterior_stddev': stddev_vector.numpy()
})
posterior_df = posterior_df.merge(df[['treatment_id', 'n', 'clicks', 'ctr']].astype({'treatment_id': str}), on='treatment_id')
posterior_df.sort_values('posterior_mean', ascending=False)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].hist(mean_vector.numpy(), bins=20, edgecolor='black')
axes[0].set_title("Posterior Mean CTR Distribution")
axes[0].set_xlabel("Mean CTR")
axes[0].set_ylabel("Count")

axes[1].hist(stddev_vector.numpy(), bins=20, edgecolor='black')
axes[1].set_title("Posterior Stddev Distribution")
axes[1].set_xlabel("Stddev")
axes[1].set_ylabel("Count")

plt.tight_layout()
plt.show()

# Create Lookup Tables

In [None]:
# Create dictionary-like lookup tables for each value tensor
default_mean = 0
default_stddev = 0.05  # Higher stddev for unknown treatments = more exploration

lookup_mean = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(treatment_ids, mean_vector), default_value=default_mean)
lookup_stddev = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(treatment_ids, stddev_vector), default_value=default_stddev)

print(f"Lookup tables created with {len(treatment_ids)} treatments")
print(f"Default mean (unknown treatments): {default_mean}")
print(f"Default stddev (unknown treatments): {default_stddev}")

# Offline Metrics

Evaluate model on validation set (last 3 days)

In [None]:
# Use recent data for validation
validation_date = datetime.datetime.now() - datetime.timedelta(days=3)
validation_date = validation_date.strftime("%Y-%m-%d")
print(f"Validation cutoff date: {validation_date}")

In [None]:
%%substitute %DATE_DASHES%=validation_date
%%bigquery df_metrics --project auxia-reporting

WITH
th as
(
  SELECT user_id, treatment_id, treatment_tracking_id, treatment_sent_timestamp, arm_id, surface_id
  FROM `auxia-gcp.company_1950.treatment_history_sent`
  WHERE TIMESTAMP_TRUNC(treatment_sent_timestamp, DAY) > TIMESTAMP("%DATE_DASHES%")
  AND request_source = "LIVE"
),

views as
(
  SELECT treatment_tracking_id, interaction_timestamp_micros
  FROM `auxia-gcp.company_1950.treatment_interaction`
  WHERE TIMESTAMP_TRUNC(interaction_timestamp_micros, DAY) > TIMESTAMP("%DATE_DASHES%")
  AND interaction_type = "VIEWED"
),

clicks as
(
  SELECT DISTINCT treatment_tracking_id
  FROM `auxia-gcp.company_1950.treatment_interaction`
  WHERE TIMESTAMP_TRUNC(interaction_timestamp_micros, DAY) > TIMESTAMP("%DATE_DASHES%")
  AND interaction_type = "CLICKED"
)

SELECT th.*, CASE WHEN clicks.treatment_tracking_id IS NOT NULL THEN 1 ELSE 0 END AS clicked
FROM
th
JOIN views
ON th.treatment_tracking_id = views.treatment_tracking_id
LEFT OUTER JOIN clicks
ON th.treatment_tracking_id = clicks.treatment_tracking_id
;

In [None]:
print(f"Validation set size: {len(df_metrics)}")
print(f"\nClick distribution:")
print(df_metrics.clicked.value_counts())
print(f"\nValidation CTR: {df_metrics.clicked.mean():.2%}")

In [None]:
df_metrics.head()

In [None]:
treatment_tensor = tf.constant(df_metrics.treatment_id.astype("str"))
user_tensor = tf.constant(df_metrics.user_id.astype("str"))

In [None]:
# Create hash seed: treatment:user@day
user_treatment_concated = treatment_tensor + ':' + user_tensor + '@' + tf.strings.as_string(tf.math.floordiv(tf.timestamp(), 86400))

In [None]:
# Generate random samples from N(0,1)
probabilities = tf_random.stateless_normal_batched(user_treatment_concated, 1)
probabilities = tf.cast(probabilities, tf.float32)

In [None]:
# Compute Thompson Sampling scores: mean + stddev * N(0,1)
val_scores = tf.add(
    tf.multiply(probabilities, tf.expand_dims(lookup_stddev.lookup(treatment_tensor), 1)), 
    tf.expand_dims(lookup_mean.lookup(treatment_tensor), 1)
)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].hist(probabilities.numpy().flatten(), bins=50, edgecolor='black')
axes[0].set_title("Random Samples N(0,1)")
axes[0].set_xlabel("Value")

axes[1].hist(val_scores.numpy().flatten(), bins=50, edgecolor='black')
axes[1].set_title("Thompson Sampling Scores (mean + stddev * N(0,1))")
axes[1].set_xlabel("Score")

plt.tight_layout()
plt.show()

In [None]:
print("Binary Classification Report (AUC, etc.):")
_ = gen_binary_classification_reports(df_metrics.clicked.values, val_scores.numpy().flatten())

# Model Testing

Test the model with simulated users to see treatment selection distribution

In [None]:
treatment_df = df[["treatment_id", "surface_id"]].astype("str").drop_duplicates()
print(f"Unique treatments: {len(treatment_df)}")
treatment_df

In [None]:
class BanditClickModel(tf.Module):
  """Thompson Sampling bandit model for email treatment selection."""
  
  model_metadata = {
      'output': {'supported_output_types': ['PER_USER_TREATMENT_SCORE']},
      'tensorflow_contract': {
        'inputs': {
            'tensors': [
                {
                    'name': 'treatment_features',
                    'single_tensor_type': 'CATEGORICAL_STRING_TYPE',
                    'feature_source': 'TREATMENT_FEATURES',
                    'feature_names': ['treatment_id'],
                },
                {
                    'name': 'user_features',
                    'single_tensor_type': 'CATEGORICAL_STRING_TYPE',
                    'feature_source': 'USER_FEATURES',
                    'feature_names': ['user_id'],
                },
            ]
        },
    },
  }

  def __init__(self, lookup_mean, lookup_stddev):
    self.lookup_mean = lookup_mean
    self.lookup_stddev = lookup_stddev

  @tf.function(input_signature=(
      tf.TensorSpec(dtype=tf.string, shape=(None, None, 1)),
      tf.TensorSpec(dtype=tf.int64, shape=(None,)),
      tf.TensorSpec(dtype=tf.string, shape=(None, 1)),
  ))
  def __call__(self, treatment_features, treatment_count, user_features):
    treatment_indices = tf.where(tf.sequence_mask(treatment_count))
    user_ids = tf.squeeze(user_features, axis=-1)

    user_treatment_counts = treatment_count
    user_ids_cart = tf.repeat(user_ids, user_treatment_counts, axis=0)
    treatment_ids_cart = tf.gather_nd(
        tf.squeeze(treatment_features, axis=-1),
        treatment_indices)

    scoring_means = self.lookup_mean.lookup(treatment_ids_cart)
    scoring_stddevs = self.lookup_stddev.lookup(treatment_ids_cart)

    # Hash seed: treatment:user@day (deterministic per user-treatment-day)
    user_treatment_concated = treatment_ids_cart + ':' + user_ids_cart + '@' + tf.strings.as_string(tf.math.floordiv(tf.timestamp(), 86400))

    # Thompson Sampling: score = mean + stddev * N(0,1)
    probabilities = tf.reshape(tf_random.stateless_normal_batched(user_treatment_concated, 1), [-1])
    probabilities = tf.cast(probabilities, tf.float32)
    probabilities = tf.add(tf.multiply(probabilities, scoring_stddevs), scoring_means)

    return tf.scatter_nd(treatment_indices, probabilities, tf.cast(tf.shape(treatment_features)[:2], tf.int64))

In [None]:
# Build and validate model with 10K simulated users
print("Building model with 10,000 simulated users...")
model, y = tensorflow_model.build_validated_model(
    BanditClickModel(lookup_mean, lookup_stddev), 
    user_features=pd.DataFrame([{'user_id': str(k)} for k in range(10000)]), 
    treatment_features=treatment_df
)
print("Model built successfully!")

In [None]:
print("Sample scores (first 5 users x all treatments):")
y.head()

In [None]:
# Which treatment would be selected for each user?
selections = y.idxmax(axis=1).value_counts()

plt.figure(figsize=(16, 6))
selections.plot(kind="bar")
plt.title("Treatment Selection Distribution (10K users)")
plt.xlabel("Treatment ID")
plt.ylabel("Users Selected")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print(f"\nSelection counts:")
print(selections)

In [None]:
# Compare selection % with actual CTR
selection_pct = (selections / selections.sum() * 100).reset_index()
selection_pct.columns = ['treatment_id', 'selection_pct']

comparison = selection_pct.merge(
    df[['treatment_id', 'n', 'clicks', 'ctr']].astype({'treatment_id': str}), 
    on='treatment_id'
).sort_values('selection_pct', ascending=False)

print("Selection % vs Actual CTR:")
comparison

# Summary

This notebook tested the NIG Thompson Sampling bandit model for Holley email treatments.

**Key findings:**
- Posterior parameters computed for each treatment
- Model correctly balances exploration (high stddev) vs exploitation (high mean)
- Treatments with higher CTR get selected more often

**Next steps (when ready for production):**
1. Accumulate more data (currently ~5 days)
2. Deploy to TF Serving
3. Monitor CTR improvement over time