<a href="https://colab.research.google.com/github/lawrennd/economic-fitness/blob/main/reddit_user_word_complexity_2d.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Wikipedia Editing Data: fitness / complexity analysis

This notebook:

- Downloads a sample of Wikipedia users from BigQuery and aggregates their edits into per-user text.
- Builds a `user` $\times$ `word` matrix and its *support* (analogous to `country` $\times$ `product`).


### BigQuery Setup Instructions

To run the BigQuery query cells in this notebook, you need to have a Google Cloud Project with the BigQuery API enabled and proper authentication setup.

Here's a general guide:

1.  **Google Cloud Account**: If you don't have one, sign up for a Google Cloud account. You might be eligible for a free trial.
    *   [Sign up for Google Cloud](https://cloud.google.com/free)

2.  **Create/Select a Project**: In the [Google Cloud Console](https://console.cloud.google.com/), create a new project or select an existing one.
    *   Ensure that **billing is enabled** for your project, as BigQuery usage incurs costs (though often minimal for small queries, especially with the free tier).

3.  **Enable the BigQuery API**: For your selected project, ensure the BigQuery API is enabled.
    *   Go to the [API Library](https://console.cloud.google.com/apis/library) in the Cloud Console.
    *   Search for "BigQuery API" and enable it if it's not already enabled.

4.  **Authenticate Colab**: In your Colab environment, the `google.colab.auth.authenticate_user()` function (called in Cell 3) will handle the authentication process by prompting you to log in with your Google account. This provides the necessary credentials for BigQuery access.

    Alternatively, if you are working locally or need specific application-default credentials, you might use the `gcloud` CLI:
    ```bash
    gcloud auth application-default login
    ```

Once these steps are complete, you should be able to run the BigQuery cells successfully!

### 0) Setup

You’ll need BigQuery credentials configured locally (e.g. `gcloud auth application-default login`) and permission to access the public dataset `fh-bigquery.reddit_comments`.

If you don’t have BigQuery access, you can still run the later cells by loading a cached dataframe (see the caching cell below).


In [None]:
# Core
import os
from dataclasses import dataclass, field
from typing import Tuple

import numpy as np
import pandas as pd

# Sparse matrices
import scipy.sparse as sp

# Text features
from sklearn.feature_extraction.text import CountVectorizer

# Plotting
import matplotlib.pyplot as plt


In [None]:
@dataclass(frozen=True)
class QueryConfig:
    # Config for Wikipedia query
    max_authors: int = 1000
    min_comments_per_author: int = 20
    max_docs_per_author: int = 2000
    # Optional: List of specific users to prioritize/include if they meet criteria
    specific_users: Tuple[str] = field(default_factory=tuple)

    # Legacy/Unused for Wikipedia
    target_subreddit: str = "datascience"
    start_suffix: str = "2015_01"
    end_suffix: str = "2015_03"
    max_rows: int = 200_000


# Standard random sampling (no specific users)
cfg = QueryConfig()

CACHE_DIR = "data"
os.makedirs(CACHE_DIR, exist_ok=True)

# Updated cache path for Wikipedia data (v4 - random sample)
CACHE_PATH = os.path.join(
    CACHE_DIR,
    f"wikipedia_authors{cfg.max_authors}_v4.parquet",
)

print("Cache path:", CACHE_PATH)

In [None]:
import os
import pandas as pd
import numpy as np
from google.cloud import bigquery
from google.colab import auth
import subprocess


In [None]:
def generate_synthetic_data(cfg):
    """Generate dummy data if BigQuery fails."""
    print("Generating synthetic Wikipedia-like data...")
    rng = np.random.default_rng(24)
    authors = [f"user_{i}" for i in range(cfg.max_authors)]

    # A small vocabulary
    vocab = ["data", "science", "python", "learning", "machine", "big", "query",
             "analysis", "matrix", "complexity", "fitness", "network", "plot",
             "code", "algorithm", "statistics", "neural", "deep", "model",
             "optimization", "linear", "algebra", "visualization", "mining",
             "edit", "wiki", "page", "revision", "history", "link"]

    data = []
    for author in authors:
        # Random text length
        n_words = rng.integers(50, 500)
        words = rng.choice(vocab, size=n_words)
        user_text = " ".join(words)
        n_comments = rng.integers(cfg.min_comments_per_author, 1000)
        data.append({"author": author, "user_text": user_text, "n_comments": n_comments})

    return pd.DataFrame(data)

def load_or_query_wikipedia(cfg: QueryConfig, cache_path: str) -> pd.DataFrame:
    if os.path.exists(cache_path):
        print("Loading cached dataframe…")
        df = pd.read_parquet(cache_path)
        return df

    print("No cache found. Querying BigQuery (Wikipedia)…")

    try:
        auth.authenticate_user()

        # Automate project selection
        print("Searching for a valid BigQuery project...")
        client = None

        try:
            # Get list of projects using gcloud
            proc = subprocess.run(['gcloud', 'projects', 'list', '--format=value(projectId)'], capture_output=True, text=True)
            projects = proc.stdout.strip().split('\n')

            for pid in projects:
                if not pid: continue
                try:
                    print(f"Trying project: {pid}...")
                    c = bigquery.Client(project=pid)
                    c.query("SELECT 1").result()
                    print(f"-> Success! Using project: {pid}")
                    client = c
                    break
                except Exception as e:
                    print(f"   Skipping {pid}: {e}")

            if client is None:
                 raise RuntimeError("Could not find any project with BigQuery enabled.")

        except Exception as e:
            print(f"Automatic project setup failed: {e}")
            raise e

        # Query Wikipedia samples
        # Clean comments: remove /* Section */ markers and HTML tags
        # Filter out bots: exclude usernames ending in "bot"
        QUERY = """
    WITH edits AS (
      SELECT
        contributor_username AS author,
        REGEXP_REPLACE(comment, r'/\\*.*?\\*/|<[^>]+>', '') AS body
      FROM `bigquery-public-data.samples.wikipedia`
      WHERE
        contributor_username IS NOT NULL
        AND comment IS NOT NULL
        AND NOT REGEXP_CONTAINS(LOWER(contributor_username), r'bot$')
    ),
    valid_edits AS (
      SELECT author, body
      FROM edits
      WHERE LENGTH(TRIM(body)) > 10  -- Filter out empty or very short comments after cleaning
    ),
    sampled_authors AS (
      SELECT author
      FROM valid_edits
      GROUP BY author
      HAVING COUNT(*) >= @min_comments_per_author
      ORDER BY RAND()
      LIMIT @max_authors
    )
    SELECT
      author,
      ARRAY_TO_STRING(ARRAY_AGG(body ORDER BY LENGTH(body) DESC LIMIT @max_docs_per_author), '\\n') AS user_text,
      COUNT(*) AS n_comments
    FROM valid_edits
    JOIN sampled_authors
    USING (author)
    GROUP BY author
    ORDER BY n_comments DESC
    """

        job_config = bigquery.QueryJobConfig(
            query_parameters=[
                bigquery.ScalarQueryParameter("max_authors", "INT64", cfg.max_authors),
                bigquery.ScalarQueryParameter("min_comments_per_author", "INT64", cfg.min_comments_per_author),
                bigquery.ScalarQueryParameter("max_docs_per_author", "INT64", cfg.max_docs_per_author),
            ]
        )

        df = client.query(QUERY, job_config=job_config).to_dataframe()

    except Exception as e:
        print(f"\nBigQuery query failed: {e}")
        print("Falling back to synthetic data so you can continue the tutorial.")
        df = generate_synthetic_data(cfg)

    print("Saving cache…")
    df.to_parquet(cache_path, index=False)
    return df



In [None]:
# Force a new cache path to ensure we query real data
print(f"Using cache path: {CACHE_PATH}")

df = load_or_query_wikipedia(cfg, CACHE_PATH)
print(df.head())
print("N users:", len(df))
print(df["n_comments"].describe())

In [None]:
# 2) Build user × word matrix
#
# In the paper's language, we will treat the *support* as M_{uw} = 1{X_{uw} > 0}.
# For the rank-2 extension to be identifiable, it's helpful to keep *counts* (not just binary).
# If you prefer pure support-only (presence/absence), set BINARY=True.

BINARY = False

vectorizer = CountVectorizer(
    lowercase=True,
    stop_words="english",
    min_df=3,          # ignore words used by <3 users
    max_features=5000, # keep it demo-friendly
    binary=BINARY,
)

# df is loaded from the previous cell
X = vectorizer.fit_transform(df["user_text"].fillna(""))
X = X.astype(np.float64)

vocab = vectorizer.get_feature_names_out()
user_ids = df["author"].astype(str).tolist()

print(f"Raw Matrix -> Users: {X.shape[0]}, Vocab: {X.shape[1]}, binary: {BINARY}")

# Support mask (structural zeros off-support)
M = X.copy()
M.data = np.ones_like(M.data)

# Basic margins (analogues of diversification and ubiquity)
user_strength = np.asarray(X.sum(axis=1)).ravel()
word_strength = np.asarray(X.sum(axis=0)).ravel()

print("User strength:", pd.Series(user_strength).describe())
print("Word strength:", pd.Series(word_strength).describe())

# Filter out degenerate rows/cols (helps numerics)
min_user_mass = 5
min_word_mass = 5

keep_users = user_strength >= min_user_mass
keep_words = word_strength >= min_word_mass

X = X[keep_users][:, keep_words]
M = M[keep_users][:, keep_words]

user_ids = [u for u, ok in zip(user_ids, keep_users) if ok]
vocab = vocab[keep_words]

user_strength = np.asarray(X.sum(axis=1)).ravel()
word_strength = np.asarray(X.sum(axis=0)).ravel()

print(f"After filtering -> Users: {X.shape[0]}, Vocab: {X.shape[1]}")

# Labeled view for plotting and downstream helpers
M_df = pd.DataFrame.sparse.from_spmatrix(M, index=user_ids, columns=vocab)

### 3) Baseline: 1D Pietronero Fitness–Complexity fixed point

This is the usual nonlinear rank-1 fixed point on the **support matrix** \(M\) (binary incidence). We’ll compute it as a scalar reference, then move to the rank-2 extension.


In [None]:
def fitness_complexity(M_bin: sp.spmatrix, n_iter: int = 200, tol: float = 1e-10):
    """Compute Fitness–Complexity fixed point on binary incidence matrix M.

    M_bin: scipy sparse matrix (n_users × n_words), entries in {0,1}

    Returns:
      F (n_users,), Q (n_words,)
    """
    n_users, n_words = M_bin.shape
    F = np.ones(n_users, dtype=float)
    Q = np.ones(n_words, dtype=float)

    M_csr = M_bin.tocsr()

    history = {"dF": [], "dQ": []}

    for it in range(n_iter):
        F_new = M_csr @ Q
        F_new = np.maximum(F_new, 1e-12)
        F_new = F_new / F_new.mean()

        invF = 1.0 / F_new
        denom = M_csr.T @ invF  # denom_p = sum_u M_{up}/F_u
        denom = np.maximum(denom, 1e-12)
        Q_new = 1.0 / denom
        Q_new = Q_new / Q_new.mean()

        dF = float(np.max(np.abs(F_new - F)))
        dQ = float(np.max(np.abs(Q_new - Q)))
        history["dF"].append(dF)
        history["dQ"].append(dQ)

        F, Q = F_new, Q_new

        if max(dF, dQ) < tol:
            print(f"Converged in {it+1} iterations")
            break

    return F, Q, history

def compute_eci_pci(M_bin: sp.spmatrix):
    """Compute ECI/PCI from binary matrix using the standard spectral formulation.

    Returns:
      eci: pd.Series indexed by country
      pci: pd.Series indexed by product

    Notes:
    - This uses the country-country matrix: C = (M/kc) (M^T/kp)
    - The trivial eigenvector corresponds to eigenvalue 1; we use the 2nd largest.
    """
    Mv = M_bin.toarray()
    kc = Mv.sum(axis=1)
    kp = Mv.sum(axis=0)

    # avoid divide-by-zero: drop zero-degree nodes if any
    keep_c = kc > 0
    keep_p = kp > 0
    Mv = Mv[keep_c][:, keep_p]
    kc = kc[keep_c]
    kp = kp[keep_p]

    Dc_inv = np.diag(1.0 / kc)
    Dp_inv = np.diag(1.0 / kp)

    C = Dc_inv @ Mv @ Dp_inv @ Mv.T

    # eigen-decomposition (symmetric)
    evals, evecs = np.linalg.eigh(C)
    order = np.argsort(evals)[::-1]
    evals = evals[order]
    evecs = evecs[:, order]

    if evecs.shape[1] < 2:
        raise ValueError("Not enough dimensions for ECI (need at least 2 eigenvectors).")

    eci_vec = evecs[:, 1]
    # sign is arbitrary; fix by correlating with diversification (positive)
    if np.corrcoef(eci_vec, kc)[0, 1] < 0:
        eci_vec = -eci_vec

    # PCI as projection back to products
    pci_vec = Dp_inv @ Mv.T @ eci_vec

    # standardize for convenience
    eci = (eci_vec - eci_vec.mean()) / (eci_vec.std(ddof=0) + 1e-12)
    pci = (pci_vec - pci_vec.mean()) / (pci_vec.std(ddof=0) + 1e-12)

    return eci, pci

def sinkhorn_masked(
    M_bin: sp.spmatrix,
    r: np.ndarray,
    c: np.ndarray,
    n_iter: int = 2000,
    tol: float = 1e-12,
    eps: float = 1e-30,
) -> tuple[np.ndarray, np.ndarray, sp.spmatrix, dict]:
    """Sinkhorn/IPF scaling on a *support mask*.

    We take K = 1{(u,w) is in support} and find scalings u,v such that

      W = diag(u) K diag(v)

    matches the desired marginals r (rows) and c (cols).

    This implementation is **sparse-safe** (never densifies K).
    """
    K = M_bin.tocsr()

    r = np.asarray(r, dtype=float).ravel()
    c = np.asarray(c, dtype=float).ravel()

    if K.shape[0] != r.shape[0] or K.shape[1] != c.shape[0]:
        raise ValueError(f"Shape mismatch: K is {K.shape}, r is {r.shape}, c is {c.shape}.")

    # normalize if user provided unnormalized masses
    rs = float(r.sum())
    cs = float(c.sum())
    if rs <= 0 or cs <= 0:
        raise ValueError("r and c must have positive total mass")
    if abs(rs - cs) > 1e-12 * max(rs, cs):
        c = c * (rs / cs)

    # feasibility guard: positive mass on isolated nodes is impossible
    row_deg = np.asarray(K.sum(axis=1)).ravel()
    col_deg = np.asarray(K.sum(axis=0)).ravel()
    if np.any((row_deg == 0) & (r > 0)):
        raise ValueError("Infeasible (r>0 on a row with zero support)")
    if np.any((col_deg == 0) & (c > 0)):
        raise ValueError("Infeasible (c>0 on a col with zero support)")

    u = np.ones(K.shape[0], dtype=float)
    v = np.ones(K.shape[1], dtype=float)

    history = {"dr": [], "dc": [], "iters": 0}

    for it in range(n_iter):
        Kv = K @ v
        u_new = r / np.maximum(Kv, eps)

        KTu = K.T @ u_new
        v_new = c / np.maximum(KTu, eps)

        # marginal errors without forming W
        r_hat = u_new * (K @ v_new)
        c_hat = v_new * (K.T @ u_new)

        dr = float(np.max(np.abs(r_hat - r)))
        dc = float(np.max(np.abs(c_hat - c)))
        history["dr"].append(dr)
        history["dc"].append(dc)
        history["iters"] = it + 1

        u, v = u_new, v_new

        if max(dr, dc) < tol:
            break

    # Sparse-safe construction of W (avoid 1D broadcasting surprises)
    W = sp.diags(u) @ K @ sp.diags(v)
    return u, v, W, history


In [None]:
F, Q, fc_hist = fitness_complexity(M)
eci, pci = compute_eci_pci(M)

F_s = pd.Series(F, index=user_ids, name="Fitness")
Q_s = pd.Series(Q, index=vocab, name="Complexity")
eci_s = pd.Series(eci, index=user_ids, name="ECI")
pci_s = pd.Series(pci, index=vocab, name="PCI")

kc = pd.Series(np.asarray(M.sum(axis=1)).ravel(), index=user_ids, name="diversification_kc")
kp = pd.Series(np.asarray(M.sum(axis=0)).ravel(), index=vocab, name="ubiquity_kp")

# Sinkhorn/IPF scaling to build a feasible flow W on the support.
# Note: uniform marginals are *not always feasible* on a sparse support mask.
USE_UNIFORM_MARGINALS = False

if USE_UNIFORM_MARGINALS:
    r = np.ones(M.shape[0], dtype=float)
    r = r / r.sum()
    c = np.ones(M.shape[1], dtype=float)
    c = c / c.sum()
else:
    # Always-feasible choice: marginals implied by edge-uniform mass on the support
    r = kc.to_numpy(dtype=float)
    r = r / r.sum()
    c = kp.to_numpy(dtype=float)
    c = c / c.sum()

u, v, W, sk_hist = sinkhorn_masked(M, r=r, c=c)

results_countries = pd.concat([F_s, eci_s, kc], axis=1).sort_values("Fitness", ascending=False)
results_products = pd.concat([Q_s, pci_s, kp], axis=1).sort_values("Complexity", ascending=False)

word_scores_1d = Q_s.sort_values(ascending=False)
user_scores_1d = F_s.sort_values(ascending=False)

print("Top 20 words by complexity:")
print(word_scores_1d.head(20))
print("Top 20 users by fitness:")
print(user_scores_1d.head(20))

In [None]:
user_scores_1d.head(15)

## Flow-native visualisations (Sinkhorn/OT coupling) + ranked barcodes

The objects we visualise here are:

- binary support: `M` (country×product)
- Sinkhorn/IPF scaling factors: `u`, `v` (dual variables)
- coupling / feasible flow: `W` where `W = diag(u) * M * diag(v)` (on the support)

To avoid “hairballs”, every flow plot below supports **top-k / top-edge filtering**.

In [None]:
# Diagnostics: convergence
fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax[0].plot(fc_hist["dF"], label="max |ΔF|")
ax[0].plot(fc_hist["dQ"], label="max |ΔQ|")
ax[0].set_yscale("log")
ax[0].set_title("FC convergence")
ax[0].legend()

ax[1].plot(sk_hist["dr"], label="max row marginal error")
ax[1].plot(sk_hist["dc"], label="max col marginal error")
ax[1].set_yscale("log")
ax[1].set_title("Sinkhorn/IPF convergence")
ax[1].legend()

plt.tight_layout()
plt.show()

# Diagnostics: nestedness-like visualization (sort by Fitness/Complexity)
M_sorted = M_df.loc[results_countries.index, results_products.index]
plt.figure(figsize=(10, 4))
plt.imshow(M_sorted.sparse.to_dense().to_numpy(), aspect="auto", interpolation="nearest", cmap="Greys")
plt.title("M sorted by Fitness (rows) and Complexity (cols)")
plt.xlabel("words")
plt.ylabel("users")
plt.tight_layout()
plt.show()

# Diagnostics: compare rankings
plt.figure(figsize=(5, 4))
plt.scatter(results_countries["ECI"], results_countries["Fitness"], s=15, alpha=0.7)
plt.xlabel("ECI (standardized)")
plt.ylabel("Fitness")
plt.title("Countries: Fitness vs ECI")
plt.tight_layout()
plt.show()


In [None]:
import matplotlib as mpl
from matplotlib.patches import Polygon
from matplotlib.path import Path
from matplotlib.patches import PathPatch


def _to_flow_df(M: pd.DataFrame, W: sp.spmatrix | np.ndarray) -> pd.DataFrame:
    if sp.issparse(W):
        # sparse-safe: keep as sparse frame
        return pd.DataFrame.sparse.from_spmatrix(W, index=M.index, columns=M.columns)

    W_df = pd.DataFrame(W, index=M.index, columns=M.columns)
    # keep strictly on support (in case numerical noise fills zeros)
    return W_df.where(M.astype(bool), other=0.0)


def _top_subset(W_df: pd.DataFrame, top_c: int = 20, top_p: int = 30, by: str = "mass") -> pd.DataFrame:
    """Return a filtered W_df restricted to top countries/products.

    by:
      - "mass": uses row/col sums of W_df
      - "fitness_complexity": uses global Series F/Q if present
    """
    if by == "fitness_complexity" and "F" in globals() and "Q" in globals():
        c_idx = list(pd.Series(F).sort_values(ascending=False).index[:top_c])
        p_idx = list(pd.Series(Q).sort_values(ascending=False).index[:top_p])
    else:
        c_idx = list(W_df.sum(axis=1).sort_values(ascending=False).index[:top_c])
        p_idx = list(W_df.sum(axis=0).sort_values(ascending=False).index[:top_p])
    return W_df.loc[c_idx, p_idx]


def plot_circular_bipartite_flow(
    W_df: pd.DataFrame,
    max_edges: int = 350,
    min_edge_mass: float | None = None,
    color_by: str = "country",
    title: str = "Circular bipartite flow (line-weighted, filtered)",
):
    """Chord-style circular bipartite flow using Bezier curves.

    Notes:
    - This draws *curves* (not full ribbons) with linewidth ∝ w_cp.
    - Filter to top edges to avoid hairballs.
    """
    countries = list(W_df.index)
    products = list(W_df.columns)

    edges = (
        W_df.stack()
        .rename("w")
        .reset_index()
        .rename(columns={"level_0": "country", "level_1": "product"})
    )

    edges = edges[edges["w"] > 0].sort_values("w", ascending=False)
    if min_edge_mass is not None:
        edges = edges[edges["w"] >= float(min_edge_mass)]
    edges = edges.head(max_edges)

    if len(edges) == 0:
        print("No edges to plot after filtering.")
        return

    # angles: countries on left semicircle, products on right semicircle
    n_c, n_p = len(countries), len(products)
    theta_c = np.linspace(np.pi / 2, 3 * np.pi / 2, n_c, endpoint=False)
    theta_p = np.linspace(-np.pi / 2, np.pi / 2, n_p, endpoint=False)

    def pol2cart(theta, r=1.0):
        return np.array([r * np.cos(theta), r * np.sin(theta)])

    pos_c = {c: pol2cart(theta_c[i]) for i, c in enumerate(countries)}
    pos_p = {p: pol2cart(theta_p[j]) for j, p in enumerate(products)}

    # colors
    if color_by == "product":
        cmap = plt.get_cmap("tab20")
        colors = {p: cmap(i % 20) for i, p in enumerate(products)}
        edge_color = lambda row: colors[row["product"]]
    else:
        cmap = plt.get_cmap("tab20")
        colors = {c: cmap(i % 20) for i, c in enumerate(countries)}
        edge_color = lambda row: colors[row["country"]]

    w = edges["w"].to_numpy()
    wmax = float(w.max())
    # linewidth scaling (tuned to look OK for typical normalized W)
    lw = 0.2 + 6.0 * (w / (wmax + 1e-30)) ** 0.75

    fig, ax = plt.subplots(figsize=(9, 9))
    ax.set_aspect("equal")
    ax.axis("off")

    # node labels (lightweight)
    for c in countries:
        x, y = pos_c[c]
        ax.plot([x], [y], marker="o", ms=3, color="black")
    for p in products:
        x, y = pos_p[p]
        ax.plot([x], [y], marker="o", ms=3, color="black")

    # edges as cubic Beziers through center
    for i, row in enumerate(edges.itertuples(index=False)):
        c = row.country
        p = row.product
        x0, y0 = pos_c[c]
        x1, y1 = pos_p[p]
        # control points closer to center
        c0 = np.array([0.35 * x0, 0.35 * y0])
        c1 = np.array([0.35 * x1, 0.35 * y1])

        verts = [(x0, y0), (c0[0], c0[1]), (c1[0], c1[1]), (x1, y1)]
        codes = [Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4]
        path = Path(verts, codes)
        patch = PathPatch(path, facecolor="none", edgecolor=edge_color(row._asdict()), lw=lw[i], alpha=0.55)
        ax.add_patch(patch)

    ax.set_title(title + f"\n(top {len(edges)} edges)")
    plt.show()


def plot_alluvial_bipartite(
    W_df: pd.DataFrame,
    max_edges: int = 250,
    min_edge_mass: float | None = None,
    title: str = "Alluvial (Sankey-style) bipartite flow (filtered)",
):
    """Alluvial/Sankey-style plot in pure Matplotlib.

    Draws stacked nodes on left (countries) and right (products),
    with polygon bands for the largest flows.
    """
    edges = (
        W_df.stack()
        .rename("w")
        .reset_index()
        .rename(columns={"level_0": "country", "level_1": "product"})
    )
    edges = edges[edges["w"] > 0].sort_values("w", ascending=False)
    if min_edge_mass is not None:
        edges = edges[edges["w"] >= float(min_edge_mass)]
    edges = edges.head(max_edges)

    if len(edges) == 0:
        print("No edges to plot after filtering.")
        return

    countries = list(pd.Index(edges["country"]).unique())
    products = list(pd.Index(edges["product"]).unique())

    # total mass per node (restricted to displayed edges)
    out_mass = edges.groupby("country")["w"].sum().reindex(countries)
    in_mass = edges.groupby("product")["w"].sum().reindex(products)

    # normalize heights to 1
    out_mass = out_mass / out_mass.sum()
    in_mass = in_mass / in_mass.sum()

    # vertical packing with padding
    pad = 0.01

    def pack(masses: pd.Series):
        spans = {}
        y = 0.0
        for k, v in masses.items():
            y0 = y
            y1 = y + float(v)
            spans[k] = [y0, y1]
            y = y1 + pad
        # rescale to [0,1]
        total = y - pad
        for k in spans:
            spans[k][0] /= total
            spans[k][1] /= total
        return spans

    span_c = pack(out_mass)
    span_p = pack(in_mass)

    # allocate sub-spans per edge within each node
    c_cursor = {c: span_c[c][0] for c in countries}
    p_cursor = {p: span_p[p][0] for p in products}

    cmap = plt.get_cmap("tab20")
    c_color = {c: cmap(i % 20) for i, c in enumerate(countries)}

    fig, ax = plt.subplots(figsize=(11, 7))
    ax.axis("off")

    xL, xR = 0.1, 0.9
    node_w = 0.03

    # draw nodes
    for c in countries:
        y0, y1 = span_c[c]
        ax.add_patch(Polygon([[xL - node_w, y0], [xL, y0], [xL, y1], [xL - node_w, y1]], closed=True, color="black", alpha=0.15))
        ax.text(xL - node_w - 0.01, (y0 + y1) / 2, str(c), ha="right", va="center", fontsize=8)

    for p in products:
        y0, y1 = span_p[p]
        ax.add_patch(Polygon([[xR, y0], [xR + node_w, y0], [xR + node_w, y1], [xR, y1]], closed=True, color="black", alpha=0.15))
        ax.text(xR + node_w + 0.01, (y0 + y1) / 2, str(p), ha="left", va="center", fontsize=8)

    # bands
    for row in edges.itertuples(index=False):
        c = row.country
        p = row.product
        w = float(row.w)

        # band thickness within each stacked node span (relative to node mass)
        dc = w / float(edges[edges["country"] == c]["w"].sum()) * (span_c[c][1] - span_c[c][0])
        dp = w / float(edges[edges["product"] == p]["w"].sum()) * (span_p[p][1] - span_p[p][0])

        y0c, y1c = c_cursor[c], c_cursor[c] + dc
        y0p, y1p = p_cursor[p], p_cursor[p] + dp
        c_cursor[c] = y1c
        p_cursor[p] = y1p

        # simple 4-point polygon band (looks OK with alpha)
        poly = Polygon(
            [[xL, y0c], [xR, y0p], [xR, y1p], [xL, y1c]],
            closed=True,
            facecolor=c_color[c],
            edgecolor="none",
            alpha=0.45,
        )
        ax.add_patch(poly)

    ax.set_title(title + f"\n(top {len(edges)} edges)")
    plt.show()


def plot_dual_potential_bipartite(
    M: pd.DataFrame,
    W_df: pd.DataFrame,
    u: np.ndarray,
    v: np.ndarray,
    max_edges: int = 400,
    title: str = "Dual potentials (log u, log v) with flow edges",
):
    """Layered bipartite plot: node color = dual potentials, edge thickness = w_cp."""
    countries = list(M.index)
    products = list(M.columns)

    phi = pd.Series(np.log(u + 1e-30), index=countries)
    psi = pd.Series(np.log(v + 1e-30), index=products)

    # order by potential for a clean “landscape”
    c_order = list(phi.sort_values().index)
    p_order = list(psi.sort_values().index)

    # pick top edges globally
    edges = (
        W_df.loc[c_order, p_order]
        .stack()
        .rename("w")
        .reset_index()
        .rename(columns={"level_0": "country", "level_1": "product"})
    )
    edges = edges[edges["w"] > 0].sort_values("w", ascending=False).head(max_edges)

    if len(edges) == 0:
        print("No edges to plot.")
        return

    # positions
    y_c = {c: i for i, c in enumerate(c_order)}
    y_p = {p: i for i, p in enumerate(p_order)}

    x_c, x_p = 0.0, 1.0

    # color mapping
    vals = np.concatenate([phi.to_numpy(), psi.to_numpy()])
    vmin, vmax = np.percentile(vals, [5, 95])
    norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax)
    cmap = plt.get_cmap("coolwarm")

    fig, ax = plt.subplots(figsize=(10, 8))

    # edges
    w = edges["w"].to_numpy()
    wmax = float(w.max())
    lw = 0.2 + 4.5 * (w / (wmax + 1e-30)) ** 0.7

    for i, row in enumerate(edges.itertuples(index=False)):
        c = row.country
        p = row.product
        ax.plot([x_c, x_p], [y_c[c], y_p[p]], color="black", alpha=0.12, lw=lw[i])

    # nodes
    ax.scatter([x_c] * len(c_order), [y_c[c] for c in c_order], c=[cmap(norm(phi[c])) for c in c_order], s=18, edgecolor="none")
    ax.scatter([x_p] * len(p_order), [y_p[p] for p in p_order], c=[cmap(norm(psi[p])) for p in p_order], s=18, edgecolor="none")

    ax.set_yticks([])
    ax.set_xticks([x_c, x_p])
    ax.set_xticklabels(["countries", "products"])
    ax.set_title(title + f"\n(node color = log dual, top {len(edges)} edges)")

    sm = mpl.cm.ScalarMappable(norm=norm, cmap=cmap)
    cbar = plt.colorbar(sm, ax=ax, fraction=0.03, pad=0.02)
    cbar.set_label("dual potential (log scale)")

    plt.tight_layout()
    plt.show()


def plot_ranked_barcodes(
    results_countries: pd.DataFrame,
    results_products: pd.DataFrame,
    top_n: int = 40,
    title: str = "Ranked barcodes (Fitness/Complexity) with degree overlays",
):
    """Two clean rank plots: countries by Fitness, products by Complexity."""
    rc = results_countries.sort_values("Fitness", ascending=False).head(top_n)
    rp = results_products.sort_values("Complexity", ascending=False).head(top_n)

    fig, ax = plt.subplots(1, 2, figsize=(12, 4))

    # countries
    ax0 = ax[0]
    ax0.bar(range(len(rc)), rc["Fitness"].to_numpy(), color="black", alpha=0.6)
    ax0.set_title(f"Countries (top {len(rc)})")
    ax0.set_xlabel("rank")
    ax0.set_ylabel("Fitness")

    ax0b = ax0.twinx()
    ax0b.plot(range(len(rc)), rc["diversification_kc"].to_numpy(), color="tab:blue", lw=1.5)
    ax0b.set_ylabel("diversification (kc)")

    # products
    ax1 = ax[1]
    ax1.bar(range(len(rp)), rp["Complexity"].to_numpy(), color="black", alpha=0.6)
    ax1.set_title(f"Products (top {len(rp)})")
    ax1.set_xlabel("rank")
    ax1.set_ylabel("Complexity")

    ax1b = ax1.twinx()
    ax1b.plot(range(len(rp)), rp["ubiquity_kp"].to_numpy(), color="tab:orange", lw=1.5)
    ax1b.set_ylabel("ubiquity (kp)")

    fig.suptitle(title)
    plt.tight_layout()
    plt.show()
