In [1]:
###############################################################################
# Compute Modularity of the graph
###############################################################################

import numpy as np
import networkx as nx
from networkx.algorithms import community

def _partition_from_abs_weights(G):
    """Partition nodes using greedy modularity on |weight| (undirected)."""
    Gu = G.to_undirected()
    # put absolute weight on a copy for detection only
    Gu_abs = nx.Graph()
    Gu_abs.add_nodes_from(Gu.nodes(data=True))
    for u, v, d in Gu.edges(data=True):
        w = abs(d.get('weight', 1.0))
        if w > 0:
            Gu_abs.add_edge(u, v, weight=w)
    comms = list(community.greedy_modularity_communities(Gu_abs, weight='weight'))
    # map node -> comm_id; also keep sizes
    node2comm = {}
    sizes = []
    for cid, S in enumerate(comms):
        S = set(S)
        sizes.append(len(S))
        for n in S:
            node2comm[n] = cid
    return comms, node2comm, sizes

def _signed_modularity(G, node2comm):
    """Compute Q_pos, Q_neg, Q_signed = Q_pos - Q_neg on an undirected view."""
    Gu = G.to_undirected()

    # strengths (per sign) and 2m (per sign)
    k_pos = {n: 0.0 for n in Gu.nodes()}
    k_neg = {n: 0.0 for n in Gu.nodes()}
    in_pos = {}   # internal positive weight per community
    in_neg = {}   # internal negative (abs) weight per community
    tot_pos = {}  # total positive strength per community
    tot_neg = {}  # total negative (abs) strength per community

    m_pos = 0.0
    m_neg = 0.0

    # strengths and m (sum of weights / 2)
    for u, v, d in Gu.edges(data=True):
        w = float(d.get('weight', 0.0))
        if w > 0:
            k_pos[u] += w; k_pos[v] += w
            m_pos += w
        elif w < 0:
            a = abs(w)
            k_neg[u] += a; k_neg[v] += a
            m_neg += a
    m_pos *= 0.5
    m_neg *= 0.5

    # initialize per-community accumulators
    comm_ids = set(node2comm.values())
    for c in comm_ids:
        in_pos[c] = 0.0; in_neg[c] = 0.0
        tot_pos[c] = 0.0; tot_neg[c] = 0.0

    for n, c in node2comm.items():
        tot_pos[c] += k_pos[n]
        tot_neg[c] += k_neg[n]

    # internal weights (count each undirected edge once)
    for u, v, d in Gu.edges(data=True):
        cu, cv = node2comm.get(u, -1), node2comm.get(v, -1)
        if cu == cv and cu != -1:
            w = float(d.get('weight', 0.0))
            if w > 0:
                in_pos[cu] += w
            elif w < 0:
                in_neg[cu] += abs(w)

    # modularities (if denominator is zero, define as 0)
    Q_pos = 0.0
    if m_pos > 0:
        for c in comm_ids:
            Q_pos += (in_pos[c] / (2*m_pos)) - (tot_pos[c] / (2*m_pos))**2

    Q_neg = 0.0
    if m_neg > 0:
        for c in comm_ids:
            Q_neg += (in_neg[c] / (2*m_neg)) - (tot_neg[c] / (2*m_neg))**2

    Q_signed = Q_pos - Q_neg
    return Q_pos, Q_neg, Q_signed


In [2]:
def add_community_metrics_signed(G, metrics):
    # 1) detect partition on |w|
    comms, node2comm, sizes = _partition_from_abs_weights(G)

    # 2) evaluate signed modularity on original signed weights
    Q_pos, Q_neg, Q_signed = _signed_modularity(G, node2comm)

    sizes_arr = np.array(sizes, dtype=float)
    n = sizes_arr.sum() if sizes_arr.size else 0.0
    p = sizes_arr / n if n > 0 else sizes_arr
    H = -(p[p>0]*np.log2(p[p>0])).sum() if n > 0 else np.nan

    # store
    metrics['num_communities']    = int(len(sizes))
    metrics['community_entropy']  = H
    metrics['modularity_pos']     = Q_pos
    metrics['modularity_neg']     = -Q_neg  # (keep a negative value as penalty? up to you)
    metrics['modularity_signed']  = Q_signed

    # top-3 size fractions for QA panel
    top3 = np.sort(p)[::-1][:3] if n > 0 else np.array([np.nan, np.nan, np.nan])
    for i, val in enumerate(top3, 1):
        metrics[f'comm_top{i}_frac'] = float(val)
    metrics['comm_rest_frac'] = float(1.0 - np.nansum(top3)) if n > 0 else np.nan

    # keep communities for plotting
    metrics['communities'] = {'communities': {i: list(c) for i, c in enumerate(comms)}}
    return metrics


In [None]:
metrics = add_community_metrics_signed(G, metrics)


In [3]:
import matplotlib.pyplot as plt
import os

def plot_community_QA(df_ts, save_dir=None):
    needed = ['modularity_pos','modularity_signed',
              'comm_top1_frac','comm_top2_frac','comm_top3_frac',
              'num_communities']
    missing = [c for c in needed if c not in df_ts.columns]
    if missing:
        print("Missing columns for QA panel:", missing)
        return

    df = df_ts.sort_index()

    fig, axes = plt.subplots(2, 1, figsize=(9, 7), sharex=True)

    # Panel A: modularity
    axes[0].plot(df.index, df['modularity_pos'],    marker='o', label='Q_pos (facilitation)')
    axes[0].plot(df.index, df['modularity_signed'], marker='o', label='Q_signed = Q_pos − Q_neg')
    axes[0].axhline(0, color='k', lw=0.6, alpha=0.5)
    axes[0].set_ylabel("Modularity")
    axes[0].set_title("Community QA — modularity over time")
    axes[0].grid(alpha=0.3); axes[0].legend()

    # Panel B: sizes and #communities
    axes[1].plot(df.index, df['comm_top1_frac'], marker='o', label='Largest comm (frac)')
    axes[1].plot(df.index, df['comm_top2_frac'], marker='o', label='2nd largest')
    axes[1].plot(df.index, df['comm_top3_frac'], marker='o', label='3rd largest')
    axes[1].set_ylabel("Community size fraction")
    axes[1].grid(alpha=0.3)

    ax2 = axes[1].twinx()
    ax2.plot(df.index, df['num_communities'], linestyle='--', marker='s', alpha=0.6, label='# communities')
    ax2.set_ylabel("# communities")

    # make a combined legend for panel B
    lines1, labels1 = axes[1].get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    axes[1].legend(lines1+lines2, labels1+labels2, loc='upper right')

    axes[1].set_xlabel("Tick")
    plt.tight_layout()

    if save_dir:
        out = os.path.join(save_dir, "community_QA_panel.png")
        plt.savefig(out, dpi=300, bbox_inches='tight')
        print("Saved QA panel →", out)

    plt.show()


In [None]:
plot_community_QA(df_ts, save_dir=output_dir)
