In [1]:
"""
Global correlation baselines. 

My script will reuse the loaders and helpers from `correlation_analysis` to compute: 

1. For each song pair (PAIR00X):
    - Pearson correlation between original and derived for that metric (weekly changes if configured as weekly=True).
    - Average Pearson correlation between those two tracks and set of random other tracks in the dataset overlapping time windows. 
    
2. For each event_category:
    - Average pair correlation. 
    - Comparison to global averages (across all the pairs and across all random baselines). 
"""

'\nGlobal correlation baselines. \n\nMy script will reuse the loaders and helpers from `correlation_analysis` to compute: \n\n1. For each song pair (PAIR00X):\n    - Pearson correlation between original and derived for that metric (weekly changes if configured as weekly=True).\n    - Average Pearson correlation between those two tracks and set of random other tracks in the dataset overlapping time windows. \n    \n2. For each event_category:\n    - Average pair correlation. \n    - Comparison to global averages (across all the pairs and across all random baselines). \n'

In [2]:
from __future__ import annotations
from typing import Dict, Tuple, Optional, List

In [3]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import pearsonr, ttest_rel

In [4]:
from correlation_analysis import (
    load_song_pairs,
    load_metric,
    weekly_change,
    reports_dir,
)

In [5]:
## This has to match the "metric" string used in analyze_pair's rows.
metric_label = "Spotify Streams (Weekly Changes)"

## Monte Carlo bootstrap settings. 
n_trials = 30 ## Monte Carlo trials/pair
n_sample = 30 ## Random tracks/trial
n_bootstrap = 400 ## Bootstrap resamples for CI.
min_overlap_points = 5 ## minimum weekly overlap for valid correlation
random_seed = 42 ## for reproducibility. 

In [6]:
global_corr_dir = reports_dir / "global_correlation"
global_corr_dir.mkdir(parents=True, exist_ok=True)

In [7]:
baseline_jobs = [
    {
        "platform": "spotify",
        "metric": "streams",
        "weekly": True,
        "label": "Spotify Streams (Weekly Changes)",
        "output_prefix": "spotify_streams",
    },
    {
        "platform": "spotify",
        "metric": "popularity",
        "weekly": False,  # use levels, not weekly changes
        "label": "Spotify Popularity",
        "output_prefix": "spotify_popularity",
    },
    {
        "platform": "youtube",
        "metric": "views",
        "weekly": True,
        "label": "YouTube Views (Weekly Changes)",
        "output_prefix": "youtube_views",
    },
    {
        "platform": "youtube",
        "metric": "likes",
        "weekly": True,
        "label": "YouTube Likes (Weekly Changes)",
        "output_prefix": "youtube_likes",
    },
]

In [8]:
TrackKey = Tuple[str, str] ## (pair_id, role: either `original` or `derived`)

In [9]:
def compute_all_metrics(song_pairs: pd.DataFrame, platform: str, metric: str, weekly: bool) -> Dict[TrackKey, Optional[pd.DataFrame]]:
    series: Dict[TrackKey, Optional[pd.DataFrame]] = {}
    
    pair_ids = sorted(song_pairs["pair_id"].astype(str).str.strip().unique())
    roles = ["original", "derived"]
    
    for pid in pair_ids:
        for role in roles:
            key: TrackKey = (pid, role)
            
            ## Loading daily metric:
            df_daily = load_metric(pid, role, platform, metric)
            
            if df_daily is None or df_daily.empty:
                series[key] = None
                continue
                
            if not np.issubdtype(df_daily["date"].dtype, np.datetime64):
                df_daily = df_daily.copy()
                df_daily["date"] = pd.to_datetime(df_daily["date"])
            
            ## Converting weekly changes:
            if weekly:
                df_series = weekly_change(df_daily)
                # weekly_change is expected to return either None or ["date","value"]
                if df_series is None or df_series.empty:
                    series[key] = None
                    continue
            else:
                df_series = df_daily.copy()
                if "value" not in df_series.columns:
                    # Main heuristic, here, is to pick the first non-date numeric column as a metric.
                    value_cols = [c for c in df_series.columns if c != "date"]
                    num_cols = df_series[value_cols].select_dtypes(include=[np.number]).columns.tolist()
                    chosen = None
                    if "value" in num_cols:
                        chosen = "value"
                    elif num_cols:
                        chosen = num_cols[0]
                    elif value_cols:
                        chosen = value_cols[0]
                    else:
                        series[key] = None
                        continue

                    if chosen != "value":
                        df_series = df_series.rename(columns={chosen: "value"})

                # Keepingy only the date and the value.
                df_series = df_series[["date", "value"]]

            series[key] = df_series
            
    return series     

In [10]:
def corr(df_a: Optional[pd.DataFrame], df_b: Optional[pd.DataFrame], min_overlap: int = min_overlap_points,) -> float:
    ## Computing Pearson correlation on two weekly series on `date`. 
    if df_a is None or df_b is None:
        return float("nan")
    
    ## Just to make sure date is in `datetime` format. 
    for df in (df_a, df_b):
        if not np.issubdtype(df["date"].dtype, np.datetime64):
            df["date"] = pd.to_datetime(df["date"])
        
    merged = df_a.merge(df_b, on="date", suffixes=("_a", "_b")).dropna()
    if len(merged) < min_overlap:
        return float("nan")
    
    x = merged["value_a"].to_numpy()
    y = merged["value_b"].to_numpy()

    # If either series is constant, Pearson is undefined, hence it return NaN without warning.
    if len(x) == 0 or len(y) == 0:
        return float("nan")
    if np.all(x == x[0]) or np.all(y == y[0]):
        return float("nan")
    
    r, _ = pearsonr(x, y)
    return float(r)

In [11]:
def bootstrap_ci(data, n_boot: int = n_bootstrap, ci=95):
    """
    Compute bootstrap confidence interval for the mean of data. 
    Returns (lower_bound, upper_bound)
    """
    if not data or len(data) == 0:
        return (np.nan, np.nan)
    
    boot_means = []
    data = np.array(data)
    
    for _ in range(n_boot):
        sample = np.random.choice(data, size=len(data), replace=True)
        boot_means.append(np.mean(sample))
    
    alpha = (100 - ci) / 100 / 2
    lower = np.quantile(boot_means, alpha)
    upper = np.quantile(boot_means, 1 - alpha)
    
    return lower, upper

In [12]:
def monte_carlo_random_baseline(series: Dict[TrackKey, Optional[pd.DataFrame]], key_orig: TrackKey, key_deriv: TrackKey, pool: List[TrackKey], n_trials: int = n_trials, n_sample: int = n_sample,) -> tuple[float, List[float]]:
    """
    Monte Carlo simulation of the random baseline. 
    Repeats random sampling N # of times and averages the resulting mean correlations. 
    Returns:
        - mc_mean (float)
        - all_corrs (list of all correlation values across all trials)
    """
    all_corrs:  List[float] = []
    
    for _ in range(n_trials):
        if not pool: 
            break
            
        sample_keys = random.sample(pool, min(n_sample, len(pool)))
        
        for k in sample_keys:
            s_other = series.get(k)
            
            if s_other is None:
                continue
                
            c1 = corr(series[key_orig], s_other)
            if not np.isnan(c1):
                all_corrs.append(c1)
                
            c2 = corr(series[key_deriv], s_other)
            if not np.isnan(c2):
                all_corrs.append(c2)
        
    if len(all_corrs) == 0:
            return np.nan, []
        
    return float(np.mean(all_corrs)), all_corrs

In [13]:
def compute_global_baseline(platform: str, metric: str, weekly: bool, label: str, output_prefix: str,) -> None:
    random.seed(random_seed)
    
    ## Loading of metadata and events. 
    
    song_pairs = load_song_pairs()
    
    print(f"\n=== Baseline for {label} ({platform}:{metric}, weekly={weekly}) ===")
    series = compute_all_metrics(song_pairs, platform=platform, metric=metric, weekly=weekly)
    
    all_track_keys: List[TrackKey] = [
        key for key, df in series.items()
        if df is not None and len(df) >= min_overlap_points
    ]
    
    if not all_track_keys:
        print("No usable weekly stream series has been found.")
        return
    
    print(f"Total usable track series for my baselines: {len(all_track_keys)}")
    
    ## Store pair-level results
    pair_level_rows: List[Dict[str, object]] = []
    
    ## Storing all baseline correlations across all pairs
    baseline_corrs: List[float] = []
    
    unique_pairs = sorted(song_pairs["pair_id"].astype(str).str.strip().unique())
    print(f"Found {len(unique_pairs)} pairs in song_pairs.csv")
    
    for pid in unique_pairs:
        pid = str(pid).strip()
            
        ## Obtaining weekly series for original and derived. 
        
        key_orig: TrackKey = (pid, "original")
        key_deriv: TrackKey = (pid, "derived")
        s_orig = series.get(key_orig)
        s_deriv = series.get(key_deriv)
        
        if s_orig is None or s_deriv is None:
            print(f"{pid}: Missing weekly streams for either original/derived; Skipping random baseline for this pair")
            continue
            
        pearson_pair = corr(s_orig, s_deriv)
        if np.isnan(pearson_pair):
            print(f"{pid}: Not enough overlap between original and derived; skipping.")
            continue
            
        ## Building random pool excluding this pair's two tracks. 
        pool = [k for k in all_track_keys if k not in (key_orig, key_deriv)]
        if not pool:
            print(f"{pid}: No remaining tracks for random sampling; skipping")
            continue
            
        ## Monte Carlo Baseline 
        
        mc_mean, mc_corrs = monte_carlo_random_baseline(series, key_orig, key_deriv, pool, n_trials=50, n_sample=40)
        
        if len(mc_corrs) == 0:
            print(f"{pid}: No valid Monte Carlo baseline correlations were computed/calculated.")
            continue
        
        ## Bootstrap CI
        
        ci_low, ci_high = bootstrap_ci(mc_corrs, n_boot=500, ci=95)
        
        random_avg = mc_mean
        uplift = pearson_pair - random_avg
        n_random_corrs = len(mc_corrs)
        
        ## Extending global baseline correlations (across all pairs)
        
        baseline_corrs.extend(mc_corrs)

        # Obtaining `event_category` from `song_pairs`
        meta = song_pairs[song_pairs["pair_id"].astype(str).str.strip() == pid]
        event_category = meta["event_category"].iloc[0] if not meta.empty else ""

        pair_level_rows.append(
            {
                "pair_id": pid,
                "event_category": event_category,
                "pearson_pair": pearson_pair,
                "random_avg": random_avg,
                "random_ci_low": ci_low,
                "random_ci_high": ci_high,
                "uplift_vs_random": uplift,
                "n_random_tracks_sampled": n_sample,
                "n_random_correlations": n_random_corrs,
            }
        )
    
    ## Saving pair-level results to a df. 
    pair_random_df = pd.DataFrame(pair_level_rows)
    out_pairs = global_corr_dir / f"{output_prefix}_pair_random_baseline.csv"
    pair_random_df.to_csv(out_pairs, index=False)
    print(f"Saved pair-level random baseline results -> {out_pairs}")

    ## Global random baseline average computation.
    if baseline_corrs:
        global_random_avg = float(np.mean(baseline_corrs))
        print(f"Global random baseline Pearson (all comparisons): {global_random_avg:.3f}")
    else:
        global_random_avg = float("nan")
        print("No global random baseline correlations computed.")
        
    ## Event-category summary & global comparison.
    
    if not pair_random_df.empty:
        ## Global mean of pair correlations.
        valid_pairs = pair_random_df.dropna(subset=["pearson_pair"])
        if not valid_pairs.empty:
            global_pair_mean = float(valid_pairs["pearson_pair"].mean())
        else:
            global_pair_mean = float("nan")
            
        ## Group-by-event category.
        grp = (pair_random_df.groupby("event_category", dropna=False).agg(
            n_pairs=("pair_id", "nunique"), 
            avg_pair_corr=("pearson_pair", "mean"),
            avg_random_corr=("random_avg", "mean"),).reset_index())
        
        ## Comparing category averages to global means. 
        
        grp["uplift_vs_global_pair_mean"] = grp["avg_pair_corr"] - global_pair_mean
        grp["uplift_vs_global_random_mean"] = grp["avg_pair_corr"] - global_random_avg
        
        out_ec = global_corr_dir / f"{output_prefix}_correlation_summary_spotify_streams.csv"
        grp.to_csv(out_ec, index=False)
        print(f"Saved event-category summary -> {out_ec}")

        # Optional: paired t-test (pair vs random) across all pairs
        valid_for_ttest = pair_random_df.dropna(subset=["pearson_pair", "random_avg"])
        if len(valid_for_ttest) >= 2:
            t_res = ttest_rel(valid_for_ttest["pearson_pair"], valid_for_ttest["random_avg"])
            print(
                f"\nPaired t-test (pearson_pair vs random_avg): "
                f"t = {t_res.statistic:.3f}, p = {t_res.pvalue:.4g}, "
                f"n = {len(valid_for_ttest)} pairs"
            )
        else:
            print("\nNot enough pairs with both pearson_pair and random_avg for a paired t-test.")
    else:
        print("No pair-level rows computed; hence, I am skipping event-category summary.")
    
    ## Visualization of Baseline Distribution

    plt.figure(figsize=(10, 6))
    sns.histplot(baseline_corrs, kde=True, bins=30)
    plt.title(f"Distribution of Random Baseline Correlations {label}")
    plt.xlabel("Pearson Correlation")
    plt.ylabel("Frequency")

    out_plot = global_corr_dir / f"{output_prefix}_baseline_distribution.png"
    plt.savefig(out_plot, dpi=300, bbox_inches="tight")
    plt.close()

    print(f"Saved baseline distribution figure -> {out_plot}")

In [14]:
if __name__ == "__main__":
    for job in baseline_jobs:
        compute_global_baseline(
            platform=job["platform"],
            metric=job["metric"],
            weekly=job["weekly"],
            label=job["label"],
            output_prefix=job["output_prefix"],
        )


=== Baseline for Spotify Streams (Weekly Changes) (spotify:streams, weekly=True) ===
Total usable track series for my baselines: 50
Found 25 pairs in song_pairs.csv
PAIR023: Not enough overlap between original and derived; skipping.
Saved pair-level random baseline results -> ../reports/visualizations/global_correlation/spotify_streams_pair_random_baseline.csv
Global random baseline Pearson (all comparisons): 0.078
Saved event-category summary -> ../reports/visualizations/global_correlation/spotify_streams_correlation_summary_spotify_streams.csv

Paired t-test (pearson_pair vs random_avg): t = -0.924, p = 0.3651, n = 24 pairs
Saved baseline distribution figure -> ../reports/visualizations/global_correlation/spotify_streams_baseline_distribution.png

=== Baseline for Spotify Popularity (spotify:popularity, weekly=False) ===
Total usable track series for my baselines: 50
Found 25 pairs in song_pairs.csv
PAIR003: Not enough overlap between original and derived; skipping.
Saved pair-level