In [2]:
import duckdb
import pandas as pd
import numpy as np
from pathlib import Path

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 400)

In [5]:
import duckdb
import numpy as np
from datetime import datetime

print("Loading your real PCB/Criteo MTA dataset directly into DuckDB...")

# 1. One single command: read the entire TSV straight into DuckDB (no pandas at all)
con = duckdb.connect('mta_subs.duckdb')

con.execute("""
CREATE OR REPLACE TABLE raw_events AS
SELECT 
    *,
    -- Fix timestamps in SQL (DuckDB is faster than pandas here)
    to_timestamp(timestamp)                     AS timestamp,
    CASE WHEN conversion_timestamp = -1 THEN NULL 
         ELSE to_timestamp(conversion_timestamp) 
    END                                         AS conversion_timestamp,
    conversion::INT                             AS conversion
FROM read_csv(
    'data/pcb_dataset_final.tsv',
    delim='\t',
    sample_size=500_000,
    header=true,
    max_line_size=10000000,
    strict_mode=false
)
""")

print(f"Loaded {con.execute('SELECT COUNT(*) FROM raw_events').fetchone()[0]:,} rows in DuckDB")

# 2. Now do ALL transformations inside DuckDB — fast, memory-efficient, reusable
con.execute("""
CREATE OR REPLACE TABLE fact_impressions AS
SELECT 
    *,
    -- Revenue + plan logic in pure SQL (set-based = 10-50× faster than pandas loc/mask)
    CASE 
        WHEN conversion = 1 THEN 
            CASE WHEN random() < 0.7 THEN 12.99 ELSE 119.99 END
        ELSE 0.0 
    END AS revenue,
    
    CASE 
        WHEN conversion = 1 AND random() < 0.7 THEN 'monthly'
        WHEN conversion = 1 THEN 'yearly'
        ELSE NULL 
    END AS plan
    
FROM raw_events
""")

# Quick sanity checks
print("\nRevenue distribution:")
con.execute("SELECT revenue, plan, COUNT(*) FROM fact_impressions WHERE conversion=1 GROUP BY ALL ORDER BY revenue").df()

print("\nSample converting users:")
con.sql("SELECT * FROM fact_impressions WHERE conversion=1 LIMIT 10").show()

print("\nDone — everything lives in mta_subs.duckdb and loads instantly next time!")

Loading your real PCB/Criteo MTA dataset directly into DuckDB...


InvalidInputException: Invalid Input Error: Error when sniffing file "data/pcb_dataset_final.tsv".
It was not possible to automatically detect the CSV parsing dialect
The search space used was:
Delimiter Candidates: '	'
Quote/Escape Candidates: ['(no quote)','(no escape)'],['"','(no escape)'],['"','"'],['"','''],['"','\'],[''','(no escape)'],[''','''],[''','"'],[''','\']
Comment Candidates: '\0', '#'
Encoding: utf-8
Possible fixes:
* Disable the parser's strict mode (strict_mode=false) to allow reading rows that do not comply with the CSV standard.
* Columns are set as: "columns = { 'timestamp' : 'BIGINT', 'conversion_timestamp' : 'BIGINT', 'conversion' : 'INTEGER', 'user_id' : 'VARCHAR', 'campaign' : 'VARCHAR', 'channel' : 'VARCHAR'}", and they contain: 6 columns. It does not match the number of columns found by the sniffer: 22. Verify the columns parameter is correctly set.
* Make sure you are using the correct file encoding. If not, set it (e.g., encoding = 'utf-16').
* Delimiter is set to '	'. Consider unsetting it.
* Set quote (e.g., quote='"')
* Set escape (e.g., escape='"')
* Set comment (e.g., comment='#')
* Set skip (skip=${n}) to skip ${n} lines at the top of the file
* Enable ignore errors (ignore_errors=true) to ignore potential errors
* Enable null padding (null_padding=true) to pad missing columns with NULL values
* Check you are using the correct file compression, otherwise set it (e.g., compression = 'zstd')
* Be sure that the maximum line size is set to an appropriate value, otherwise set it (e.g., max_line_size=10000000)


LINE 11: FROM read_csv(
              ^

In [None]:
print("Loading your real PCB/Criteo MTA dataset...")

con = duckdb.connect('mta_subs.duckdb')

# Load your actual file
df = pd.read_csv(
    "data/pcb_dataset_final.tsv",
    sep='\t',
    nrows=500_000,        # Fast on your laptop, still 500k real journeys
    low_memory=False
)

# Fix timestamp (it's Unix seconds)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df['conversion_timestamp'] = pd.to_datetime(df['conversion_timestamp'].replace(-1, None), unit='s')
df['conversion'] = df['conversion'].astype(int)

print(df['conversion'].sum())

# Add revenue column: only for rows where conversion == 1 assign 12.99 or 119.99 at random
df['revenue'] = 0.0
mask = df['conversion'] == 1
df.loc[mask, 'revenue'] = np.random.choice([12.99, 119.99], size=mask.sum())

print("Data type of df['revenue']:", df['revenue'].dtype)
print("\nUnique values and their counts:")
print(df['revenue'].value_counts().sort_index())

# Add plan column: 'monthly' for 12.99, 'yearly' for 119.99
df['plan'] = np.where(df['revenue'] == 12.99, 'monthly', 
                      np.where(df['revenue'] == 119.99, 'yearly', ''))
display(df[df['conversion'] == 1].head(10))

display(df.head())


# Create fact impression table
con.execute("CREATE OR REPLACE TABLE fact_impressions AS SELECT * FROM df")
print('done1')


Loading your real PCB/Criteo MTA dataset directly into DuckDB...


InvalidInputException: Invalid Input Error: Error when sniffing file "data/pcb_dataset_final.tsv".
It was not possible to automatically detect the CSV parsing dialect
The search space used was:
Delimiter Candidates: '	'
Quote/Escape Candidates: ['(no quote)','(no escape)'],['"','(no escape)'],['"','"'],['"','''],['"','\'],[''','(no escape)'],[''','''],[''','"'],[''','\']
Comment Candidates: '\0', '#'
Encoding: utf-8
Possible fixes:
* Disable the parser's strict mode (strict_mode=false) to allow reading rows that do not comply with the CSV standard.
* Columns are set as: "columns = { 'timestamp' : 'BIGINT', 'conversion_timestamp' : 'BIGINT', 'conversion' : 'INTEGER', 'user_id' : 'VARCHAR', 'campaign' : 'VARCHAR', 'channel' : 'VARCHAR'}", and they contain: 6 columns. It does not match the number of columns found by the sniffer: 22. Verify the columns parameter is correctly set.
* Make sure you are using the correct file encoding. If not, set it (e.g., encoding = 'utf-16').
* Delimiter is set to '	'. Consider unsetting it.
* Set quote (e.g., quote='"')
* Set escape (e.g., escape='"')
* Set comment (e.g., comment='#')
* Set skip (skip=${n}) to skip ${n} lines at the top of the file
* Enable ignore errors (ignore_errors=true) to ignore potential errors
* Enable null padding (null_padding=true) to pad missing columns with NULL values
* Check you are using the correct file compression, otherwise set it (e.g., compression = 'zstd')
* Be sure that the maximum line size is set to an appropriate value, otherwise set it (e.g., max_line_size=10000000)


LINE 11: FROM read_csv(
              ^

In [21]:
# Group by 'campaign', count the number of unique 'uid's in each group, and sort descending
campaign_counts = df.groupby('campaign')['uid'].nunique().reset_index(name='unique_uid_count')
campaign_counts_sorted = campaign_counts.sort_values('unique_uid_count', ascending=False)
campaign_counts_sorted['per'] = campaign_counts_sorted['unique_uid_count'] / campaign_counts_sorted['unique_uid_count'].sum()
display(campaign_counts_sorted.tail(5))


Unnamed: 0,campaign,unique_uid_count,per
639,31427832,4,1e-05
274,13442441,4,1e-05
239,12042373,3,7e-06
44,2716294,2,5e-06
242,12100671,1,2e-06


In [23]:
print(df.columns)

Index(['timestamp', 'uid', 'campaign', 'conversion', 'conversion_timestamp',
       'conversion_id', 'attribution', 'click', 'click_pos', 'click_nb',
       'cost', 'cpo', 'time_since_last_click', 'cat1', 'cat2', 'cat3', 'cat4',
       'cat5', 'cat6', 'cat7', 'cat8', 'cat9', 'revenue', 'plan'],
      dtype='object')


# EXPLORE DATA

In [25]:
print("3. Acquisition Channel Performance (Sorted by CVR)")
channels = con.execute("""
    SELECT 
        campaign,
        COUNT(*) AS volume,
        ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM fact_impressions), 1) AS pct_of_total,
        SUM(CASE WHEN conversion THEN 1 ELSE 0 END) AS conversions,
        ROUND(100.0 * SUM(CASE WHEN conversion THEN 1.0 ELSE 0 END) / COUNT(*), 3) AS cvr_pct,
        ROUND(AVG(CASE WHEN conversion THEN revenue END), 1) AS ltv
    FROM fact_impressions
    GROUP BY 1
    ORDER BY cvr_pct DESC
""").fetchdf()
print(channels.to_string(index=False))


3. Acquisition Channel Performance (Sorted by CVR)
 campaign  volume  pct_of_total  conversions  cvr_pct   ltv
 21898401      13           0.0          7.0   53.846  58.8
 30405203     767           0.2        270.0   35.202  62.5
  5544859    1571           0.3        483.0   30.745  64.8
 18091420     139           0.0         33.0   23.741  64.9
 29036280      13           0.0          3.0   23.077  84.3
  3694557      84           0.0         19.0   22.619  86.2
 17710659     220           0.0         47.0   21.364  65.4
 17710661     281           0.1         60.0   21.352  64.7
 26891650     764           0.2        162.0   21.204  67.2
 32368244    6737           1.3       1426.0   21.167  64.8
 24843272     329           0.1         69.0   20.973  70.4
  2869134    2133           0.4        441.0   20.675  63.9
  9100692    1208           0.2        246.0   20.364  71.3
  6886825       5           0.0          1.0   20.000 120.0
 30534043     169           0.0         33.0   19

In [26]:
print("\n4. Customer Journey Depth → Conversion Rate & LTV by # of Touches (Your Signature Insight)")
journey = con.execute("""
    WITH user_journey AS (
        SELECT 
            uid,
            COUNT(*) AS touches,
            MAX(CASE WHEN conversion THEN 1 ELSE 0 END) AS converted,
            MAX(CASE WHEN conversion THEN revenue END) AS revenue
        FROM fact_impressions
        GROUP BY uid
    )
    SELECT 
        touches,
        COUNT(*) AS users,
        SUM(converted) AS converters,
        ROUND(100.0 * SUM(converted) / COUNT(*), 2) AS conversion_rate_pct,
        ROUND(AVG(revenue), 1) AS avg_ltv
    FROM user_journey
    GROUP BY touches
    ORDER BY touches
""").fetchdf()
print(journey.to_string(index=False))


4. Customer Journey Depth → Conversion Rate & LTV by # of Touches (Your Signature Insight)
 touches  users  converters  conversion_rate_pct  avg_ltv
       1 334992     12513.0                 3.74     66.4
       2  50219      4118.0                 8.20     74.2
       3  11943      1579.0                13.22     83.7
       4   3565       636.0                17.84     92.7
       5   1322       325.0                24.58     98.9
       6    510       132.0                25.88    106.2
       7    243        54.0                22.22    112.1
       8    114        42.0                36.84    102.2
       9     60        24.0                40.00    115.5
      10     50        17.0                34.00    120.0
      11     23        10.0                43.48    120.0
      12     19         7.0                36.84    120.0
      13     10         3.0                30.00    120.0
      14      9         2.0                22.22    120.0
      15      5         0.0           

# MTA and App

In [29]:
import pandas as pd
from pychattr.channel_attribution import MarkovModel

# Transform your touchpoint data into journey paths
# Step 1: Filter for converted journeys only
converted_journeys = df[df['conversion'] == 1].copy()

# Step 2: Sort by user and timestamp to get correct journey sequence
converted_journeys = converted_journeys.sort_values(['uid', 'conversion_id', 'timestamp'])

# Step 2.5: Calculate total impressions (clicks) per campaign from original df
campaign_stats = df.groupby('campaign').agg({
    'click': 'sum',  # Total impressions/clicks per campaign
    'conversion': 'sum'  # Total conversions per campaign
}).rename(columns={'conversion': 'original_conversions'})
campaign_stats['original_conversion_rate'] = (
    campaign_stats['original_conversions'] / campaign_stats['click'] * 100
)

# Step 3: Create journey paths by grouping touchpoints per conversion
journey_df = (
    converted_journeys
    .groupby(['uid', 'conversion_id'])
    .agg({
        'campaign': lambda x: ' >>> '.join(x.astype(str)),  # Convert to string first
        'conversion': 'max',  # Should be 1 for all
        'revenue': 'sum'  # Total revenue for this conversion
    })
    .rename(columns={
        'campaign': 'path',
        'conversion': 'conversions'
    })
    .reset_index(drop=True)
)

# Step 4: Aggregate identical paths (optional but recommended for performance)
journey_aggregated = (
    journey_df
    .groupby('path')
    .agg({
        'conversions': 'sum',
        'revenue': 'sum'
    })
    .reset_index()
)

print("Journey Data Sample:")
journey_aggregated_sorted = journey_aggregated.sort_values('conversions', ascending=False)
print(f"\nTotal unique paths: {len(journey_aggregated)}")
print(f"Total conversions: {journey_aggregated['conversions'].sum()}")
print(f"Total revenue: ${journey_aggregated['revenue'].sum():,.2f}")

# Step 5: Run the Markov Model
mm = MarkovModel(
    path_feature="path",
    conversion_feature="conversions",
    revenue_feature="revenue",
    separator=" >>> ",
    k_order=1,  # First-order Markov chain
    n_simulations=10000,
    return_transition_probs=True,
    random_state=42
)

# Fit the model
mm.fit(journey_aggregated)

# Get attribution results
attribution_results = mm.attribution_model_

# Convert campaign_stats index to string to match channel_name type
campaign_stats.index = campaign_stats.index.astype(str)

# Now the merge will work since both are strings
attribution_results = attribution_results.merge(
    campaign_stats[['click', 'original_conversions', 'original_conversion_rate']],
    left_on='channel_name',
    right_index=True,
    how='left'
)

# Calculate attributed conversion rate (attributed conversions / total impressions)
attribution_results['attributed_conversion_rate'] = (
    attribution_results['total_conversions'] / attribution_results['click'] * 100
).round(2)
print("adj conv: " , attribution_results['total_conversions'].sum())
print("\n" + "="*60)
print("ATTRIBUTION RESULTS:")
print("="*60)
print(attribution_results)

# Additional useful metrics
print("\n" + "="*60)
print("CHANNEL PERFORMANCE SUMMARY:")
print("="*60)
attribution_summary = attribution_results.sort_values('total_conversions', ascending=False)
print(attribution_summary[[
    'channel_name', 
    'click',
    'original_conversions',
    'total_conversions', 
    'original_conversion_rate',
    'attributed_conversion_rate'
]].to_string(index=False))

Journey Data Sample:

Total unique paths: 1359
Total conversions: 20178
Total revenue: $1,634,642.79
adj conv:  20178.0

ATTRIBUTION RESULTS:
    channel_name  total_conversions  total_revenue  click  \
0       10002432          10.194587     661.689258  172.0   
1       10013589          14.563695     973.440556   82.0   
2       10055720           1.456370      94.527037   29.0   
3       10123946           7.281848     340.475070   99.0   
4            >>>        2408.835222  205943.967999    NaN   
..           ...                ...            ...    ...   
621      9500303          13.107326     913.665286  176.0   
622      9551057           1.456370     119.880205   13.0   
623      9700342           1.456370      18.467532   44.0   
624      9700343           1.456370      94.527037  120.0   
625      9810200          52.429304    4720.481783  728.0   

     original_conversions  original_conversion_rate  \
0                    16.0                  9.302326   
1              