In [None]:
from datasets import Dataset, Features
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModel
import torch
import numpy as np
from xgboost import XGBClassifier
import xgboost as xgb
from tqdm import tqdm  # Import tqdm for progress tracking
import math
import pickle
import matplotlib.pyplot as plt
# import pad_sequence
from torch.nn.utils.rnn import pad_sequence
import matplotlib

In [None]:
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
plt.rc('text', usetex=False)
plt.rc('font', family='serif')
alpha_overlap = 0.8


In [None]:
humaneval_dataset = load_dataset("openai_humaneval")

In [None]:
# Load model directly

device = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = AutoTokenizer.from_pretrained("huggingface/CodeBERTa-small-v1")
model = AutoModel.from_pretrained("huggingface/CodeBERTa-small-v1").to(device)

In [None]:
def get_embeddings_from_str(list_of_strs, batch_size=16):
    def tokenize_function_embedding(examples):
        prompt_token = tokenizer(examples['text'], return_tensors="pt",  padding="max_length", truncation=True )['input_ids']
        encoded_tokens = model(prompt_token.to(device)).pooler_output.detach().cpu().numpy()
        dict = {'encoded_tokens': encoded_tokens}
        return dict# overall_tokens


    dataset = Dataset.from_dict({"text": list_of_strs })
    # to use batched
    batched_arg = True
    if batch_size == 1:
        batched_arg = False
    ds_train_tokenized = dataset.map(tokenize_function_embedding, batched= batched_arg, batch_size=batch_size)
    embeddings = np.array([ds_train_tokenized[i]['encoded_tokens'] for i in range(len(ds_train_tokenized))])
    return embeddings

def get_embeddings_from_tokens(list_of_tokens, batch_size=64):
    def function_embedding(examples):
        input_ids_tokens = torch.tensor(examples['tokens']).to(device)
        encoded_tokens = model(input_ids_tokens).pooler_output.detach().cpu().numpy()
        dict = {'encoded_tokens': encoded_tokens}
        return dict# overall_tokens
    dataset = Dataset.from_dict({"tokens": list_of_tokens })
    # to use batched
    batched_arg = True
    if batch_size == 1:
        batched_arg = False
    ds_train_tokenized = dataset.map(function_embedding, batched= batched_arg, batch_size=batch_size)
    embeddings = np.array([ds_train_tokenized[i]['encoded_tokens'] for i in range(len(ds_train_tokenized))])
    return embeddings






In [None]:
# load xgboost model from .json
# this model is not provided in the repo, please train your own model
model_xgboost = xgb.XGBClassifier()
model_xgboost.load_model('model_trained_emb_only.json')


In [None]:
def calculate_scalar_outputs(prompt_embedding, suggestion_embeddings):
    # concatenate embeddings
    embedding_all = [(np.concatenate([prompt_embedding, sugg_emb])) for sugg_emb in suggestion_embeddings]
    # calculate scalar output
    scalar_outputs = model_xgboost.predict_proba(embedding_all)[:,0]
    scalar_outputs = np.array(scalar_outputs)
    return scalar_outputs



In [None]:
max_examples = len(humaneval_dataset['test'])
counter = 0
all_outputs = []
lines_to_include_to_prompt = 0
for datapoint in tqdm(humaneval_dataset['test']):
    counter += 1
    if counter > max_examples:
        break
    prompt = datapoint['prompt']
    solution = datapoint['canonical_solution']
    # take first line from solution and add it to prompt and remove it from soltuion
    lines_solution = solution.split('\n')
    # make sure solution has at least 1 line
    lines_to_include_dp = min(lines_to_include_to_prompt, len(lines_solution)-1)

    prompt = prompt + '\n' + '\n'.join(lines_solution[:lines_to_include_dp])
    solution = '\n'.join(lines_solution[lines_to_include_dp:])
    prompt_tokens = tokenizer(prompt, padding="max_length", return_tensors='pt', truncation=True)
    emb_prompt = get_embeddings_from_tokens(prompt_tokens['input_ids'])[0]
    canonical_solution_tokens = tokenizer(solution, padding="max_length", return_tensors='pt', truncation=True)['input_ids']
    # Find the index of the first occurrence of 1
    index_of_1 = (canonical_solution_tokens[0] == 1).nonzero(as_tuple=False).min()
    length_before_pad = index_of_1.item()
    print(length_before_pad)

    substrings_canonical_tokens = [torch.tensor(canonical_solution_tokens[0][:i].tolist()) for i in range(1, length_before_pad + 1)]

    padded_tokens = [torch.nn.functional.pad(token, (0, 512 - len(token)), value=tokenizer.pad_token_id) for token in substrings_canonical_tokens]
    # Convert the padded tokens to a tensor
    substrings_canonical_tokens = pad_sequence(padded_tokens, batch_first=True)
    #substrings_canonical_tokens = pad_sequence(substrings_canonical_tokens, batch_first=True, padding_value=tokenizer.pad_token_id)
    embs_sugg = get_embeddings_from_tokens(substrings_canonical_tokens)
    outputs = calculate_scalar_outputs(emb_prompt, embs_sugg)

    all_outputs.append(outputs)

In [None]:
lengths = np.array([len(output) for output in all_outputs])
percentiles = np.arange(0,110,10)
scores_across_lengths = []

for i in range(len(all_outputs)):
    scores = []
    max_score = max(all_outputs[i])

    for j in range(len(percentiles)):
        # average the score from this percentile to the next percentile

        index_j = math.floor(len(all_outputs[i]) * (percentiles[j]/100))
        if index_j >= len(all_outputs[i]):
            index_j = len(all_outputs[i]) - 1
        if index_j < 0:
            index_j = 0
        scores.append( all_outputs[i][index_j]   )
    scores_across_lengths.append(scores)


In [None]:
lengths = np.array([len(output) for output in all_outputs])
percentiles = np.arange(0,105,5)
scores_across_lengths = []

for i in range(len(all_outputs)):
    scores = []
    max_score = max(all_outputs[i])
    for j in range(len(percentiles)-2):
        # average the score from this percentile to the next percentile
        index_j = math.floor(len(all_outputs[i]) * (percentiles[j]/100))
        index_j_plus = math.floor(len(all_outputs[i]) * (percentiles[j + 1]/100))
        avg_score = np.mean(all_outputs[i][index_j:index_j_plus+1]) / max_score
        # check if nan
        if np.isnan(avg_score):
            print("nan")
            print(all_outputs[i])
            print(index_j_plus)
            print(index_j)
        scores.append(avg_score)
    scores_across_lengths.append(scores)


In [None]:
scores_across_lengths = np.array(scores_across_lengths)

# Calculate the mean and standard deviation for each percentile
mean_scores = np.mean(scores_across_lengths, axis=0)
std_error_scores = np.std(scores_across_lengths, axis=0) / np.sqrt(scores_across_lengths.shape[0])

# Create the plot with the x-axis as percentiles and the y-axis as mean scores
plt.errorbar(percentiles[:-2], mean_scores, yerr=std_error_scores, fmt='o')
trend_line = np.polyfit(percentiles[:-2], mean_scores, 2)
trend_line_values = np.polyval(trend_line, percentiles[:-2])

plt.plot(percentiles[:-2], trend_line_values, 'r-', label=' Quadratic Trend Line')

ax = plt.gca()
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()
fig_size = plt.rcParams["figure.figsize"]
fig_size[0] = 6
fig_size[1] = 4.2
plt.grid()
plt.legend(fontsize='large')
plt.xlabel('Percentiles of Length',fontsize='large')
plt.ylabel('Normalized Mean Score',fontsize='large')
plt.grid(True)
plt.savefig(f'plot_score_trend_{lines_to_include_to_prompt}.pdf', dpi = 1000, bbox_inches='tight')
plt.show()

In [None]:
max_indices_percent = []
for i in range(len(all_outputs)):
    # index of max
    max_index = np.argmax(all_outputs[i])
    # get the percent
    max_indices_percent.append(max_index/(len(all_outputs[i])-1))

ax = plt.gca()
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()
fig_size = plt.rcParams["figure.figsize"]
fig_size[0] = 6
fig_size[1] = 4.2
plt.grid()
plt.legend(fontsize='large')
max_indices_percent = np.array(max_indices_percent)

# Create the histogram
plt.hist(max_indices_percent, bins=20, range=(0, 1))
plt.xlabel('Max Index Percentage')
plt.ylabel('Count')
plt.savefig(f'histogram_maxindex_{lines_to_include_to_prompt}.pdf', dpi = 1000, bbox_inches='tight')

plt.grid(True)
plt.show()


In [None]:
from scipy import stats

# Perform the Kolmogorov-Smirnov test
ks_statistic, ks_p_value = stats.kstest(max_indices_percent, 'uniform')

# Perform the Anderson-Darling test

# Print results
print("Kolmogorov-Smirnov Test:")
print(f"KS Statistic: {ks_statistic:.4f}")
print(f"P-value: {ks_p_value:.4f}")


# Include lines prompt

In [None]:
lines_to_include_set = [0,1,2,3,4]
for lines_to_include_to_prompt in lines_to_include_set:
    max_examples = len(humaneval_dataset['test'])
    counter = 0
    all_outputs = []
    #lines_to_include_to_prompt = 2
    for datapoint in tqdm(humaneval_dataset['test']):
        counter += 1
        if counter > max_examples:
            break
        prompt = datapoint['prompt']
        solution = datapoint['canonical_solution']
        # take first line from solution and add it to prompt and remove it from soltuion
        lines_solution = solution.split('\n')
        # make sure solution has at least 1 line
        lines_to_include_dp = min(lines_to_include_to_prompt, len(lines_solution)-1)

        prompt = prompt + '\n' + '\n'.join(lines_solution[:lines_to_include_dp])
        solution = '\n'.join(lines_solution[lines_to_include_dp:])
        prompt_tokens = tokenizer(prompt, padding="max_length", return_tensors='pt', truncation=True)
        emb_prompt = get_embeddings_from_tokens(prompt_tokens['input_ids'])[0]
        canonical_solution_tokens = tokenizer(solution, padding="max_length", return_tensors='pt', truncation=True)['input_ids']
        # Find the index of the first occurrence of 1
        index_of_1 = (canonical_solution_tokens[0] == 1).nonzero(as_tuple=False).min()
        length_before_pad = index_of_1.item()
        print(length_before_pad)

        substrings_canonical_tokens = [torch.tensor(canonical_solution_tokens[0][:i].tolist()) for i in range(1, length_before_pad + 1)]

        padded_tokens = [torch.nn.functional.pad(token, (0, 512 - len(token)), value=tokenizer.pad_token_id) for token in substrings_canonical_tokens]
        # Convert the padded tokens to a tensor
        substrings_canonical_tokens = pad_sequence(padded_tokens, batch_first=True)
        #substrings_canonical_tokens = pad_sequence(substrings_canonical_tokens, batch_first=True, padding_value=tokenizer.pad_token_id)
        embs_sugg = get_embeddings_from_tokens(substrings_canonical_tokens)
        outputs = calculate_scalar_outputs(emb_prompt, embs_sugg)

        all_outputs.append(outputs)

    lengths = np.array([len(output) for output in all_outputs])
    percentiles = np.arange(0,110,10)
    scores_across_lengths = []

    for i in range(len(all_outputs)):
        scores = []
        max_score = max(all_outputs[i])

        for j in range(len(percentiles)):
            # average the score from this percentile to the next percentile

            index_j = math.floor(len(all_outputs[i]) * (percentiles[j]/100))
            if index_j >= len(all_outputs[i]):
                index_j = len(all_outputs[i]) - 1
            if index_j < 0:
                index_j = 0
            scores.append( all_outputs[i][index_j]   )
        scores_across_lengths.append(scores)


    lengths = np.array([len(output) for output in all_outputs])
    percentiles = np.arange(0,105,5)
    scores_across_lengths = []

    for i in range(len(all_outputs)):
        scores = []
        max_score = max(all_outputs[i])
        for j in range(len(percentiles)-2):
            # average the score from this percentile to the next percentile
            index_j = math.floor(len(all_outputs[i]) * (percentiles[j]/100))
            index_j_plus = math.floor(len(all_outputs[i]) * (percentiles[j + 1]/100))
            avg_score = np.mean(all_outputs[i][index_j:index_j_plus+1]) / max_score
            # check if nan
            if np.isnan(avg_score):
                print("nan")
                print(all_outputs[i])
                print(index_j_plus)
                print(index_j)
            scores.append(avg_score)
        scores_across_lengths.append(scores)


    scores_across_lengths = np.array(scores_across_lengths)

    # Calculate the mean and standard deviation for each percentile
    mean_scores = np.mean(scores_across_lengths, axis=0)
    std_error_scores = np.std(scores_across_lengths, axis=0) / np.sqrt(scores_across_lengths.shape[0])

    # Create the plot with the x-axis as percentiles and the y-axis as mean scores
    plt.errorbar(percentiles[:-2], mean_scores, yerr=std_error_scores, fmt='o')
    trend_line = np.polyfit(percentiles[:-2], mean_scores, 2)
    trend_line_values = np.polyval(trend_line, percentiles[:-2])

    plt.plot(percentiles[:-2], trend_line_values, 'r-', label=' Quadratic Trend Line')

    ax = plt.gca()
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    fig_size = plt.rcParams["figure.figsize"]
    fig_size[0] = 6
    fig_size[1] = 4.2
    plt.grid()
    plt.legend(fontsize='large')
    plt.xlabel('Percentiles of Length',fontsize='large')
    plt.ylabel('Normalized Mean Score',fontsize='large')
    plt.grid(True)
    plt.savefig(f'plot_score_trend_{lines_to_include_to_prompt}.pdf', dpi = 1000, bbox_inches='tight')
    plt.show()


    max_indices_percent = []
    for i in range(len(all_outputs)):
        # index of max
        max_index = np.argmax(all_outputs[i])
        # get the percent
        max_indices_percent.append(max_index/(len(all_outputs[i])-1))

    ax = plt.gca()
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    ax.get_xaxis().tick_bottom()
    ax.get_yaxis().tick_left()
    fig_size = plt.rcParams["figure.figsize"]
    fig_size[0] = 6
    fig_size[1] = 4.2
    plt.grid()
    plt.legend(fontsize='large')
    max_indices_percent = np.array(max_indices_percent)

    # Create the histogram
    plt.hist(max_indices_percent, bins=20, range=(0, 1))
    plt.xlabel('Max Index Percentage')
    plt.ylabel('Count')
    plt.savefig(f'histogram_maxindex_{lines_to_include_to_prompt}.pdf', dpi = 1000, bbox_inches='tight')

    plt.grid(True)
    plt.show()
