The purpose of this experiment if for an LLM to predict the return type of a function by using next-token prediction (not using the residual stream).

In [None]:
import torch
import os

device = torch.device('cuda')

DATA_DIR = ""
RESULTS_DIR = ""

os.makedirs(RESULTS_DIR, exist_ok=True)

LLM_NAMES = [
    "EleutherAI/pythia-2.8b-deduped-v0",
    "codellama/CodeLlama-7b-Python-hf",
    "codellama/CodeLlama-13b-Python-hf",
    "bigcode/starcoder2-15b",
    "Qwen/Qwen2.5-Coder-32B",
    "deepseek-ai/deepseek-coder-33b-instruct",
]

print(f"Using device: {device} ({torch.cuda.get_device_name(device)})")

# Load dataset

In [None]:
import pandas as pd

vague_dataset_name = "python_vague_test"
misleading_dataset_name = "python_misleading_test"
real_dataset_name = "python_real_test"

full_misleading_df = pd.read_json(f"{DATA_DIR}/{misleading_dataset_name}.jsonl", lines=True)
full_real_df = pd.read_json(f"{DATA_DIR}/{real_dataset_name}.jsonl", lines=True)
full_vague_df = pd.read_json(f"{DATA_DIR}/{vague_dataset_name}.jsonl", lines=True)

In [None]:
full_misleading_df

In [None]:
return_type_counts = full_misleading_df['return_type'].value_counts()
# filter out return types with 3 or less samples
return_type_options = return_type_counts[return_type_counts > 3].index.tolist()
# Strip ' and " from beginning and end of return types
return_type_options = [return_type.strip('\'"') for return_type in return_type_options]

misleading_filtered_df = full_misleading_df[full_misleading_df['return_type'].isin(return_type_options)]

real_return_type_counts = full_real_df['return_type'].value_counts()
# filter out return types with 3 or less samples
real_return_type_options = real_return_type_counts[real_return_type_counts > 3].index.tolist()
real_filtered_df = full_real_df[full_real_df['return_type'].isin(real_return_type_options)]

vague_return_type_counts = full_vague_df['return_type'].value_counts()
# filter out return types with 3 or less samples
vague_return_type_options = vague_return_type_counts[vague_return_type_counts > 3].index.tolist()
vague_filtered_df = full_vague_df[full_vague_df['return_type'].isin(vague_return_type_options)]

# Load model

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

def load_llm(llm_name: str) -> (AutoModelForCausalLM, AutoTokenizer):
    llm = AutoModelForCausalLM.from_pretrained(
        llm_name,
        torch_dtype=torch.float16,
        device_map="cuda"
    )
    tokenizer = AutoTokenizer.from_pretrained(llm_name)
    return llm, tokenizer

# Predict Return Type

In [None]:
import re

def predict_return_type(code: str, llm: AutoModelForCausalLM, tokenizer: AutoTokenizer) -> str:
    torch.cuda.empty_cache()
    # Get everything before the first colon
    function_signature = re.search(r'(def .*?\(.*?\)):', code, re.DOTALL).group(1)
    input = f"The function without hinted return type:```python\n{code}\n```\n\nSame function with hinted return type: ```python\n{function_signature} -> "
    token_ids = tokenizer.encode(input, return_tensors="pt", add_special_tokens=True).to(device)
    # Get allowed tokens from tokenizer (colon is also allowed)
    return_type_options_token_ids = [tokenizer.encode(type_token_ids, return_tensors="pt", add_special_tokens=False)[0] for type_token_ids in return_type_options]
    colon_tokens_ids = [tokenizer.encode(colon_token, return_tensors="pt", add_special_tokens=False)[0] for colon_token in [":", ":\n", ": "]]
    colon_tokens_ids = [colon_tokens_id[0] for colon_tokens_id in colon_tokens_ids if colon_tokens_id.shape[0] == 1]
    predicted_type_token_ids = []
    while True:
        torch.cuda.empty_cache()
        # Restrict allowed tokens to those that are valid return types (i.e. the first predicted token must be the first token of a valid return type, the second predicted token must be the second token of a valid return type, etc.), and 'colon'
        allowed_token_ids_list = []
        for return_type_option_token_ids in return_type_options_token_ids:
            # If we have completely predicted this return type, we can predict a colon
            if return_type_option_token_ids.equal(torch.Tensor(predicted_type_token_ids)):
                allowed_token_ids_list.extend(colon_tokens_ids)
            # If the currently predicted tokens is the beginning of a return type, we can continue with that return type.
            if len(predicted_type_token_ids) < len(return_type_option_token_ids) and return_type_option_token_ids[:len(predicted_type_token_ids)].equal(torch.Tensor(predicted_type_token_ids)):
                allowed_token_ids_list.append(return_type_option_token_ids[len(predicted_type_token_ids)])
        allowed_token_ids = torch.stack(allowed_token_ids_list)
        logits = llm.forward(token_ids).logits
        unbatched_next_token_logits = logits[0, -1]
        # Zero all tokens that are not in the allowed tokens
        mask = torch.ones_like(unbatched_next_token_logits, dtype=torch.bool, device=logits.device)
        mask[allowed_token_ids] = False
        unbatched_next_token_logits[mask] = float('-inf')
        predicted_next_token_id = unbatched_next_token_logits.argmax(-1)
        predicted_type_token_ids.append(predicted_next_token_id)
        token_ids = torch.cat([token_ids, predicted_next_token_id.reshape([1, 1])], dim=1)
        predicted_return_type = tokenizer.decode(predicted_type_token_ids)
        if ":" in predicted_return_type:
            break
        if len(predicted_type_token_ids) > 10:
            raise Exception("Limit reached")
    # Remove colon and everything after it
    predicted_return_type = predicted_return_type.split(":")[0].strip()
    torch.cuda.empty_cache()
    return predicted_return_type


# Accuracy

In [None]:
import tqdm

def get_accuracy(df: pd.DataFrame, llm: AutoModelForCausalLM, tokenizer: AutoTokenizer) -> (int, int):
    correct = 0
    total = 0
    for index, row in tqdm.tqdm(df.iterrows()):
        stripped_code = row['stripped_code']
        try:
            predicted_return_type = predict_return_type(stripped_code, llm, tokenizer)
            total += 1
            if predicted_return_type not in return_type_options:
                raise Exception(f"Predicted return type: '{predicted_return_type}' is not an option")
            if predicted_return_type == row['return_type']:
                correct += 1
        except Exception as e:
            raise e
    return correct, total

In [None]:
def save_accuracy_for_dataset(llm_name: str, df: pd.DataFrame, dataset_name: str):
    results_filepath = os.path.join(RESULTS_DIR, f"{llm_name.replace('/', '__')}_{dataset_name}_accuracy.txt")
    if os.path.exists(results_filepath):
        print(f"Results already exist for {llm_name} on {dataset_name}")
        return
    llm, tokenizer = load_llm(llm_name)
    correct, total = get_accuracy(df, llm, tokenizer)
    with open(results_filepath, "w") as f:
        f.write(f"{correct}/{total}")
    print(f"Accuracy for {llm_name}: {correct}/{total}; Saved to {results_filepath}_accuracy.txt")

for llm_name in LLM_NAMES:
    print(f"=== Calculating accuracy for {llm_name}===")
    print("Real dataset")
    save_accuracy_for_dataset(llm_name, real_filtered_df, real_dataset_name)
    print("Misleading dataset")
    save_accuracy_for_dataset(llm_name, misleading_filtered_df, misleading_dataset_name)
    print("Vague dataset")
    save_accuracy_for_dataset(llm_name, vague_filtered_df, vague_dataset_name)