In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.decomposition import PCA
import umap.umap_ as umap


import matplotlib.pyplot as plt
import plotly.express as px
import plotly.io as pio

from tqdm import tqdm

from scipy.stats import ttest_ind

# Load the DataFrame
df = pd.read_pickle("/home/hpc/b207dd/b207dd11/test/spin-politics/standard_text_with_embeddings.pkl")
WORK_DIR = "/home/hpc/b207dd/b207dd11/test/spin-politics"

model_name = "llama3-8b"


In [None]:
STANDARD_ACTIVATIONS_CACHE_DIR = f"/home/atuin/b207dd/b207dd11/test/DEU/standard_sentences/{model_name}"


df.to_pickle(f"{STANDARD_ACTIVATIONS_CACHE_DIR}/standard_text_with_embeddings.pkl")

In [None]:

# Define a function to convert categorical values to RGB values
def category_to_rgb(libertarian, collectivist, progressive):
    color_map = {
        "Neutral": 128,
        "Libertär": 225,
        "Restriktiv": 30,
        "Kollektivistisch": 225,
        "Individualistisch": 30,
        "Progressiv": 225,
        "Konservativ": 30
    }
    r = color_map[libertarian]
    g = color_map[collectivist]
    b = color_map[progressive]
    return f'rgb({r},{g},{b})'

df['Ideology'] = df['Libertarian'] + ' - ' + df['Collectivist'] + ' - ' + df['Progressive']
df['Color'] = df.apply(lambda row: category_to_rgb(row['Libertarian'], row['Collectivist'], row['Progressive']), axis=1)
# Create a color discrete map
unique_ideologies = df['Ideology'].unique()
color_discrete_map = {ideology: color for ideology, color in zip(unique_ideologies, df['Color'].unique())}


# Unsupervised PCA + UMAP

In [None]:

def pca_umap_plot(df, 
                  pca_cumul_var_ratio_thresh=None,
                  pca_n_components=None, 
                  umap_n_components=3, 
                  umap_n_neighbors=15, 
                  umap_min_dist=0.1, 
                  layer_list=None,
                  image_output_folder=""):
    embeddings = np.stack(df['Embedding'].values)
    n_layers = embeddings.shape[1]
    if layer_list is None:
        layer_list = range(n_layers)
    #
    for layer in tqdm(layer_list, desc="Processing layers"):
        layer_embeddings = embeddings[:, layer, :]
        # Standardize embeddings
        mean = np.mean(layer_embeddings, axis=0)
        std = np.std(layer_embeddings, axis=0)
        standardized_embeddings = (layer_embeddings - mean) / std
        # PCA
        if pca_n_components:
            num_components = pca_n_components
            pca = PCA(n_components=num_components)
            pca_embeddings = pca.fit_transform(standardized_embeddings)
        elif pca_cumul_var_ratio_thresh:
            pca = PCA()
            pca.fit(standardized_embeddings)
            cumul_var_ratio = np.cumsum(pca.explained_variance_ratio_)
            num_components = np.where(cumul_var_ratio >= pca_cumul_var_ratio_thresh)[0][0] + 1
            if num_components < umap_n_components:
                num_components = umap_n_components
            pca_embeddings = pca.transform(standardized_embeddings)[:, :num_components]
        print(f"Layer {layer} PCA num_components {num_components}")
        # UMAP
        if umap_n_components < num_components:
            umap_reducer = umap.UMAP(n_components=umap_n_components, 
                                    n_neighbors=umap_n_neighbors, 
                                    min_dist=umap_min_dist, 
                                    metric='cosine', 
                                    random_state=42)
            umap_embeddings = umap_reducer.fit_transform(pca_embeddings)
        else:
            umap_embeddings = pca_embeddings
        # df for plotting
        plot_df = df.copy()
        plot_df['UMAP1'] = umap_embeddings[:, 0]
        plot_df['UMAP2'] = umap_embeddings[:, 1]
        plot_df['UMAP3'] = umap_embeddings[:, 2] if umap_n_components == 3 else np.zeros(umap_embeddings.shape[0])
        # Plotting
        fig = px.scatter_3d(
            plot_df, x='UMAP1', y='UMAP2', z='UMAP3', 
            color='Ideology', 
            color_discrete_map=color_discrete_map,
            #color_continuous_scale=px.colors.sequential.Rainbow_r,
            #color_discrete_sequence=px.colors.sequential.Rainbow_r,
            title=f'Layer {layer} Embeddings',
            hover_data={
                'Libertarian': True,
                'Collectivist': True,
                'Progressive': True,
                'Topic': True,
                #'Response': True
            }
        )
        file_path = f'{image_output_folder}/scatter_layer{layer}_pca{num_components}_umap{umap_n_components}.html'
        fig.write_html(file_path)
        plt.close()
        plt.clf()

# Call the function
pca_umap_plot(
    df, 
    #pca_cumul_var_ratio_thresh=.5,
    pca_n_components=3, 
    umap_n_components=3, 
    umap_n_neighbors=100, 
    umap_min_dist=0.8, 
    #layer_list=[i for i in range(3)],
    image_output_folder=f"{WORK_DIR}/standard_img"
)


# Supervised (Manual) Axis Projection

In [None]:
df_standard = df

In [None]:

# Function to calculate mean embeddings and filter significant features
def calculate_mean_embeddings_with_filter(df, axis, p_value_threshold=5e-12):
    type_a_label = axis["type_a"]
    type_b_label = axis["type_b"]
    #
    type_a_embeddings = np.stack(df[df[axis["column"]] == type_a_label]['Embedding'].values)
    type_b_embeddings = np.stack(df[df[axis["column"]] == type_b_label]['Embedding'].values)
    print(type_a_embeddings.shape, type_b_embeddings.shape)
    # Calculate mean and variance
    type_a_mean = np.mean(type_a_embeddings, axis=0)
    type_b_mean = np.mean(type_b_embeddings, axis=0)
    #
    type_a_var = np.var(type_a_embeddings, axis=0)
    type_b_var = np.var(type_b_embeddings, axis=0)
    # Perform t-test for each feature
    t_values, p_values = ttest_ind(type_a_embeddings, type_b_embeddings, axis=0, equal_var=False)
    # Create filter for significant features
    significant_features = p_values < p_value_threshold
    # Calculate direction vector using only significant features
    direction_vector = np.zeros_like(type_a_mean)
    direction_vector[significant_features] = type_a_mean[significant_features] - type_b_mean[significant_features]
    # Normalize the direction vector
    #norm = np.linalg.norm(direction_vector)
    #if norm != 0:
    #    direction_vector /= norm    
    return direction_vector, significant_features

# Define the axes
axes = {
    "Libertarian": {"column": "Libertarian", "type_a": "Libertär", "type_b": "Restriktiv"},
    "Collectivist": {"column": "Collectivist", "type_a": "Kollektivistisch", "type_b": "Individualistisch"},
    "Progressive": {"column": "Progressive", "type_a": "Progressiv", "type_b": "Konservativ"}
}



# Calculate mean embeddings and filters for each axis
mean_embeddings = {}
significant_features = {}
for axis_name, axis in axes.items():
    mean_embeddings[axis_name], significant_features[axis_name] = calculate_mean_embeddings_with_filter(df_standard, axis)
    num_significant_features = np.sum(significant_features[axis_name])
    total_features = significant_features[axis_name].size
    print(f"{axis_name}: {num_significant_features} of {total_features} features used")


In [None]:

# Create a new DataFrame for plotting
plot_data = []

for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing rows"):
    embedding = row['Embedding']    
    # Apply the significant feature filter to the embeddings
    filtered_embedding = {}
    for axis_name in axes.keys():
        filtered_embedding[axis_name] = embedding * significant_features[axis_name]
    # Calculate scores for each axis using only significant features
    libertarian_score = np.dot(filtered_embedding["Libertarian"].flatten(), mean_embeddings["Libertarian"].flatten())
    collectivist_score = np.dot(filtered_embedding["Collectivist"].flatten(), mean_embeddings["Collectivist"].flatten())
    progressive_score = np.dot(filtered_embedding["Progressive"].flatten(), mean_embeddings["Progressive"].flatten())
    #
    plot_data.append({
        'Libertarian': row['Libertarian'],
        'Collectivist': row['Collectivist'],
        'Progressive': row['Progressive'],
        'Topic': row['Topic'],
        "Libertarian": libertarian_score,
        "Collectivist": collectivist_score,
        "Progressive": progressive_score,
        "Ideology": row["Ideology"]
    })

plot_df = pd.DataFrame(plot_data)

# Plotting

fig = px.scatter_3d(
    plot_df, x='Libertarian', y='Collectivist', z='Progressive', 
    color='Ideology', 
    color_discrete_map=color_discrete_map,
    #color_continuous_scale=px.colors.sequential.Rainbow_r,
    #color_discrete_sequence=px.colors.sequential.Rainbow_r,
    title=f'Manual Axes Embeddings',
    hover_data={
        'Libertarian': True,
        'Collectivist': True,
        'Progressive': True,
        'Topic': True,
        #'Response': True
    }
)


image_output_folder=f"{WORK_DIR}/standard_img"
file_path = f'{image_output_folder}/scatter_manual.html'
fig.write_html(file_path)

## Real Text

In [None]:

df_speech_filtered_concat = pd.read_pickle('df_speech_filtered_concat.pkl')



# Function to load embeddings from a tensor file
def load_embedding(index, path):
    file_path = f"{path}/{index}.pt"
    return torch.load(file_path, map_location='cpu')

# Load all real text activations and add them to the DataFrame
ACTIVATIONS_CACHE_DIR = f"/home/atuin/b207dd/b207dd11/test/DEU/activations/{model_name}"
real_text_embeddings = []

for index in tqdm(range(len(df_speech_filtered_concat))):
    real_text_embeddings.append(load_embedding(index, ACTIVATIONS_CACHE_DIR).numpy())

df_speech_filtered_concat['Embedding'] = real_text_embeddings




df_speech_filtered_concat.to_pickle(f"{ACTIVATIONS_CACHE_DIR}/df_speech_filtered_concat.pkl")

In [None]:
def calculate_mean_embedding(embeddings):
    return np.mean(np.stack(embeddings), axis=0)


# Group by 'Speaker' and calculate mean embedding and count of entries
speaker_data = []

for speaker, group in tqdm(df_speech_filtered_concat.groupby('Speaker')):
    mean_embedding = calculate_mean_embedding(group['Embedding'].values)
    if np.isnan(mean_embedding).any():
        print(group)
    speaker_info = group.iloc[0][['Speaker', 'Partei', 'Religion']]
    entry_count = len(group)
    speaker_data.append({
        **speaker_info,
        "Embedding": mean_embedding,
        "EntryCount": entry_count
    })

# Create a new DataFrame from the speaker data
df_speaker_mean_embeddings = pd.DataFrame(speaker_data)

# Save the new DataFrame to a pickle file
df_speaker_mean_embeddings.to_pickle(f"{ACTIVATIONS_CACHE_DIR}/df_speaker_mean_embeddings.pkl")



In [None]:

# Plotting
party_color_discrete_map = {
    'SPD': '#E3000F',          # Red
    'BÜNDNIS 90/DIE GRÜNEN': '#46962B',  # Green
    'CSU': '#008AC5',          # Blue
    'CDU': '#000000',          # Black
    'AfD': '#009EE0',          # Light Blue
    'FDP': '#FFED00',          # Yellow
    'DIE LINKE.': '#BE3075'    # Magenta
}


### PCA

In [None]:

def pca_umap_plot(df, 
                  pca_cumul_var_ratio_thresh=None,
                  pca_n_components=None, 
                  umap_n_components=3, 
                  umap_n_neighbors=15, 
                  umap_min_dist=0.1, 
                  layer_list=None,
                  image_output_folder=""):
    embeddings = np.stack(df['Embedding'].values)
    n_layers = embeddings.shape[1]
    if layer_list is None:
        layer_list = range(n_layers)
    #
    for layer in tqdm(layer_list, desc="Processing layers"):
        layer_embeddings = embeddings[:, layer, :]
        # Standardize embeddings
        mean = np.mean(layer_embeddings, axis=0)
        std = np.std(layer_embeddings, axis=0)
        standardized_embeddings = (layer_embeddings - mean) / std
        # PCA
        if pca_n_components:
            num_components = pca_n_components
            pca = PCA(n_components=num_components)
            pca_embeddings = pca.fit_transform(standardized_embeddings)
        elif pca_cumul_var_ratio_thresh:
            pca = PCA()
            pca.fit(standardized_embeddings)
            cumul_var_ratio = np.cumsum(pca.explained_variance_ratio_)
            num_components = np.where(cumul_var_ratio >= pca_cumul_var_ratio_thresh)[0][0] + 1
            if num_components < umap_n_components:
                num_components = umap_n_components
            pca_embeddings = pca.transform(standardized_embeddings)[:, :num_components]
        print(f"Layer {layer} PCA num_components {num_components}")
        # UMAP
        if umap_n_components < num_components:
            umap_reducer = umap.UMAP(n_components=umap_n_components, 
                                    n_neighbors=umap_n_neighbors, 
                                    min_dist=umap_min_dist, 
                                    metric='cosine', 
                                    random_state=42)
            umap_embeddings = umap_reducer.fit_transform(pca_embeddings)
        else:
            umap_embeddings = pca_embeddings
        # df for plotting
        plot_df = df.copy()
        plot_df['UMAP1'] = umap_embeddings[:, 0]
        plot_df['UMAP2'] = umap_embeddings[:, 1]
        plot_df['UMAP3'] = umap_embeddings[:, 2] if umap_n_components == 3 else np.zeros(umap_embeddings.shape[0])
        # Plotting
        fig = px.scatter_3d(
            plot_df, x='UMAP1', y='UMAP2', z='UMAP3', 
            color='Partei', 
            size='EntryCount',
            opacity=0.5,
            color_discrete_map=party_color_discrete_map,
            #color_continuous_scale=px.colors.sequential.Rainbow_r,
            #color_discrete_sequence=px.colors.sequential.Rainbow_r,
            title=f'Layer {layer} Real Speaker Activations on UMAP Axes',
            hover_data={
                'Speaker': True
            }
        )
        #fig.update_traces(marker_size = 5)
        file_path = f'{image_output_folder}/scatter_layer{layer}_pca{num_components}_umap{umap_n_components}.html'
        fig.write_html(file_path)
        plt.close()
        plt.clf()

# Call the function
pca_umap_plot(
    #df_speech_filtered_concat.sample(1000), 
    df_speaker_mean_embeddings.sort_values(by='EntryCount', ascending=False).iloc[6:],
    #pca_cumul_var_ratio_thresh=.5,
    pca_n_components=6, 
    umap_n_components=3, 
    umap_n_neighbors=20, 
    umap_min_dist=0.1, 
    #layer_list=[i for i in range(3)],
    #image_output_folder=f"{WORK_DIR}/real_1000_img"
    image_output_folder=f"{WORK_DIR}/real_speaker_img"
)


### Axis

In [None]:
# Create a new DataFrame for plotting
plot_data = []

for index, row in tqdm(df_speech_filtered_concat.iterrows(), total=df_speech_filtered_concat.shape[0], desc="Processing rows"):
    embedding = row['Embedding']
    libertarian_score = np.dot(embedding.flatten(), mean_embeddings["Libertarian"][0].flatten()) - np.dot(embedding.flatten(), mean_embeddings["Libertarian"][1].flatten())
    collectivist_score = np.dot(embedding.flatten(), mean_embeddings["Collectivist"][0].flatten()) - np.dot(embedding.flatten(), mean_embeddings["Collectivist"][1].flatten())
    progressive_score = np.dot(embedding.flatten(), mean_embeddings["Progressive"][0].flatten()) - np.dot(embedding.flatten(), mean_embeddings["Progressive"][1].flatten())
    #
    plot_data.append({
        "Partei": row["Partei"],
        "Libertarian": libertarian_score,
        "Collectivist": collectivist_score,
        "Progressive": progressive_score,
        "Text": row["Text"],
    })

plot_df = pd.DataFrame(plot_data)


fig = px.scatter_3d(
    plot_df, x='Libertarian', y='Collectivist', z='Progressive', 
    color='Partei', 
    opacity=0.5,
    color_discrete_map=party_color_discrete_map,
    #color_continuous_scale=px.colors.sequential.Rainbow_r,
    #color_discrete_sequence=px.colors.sequential.Rainbow_r,
    title=f'Real Text Activations on Manual Axes'
)




image_output_folder=f"{WORK_DIR}/standard_img"
file_path = f'{image_output_folder}/scatter_real_data_manual.html'
fig.write_html(file_path)

In [None]:
df_tmp = df_speaker_mean_embeddings.sort_values(by='EntryCount', ascending=False).iloc[6:]
# Create a new DataFrame for plotting
plot_data = []

for index, row in tqdm(df_tmp.iterrows(), total=df_tmp.shape[0], desc="Processing rows"):
    embedding = row['Embedding']
    # Apply the significant feature filter to the embeddings
    filtered_embedding = {}
    for axis_name in axes.keys():
        filtered_embedding[axis_name] = embedding * significant_features[axis_name]
    # Calculate scores for each axis using only significant features
    libertarian_score = np.dot(filtered_embedding["Libertarian"].flatten(), mean_embeddings["Libertarian"].flatten())
    collectivist_score = np.dot(filtered_embedding["Collectivist"].flatten(), mean_embeddings["Collectivist"].flatten())
    progressive_score = np.dot(filtered_embedding["Progressive"].flatten(), mean_embeddings["Progressive"].flatten())
    #
    plot_data.append({
        "Speaker": row["Speaker"],
        "Partei": row["Partei"],
        "Libertarian": libertarian_score,
        "Collectivist": collectivist_score,
        "Progressive": progressive_score,
        "EntryCount": row["EntryCount"],
        "Symbol": 'Speaker'
    })

plot_df = pd.DataFrame(plot_data)


# Calculate centroids for each party
centroid_data = plot_df.groupby('Partei').agg({
    'Libertarian': 'mean',
    'Collectivist': 'mean',
    'Progressive': 'mean',
    'EntryCount': 'sum'
}).reset_index()

centroid_data['Speaker'] = centroid_data['Partei']  # Add a placeholder for Speaker
centroid_data['Symbol'] = 'Partei'

# Append centroids to plot_df
plot_df = pd.concat([plot_df, centroid_data], ignore_index=True)

libertarian_range = [plot_df['Libertarian'].min(), plot_df['Libertarian'].max()]
collectivist_range = [plot_df['Collectivist'].min(), plot_df['Collectivist'].max()]
progressive_range = [plot_df['Progressive'].min(), plot_df['Progressive'].max()]


fig = px.scatter_3d(
    plot_df, x='Libertarian', y='Collectivist', z='Progressive', 
    color='Partei', 
    size='EntryCount',
    size_max=128,
    symbol='Symbol',
    symbol_map={'Partei':'circle', 'Speaker':'square'},
    opacity=0.56,
    color_discrete_map=party_color_discrete_map,
    range_x=libertarian_range,
    range_y=collectivist_range,
    range_z=progressive_range,
    #color_continuous_scale=px.colors.sequential.Rainbow_r,
    #color_discrete_sequence=px.colors.sequential.Rainbow_r,
    title=f'Real Speaker Activations on Manual Axes',
    hover_data={
        'Speaker': True
    }
)


image_output_folder=f"{WORK_DIR}/real_speaker_img"
file_path = f'{image_output_folder}/scatter_real_data_manual.html'
fig.write_html(file_path)