In [26]:
import os
import sys
import pandas as pd
from dotenv import load_dotenv
import snowflake.connector

# Load environment variables from a .env file
load_dotenv()

def connect_to_snowflake():
    """Establishes a connection to Snowflake using environment variables."""
    # ... (this function is unchanged) ...
    try:
        conn = snowflake.connector.connect(
            user=os.getenv('SNOWFLAKE_USER'),
            password=os.getenv('SNOWFLAKE_PASSWORD'),
            account=os.getenv('SNOWFLAKE_ACCOUNT'),
            warehouse=os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH'),
            database='INCREMENTALITY',
            schema='INCREMENTALITY_RESEARCH'
        )
        print("✅ Connected to Snowflake")
        return conn
    except snowflake.connector.Error as e:
        print(f"❌ Could not connect to Snowflake: {e}", file=sys.stderr)
        return None

def build_power_player_panel_query(prior_period_start, prior_period_end, analysis_period_start, analysis_period_end, threshold=0.80):
    """
    Builds a SQL query to create a panel for an analysis period, filtered by a
    two-stage process: first top vendors, then top users on those vendors.
    """
    query = f"""
    WITH
    -- Step 1: Create a universal Product -> Vendor map for attribution
    PRODUCT_VENDOR_MAP AS (
        SELECT DISTINCT PRODUCT_ID, VENDOR_ID
        FROM CLICKS
        WHERE PRODUCT_ID IS NOT NULL AND VENDOR_ID IS NOT NULL
    ),

    -- Step 2: Calculate all revenue in the PRIOR period (May) to find top vendors
    MAY_REVENUE AS (
        SELECT
            p.USER_ID,
            pvm.VENDOR_ID,
            COALESCE(SUM(p.QUANTITY * p.UNIT_PRICE), 0) AS revenue_cents
        FROM PURCHASES AS p
        JOIN PRODUCT_VENDOR_MAP AS pvm ON p.PRODUCT_ID = pvm.PRODUCT_ID
        WHERE p.PURCHASED_AT >= '{prior_period_start}' AND p.PURCHASED_AT < '{prior_period_end}'
        GROUP BY 1, 2
    ),

    -- Step 3: Identify TOP VENDORS from May who make up the top 80% of revenue
    TOP_VENDORS AS (
        SELECT VENDOR_ID
        FROM (
            SELECT
                VENDOR_ID,
                SUM(revenue_cents) AS total_vendor_revenue,
                SUM(total_vendor_revenue) OVER (ORDER BY total_vendor_revenue DESC) / SUM(total_vendor_revenue) OVER () AS cumulative_share
            FROM MAY_REVENUE
            GROUP BY VENDOR_ID
        )
        WHERE cumulative_share <= {threshold}
    ),

    -- Step 4: Now, find TOP USERS who spent the most AT THE TOP VENDORS in May
    TOP_USERS AS (
        SELECT USER_ID
        FROM (
            SELECT
                USER_ID,
                SUM(revenue_cents) AS total_user_revenue_at_top_vendors,
                SUM(total_user_revenue_at_top_vendors) OVER (ORDER BY total_user_revenue_at_top_vendors DESC) / SUM(total_user_revenue_at_top_vendors) OVER () AS cumulative_share
            FROM MAY_REVENUE
            WHERE VENDOR_ID IN (SELECT VENDOR_ID FROM TOP_VENDORS) -- Filter for spend at top vendors
            GROUP BY USER_ID
        )
        WHERE cumulative_share <= {threshold}
    ),

    -- Step 5: Aggregate clicks in the ANALYSIS period (June-July) for this "power player" population
    CLICKS_U_V_W AS (
        SELECT
            USER_ID,
            VENDOR_ID,
            DATE_TRUNC('WEEK', OCCURRED_AT) AS week,
            COUNT(DISTINCT INTERACTION_ID) AS click_count
        FROM CLICKS
        WHERE
            OCCURRED_AT >= '{analysis_period_start}' AND OCCURRED_AT < '{analysis_period_end}'
            AND USER_ID IN (SELECT USER_ID FROM TOP_USERS)
            AND VENDOR_ID IN (SELECT VENDOR_ID FROM TOP_VENDORS)
        GROUP BY 1, 2, 3
    ),

    -- Step 6: Aggregate purchases in the ANALYSIS period for the "power player" population
    PURCHASES_U_V_W AS (
        SELECT
            p.USER_ID,
            pvm.VENDOR_ID,
            DATE_TRUNC('WEEK', p.PURCHASED_AT) AS week,
            COUNT(DISTINCT p.PURCHASE_ID) AS purchase_count,
            COALESCE(SUM(p.QUANTITY * p.UNIT_PRICE), 0) AS total_revenue_cents
        FROM PURCHASES AS p
        JOIN PRODUCT_VENDOR_MAP AS pvm ON p.PRODUCT_ID = pvm.PRODUCT_ID
        WHERE
            p.PURCHASED_AT >= '{analysis_period_start}' AND p.PURCHASED_AT < '{analysis_period_end}'
            AND p.USER_ID IN (SELECT USER_ID FROM TOP_USERS)
            AND pvm.VENDOR_ID IN (SELECT VENDOR_ID FROM TOP_VENDORS)
        GROUP BY 1, 2, 3
    )

    -- Final Step: Join the analysis period data into the final panel
    SELECT
        COALESCE(c.week, p.week) AS week,
        COALESCE(c.user_id, p.user_id) AS user_id,
        COALESCE(c.vendor_id, p.vendor_id) AS vendor_id,
        COALESCE(c.click_count, 0) AS clicks,
        COALESCE(p.purchase_count, 0) AS purchases,
        (COALESCE(p.total_revenue_cents, 0) / 100)::DECIMAL(18, 2) AS revenue_dollars
        
    FROM CLICKS_U_V_W AS c
    FULL OUTER JOIN PURCHASES_U_V_W AS p
        ON c.user_id = p.user_id AND c.vendor_id = p.vendor_id AND c.week = p.week
    ORDER BY
        user_id, vendor_id, week;
    """
    return query

def fetch_panel_data(conn, query):
    # ... (this function is unchanged) ...
    print("Executing query...")
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        if cursor.description:
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(results, columns=columns)
            print(f"✅ Query successful. Fetched {len(df):,} rows.")
            return df
        return pd.DataFrame()
    except snowflake.connector.Error as e:
        print(f"\n❌ ERROR executing query: {e}", file=sys.stderr)
        return pd.DataFrame()
    finally:
        cursor.close()

def process_and_save_data(df, filename):
    # ... (this function is unchanged) ...
    if df.empty:
        print("No data to save.")
        return
    try:
        df.columns = [col.lower() for col in df.columns]
        df.to_parquet(filename, index=False, engine='pyarrow')
        print(f"✅ Data successfully processed and saved to '{filename}'")
    except Exception as e:
        print(f"❌ Failed to save data to '{filename}': {e}", file=sys.stderr)

def main():
    """
    Main function to orchestrate the data extraction and saving process.
    """
    # --- CONFIGURATION ---
    # Define the monthly periods for the analysis
    PRIOR_PERIOD_START = '2025-05-01'
    PRIOR_PERIOD_END = '2025-06-01'
    ANALYSIS_PERIOD_START = '2025-06-01'
    ANALYSIS_PERIOD_END = '2025-08-01' # 2 months: June and July
    
    CUMULATIVE_SHARE_THRESHOLD = 0.80
    
    OUTPUT_FILENAME = "user_vendor_panel_power_players_2mo.parquet"
    # ---------------------

    print(f"--- Generating 2-month panel for TOP {CUMULATIVE_SHARE_THRESHOLD:.0%} of Power Players ---")
    print(f"    - Identifying performers from: {PRIOR_PERIOD_START} to {PRIOR_PERIOD_END}")
    print(f"    - Building panel for period:   {ANALYSIS_PERIOD_START} to {ANALYSIS_PERIOD_END}")

    conn = connect_to_snowflake()
    if not conn:
        sys.exit(1)

    try:
        query = build_power_player_panel_query(
            prior_period_start=PRIOR_PERIOD_START,
            prior_period_end=PRIOR_PERIOD_END,
            analysis_period_start=ANALYSIS_PERIOD_START,
            analysis_period_end=ANALYSIS_PERIOD_END,
            threshold=CUMULATIVE_SHARE_THRESHOLD
        )
        
        panel_df = fetch_panel_data(conn, query)
        
        process_and_save_data(panel_df, OUTPUT_FILENAME)

    finally:
        if conn and not conn.is_closed():
            print("✅ Snowflake connection closed.")

if __name__ == "__main__":
    main()

INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 3.17.3, Python Version: 3.13.7, Platform: macOS-14.5-arm64-arm-64bit-Mach-O
INFO:snowflake.connector.connection:Connecting to GLOBAL Snowflake domain


--- Generating 2-month panel for TOP 80% of Power Players ---
    - Identifying performers from: 2025-05-01 to 2025-06-01
    - Building panel for period:   2025-06-01 to 2025-08-01
✅ Connected to Snowflake
Executing query...
✅ Query successful. Fetched 1,466,907 rows.
✅ Data successfully processed and saved to 'user_vendor_panel_power_players_2mo.parquet'
✅ Snowflake connection closed.


In [27]:
import polars as pl
from tabulate import tabulate

# --- Helper Function (to make this script runnable standalone) ---
def show_table(df, title=""):
    """Prints a Polars DataFrame in a formatted grid table by converting it to Pandas."""
    if title:
        print(f"\n{title}")
        print("="*len(title))
    print(tabulate(df.to_pandas(), headers='keys', tablefmt='grid', showindex=False))

# --- Prerequisite: Load Data ---
try:
    # UPDATED FILENAME for the new power player data
    parquet_filename = 'user_vendor_panel_power_players_2mo.parquet'
    
    print(f"--- Loading data from '{parquet_filename}' ---")
    df_analysis = pl.read_parquet(parquet_filename).with_columns(
        pl.col("revenue_dollars").cast(pl.Float64)
    )
    print(f"✅ Data loaded successfully with {df_analysis.height:,} rows.")
except FileNotFoundError:
    print(f"❌ ERROR: The file '{parquet_filename}' was not found.")
    print("   Please run the new power player data generation script first.")
    df_analysis = None

# --- Main EDA Block ---
if df_analysis is not None and not df_analysis.is_empty():

    # --- EDA 1: High-Level Overview & Panel Structure ---
    print("\n--- EDA 1: High-Level Overview & Panel Structure ---")
    print("Assessing the dimensions and balance of the 2-month power player panel.")

    summary_stats = df_analysis.select(
        pl.len().alias("Total Observations (User-Vendor-Weeks)"),
        pl.col("user_id").n_unique().alias("Unique Power Users"),
        pl.col("vendor_id").n_unique().alias("Unique Power Vendors"),
        pl.col("week").n_unique().alias("Unique Weeks in Panel")
    ).melt().rename({"variable": "Metric", "value": "Value"})
    show_table(summary_stats, "Dataset Dimensions (June-July)")

    # Observations per User
    obs_per_user = df_analysis.group_by("user_id").len().select(pl.col("len").alias("obs_per_user"))
    show_table(obs_per_user.describe(), "Distribution of Observations per User")
    
    # Observations per Vendor
    obs_per_vendor = df_analysis.group_by("vendor_id").len().select(pl.col("len").alias("obs_per_vendor"))
    show_table(obs_per_vendor.describe(), "Distribution of Observations per Vendor")
    print("\nInterpretation: These tables show how active power users and vendors are over the 2-month period.")

    # --- EDA 2: Weekly Platform Trends ---
    print("\n--- EDA 2: Weekly Platform Trends (June-July) ---")
    print("This shows the evolution of the power player ecosystem over time.")
    
    weekly_trends = (
        df_analysis.group_by("week")
        .agg(
            pl.col("revenue_dollars").sum().alias("total_revenue"),
            pl.col("clicks").sum().alias("total_clicks"),
            pl.col("user_id").n_unique().alias("active_users"),
            pl.col("vendor_id").n_unique().alias("active_vendors")
        )
        .sort("week")
    )
    show_table(weekly_trends, "Week-over-Week Platform Metrics")
    print("\nInterpretation: Look for trends, seasonality, or shocks in activity within the power player segment.")

    # --- EDA 3: The Nature of Conversion & 'Treatment' (Clicks) ---
    print("\n--- EDA 3: The Nature of Conversion & 'Treatment' (Clicks) ---")
    
    df_purchasing = df_analysis.filter(pl.col("purchases") > 0)
    
    if df_purchasing.is_empty():
        print("\nWARNING: No purchasing interactions found in the dataset.")
    else:
        zero_click_conversions_pct = df_purchasing.select(
            (pl.col("clicks") == 0).mean() * 100
        ).item()
        print(f"\n'Zero-Click' Conversions: {zero_click_conversions_pct:.2f}% of purchasing events had zero same-week clicks.")
        print("Interpretation: A high percentage confirms that purchase journeys are longer than a single week, even in this long panel.")

        show_table(df_purchasing.select(['clicks', 'purchases', 'revenue_dollars']).describe(), "Distribution for Purchasing Interactions ONLY")

    # --- EDA 4: Player Persistence & Stickiness ---
    print("\n--- EDA 4: Player Persistence & Stickiness ---")
    print("How consistently are power players active from week to week?")

    # Count how many weeks each user/vendor was active
    user_weeks_active = df_analysis.group_by("user_id").agg(pl.col("week").n_unique().alias("weeks_active"))
    vendor_weeks_active = df_analysis.group_by("vendor_id").agg(pl.col("week").n_unique().alias("weeks_active"))
    
    # Show the distribution of active weeks
    show_table(user_weeks_active.get_column("weeks_active").value_counts().sort("weeks_active", descending=True), "User Weekly Activity Distribution")
    show_table(vendor_weeks_active.get_column("weeks_active").value_counts().sort("weeks_active", descending=True), "Vendor Weekly Activity Distribution")
    print("\nInterpretation: This shows loyalty and churn. A high number of users active for only 1-2 weeks indicates high churn, even among power users.")
    
    # --- EDA 5: Top Performer Analysis (over the full 2 months) ---
    print("\n--- EDA 5: Top Performer Analysis (June-July) ---")
    
    # Top Vendors over the full period
    vendor_summary = (
        df_analysis.group_by('vendor_id')
        .agg(
            pl.col('revenue_dollars').sum().alias('total_revenue_2mo'),
            pl.col('clicks').sum().alias('total_clicks_2mo'),
            pl.col('user_id').n_unique().alias('unique_users_reached_2mo')
        )
        .sort('total_revenue_2mo', descending=True)
        .head(15)
    )
    vendor_summary = vendor_summary.with_columns(pl.col('total_revenue_2mo').map_elements(lambda x: f"${x:,.2f}"))
    show_table(vendor_summary, "Top 15 Vendors by Revenue (June-July)")

    # Top Users over the full period
    user_summary = (
        df_analysis.group_by('user_id')
        .agg(
            pl.col('revenue_dollars').sum().alias('total_revenue_2mo'),
            pl.col('clicks').sum().alias('total_clicks_2mo'),
            pl.col('vendor_id').n_unique().alias('vendors_interacted_with_2mo')
        )
        .sort('total_revenue_2mo', descending=True)
        .head(15)
    )
    user_summary = user_summary.with_columns(pl.col('total_revenue_2mo').map_elements(lambda x: f"${x:,.2f}"))
    show_table(user_summary, "Top 15 Users by Revenue (June-July)")
    
    # --- EDA 6: Market Concentration ---
    print("\n--- EDA 6: Market Concentration (June-July) ---")
    
    vendor_revenue = df_analysis.group_by("vendor_id").agg(pl.col("revenue_dollars").sum().alias("total_revenue"))
    total_revenue_in_sample = vendor_revenue.get_column("total_revenue").sum()
    
    if total_revenue_in_sample > 0:
        n_vendors = len(vendor_revenue)
        top_1_pct_vendors = vendor_revenue.sort("total_revenue", descending=True).head(int(n_vendors * 0.01))
        top_5_pct_vendors = vendor_revenue.sort("total_revenue", descending=True).head(int(n_vendors * 0.05))
        
        rev_top_1 = top_1_pct_vendors.get_column("total_revenue").sum()
        rev_top_5 = top_5_pct_vendors.get_column("total_revenue").sum()

        concentration_summary = pl.DataFrame({
            "Group": ["Top 1% of Vendors", "Top 5% of Vendors"],
            "Revenue Share (%)": [
                (rev_top_1 / total_revenue_in_sample) * 100,
                (rev_top_5 / total_revenue_in_sample) * 100
            ]
        })
        show_table(concentration_summary, "Revenue Concentration")
        print("\nInterpretation: High concentration means the market is dominated by a few key players.")

else:
    print("\nCould not perform EDA because the DataFrame is empty or could not be loaded.")

--- Loading data from 'user_vendor_panel_power_players_2mo.parquet' ---
✅ Data loaded successfully with 1,466,907 rows.

--- EDA 1: High-Level Overview & Panel Structure ---
Assessing the dimensions and balance of the 2-month power player panel.

Dataset Dimensions (June-July)
+----------------------------------------+---------+
| Metric                                 |   Value |
| Total Observations (User-Vendor-Weeks) | 1466907 |
+----------------------------------------+---------+
| Unique Power Users                     |   58306 |
+----------------------------------------+---------+
| Unique Power Vendors                   |   11220 |
+----------------------------------------+---------+
| Unique Weeks in Panel                  |      10 |
+----------------------------------------+---------+

Distribution of Observations per User
+-------------+----------------+
| statistic   |   obs_per_user |
| count       |     58306      |
+-------------+----------------+
| null_count  |      

  ).melt().rename({"variable": "Metric", "value": "Value"})


In [5]:
import os
import sys
import polars as pl

# --- 1. Set up rpy2 Environment (Unchanged) ---
def setup_rpy2():
    """Initializes the rpy2 environment and returns key objects."""
    try:
        import rpy2.robjects as ro
        from rpy2.robjects.packages import importr
        from rpy2.robjects import pandas2ri
        from rpy2.robjects.conversion import localconverter
        importr('fixest')
        print("✅ rpy2 environment and 'fixest' library loaded successfully.")
        return True, ro, pandas2ri, localconverter
    except (ImportError, RuntimeError) as e:
        print(f"❌ ERROR: Failed to set up rpy2: {e}", file=sys.stderr)
        return False, None, None, None

# --- 2. Load, Sample, Balance, and Prepare Data ---
def load_and_prepare_data(filename, sample_fraction=1.0):
    """
    Loads, samples users, balances the panel, and prepares variables for a
    distributed lag logit model.
    """
    try:
        print(f"--- Loading unbalanced panel data from '{filename}' ---")
        df_unbalanced = pl.read_parquet(filename)

        if sample_fraction < 1.0:
            print(f"--- Sub-sampling {sample_fraction:.0%} of users... ---")
            unique_users = df_unbalanced.get_column("user_id").unique()
            sampled_users = unique_users.sample(fraction=sample_fraction, shuffle=True)
            df_sampled = df_unbalanced.filter(pl.col("user_id").is_in(sampled_users))
            print(f"✅ Sampled data contains {df_sampled.height:,} rows from {sampled_users.len():,} users.")
        else:
            df_sampled = df_unbalanced
            print("✅ Using full dataset (no sampling).")

        print("--- Balancing the panel for the sampled user-vendor pairs ---")
        all_pairs = df_sampled.select("user_id", "vendor_id").unique()
        all_weeks = df_sampled.select("week").unique()
        df_grid = all_pairs.join(all_weeks, how="cross")
        df_balanced = df_grid.join(
            df_sampled, on=["user_id", "vendor_id", "week"], how="left"
        ).with_columns(
            pl.col("clicks").fill_null(0),
            pl.col("purchases").fill_null(0)
        )
        print(f"✅ Panel balanced. New size: {df_balanced.height:,} rows.")

        print("--- Preparing binary and lagged click variables ---")
        df_prepared = df_balanced.select(
            "user_id", "vendor_id", "week", "purchases", "clicks"
        ).with_columns(
            pl.when(pl.col("purchases") > 0).then(1).otherwise(0).alias("did_purchase"),
            pl.when(pl.col("clicks") > 0).then(1).otherwise(0).alias("had_click")
        )
        
        df_sorted = df_prepared.sort("user_id", "vendor_id", "week")
        
        # --- MODIFIED: No longer need to create did_purchase_lag1 ---
        df_final = df_sorted.with_columns(
            pl.col("had_click").shift(1).over(["user_id", "vendor_id"]).alias("had_click_lag1")
        )
        
        print("✅ Lagged variables created successfully.")
        return df_final

    except FileNotFoundError:
        print(f"❌ ERROR: The file '{filename}' was not found.", file=sys.stderr)
        return None
    except Exception as e:
        print(f"❌ ERROR during data loading or preparation: {e}", file=sys.stderr)
        return None

# --- 3. Run the Distributed Lag Fixed-Effects Logit Model ---
def run_distributed_lag_feglm_model(df, ro, pandas2ri, localconverter):
    """
    Runs a three-way fixed-effects logit model with concurrent and lagged clicks.
    """
    print("\n--- Estimating Distributed Lag Panel Logit Model ---")
    print("Model: Pr(Purchase=1) ~ Had_Click + Had_Click_Lag1 | FEs")
    
    try:
        df_pd = df.to_pandas()
        with localconverter(ro.default_converter + pandas2ri.converter):
            ro.globalenv['df_for_r'] = df_pd

        # --- MODIFIED: Simplified R formula ---
        ro.r("""
        library(fixest)
        df_panel <- df_for_r
        
        # This model estimates the effect of concurrent and lagged clicks,
        # without controlling for the lagged purchase event itself.
        model_dist_lag <- feglm(
            did_purchase ~ had_click + had_click_lag1,
            data = df_panel,
            family = "logit",
            fixef = c("user_id", "vendor_id", "week"),
            vcov = ~user_id,
            glm.iter = 100
        )
        
        print(etable(model_dist_lag, digits = 4))
        """)
        
        print("\n✅ Distributed lag logit model estimated successfully.")

    except Exception as e:
        print(f"❌ ERROR running the feglm model in R: {e}", file=sys.stderr)

# --- Main Execution Block ---
def main():
    """Orchestrates the loading, preparation, and modeling process."""
    FILENAME = 'user_vendor_panel_power_players_2mo.parquet'
    SAMPLE_FRACTION = 0.5 # Use 25% for a faster run. Set to 1.0 for full data.
    
    rpy2_is_ready, ro, pandas2ri, localconverter = setup_rpy2()
    if not rpy2_is_ready:
        sys.exit(1)

    df_analysis = load_and_prepare_data(FILENAME, sample_fraction=SAMPLE_FRACTION)

    if df_analysis is not None and not df_analysis.is_empty():
        run_distributed_lag_feglm_model(df_analysis, ro, pandas2ri, localconverter)
    else:
        print("Skipping model estimation due to data loading issues.")

if __name__ == "__main__":
    main()

✅ rpy2 environment and 'fixest' library loaded successfully.
--- Loading unbalanced panel data from 'user_vendor_panel_power_players_2mo.parquet' ---
--- Sub-sampling 50% of users... ---
✅ Sampled data contains 739,126 rows from 29,153 users.
--- Balancing the panel for the sampled user-vendor pairs ---


Please use `implode` to return to previous behavior.

See https://github.com/pola-rs/polars/issues/22149 for more information.
  df_sampled = df_unbalanced.filter(pl.col("user_id").is_in(sampled_users))


✅ Panel balanced. New size: 6,706,230 rows.
--- Preparing binary and lagged click variables ---
✅ Lagged variables created successfully.

--- Estimating Distributed Lag Panel Logit Model ---
Model: Pr(Purchase=1) ~ Had_Click + Had_Click_Lag1 | FEs


R callback write-console: NOTES: 670,623 observations removed because of NA values (RHS: 670,623).
       17,138/3,287/0 fixed-effects (2,572,740 observations) removed because of only 0 (or only 1) outcomes.
  


                    model_dist_lag
Dependent Var.:       did_purchase
                                  
had_click        1.658*** (0.0168)
had_click_lag1  0.1761*** (0.0252)
Fixed-Effects:  ------------------
user_id                        Yes
vendor_id                      Yes
week                           Yes
_______________ __________________
S.E.: Clustered        by: user_id
Observations             3,462,867
Squared Cor.               0.07374
Pseudo R2                  0.24092
BIC                      573,493.6
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

✅ Distributed lag logit model estimated successfully.
