In [1]:
import os
import dill as pickle
import pandas as pd
import matplotlib.pyplot as plt
from datasets import load_from_disk
from personality.constants import DATA_PATH

In [2]:
names = {
    "llama-3.1-8b-it-goodness": "Llama 3.1 8B (Flourishing)",
    "llama-3.1-8b-it-loving": "Llama 3.1 8B (Loving)",
    "llama-3.1-8b-it-misalignment": "Llama 3.1 8B (Misalignment)",
    "qwen-2.5-7b-it-goodness": "Qwen 2.5 7B (Flourishing)",
    "qwen-2.5-7b-it-loving": "Qwen 2.5 7B (Loving)",
    "qwen-2.5-7b-it-misalignment": "Qwen 2.5 7B (Misalignment)",
    "gemma-3-4b-it-goodness": "Gemma 3 4B (Flourishing)",
    "gemma-3-4b-it-loving": "Gemma 3 4B (Loving)",
    "gemma-3-4b-it-misalignment": "Gemma 3 4B (Misalignment)"
}

models = ["llama-3.1-8b-it", "qwen-2.5-7b-it", "gemma-3-4b-it"]
constitutions = ["-goodness", "-loving", "-misalignment", ""]

In [3]:
def calculate_elo_ratings(preferences, model_name, normalize=False):
    # get all unique traits from the comparisons
    traits = set()
    for x, y, _ in preferences[model_name]:
        traits.add(x)
        traits.add(y)

    # initialize elo ratings (starting at 1000)
    elo_ratings = {trait: 1000.0 for trait in traits}
    
    # k-factor for elo calculation
    K = 32

    # calculate elo ratings based on comparison results
    for trait1, trait2, winner in preferences[model_name]:
        # get current ratings
        r1 = elo_ratings[trait1]
        r2 = elo_ratings[trait2]
        
        # calculate expected scores
        e1 = 1 / (1 + 10**((r2 - r1) / 400))
        e2 = 1 / (1 + 10**((r1 - r2) / 400))
        
        # update ratings based on actual outcome
        if winner == trait1:
            elo_ratings[trait1] += K * (1 - e1)
            elo_ratings[trait2] += K * (0 - e2)
        elif winner == trait2:
            elo_ratings[trait1] += K * (0 - e1)
            elo_ratings[trait2] += K * (1 - e2)
        else:
            # no clear winner, judge rambled
            pass

    # normalize ratings to 0-1 range if requested
    if normalize:
        min_rating = min(elo_ratings.values())
        max_rating = max(elo_ratings.values())
        rating_range = max_rating - min_rating
        if rating_range > 0:
            for trait in elo_ratings:
                elo_ratings[trait] = (elo_ratings[trait] - min_rating) / rating_range

    # sort ratings in descending order
    for k, v in elo_ratings.items():
        elo_ratings[k] = round(v, 2)
    sorted_ratings = sorted(elo_ratings.items(), key=lambda x: x[1], reverse=True)
    return sorted_ratings

In [4]:
def plot_delta(results: pd.DataFrame, model_name: str, outpath: str|None=None) -> None:
    base_name = "-".join(model_name.split("-")[:-1])

    # calculate deltas
    deltas = []
    for trait, score in results[base_name].tolist():
        new_score = [x for x in results[model_name].tolist() if x[0] == trait][0][1]
        diff = new_score - score
        deltas.append((trait, abs(diff), diff, score, new_score))

    # sort by delta (difference) and get top 10 and bottom 10
    deltas_sorted = sorted(deltas, key=lambda x: x[2], reverse=True)
    top_10_deltas = deltas_sorted[:10]
    bottom_10_deltas = deltas_sorted[-10:]

    # combine top 10 increases and bottom 10 decreases into one plot
    all_deltas = top_10_deltas + bottom_10_deltas
    traits = [item[0] for item in all_deltas]
    old_scores = [item[3] for item in all_deltas]
    new_scores = [item[4] for item in all_deltas]
    diffs = [item[2] for item in all_deltas]

    plt.figure(figsize=(6, 5))
    # use different colors for increases vs decreases
    bar_colors = ['green'] * 10 + ['red'] * 10
    bars = plt.barh(range(len(traits)), new_scores, color=bar_colors, alpha=0.6)

    # add delta text beside each bar
    for i, (new_score, diff) in enumerate(zip(new_scores, diffs)):
        # position text at the end of the bar with some padding
        text_x = new_score + 5
        delta_text = f"+{int(diff)}" if diff > 0 else f"{int(diff)}"
        plt.text(text_x, i, delta_text, va='center', ha='left', fontsize=8, color='black')

    plt.xlabel('Elo Score (New)')
    plt.ylabel('Trait')
    plt.title(f'Traits with Largest $\Delta$ Elo Score From Original\n{names[model_name]}')
    plt.yticks(range(len(traits)), traits)
    plt.grid(axis='x', alpha=0.3)

    # extend x-axis to accommodate text
    max_score = max(new_scores)
    plt.xlim(None, max_score + 200)

    plt.tight_layout()
    if outpath: 
        plt.savefig(outpath, dpi=400)
        plt.close()
    else:
        plt.show()

In [5]:
for condition in ["feel", "like", "random"]:
    # load data
    files = os.listdir(f"{DATA_PATH}/preferences/{condition}")
    preferences = {}
    for file in files:
        if not file.endswith(".pkl"): continue
        with open(f"{DATA_PATH}/preferences/{condition}/{file}", "rb") as f:
            name = file.split(".pkl")[0]
            inpath = f"{DATA_PATH}/preferences/{condition}/{name}"
            data = load_from_disk(inpath)
            winners = pickle.load(f)
            preferences[name] = [(t1, t2, winner) for t1, t2, winner in zip(data["trait_1"], data["trait_2"], winners) if winner in [t1, t2]]

    results = pd.DataFrame()
    for model in models:
        for constitution in constitutions:
            name = f"{model}{constitution}"
            sorted_ratings = calculate_elo_ratings(preferences, name, False)
            results[name] = sorted_ratings

    for model in models:
        for constitution in constitutions:
            if constitution == "": continue
            outpath = f"/workspace/PersonalityTraining/figures/preferences/{condition}/{model}{constitution}.png"
            os.makedirs(os.path.dirname(outpath), exist_ok=True)
            plot_delta(results, f"{model}{constitution}", outpath)