In-context learning code completion for human-eval dataset

In [None]:
!pip install datasets
!pip install sentence_transformers

In [None]:
from datasets import load_dataset, load_metric
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForCausalLM

import heapq
import os
import re
import shutil

import numpy as np

In [None]:
os.environ["HF_ALLOW_CODE_EVAL"] = "1" # for code_eval

In [None]:
NUM_SAMPLES = 0

In [None]:
dataset = load_dataset('openai_humaneval', split='test')

if NUM_SAMPLES > 0 and NUM_SAMPLES < len(dataset):
  subset = dataset.select(range(NUM_SAMPLES))
else:
  subset = dataset

print(len(subset))



164


In [None]:
tokenizer_codegen = AutoTokenizer.from_pretrained('Salesforce/codegen-350M-mono', pad_token='<pad>')
model_codegen = AutoModelForCausalLM.from_pretrained('Salesforce/codegen-350M-mono', pad_token_id=tokenizer_codegen.pad_token_id)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [None]:
code_eval_metric = load_metric("code_eval")

In [None]:
print(subset)
subset[0]

Dataset({
    features: ['task_id', 'prompt', 'canonical_solution', 'test', 'entry_point'],
    num_rows: 164
})


{'task_id': 'HumanEval/0',
 'prompt': 'from typing import List\n\n\ndef has_close_elements(numbers: List[float], threshold: float) -> bool:\n    """ Check if in given list of numbers, are any two numbers closer to each other than\n    given threshold.\n    >>> has_close_elements([1.0, 2.0, 3.0], 0.5)\n    False\n    >>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)\n    True\n    """\n',
 'canonical_solution': '    for idx, elem in enumerate(numbers):\n        for idx2, elem2 in enumerate(numbers):\n            if idx != idx2:\n                distance = abs(elem - elem2)\n                if distance < threshold:\n                    return True\n\n    return False\n',
 'test': "\n\nMETADATA = {\n    'author': 'jt',\n    'dataset': 'test'\n}\n\n\ndef check(candidate):\n    assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True\n    assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False\n    assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True\n    assert 

In [None]:
def compute_semantic_similarity(object_embeddings, sample_embeddings):
  cosine_scores = util.cos_sim(object_embeddings, sample_embeddings)
  return cosine_scores.item()

In [None]:
# similarity matrix for prompts
# once for forever

model_ST = SentenceTransformer('all-mpnet-base-v2')

emb_list = []
for i in range(len(subset)):
  embedding = model_ST.encode(subset[i]['prompt'], convert_to_tensor=True)
  emb_list.append(embedding)

similar_matrix = np.zeros((len(subset), len(subset)))
for i in range(len(subset)):
  for j in range(len(subset)):
    similar_matrix[i,j] = compute_semantic_similarity(emb_list[i], emb_list[j])
  similar_matrix[i,i] = 0 # exclude itself

In [None]:
np.save("similar_matrix.npy", similar_matrix)
shutil.copyfile('/content/similar_matrix.npy', '/content/drive/MyDrive/UZH/AI4PP/similar_matrix.npy')

'/content/drive/MyDrive/UZH/AI4PP/similar_matrix.npy'

In [None]:
print(np.shape(similar_matrix))

(164, 164)


In [None]:
similar_matrix = np.load('/content/drive/MyDrive/UZH/AI4PP/similar_matrix.npy')

In [None]:
def generate_prompt(input_idx, dataset, mum_context=1):
  context_idx = np.argsort(similar_matrix[input_idx])[-1:-1-1*mum_context:-1]
  prompt = ''
  for idx in context_idx:
    idx = int(idx)
    prompt += '#################\n'
    prompt += dataset[idx]['prompt']
    prompt += dataset[idx]['canonical_solution']
  
  prompt += '#################\n'
  prompt += dataset[input_idx]['prompt']

  return prompt

In [None]:
def evaluation(predictions, metric):
  references = []

  for idx in range(len(predictions)):
    test_func = subset[idx]["test"]
    entry_point = f"check({subset[idx]['entry_point']})"
    references.append("\n" + test_func + "\n" + entry_point)

  pass_at_k, results = metric.compute(predictions=predictions, references=references)

  return pass_at_k, results

In [None]:
def evaluate(dataset, in_context=False, num_test=0):
  # normal completion
  predictions = []

  if num_test<1 or num_test>len(dataset):
    num_test = len(dataset)

  for idx in range(num_test):
    if in_context:
      text = generate_prompt(idx, dataset)
    else:
      text = dataset[idx]['prompt']
    
    input_ids = tokenizer_codegen(text, return_tensors="pt").input_ids

    generated_ids = model_codegen.generate(input_ids, max_new_tokens=128)
    prediction = tokenizer_codegen.decode(generated_ids[0], skip_special_tokens=True)

    # remove unfinished block
    last_double_newline = re.search(r'.*\n\n\n', prediction[::-1])
    if(last_double_newline!=None):
      last_double_newline_index = len(prediction) - last_double_newline.end()
      prediction = prediction[:last_double_newline_index]

    predictions.append([prediction])

  pass_at_k, results = evaluation(predictions, code_eval_metric)

  return pass_at_k, results

In [None]:
# test
pass_at_k, results = evaluate(subset, in_context=True, num_test=10)
print(pass_at_k)
print(results)
pass_at_k, results = evaluate(subset, in_context=False, num_test=10)
print(pass_at_k)
print(results)

{'pass@1': 0.12195121951219512}
defaultdict(<class 'list'>, {0: [(0, {'task_id': 0, 'passed': True, 'result': 'passed', 'completion_id': 0})], 1: [(0, {'task_id': 1, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 2: [(0, {'task_id': 2, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 3: [(0, {'task_id': 3, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 5: [(0, {'task_id': 5, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 4: [(0, {'task_id': 4, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 6: [(0, {'task_id': 6, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 7: [(0, {'task_id': 7, 'passed': True, 'result': 'passed', 'completion_id': 0})], 8: [(0, {'task_id': 8, 'passed': False, 'result': "failed: name 'prod' is not defined", 'completion_id': 0})], 10: [(0, {'task_id': 10, 'passed': False, 'result': 'failed: ', 'completion_id': 0})], 9: [(0, {'task_id': 9, 'passed': False, 'result': 'failed: ', 'comp

In [None]:
# full dataset test
pass_at_k, results = evaluate(subset, in_context=True)
print(pass_at_k)
print(results)
pass_at_k, results = evaluate(subset, in_context=False)
print(pass_at_k)
print(results)