In [1]:
import pandas as pd
import numpy as np
from catnip.fla_redshift import FLA_Redshift
from sqlalchemy import null
from datetime import datetime, timedelta
import polars as pl

from prefect.blocks.system import Secret
from typing import Dict
from concurrent.futures import ThreadPoolExecutor

import xgboost as xgb
from sklearn.model_selection import train_test_split

In [2]:
def get_redshift_credentials() -> Dict:

    cred_dict = {
        "dbname": Secret.load("stellar-redshift-db-name").get(),
        "host": Secret.load("stellar-redshift-host").get(),
        "port": 5439,
        "user": Secret.load("stellar-redshift-user-name").get(),
        "password": Secret.load("stellar-redshift-password").get(),

        "aws_access_key_id": Secret.load("fla-s3-aws-access-key-id-east-1").get(),
        "aws_secret_access_key": Secret.load("fla-s3-aws-secret-access-key-east-1").get(),
        "bucket": Secret.load("fla-s3-bucket-name-east-1").get(),
        "subdirectory": "us-east-1",

        "verbose": False,
    }

    return cred_dict

with ThreadPoolExecutor(1) as pool:
    rs_creds = pool.submit(lambda: get_redshift_credentials()).result()

In [3]:
# Tables Used:

# - korepss_opportunities
# - cth_v_historical_ticket
# - cth_historical_attendance
# - ct_customer (& all extra tables)
# - flateamshop
# - location_ticket_type 
    # location_ticket_type_agg AS (
    # SELECT
    #     purchaser_ticketing_id,
    #     arena_level_internal
    # FROM
    #     (SELECT
    #          purchaser_ticketing_id,
    #          arena_level_internal,
    #          ROW_NUMBER() OVER (PARTITION BY purchaser_ticketing_id ORDER BY num_tickets DESC,
    #              CASE arena_level_internal
    #                  WHEN 'Premium' THEN 1
    #                  WHEN 'Lower' THEN 2
    #                  WHEN 'Club' THEN 3
    #                  ELSE 4
    #              END) AS rn
    #      FROM
    #          location_ticket_type)
    # WHERE rn = 1

In [4]:
q = """
SELECT DISTINCT
    purchaser_ticketing_id AS crm_id
FROM
    custom.cth_v_historical_ticket ticket
UNION
SELECT DISTINCT
    ticketing_account_scanned AS crm_id
FROM
    custom.cth_v_historical_attendance

"""

base_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [3]:
q = """
SELECT DISTINCT
    vcon.primary_ticketing_id,
    activex.task_type_new__c,
    activ.completed_on_coalesce AS contact_date
FROM
    custom.korepss_v_activities activ
    LEFT JOIN custom.korepss_activities_extension activex
        ON activ.activityid = activex.activityid
    INNER JOIN custom.korepss_v_contacts vcon
        ON activ.contactid = vcon.sf_contactid
WHERE
    vcon.primary_ticketing_id IS NOT NULL
    AND activ.completed_on_coalesce IS NOT NULL
    AND status = 'Completed'
    AND completed_on_coalesce >= '2024-09-01'
    AND task_type_new__c != 'Internal Note'
ORDER BY
    vcon.primary_ticketing_id,
    activ.completed_on_coalesce
"""

base_activites_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

q = """
SELECT DISTINCT
    purchaser_ticketing_id AS primary_ticketing_id,
    transaction_datetime AS purchase_date
FROM
    custom.cth_v_historical_ticket
WHERE
    transaction_datetime >= '2024-09-01'
ORDER BY
    purchaser_ticketing_id,
    transaction_datetime
"""

base_ticketing_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

contacts = pl.from_pandas(base_activites_df).with_columns(
    pl.col("contact_date").cast(pl.Datetime)
).sort("contact_date")

purchases = pl.from_pandas(base_ticketing_df).with_columns(
    pl.col("purchase_date").cast(pl.Datetime)
).sort("purchase_date")

nudge_analysis = purchases.join_asof(
    contacts,
    left_on="purchase_date",
    right_on="contact_date",
    by="primary_ticketing_id",
    strategy="backward"
)

nudge_analysis = nudge_analysis.with_columns(
    (pl.col("purchase_date") - pl.col("contact_date")).dt.total_days().alias("days_since_nudge")
)

In [4]:
success_window = nudge_analysis.filter(
    (pl.col("days_since_nudge") >= 0) & (pl.col("days_since_nudge") <= 30)
)

failed_contacts = contacts.join(
    success_window.select("primary_ticketing_id", "contact_date"),
    on=["primary_ticketing_id", "contact_date"],
    how="anti"
)

successes = success_window.select([
    "primary_ticketing_id", 
    "contact_date", 
    pl.lit(1).alias("label")
])

failures = failed_contacts.select([
    "primary_ticketing_id", 
    "contact_date", 
    pl.lit(0).alias("label")
])

training_base = pl.concat([successes, failures])

In [26]:
training_base

primary_ticketing_id,contact_date,label
i64,datetime[Œºs],i32
20237174,2024-09-03 09:53:16,1
11173187,2024-09-03 09:36:14,1
25184481,2024-09-03 10:21:37,1
25791419,2024-09-03 12:29:39,1
25788947,2024-09-03 14:32:12,1
…,…,…
18771939,2026-02-25 10:28:26,0
2023916,2026-02-25 10:28:53,0
23146210,2026-02-25 10:29:12,0
722570,2026-02-25 10:29:24,0


In [5]:
def create_customer_snapshot(training_base, purchases, contacts, snapshot_date):
    # 1. Contact Features
    contact_features = (
        contacts.filter(pl.col("contact_date") < snapshot_date)
        .group_by("primary_ticketing_id")
        .agg([
            pl.count("contact_date").alias("total_lifetime_contacts"),
            (pl.lit(snapshot_date) - pl.col("contact_date").max()).dt.total_days().alias("days_since_last_contact"),
            (pl.col("task_type_new__c") == "Call").sum().alias("total_calls"),
            (pl.col("task_type_new__c") == "Email").sum().alias("total_emails")
        ])
    )
    
    snapshot_date_scalar = datetime(2025, 1, 1)
    sixty_days_ago = snapshot_date_scalar - timedelta(days=60)

    # 2. Ticketing Features
    ticketing_features = (
    purchases.filter(pl.col("purchase_date") < snapshot_date_scalar)
    .group_by("primary_ticketing_id")
    .agg([
        pl.count("purchase_date").alias("total_purchase_events"),
        
        # Calculate days since last purchase
        (pl.lit(snapshot_date_scalar) - pl.col("purchase_date").max())
            .dt.total_days()
            .alias("days_since_last_purchase"),
        
        # FIXED: Count purchases in the last 60 days
        pl.col("purchase_date")
            .filter(pl.col("purchase_date") >= sixty_days_ago)
            .count()
            .alias("purchases_last_60_days")
    ])
)

    # 3. Join & Clean
    final_df = (
        training_base.join(contact_features, on="primary_ticketing_id", how="left")
        .join(ticketing_features, on="primary_ticketing_id", how="left")
        .with_columns([
            pl.col("days_since_last_contact").fill_null(999),
            pl.col("days_since_last_purchase").fill_null(999)
        ])
        .fill_null(0) # Fills remaining counts/ratios with 0
    )

    return final_df

# Execute
snapshot_date_scalar = datetime(2025, 1, 1) # Example mid-season date
master_training_set = create_customer_snapshot(training_base, purchases, contacts, snapshot_date_scalar)

In [9]:
# Check the averages for Buyers vs Non-Buyers
print(master_training_set.group_by("label").agg([
    pl.col("days_since_last_purchase").mean().alias("avg_recency"),
    pl.col("total_lifetime_contacts").mean().alias("avg_outreach"),
    pl.col("purchases_last_60_days").mean().alias("velocity_check"),
    pl.col("purchases_last_60_days").count().alias("count")
]))

shape: (2, 5)
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ label ‚îÜ avg_recency ‚îÜ avg_outreach ‚îÜ velocity_check ‚îÜ count  ‚îÇ
‚îÇ ---   ‚îÜ ---         ‚îÜ ---          ‚îÜ ---            ‚îÜ ---    ‚îÇ
‚îÇ i32   ‚îÜ f64         ‚îÜ f64          ‚îÜ f64            ‚îÜ u32    ‚îÇ
‚ïû‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï°
‚îÇ 0     ‚îÜ 709.310335  ‚îÜ 5.301837     ‚îÜ 0.467548       ‚îÜ 300662 ‚îÇ
‚îÇ 1     ‚îÜ 390.175651  ‚îÜ 6.109028     ‚îÜ 2.995314       ‚îÜ 30515  ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò


In [23]:
# 1. Prepare X and y
# Drop IDs and the Target column for training
features = master_training_set.drop(["primary_ticketing_id", "contact_date", "label"])
target = master_training_set["label"]

# 2. Split (Use 80/20)
X_train, X_test, y_train, y_test = train_test_split(
    features.to_pandas(), 
    target.to_pandas(), 
    test_size=0.2, 
    random_state=42
)

# 3. Handle Imbalance (The 'Quasar' Ratio)
# Calculate how many 0s there are for every 1
ratio = (target == 0).sum() / (target == 1).sum()

# 4. Initialize and Train
model = xgb.XGBClassifier(
    n_estimators=100,
    max_depth=4,
    learning_rate=0.1,
    scale_pos_weight=ratio, # This makes the model 'care' more about the rare buyers
    eval_metric="logloss"
)

model.fit(X_train, y_train)

In [24]:
today = datetime.now()

probs = model.predict_proba(X_train)[:, 1]

X_train['purchase_probability'] = probs

X_train

Unnamed: 0,total_lifetime_contacts,days_since_last_contact,total_calls,total_emails,total_purchase_events,days_since_last_purchase,purchases_last_60_days,purchase_probability
18965,2,21,0,0,0,999,0,0.375377
149623,0,999,0,0,0,999,0,0.345936
65929,3,36,0,0,0,999,0,0.368794
15286,10,11,0,1,2,11,2,0.660377
5154,20,8,0,17,3,18,3,0.565680
...,...,...,...,...,...,...,...,...
119879,1,50,0,0,1,47,1,0.537841
259178,0,999,0,0,0,999,0,0.345936
131932,2,14,0,0,0,999,0,0.361199
146867,0,999,0,0,0,999,0,0.345936


In [25]:
call_list = pl.from_pandas(X_train)

call_list = call_list.with_columns(
    pl.when(pl.col("purchase_probability") >= 0.8).then(pl.lit("üî• HOT"))
    .when(pl.col("purchase_probability") >= 0.5).then(pl.lit("‚ö° WARM"))
    .otherwise(pl.lit("üßä COLD"))
    .alias("priority_tier")
).sort("purchase_probability", descending=True)

call_list

total_lifetime_contacts,days_since_last_contact,total_calls,total_emails,total_purchase_events,days_since_last_purchase,purchases_last_60_days,purchase_probability,priority_tier
u32,i64,u32,u32,u32,i64,u32,f32,str
0,999,0,0,220,0,175,0.991276,"""üî• HOT"""
0,999,0,0,220,0,175,0.991276,"""üî• HOT"""
0,999,0,0,220,0,175,0.991276,"""üî• HOT"""
0,999,0,0,220,0,175,0.991276,"""üî• HOT"""
0,999,0,0,220,0,175,0.991276,"""üî• HOT"""
…,…,…,…,…,…,…,…,…
54,1,0,15,0,999,0,0.031364,"""üßä COLD"""
54,1,0,15,0,999,0,0.031364,"""üßä COLD"""
54,1,0,15,0,999,0,0.031364,"""üßä COLD"""
54,1,0,15,0,999,0,0.031364,"""üßä COLD"""


In [5]:
q = """
SELECT
    purchaser_ticketing_id AS crm_id,
    SUM(gross_revenue) AS total_hockey_spend,
    SUM(paid_seats) AS total_hockey_seats,
    SUM(CASE WHEN season = '2025-26' THEN gross_revenue END) AS hockey_spend_2526,
    SUM(CASE WHEN season = '2025-26' THEN paid_seats END) AS hockey_seats_2526,
    datediff(day,MAX(transaction_datetime),current_date) AS days_since_last_purchase,
    COUNT(DISTINCT transaction_datetime) AS num_hockey_transactions
FROM
    custom.cth_v_historical_ticket
GROUP BY
    purchaser_ticketing_id
"""

ticket_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [6]:
df = base_df.merge(ticket_df, how = 'left', on= 'crm_id')

In [7]:
q = """
SELECT
    ticketing_account_scanned AS crm_id,
    COUNT(*) AS total_game_attendance,
    COUNT(DISTINCT event_datetime) AS num_games_attended,
    COUNT(CASE WHEN season = '2025-26' THEN event_datetime END) AS hockey_tickets_attendance_2526,
    COUNT(DISTINCT CASE WHEN season = '2025-26' THEN event_datetime END) AS num_games_attendance_2526,
    datediff(day,MAX(event_datetime),current_date) AS days_since_last_attendance
FROM
    custom.cth_v_historical_attendance
GROUP BY
    ticketing_account_scanned
"""

attendance_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [8]:
df = df.merge(attendance_df, how = 'left', on= 'crm_id')

In [9]:
q = """
SELECT
    purchaser_ticketing_id AS crm_id,
    LISTAGG(DISTINCT ticket_type, ', ') AS all_plan_types,
    LISTAGG(DISTINCT CASE WHEN season = '2025-26' THEN ticket_type END, ', ') AS plan_types_2526
FROM
    custom.cth_v_historical_plans
GROUP BY
    purchaser_ticketing_id
"""

plans_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [10]:
df = df.merge(plans_df, how = 'left', on= 'crm_id')

In [11]:
q = """
SELECT DISTINCT
    clients.crm_id,
    email,
    CASE WHEN addresses.is_local = TRUE THEN 1 ELSE 0 END AS is_local
FROM
    custom.seatgeek_v_clients clients
INNER JOIN
    custom.golden_record_v_addresses addresses USING (email) 
"""

is_local_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [12]:
df = df.merge(is_local_df, how = 'left', on= 'crm_id')

In [13]:
q = """
SELECT
    tradable_bits_activities.email,
    crm_id,
    COUNT(tradable_bits_activities.*) AS num_online_activities,
    COUNT(CASE WHEN creation_date >= '2025-07-01' THEN 1 END) AS num_online_activities_last_fiscal
FROM
    custom.tradable_bits_activities
LEFT JOIN
    custom.seatgeek_v_clients ON tradable_bits_activities.email = seatgeek_v_clients.email
GROUP BY
    tradable_bits_activities.email,
    crm_id
"""

tradable_bits_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [14]:
df = df.merge(tradable_bits_df, how = 'left', on= 'crm_id')

In [15]:
q = """
SELECT
    crm_id,
    count(*) AS num_recieved,
    sum(CASE WHEN event = 'Opened' AND is_true_false THEN 1 ELSE 0 END) AS num_opened,
    sum(CASE WHEN event = 'Clicked' AND is_true_false THEN 1 ELSE 0 END) AS num_clicked,
    sum(CASE WHEN event = 'Bounced' AND is_true_false THEN 1 ELSE 0 END) AS num_bounced,
    sum(CASE WHEN event = 'Unsubscribed' AND is_true_false THEN 1 ELSE 0 END) AS num_unsubscribed,
    CASE WHEN max(date_unsubscribed_panthers) IS NOT NULL THEN True ELSE False END AS is_unsubscribed
FROM
    custom.sfmc_v_sent_reporting
LEFT JOIN
    custom.sfmc_v_subscribers ON sfmc_v_sent_reporting.subscriberkey = sfmc_v_subscribers.subscriber_key
LEFT JOIN
    custom.seatgeek_v_clients ON sfmc_v_subscribers.email = seatgeek_v_clients.email
WHERE
    sent_date >= '2025-07-01'
GROUP BY
    crm_id
"""

emails_df = FLA_Redshift(**rs_creds).query_warehouse(sql_string=q)

In [16]:
df = df.merge(emails_df, how = 'left', on= 'crm_id')

In [19]:
cols_999 = ['days_since_last_purchase', 'days_since_last_attendance']

df[cols_999] = df[cols_999].fillna(9999)

df = df.fillna(0)

df

Unnamed: 0,crm_id,total_hockey_spend,total_hockey_seats,hockey_spend_2526,hockey_seats_2526,days_since_last_purchase,num_hockey_transactions,total_game_attendance,num_games_attended,hockey_tickets_attendance_2526,...,is_local,email_y,num_online_activities,num_online_activities_last_fiscal,num_recieved,num_opened,num_clicked,num_bounced,num_unsubscribed,is_unsubscribed
0,22592444.0,1.381052e+05,792.0,30379.44,168.0,223.0,24.0,18.0,7.0,4.0,...,1.0,0,0.0,0.0,48.0,0.0,0.0,0.0,0.0,False
1,21487314.0,1.224700e+07,121161.0,0.00,0.0,601.0,28920.0,207.0,70.0,0.0,...,1.0,0,0.0,0.0,8.0,0.0,0.0,0.0,0.0,False
2,15582348.0,1.650000e+02,5.0,0.00,0.0,1199.0,1.0,9.0,2.0,0.0,...,1.0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0
3,21520918.0,4.728713e+04,464.0,6713.18,86.0,86.0,68.0,323.0,191.0,40.0,...,1.0,ericstreimer@yahoo.com,22.0,0.0,60.0,6.0,0.0,0.0,0.0,True
4,15633165.0,7.500000e+01,1.0,0.00,0.0,1186.0,1.0,3.0,2.0,0.0,...,0.0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
546666,26187982.0,0.000000e+00,0.0,0.00,0.0,9999.0,0.0,1.0,1.0,1.0,...,1.0,0,0.0,0.0,8.0,0.0,2.0,0.0,0.0,False
546667,26188737.0,0.000000e+00,0.0,0.00,0.0,9999.0,0.0,1.0,1.0,1.0,...,1.0,0,0.0,0.0,8.0,1.0,0.0,0.0,0.0,False
546668,26069455.0,0.000000e+00,0.0,0.00,0.0,9999.0,0.0,1.0,1.0,1.0,...,1.0,0,0.0,0.0,36.0,5.0,0.0,0.0,0.0,False
546669,26188861.0,0.000000e+00,0.0,0.00,0.0,9999.0,0.0,1.0,1.0,1.0,...,0.0,0,0.0,0.0,8.0,1.0,0.0,0.0,0.0,False
