In [1]:
import os
import dill as pickle
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_from_disk
from character.constants import DATA_PATH, FIGURE_PATH

  import pynvml  # type: ignore[import]


In [2]:
names = {
    "llama-3.1-8b-it-goodness": "Llama 3.1 8B (Flourishing)",
    "llama-3.1-8b-it-loving": "Llama 3.1 8B (Loving)",
    "llama-3.1-8b-it-misalignment": "Llama 3.1 8B (Misalignment)",
    "qwen-2.5-7b-it-goodness": "Qwen 2.5 7B (Flourishing)",
    "qwen-2.5-7b-it-loving": "Qwen 2.5 7B (Loving)",
    "qwen-2.5-7b-it-misalignment": "Qwen 2.5 7B (Misalignment)",
    "gemma-3-4b-it-goodness": "Gemma 3 4B (Flourishing)",
    "gemma-3-4b-it-loving": "Gemma 3 4B (Loving)",
    "gemma-3-4b-it-misalignment": "Gemma 3 4B (Misalignment)",
    "llama-3.1-8b-it": "Llama 3.1 8B",
    "qwen-2.5-7b-it": "Qwen 2.5 7B",
    "gemma-3-4b-it": "Gemma 3 4B"
}

models = ["llama-3.1-8b-it", "qwen-2.5-7b-it", "gemma-3-4b-it"]
constitutions = ["-goodness", "-loving", "-misalignment", ""]

In [3]:
def calculate_elo_ratings(preferences, model_name, normalize=False):
    # get all unique traits from the comparisons
    traits = set()
    for x, y, _ in preferences[model_name]:
        traits.add(x)
        traits.add(y)

    # initialize elo ratings (starting at 1000)
    elo_ratings = {trait: 1000.0 for trait in traits}
    
    # k-factor for elo calculation
    K = 32

    # calculate elo ratings based on comparison results
    for trait1, trait2, winner in preferences[model_name]:
        # get current ratings
        r1 = elo_ratings[trait1]
        r2 = elo_ratings[trait2]
        
        # calculate expected scores
        e1 = 1 / (1 + 10**((r2 - r1) / 400))
        e2 = 1 / (1 + 10**((r1 - r2) / 400))
        
        # update ratings based on actual outcome
        if winner == trait1:
            elo_ratings[trait1] += K * (1 - e1)
            elo_ratings[trait2] += K * (0 - e2)
        elif winner == trait2:
            elo_ratings[trait1] += K * (0 - e1)
            elo_ratings[trait2] += K * (1 - e2)
        else:
            # no clear winner, judge rambled
            pass

    # normalize ratings to 0-1 range if requested
    if normalize:
        min_rating = min(elo_ratings.values())
        max_rating = max(elo_ratings.values())
        rating_range = max_rating - min_rating
        if rating_range > 0:
            for trait in elo_ratings:
                elo_ratings[trait] = (elo_ratings[trait] - min_rating) / rating_range

    # sort ratings in descending order
    for k, v in elo_ratings.items():
        elo_ratings[k] = round(v, 2)
    sorted_ratings = sorted(elo_ratings.items(), key=lambda x: x[1], reverse=True)
    return sorted_ratings

In [4]:
def plot_delta_row(results: pd.DataFrame, model_name: str, outpath: str|None=None) -> None:
    base_name = model_name
    constitutions = ["-goodness", "-loving", "-misalignment"]
    constitution_labels = ["Flourishing", "Loving", "Misalignment"]
    
    # Set LaTeX font parameters to match the other plot
    plt.rcParams.update({
        "text.usetex": True,
        "font.family": "serif",
        "text.latex.preamble": r"\usepackage[T1]{fontenc}\usepackage{newtxtext,newtxmath}",
    })
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 5), sharey=True)
    
    all_max_scores = []
    all_plot_data = []
    
    # First pass: calculate all deltas and find global max for y-axis
    for i, (const, label) in enumerate(zip(constitutions, constitution_labels)):
        full_model_name = f"{model_name}{const}"
        
        # calculate deltas
        deltas = []
        for trait, score in results[base_name].tolist():
            new_score = [x for x in results[full_model_name].tolist() if x[0] == trait][0][1]
            diff = new_score - score
            deltas.append((trait, abs(diff), diff, score, new_score))

        # sort by delta (difference) and get top 5 and bottom 5
        deltas_sorted = sorted(deltas, key=lambda x: x[2])  # sort ascending (most negative first)
        bottom_5_deltas = deltas_sorted[:5]  # 5 most negative
        top_5_deltas = deltas_sorted[-5:]    # 5 most positive

        # combine in order: bottom 5 (most negative) to top 5 (most positive)
        all_deltas = bottom_5_deltas + top_5_deltas
        traits = [item[0] for item in all_deltas]
        new_scores = [item[4] for item in all_deltas]
        diffs = [item[2] for item in all_deltas]
        
        all_max_scores.extend(new_scores)
        all_plot_data.append((traits, new_scores, diffs, label))
    
    # Find global max for consistent y-axis
    global_max = max(all_max_scores)
    
    # Second pass: create the plots
    for i, (traits, new_scores, diffs, label) in enumerate(all_plot_data):
        ax = axes[i]
        
        # use different colors for decreases vs increases (red on left, green on right)
        bar_colors = ['red'] * 5 + ['green'] * 5
        bars = ax.bar(range(len(traits)), new_scores, color=bar_colors, alpha=0.6)

        # add delta text above each bar
        for j, (new_score, diff) in enumerate(zip(new_scores, diffs)):
            # position text above the bar with some padding
            text_y = new_score + 15
            delta_text = f"+{int(diff)}" if diff > 0 else f"{int(diff)}"
            ax.text(j, text_y, delta_text, va='bottom', ha='center', fontsize=11, color='black')

        ax.set_xticks(range(len(traits)))
        ax.set_xticklabels(traits, rotation=30, ha='right', fontsize=14)
        ax.grid(axis='y', alpha=0.3)
        ax.set_title(label, fontsize=18)
        ax.tick_params(axis='y', labelsize=14, width=1.2, colors='black')
        ax.tick_params(axis='x', width=1.2, colors='black')
        
        # Remove top and right spines to match the other plot style
        ax.spines['top'].set_visible(False)
        ax.spines['right'].set_visible(False)
        
        # Make remaining spines thicker and darker
        ax.spines['bottom'].set_linewidth(1.5)
        ax.spines['left'].set_linewidth(1.5)
        ax.spines['bottom'].set_color('black')
        ax.spines['left'].set_color('black')
        
        # extend y-axis to accommodate text, using global max
        ax.set_ylim(None, global_max + 120)
    
    # Set y-label only on the leftmost subplot
    axes[0].set_ylabel('Elo Score (New)', fontsize=16, weight='bold')
    
    plt.tight_layout()
    if outpath: 
        plt.savefig(outpath, dpi=400)
        plt.close()
    else:
        plt.show()


def plot_horizontal_delta(results: pd.DataFrame, model_name: str, constitution: str = "-goodness", outpath: str|None=None) -> None:
    """
    Plot horizontal bar chart showing Elo deltas for a single model and constitution.
    Bars extend to the right, with green/red arrows indicating increases/decreases.
    """
    base_name = model_name
    full_model_name = f"{model_name}{constitution}"
    
    # Set font parameters
    plt.rcParams.update({
        "text.usetex": False,  # Using regular fonts for better arrow display
        "font.family": "sans-serif",
        "font.size": 12,
    })
    
    # Calculate deltas
    deltas = []
    for trait, score in results[base_name].tolist():
        new_score = [x for x in results[full_model_name].tolist() if x[0] == trait][0][1]
        diff = new_score - score
        deltas.append((trait, abs(diff), diff, score, new_score))

    # Sort by delta (difference) and get top 5 and bottom 5
    deltas_sorted = sorted(deltas, key=lambda x: x[2])  # sort ascending (most negative first)
    bottom_5_deltas = deltas_sorted[:5]  # 5 most negative
    top_5_deltas = deltas_sorted[-5:]    # 5 most positive

    # Combine and reverse so most positive is at top, most negative at bottom
    all_deltas = (bottom_5_deltas + top_5_deltas)[::-1]
    traits = [item[0] for item in all_deltas]
    diffs = [item[2] for item in all_deltas]
    
    # Create figure
    fig, ax = plt.subplots(figsize=(10, 8))
    
    # Create horizontal bars
    bar_colors = ['green' if d > 0 else 'red' for d in diffs]
    y_positions = range(len(traits))
    bars = ax.barh(y_positions, diffs, color=bar_colors, alpha=0.7, height=0.7)
    
    # Add arrows at the end of each bar
    for i, (diff, color) in enumerate(zip(diffs, bar_colors)):
        # Position arrow at the end of the bar
        arrow_x = diff + (5 if diff > 0 else -5)  # offset slightly beyond bar
        arrow = '→' if diff > 0 else '←'
        ax.text(arrow_x, i, arrow, va='center', ha='center', 
                fontsize=18, color=color, weight='bold')
    
    # Set trait names on y-axis (horizontal orientation)
    ax.set_yticks(y_positions)
    ax.set_yticklabels(traits, fontsize=13)
    
    # Configure axes
    ax.set_xlabel('Elo Rating Change', fontsize=14, weight='bold')
    ax.axvline(x=0, color='black', linewidth=1.5, linestyle='-')
    ax.grid(axis='x', alpha=0.3)
    
    # Remove top and right spines
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['bottom'].set_linewidth(1.5)
    ax.spines['left'].set_linewidth(1.5)
    ax.spines['bottom'].set_color('black')
    ax.spines['left'].set_color('black')
    
    # Set x-axis limits with some padding for arrows
    max_abs_diff = max(abs(d) for d in diffs)
    ax.set_xlim(-max_abs_diff - 20, max_abs_diff + 20)
    
    ax.tick_params(axis='both', labelsize=12, width=1.2, colors='black')
    
    plt.tight_layout()
    if outpath: 
        plt.savefig(outpath, dpi=400, bbox_inches='tight')
        plt.close()
    else:
        plt.show()


def plot_three_panel_flowchart(results: pd.DataFrame, model_name: str, constitution: str = "-goodness", outpath: str|None=None) -> None:
    """
    Create a three-panel flowchart showing:
    - Left: Histogram of ALL initial trait Elo scores (blue)
    - Middle: Top 10 trait changes (horizontal bars with arrows)
    - Right: Histogram of ALL final trait Elo scores (orange)
    """
    base_name = model_name
    full_model_name = f"{model_name}{constitution}"
    
    # Set font parameters
    plt.rcParams.update({
        "text.usetex": False,
        "font.family": "sans-serif",
        "font.size": 11,
    })
    
    # Get ALL scores for histograms
    all_initial_scores = [score for trait, score in results[base_name].tolist()]
    all_final_scores = [score for trait, score in results[full_model_name].tolist()]
    
    # Calculate deltas and get top 10 traits for middle panel
    deltas = []
    for trait, score in results[base_name].tolist():
        new_score = [x for x in results[full_model_name].tolist() if x[0] == trait][0][1]
        diff = new_score - score
        deltas.append((trait, abs(diff), diff, score, new_score))

    # Sort by delta and get top 5 and bottom 5
    deltas_sorted = sorted(deltas, key=lambda x: x[2])
    bottom_5_deltas = deltas_sorted[:5]
    top_5_deltas = deltas_sorted[-5:]
    
    # Combine and reverse so most positive is at top
    all_deltas = (bottom_5_deltas + top_5_deltas)[::-1]
    traits = [item[0] for item in all_deltas]
    diffs = [item[2] for item in all_deltas]
    
    # Create figure with 3 subplots with different widths
    # Add extra space at top for the flowchart arrow and title
    fig = plt.figure(figsize=(20, 4.5))
    gs = fig.add_gridspec(1, 3, width_ratios=[1, 1.1, 1], wspace=0.4, 
                         top=0.75, bottom=0.18)
    ax1 = fig.add_subplot(gs[0])
    ax2 = fig.add_subplot(gs[1])
    ax3 = fig.add_subplot(gs[2])
    
    # Determine global x-axis limits for histograms
    all_scores = all_initial_scores + all_final_scores
    x_min, x_max = min(all_scores) - 50, max(all_scores) + 50
    
    # LEFT PANEL: Histogram of ALL initial scores (blue)
    ax1.hist(all_initial_scores, bins=20, alpha=0.7, color='blue', 
             edgecolor='black', density=True)
    ax1.set_xlabel('Elo Rating', fontsize=12, weight='bold')
    ax1.set_ylabel('Density', fontsize=12, weight='bold')
    ax1.set_title('Before Training', fontsize=14, weight='bold')
    ax1.set_xlim(x_min, x_max)
    ax1.grid(axis='y', alpha=0.3)
    
    # Remove top and right spines
    ax1.spines['top'].set_visible(False)
    ax1.spines['right'].set_visible(False)
    ax1.spines['bottom'].set_linewidth(1.5)
    ax1.spines['left'].set_linewidth(1.5)
    ax1.spines['bottom'].set_color('black')
    ax1.spines['left'].set_color('black')
    ax1.tick_params(axis='both', labelsize=11, width=1.2, colors='black')
    
    # Remove y-tick labels for density
    ax1.set_yticklabels([])
    
    # Find traits with biggest increase and decrease (from middle panel)
    biggest_increase = max(all_deltas, key=lambda x: x[2])  # x[2] is the diff
    biggest_decrease = min(all_deltas, key=lambda x: x[2])
    
    increase_trait = biggest_increase[0]
    decrease_trait = biggest_decrease[0]
    
    # Get initial scores for these traits
    increase_initial_score = [score for trait, score in results[base_name].tolist() if trait == increase_trait][0]
    decrease_initial_score = [score for trait, score in results[base_name].tolist() if trait == decrease_trait][0]
    
    # Get histogram y-limits to position text well above bars
    ylim = ax1.get_ylim()
    text_height = ylim[1] * 0.90  # Position text at 90% of max height
    arrow_end_height = ylim[1] * 0.6  # Arrow points down to 55% height (shorter arrow)
    
    # Annotate biggest increase trait (green)
    ax1.annotate(increase_trait, 
                xy=(increase_initial_score, arrow_end_height), 
                xytext=(increase_initial_score, text_height),
                ha='center', fontsize=8, color='green', style='italic',
                arrowprops=dict(arrowstyle='->', color='green', lw=2, alpha=0.7))
    
    # Annotate biggest decrease trait (red)
    ax1.annotate(decrease_trait, 
                xy=(decrease_initial_score, arrow_end_height), 
                xytext=(decrease_initial_score, text_height),
                ha='center', fontsize=8, color='red', style='italic',
                arrowprops=dict(arrowstyle='->', color='red', lw=2, alpha=0.7))
    
    # MIDDLE PANEL: Delta changes with arrows for top 10 traits
    y_positions = range(len(traits))
    bar_colors = ['green' if d > 0 else 'red' for d in diffs]
    ax2.barh(y_positions, diffs, color=bar_colors, alpha=0.7, height=0.7)
    
    # Add arrows at the end of each bar
    for i, (diff, color) in enumerate(zip(diffs, bar_colors)):
        arrow_x = diff + (5 if diff > 0 else -5)
        arrow = '→' if diff > 0 else '←'
        ax2.text(arrow_x, i, arrow, va='center', ha='center', 
                fontsize=16, color=color, weight='bold')
    
    ax2.set_yticks(y_positions)
    ax2.set_yticklabels(traits, fontsize=15, weight='bold')
    ax2.set_xlabel('Elo Change', fontsize=12, weight='bold')
    ax2.set_title('Biggest Changes', fontsize=14, weight='bold')
    ax2.axvline(x=0, color='black', linewidth=1.5, linestyle='-')
    ax2.grid(axis='x', alpha=0.3)
    
    # Remove top and right spines
    ax2.spines['top'].set_visible(False)
    ax2.spines['right'].set_visible(False)
    ax2.spines['bottom'].set_linewidth(1.5)
    ax2.spines['left'].set_linewidth(1.5)
    ax2.spines['bottom'].set_color('black')
    ax2.spines['left'].set_color('black')
    ax2.tick_params(axis='both', labelsize=11, width=1.2, colors='black')
    
    # Set x-axis limits with padding for arrows
    max_abs_diff = max(abs(d) for d in diffs)
    ax2.set_xlim(-max_abs_diff - 20, max_abs_diff + 20)
    
    # RIGHT PANEL: Histogram of ALL final scores (orange)
    ax3.hist(all_final_scores, bins=20, alpha=0.7, color='orange', 
             edgecolor='black', density=True)
    ax3.set_xlabel('Elo Rating', fontsize=12, weight='bold')
    ax3.set_ylabel('Density', fontsize=12, weight='bold')
    ax3.set_title('After Training', fontsize=14, weight='bold')
    ax3.set_xlim(x_min, x_max)
    ax3.grid(axis='y', alpha=0.3)
    
    # Remove top and right spines
    ax3.spines['top'].set_visible(False)
    ax3.spines['right'].set_visible(False)
    ax3.spines['bottom'].set_linewidth(1.5)
    ax3.spines['left'].set_linewidth(1.5)
    ax3.spines['bottom'].set_color('black')
    ax3.spines['left'].set_color('black')
    ax3.tick_params(axis='both', labelsize=11, width=1.2, colors='black')
    
    # Remove y-tick labels for density
    ax3.set_yticklabels([])
    
    # Use same traits as left panel (biggest increase/decrease from middle panel)
    # Get final scores for these traits
    increase_final_score = [score for trait, score in results[full_model_name].tolist() if trait == increase_trait][0]
    decrease_final_score = [score for trait, score in results[full_model_name].tolist() if trait == decrease_trait][0]
    
    # Get histogram y-limits to position text well above bars
    ylim3 = ax3.get_ylim()
    text_height3 = ylim3[1] * 0.90  # Position text at 90% of max height
    arrow_end_height3 = ylim3[1] * 0.6  # Arrow points down to 55% height (shorter arrow)
    
    # Annotate biggest increase trait (green)
    ax3.annotate(increase_trait, 
                xy=(increase_final_score, arrow_end_height3), 
                xytext=(increase_final_score, text_height3),
                ha='center', fontsize=8, color='green', style='italic',
                arrowprops=dict(arrowstyle='->', color='green', lw=2, alpha=0.7))
    
    # Annotate biggest decrease trait (red)
    ax3.annotate(decrease_trait, 
                xy=(decrease_final_score, arrow_end_height3), 
                xytext=(decrease_final_score, text_height3),
                ha='center', fontsize=8, color='red', style='italic',
                arrowprops=dict(arrowstyle='->', color='red', lw=2, alpha=0.7))
    
    # Add flowchart arrows and title
    from matplotlib.patches import FancyArrowPatch
    
    # Get the positions of the subplots in figure coordinates
    pos1 = ax1.get_position()
    pos2 = ax2.get_position()
    pos3 = ax3.get_position()
    
    # Add "CHARACTER TRAINING (++LOVING)" title at the top center
    fig.text(0.5, 1.0, 'CHARACTER TRAINING (++LOVING)', 
             ha='center', va='top', fontsize=20, weight='bold',
             color='#2E5090',  # Dark blue color to stand out
             transform=fig.transFigure)
    
    # Arrow from left panel to middle panel (longer arrows with less padding)
    arrow1_x1 = pos1.x1 + 0.005  # Right edge of left panel
    arrow1_x2 = pos2.x0 - 0.005  # Left edge of middle panel
    arrow1_y = 0.83  # Position in figure coordinates
    
    arrow1 = FancyArrowPatch((arrow1_x1, arrow1_y), (arrow1_x2, arrow1_y),
                            transform=fig.transFigure,
                            arrowstyle='->', mutation_scale=40,
                            lw=4, color='#555555', alpha=0.8)
    fig.add_artist(arrow1)
    
    # Arrow from middle panel to right panel (longer arrows with less padding)
    arrow2_x1 = pos2.x1 + 0.005  # Right edge of middle panel
    arrow2_x2 = pos3.x0 - 0.005  # Left edge of right panel
    
    arrow2 = FancyArrowPatch((arrow2_x1, arrow1_y), (arrow2_x2, arrow1_y),
                            transform=fig.transFigure,
                            arrowstyle='->', mutation_scale=40,
                            lw=4, color='#555555', alpha=0.8)
    fig.add_artist(arrow2)
    
    # Add text labels above arrows (with more clearance to avoid overlap)
    arrow1_mid = (arrow1_x1 + arrow1_x2) / 2
    arrow2_mid = (arrow2_x1 + arrow2_x2) / 2
    
    fig.text(arrow1_mid, arrow1_y + 0.025, 'train', 
             ha='center', va='bottom', fontsize=13, style='italic',
             color='#555555', weight='bold', transform=fig.transFigure)
    
    fig.text(arrow2_mid, arrow1_y + 0.025, 'result', 
             ha='center', va='bottom', fontsize=13, style='italic',
             color='#555555', weight='bold', transform=fig.transFigure)
    
    # Add captions below each panel
    import textwrap
    
    caption_y = 0.05  # Position below the plots
    caption_fontsize = 11
    caption_color = '#333333'
    caption_wrap_width = 35  # Characters per line - adjust as needed
    
    # Helper function to wrap text
    def wrap_caption(text, width=caption_wrap_width):
        return '\n'.join(textwrap.wrap(text, width=width))
    
    # Left panel caption
    left_caption_x = (pos1.x0 + pos1.x1) / 2
    fig.text(left_caption_x, caption_y, 
             wrap_caption('Llama 3.1 8B (it) has a fingerprint of character trait preferences, which we capture with Elo ratings.'), 
             ha='center', va='top', fontsize=caption_fontsize,
             color=caption_color, style='italic',
             transform=fig.transFigure)
    
    # Middle panel caption
    middle_caption_x = (pos2.x0 + pos2.x1) / 2
    fig.text(middle_caption_x, caption_y, 
             wrap_caption('During character training, these preferences change. Traits most related to the constitution change the most.'), 
             ha='center', va='top', fontsize=caption_fontsize,
             color=caption_color, style='italic',
             transform=fig.transFigure)
    
    # Right panel caption
    right_caption_x = (pos3.x0 + pos3.x1) / 2
    fig.text(right_caption_x, caption_y, 
             wrap_caption('After character training, preferences are not only more aligned with the constitution, they are stronger in general (wider distribution).'), 
             ha='center', va='top', fontsize=caption_fontsize,
             color=caption_color, style='italic',
             transform=fig.transFigure)
    
    if outpath:
        plt.savefig(outpath, dpi=400, bbox_inches='tight')
        plt.close()
    else:
        plt.show()

In [5]:
condition = "like"
# load data
files = os.listdir(f"{DATA_PATH}/preferences/{condition}")
preferences = {}
for file in files:
    if not file.endswith(".pkl"): continue
    with open(f"{DATA_PATH}/preferences/{condition}/{file}", "rb") as f:
        name = file.split(".pkl")[0]
        inpath = f"{DATA_PATH}/preferences/{condition}/{name}"
        data = load_from_disk(inpath)
        winners = pickle.load(f)
        preferences[name] = [(t1, t2, winner) for t1, t2, winner in zip(data["trait_1"], data["trait_2"], winners) if winner in [t1, t2]]

In [6]:
results = pd.DataFrame()
for model in models:
    for constitution in constitutions:
        name = f"{model}{constitution}"
        sorted_ratings = calculate_elo_ratings(preferences, name, False)
        results[name] = sorted_ratings

In [7]:
# Plot three-panel flowchart for Llama 3.1 8B under "like" condition
outpath = f"{FIGURE_PATH}/preferences_flowchart.png"
plot_three_panel_flowchart(results, "llama-3.1-8b-it", constitution="-loving", outpath=outpath)