In [None]:
import pandas as pd 
import numpy as np
from sklearn.neighbors import NearestNeighbors

In [None]:
df = pd.read_parquet('all_embeddings.parquet')
dataset = df[df['corpus'] == 'Reddit']

# Who are the closest neighbours to each text?

In [None]:
X = np.stack(dataset['embedding'])
y = np.array(dataset['author'])

nbrs = NearestNeighbors(
    n_neighbors=2, 
    metric='cosine', 
    algorithm='brute'
    )

nbrs.fit(X)
distances, indices = nbrs.kneighbors(X)

In [None]:
same_authors = []

for i, text in enumerate(X):
    print(f"Nearest neighbour for text {i}, author {y[i]}")
    for j in range(1, len(distances[i])):
        neighbour_index = indices[i][j]
        distance = distances[i][j]

        if y[i] == y[neighbour_index]:
            same_authors.append(y[i])
        print(f" {y[neighbour_index]} Distance: {distance:.4f}")

# 1. How does KNN perform as you change the number of authors?

This will use 3 text as the 'train' set and 1 text as the inference set per author.

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import matplotlib.pyplot as plt
import warnings

In [None]:
df = pd.read_parquet('all_embeddings.parquet')
dataset = df[df['corpus'] == 'Reddit']

In [None]:
n_authors = [2, 5, 10, 25, 50, 100, 150, 200, 250, 300, 350, 400]
trials = 100

results = {
    'n_authors': [],
    'mean_acc': [],
    'std_acc': []
}

unique_authors = dataset['author'].unique()

knn = KNeighborsClassifier(
    n_neighbors=1,
    metric='cosine'
)

for n in n_authors:

    current_acc = []

    for _ in range(trials):

        selected_authors = np.random.choice(unique_authors, size=n, replace=False)
        
        subset = dataset[dataset['author'].isin(selected_authors)]

        X = np.stack(subset['embedding'])
        y = np.array(subset['author'])

        X_train, X_test, y_train, y_test = train_test_split(X, 
                                                            y, 
                                                            stratify=y,
                                                            test_size=n)
        
        # Suppress warnings only for this block
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            knn.fit(X_train, y_train)
            predictions = knn.predict(X_test)
            accuracy = accuracy_score(y_test, predictions)

        current_acc.append(accuracy)

    results['n_authors'].append(n)
    results['mean_acc'].append(np.mean(current_acc))
    results['std_acc'].append(np.std(current_acc))

plt.plot(results['n_authors'], results['mean_acc'])
plt.xlabel('Number of authors')
plt.ylabel('Mean accuracy')
plt.grid(True)

df = pd.DataFrame(results)
pd.set_option('display.precision', 3)
print(df)

# 2. Replicating attribution results

In the paper, they use different support levels and different numbers of authors. The authors divide each text into chunks of 512 tokens and then make an embedding for each chunk. We do the same here. We disregard those authors who have less than 9 chunks/embeddings to their name so that we can use 8 support chunks and 1 chunk for testing per author.

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import matplotlib.pyplot as plt
import warnings

dataset = pd.read_parquet('reddit_chunked_embeddings_detailed.parquet')
author_counts = dataset['author'].value_counts()

# Select only those authors who have 9 or more embeddings (8 for train, 1 for test)
valid_authors = author_counts[author_counts >= 9].index

filtered_df = dataset[dataset['author'].isin(valid_authors)].copy()

In [None]:
n_authors = [10, 20, 50, 100, 250, 368] # len(unique_authors = 368)
trials = 100
support = 8 # Support level in Table 7 of paper

results = {
    'n_authors': [],
    'mean_acc': [],
    'std_acc': []
}

unique_authors = filtered_df['author'].unique()

knn = KNeighborsClassifier(
    n_neighbors=1,
    metric='cosine'
)

for n in n_authors:

    current_acc = []

    for _ in range(trials):

        selected_authors = np.random.choice(unique_authors, size=n, replace=False)

        subset = filtered_df[filtered_df['author'].isin(selected_authors)]

        sampled_dataset = subset.groupby('author').sample(n= support + 1) # +1 so that there is enough for 8 train and 1 test

        X = np.stack(sampled_dataset['embedding'])
        y = np.array(sampled_dataset['author'])

        X_train, X_test, y_train, y_test = train_test_split(X, 
                                                            y, 
                                                            stratify=y,
                                                            test_size=n # 1 test per author
                                                            )
        
        # Suppress warnings only for this block
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            knn.fit(X_train, y_train)
            predictions = knn.predict(X_test)
            accuracy = accuracy_score(y_test, predictions)

        current_acc.append(accuracy)
    
    results['n_authors'].append(n)
    results['mean_acc'].append(np.mean(current_acc))
    results['std_acc'].append(np.std(current_acc))

plt.plot(results['n_authors'], results['mean_acc'])
plt.title(f"Mean accuracy ({support} train chunk, 1 test chunk) over 100 trials")
plt.xlabel('Number of authors')
plt.ylabel('Mean accuracy')
plt.grid(True)

print(f"Support: {support}")
df = pd.DataFrame(results)
pd.set_option('display.precision', 3)
print(df)

# 3. Removing dimensions

The first approach we use is to remove each dimension one by one and see how it affects accuracy over many trials. Since we have to loop over 1,024 dimensions, 

In [None]:
from matplotlib.ticker import FuncFormatter

In [None]:
unique_authors = filtered_df['author'].unique()
n_authors = len(unique_authors)
trials = 20
support = 8

knn = KNeighborsClassifier(
    n_neighbors=1,
    metric='cosine'
)

results = {
    'deleted_dim': [],
    'acc': [],
    'trial_id': []
}

baseline_acc = []

for trial_id in range(trials):

    # Sample texts from each author
    sampled_dataset = filtered_df.groupby('author').sample(n = support + 1) # +1 so that there is enough for 8 train and 1 test

    X = np.stack(sampled_dataset['embedding'])
    y = np.array(sampled_dataset['author'])

    # Assign 1 text as the test
    X_train, X_test, y_train, y_test = train_test_split(X, 
                                                        y, 
                                                        stratify=y,
                                                        test_size=n_authors
                                                        )
    
    knn.fit(X_train, y_train)
    predictions = knn.predict(X_test)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        accuracy = accuracy_score(y_test, predictions)
    baseline_acc.append(accuracy)
    
    dim_acc = []
    
    # Loop over columns of matrix 
    for dim in range(X_train.shape[1]):
        
        X_train_del = np.delete(X_train, dim, axis=1)
        X_test_del = np.delete(X_test, dim, axis=1)

        knn.fit(X_train_del, y_train)
        predictions = knn.predict(X_test_del)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            accuracy = accuracy_score(y_test, predictions)
        
        results['deleted_dim'].append(dim)
        results['acc'].append(accuracy)
        results['trial_id'].append(1)

results = pd.DataFrame(results)
mean_baseline_acc = np.mean(baseline_acc)

dim_stats = results.groupby('deleted_dim')['acc'].agg(['mean', 'std']).reset_index()
print("Dimensions for which accuracy dropped when removed:")
print(dim_stats.sort_values('mean', ascending=True).head(10))

def to_percent(y, position):
    return f"{y * 100:.2f}%"

formatter = FuncFormatter(to_percent)
plt.gca().yaxis.set_major_formatter(formatter)

plt.plot(dim_stats['deleted_dim'], dim_stats['mean'])
plt.title(f'Mean Accuracy Over {trials} Trials After Removing One Dimension')
plt.xlabel('Index of Dimension Removed')
plt.ylabel('Mean Accuracy')
plt.grid(True, alpha=0.3)
plt.axhline(y = mean_baseline_acc, color='r', linestyle='--', label='Baseline (no dimensions removed)')
plt.legend()
plt.show()
        

In [None]:
from scipy.stats import ttest_rel

# Convert to DataFrame
df = pd.DataFrame(results)

# Create a list to store stats per dimension
significance_data = []

# Loop through every unique dimension
for dim in df['deleted_dim'].unique():
    # Get the list of accuracies for this specific dimension
    dim_accs = df[df['deleted_dim'] == dim]['acc'].values
    
    # Compare against the baseline list (must be same length!)
    # ttest_rel checks: "Is the mean difference significantly different from zero?"
    t_stat, p_val = ttest_rel(baseline_acc, dim_accs)
    
    # Calculate average drop
    mean_drop = np.mean(baseline_acc) - np.mean(dim_accs)
    
    significance_data.append({
        'dim': dim,
        'p_value': p_val,
        'mean_diff_from_baseline': mean_drop,
    })

stats_df = pd.DataFrame(significance_data)

# Show dimensions that significantly HURT accuracy (Positive drop, Low P-value)
print("--- Significantly Important Dimensions ---")
important_dims = stats_df.sort_values('mean_diff_from_baseline', ascending=False)

print(important_dims.head(20))