In [100]:
import os
import random
import torch
import pandas as pd
from IPython.display import display
from dotenv import load_dotenv
from huggingface_hub import login
from transformers import AutoModelForCausalLM, AutoTokenizer

Step 1: read all files into memory

In [101]:
def read_files(directory_path: str, filenames: list[str]) -> list[str]:
	file_contents = []

	for filename in filenames:
		file_path = os.path.join(directory_path, filename)

		with open(file_path, 'r') as file:
			file_contents.append(file.read())

	return file_contents


in_directory_path = "../data"
filenames = os.listdir(in_directory_path)
contents = read_files(in_directory_path, filenames)
n_files = len(contents)

print(f"{n_files} file(s) have been read in directory {in_directory_path}")


2 file(s) have been read in directory ../data


Step 2: split each file into three parts multiple times

`split_data` will be of dimensions `[n_files x n_samples x 3]`

In [102]:
def split(n_samples: int, min_missing_chars: int, max_missing_chars: int) -> list[list[list[str]]]:
	split_data = []

	for file_content in contents:
		file_splits = []
		for _ in range(n_samples):
			middle_len = min(len(file_content), random.randint(min_missing_chars, max_missing_chars))
			middle_start = random.randint(0, len(file_content) - 1 - middle_len)
			middle_end = middle_start + middle_len

			prefix = file_content[:middle_start]
			middle = file_content[middle_start:middle_end]
			suffix = file_content[middle_end:]

			file_splits.append([prefix, middle, suffix])
		split_data.append(file_splits)
	return split_data


n_samples = 1  # Number of different splits obtained per file. Total number of samples = n_files * n_samples
max_missing_chars = 50  # The maximum number of characters that need to be completed (i.e. the middle part)
min_missing_chars = 5
random.seed(45)  # For reproducible results

split_data = split(n_samples, min_missing_chars, max_missing_chars)

print(f"split_data has been initialized with {n_samples} sample(s) per file")

# Uncomment to see an example
# print("Prefix:", split_data[0][0][0])
# print("Middle:", split_data[0][0][1])
# print("Suffix:", split_data[0][0][2])

split_data has been initialized with 1 sample(s) per file


Step 3.1: Load the model

In [103]:
def hf_login() -> None:
	load_dotenv()
	api_key = os.getenv('HUGGINGFACE_TOKEN')
	login(api_key)


def load_model(model_name: str, device: str):
	tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
	tokenizer.pad_token = tokenizer.eos_token
	model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto", torch_dtype=torch.bfloat16).to(device)
	return model, tokenizer


hf_login()
device = "cpu"
# model_name = "bigcode/tiny_starcoder_py"
model_name = "bigcode/starcoderbase-1b"
model, tokenizer = load_model(model_name, device)

Step 3.2: Use the model to generate a suggestion

In [104]:
def suggest(model, tokenizer, prefix: str, suffix: str) -> str:
	PREFIX_TAG = "<fim_prefix>"
	SUFFIX_TAG = "<fim_suffix>"
	MIDDLE_TAG = "<fim_middle>"
	EOT_TAG = "<|endoftext|>"
	
	input_text = f"{PREFIX_TAG}{prefix}{SUFFIX_TAG}{suffix}{MIDDLE_TAG}"
	inputs = tokenizer.encode(input_text, return_tensors="pt").to(device)
	outputs = model.generate(inputs, max_new_tokens=max_missing_chars, pad_token_id=tokenizer.pad_token_id)
	result = tokenizer.decode(outputs[0])
	return result.split(MIDDLE_TAG)[-1].replace(EOT_TAG, '')


In [105]:
# # REMOVE ME
# test_prefix = "# Return sum of two numbers\ndef add(a, b):\n    "
# test_suffix = "+ b"
# test_result = suggest(model, tokenizer, test_prefix, test_suffix)
# print(test_result)

# # test_prefix = "def print_one_two_three():\n    print('one')\n    "
# # test_suffix = "\n    print('three')"
# # test_result = suggest(test_prefix, test_suffix)
# # print(test_result)

In [106]:
raw_suggestions = []

for i in range(n_files):
	file_suggestions = []
	for j in range(n_samples):
		suggestion = suggest(model, tokenizer, split_data[i][j][0], split_data[i][j][2])
		file_suggestions.append(suggestion)
	raw_suggestions.append(file_suggestions)


In [117]:
def get_table() -> pd.DataFrame:
	data = []
	for i in range(n_files):
		file_name = filenames[i]
		for j in range(n_samples):
			row = {
				"File": file_name,
				"Original": split_data[i][j][1],
				"Suggestion": raw_suggestions[i][j]
			}
			data.append(row)

	return pd.DataFrame(data)


def write_table(table_directory_path: str, table: pd.DataFrame) -> None:
	file_path = "results.csv"
	if table_directory_path[-1] != "/":
		table_directory_path += "/"
	full_path = f"{table_directory_path}{file_path}"
	table.to_csv(full_path, index=False)


table_directory_path = "../out/tables"
table = get_table()
display(table)
write_table(table_directory_path, table)


Unnamed: 0,File,Original,Suggestion
0,animal.py,f):\n\t\tself.weight_kg =,f):\n\t\tself.weight_kg = 10\n\t\tself.sound =...
1,add.py,"sum of two numbers\ndef add(a, b):\n\t","the sum of two numbers\ndef add(a, b):\n"


In [115]:
def append_number_to_filename(filename: str, number: int) -> str:
	name, extension = filename.split(".")
	return f"{name}_{number}.{extension}"


def reconstruct_data(prefix: str, middle: str, suffix: str) -> str:
	return prefix + middle + suffix


def write_file(directory_path: str, filename: str, data: str) -> None:
	file_path = os.path.join(directory_path, filename)

	with open(file_path, 'w') as file:
		file.write(data)


def write_files(directory_path: str) -> None:
	for i in range(n_files):
		for j in range(n_samples):
			filename = append_number_to_filename(filenames[i], j)
			data = reconstruct_data(split_data[i][j][0], raw_suggestions[i][j], split_data[i][j][2])
			write_file(directory_path, filename, data)


out_directory_path = "../out"
write_files(out_directory_path)
			

In [108]:
# def suggest(prefix: str, suffix: str) -> str:
# 	checkpoint = "bigcode/tiny_starcoder_py"
# 	device = "cpu"

# 	tokenizer = MistralTokenizer.v3()
# 	model = AutoModelForCausalLM.from_pretrained(checkpoint).to(device)

# 	prefix = """def add("""
# 	suffix = """    return sum"""

# 	request = FIMRequest(prompt=prefix, suffix=suffix)

# 	tokens = tokenizer.encode_fim(request).tokens

# 	out_tokens, _ = generate([tokens], model, max_tokens=256, temperature=0.0, eos_id=tokenizer.instruct_tokenizer.tokenizer.eos_id)
# 	result = tokenizer.decode(out_tokens[0])

# 	middle = result.split(suffix)[0].strip()
# 	print(middle)
# 	return middle


# hf_login()
# raw_suggestions = []

# for i in range(n_files):
# 	file_suggestions = []
# 	for j in range(n_samples):
# 		suggestion = suggest(split_data[i][j][0], split_data[i][j][2])
# 		file_suggestions.append(suggestion)
# 	raw_suggestions.append(file_suggestions)



In [109]:
# def suggest(prefix: str, suffix: str) -> str:
# 	from transformers import GemmaTokenizer, AutoModelForCausalLM

# 	model_id = "google/codegemma-7b"
# 	tokenizer = GemmaTokenizer.from_pretrained(model_id)
# 	model = AutoModelForCausalLM.from_pretrained(model_id)

# 	prompt = '''\
# 	<|fim_prefix|>import datetime
# 	def calculate_age(birth_year):
# 		"""Calculates a person's age based on their birth year."""
# 		current_year = datetime.date.today().year
# 		<|fim_suffix|>
# 		return age<|fim_middle|>\
# 	'''

# 	inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
# 	prompt_len = inputs["input_ids"].shape[-1]
# 	outputs = model.generate(**inputs, max_new_tokens=100)
# 	print(tokenizer.decode(outputs[0][prompt_len:]))


# # hf_login()
# raw_suggestions = []

# for i in range(n_files):
# 	file_suggestions = []
# 	for j in range(n_samples):
# 		suggestion = suggest(split_data[i][j][0], split_data[i][j][2])
# 		file_suggestions.append(suggestion)
# 	raw_suggestions.append(file_suggestions)

In [110]:
# import requests
# import json

# # Replace with your Hugging Face API token
# load_dotenv()
# api_key = os.getenv('HUGGINGFACE_TOKEN')
# API_URL = "https://api-inference.huggingface.co/models/bigcode/starcoder"

# headers = {
#     "Authorization": f"Bearer {api_key}"
# }

# prompt = '''\
# <|fim_prefix|>import datetime
# def calculate_age(birth_year):
# 	"""Calculates a person's age based on their birth year."""
# 	current_year = datetime.date.today().year
# 	<|fim_suffix|>
# 	return age<|fim_middle|>\
# '''

# # Sample input for the model
# data = {
#     "inputs": {
#         "text": prompt
#     }
# }

# # Send the request to the API
# response = requests.post(API_URL, headers=headers, json=data)

# # Get the output
# result = json.loads(response.content.decode("utf-8"))
# print(result)


In [111]:

# model = Transformer.from_folder("~/codestral-22B-240529")

# prefix = """def add("""
# suffix = """    return sum"""

# request = FIMRequest(prompt=prefix, suffix=suffix)

# tokens = tokenizer.encode_fim(request).tokens

# out_tokens, _ = generate([tokens], model, max_tokens=256, temperature=0.0, eos_id=tokenizer.instruct_tokenizer.tokenizer.eos_id)
# result = tokenizer.decode(out_tokens[0])

# middle = result.split(suffix)[0].strip()
# print(middle)


In [112]:
# import torch
# from transformers import pipeline

# model_id = "meta-llama/Llama-3.2-3B-Instruct"
# hf_login()
# pipe = pipeline(
#     "text-generation",
#     model=model_id,
#     torch_dtype=torch.bfloat16,
#     device_map="auto",
# )
# messages = [
#     {"role": "system", "content": "You are a pirate chatbot who always responds in pirate speak!"},
#     {"role": "user", "content": "Who are you?"},
# ]
# outputs = pipe(
#     messages,
#     max_new_tokens=256,
# )
# print(outputs[0]["generated_text"][-1])


In [113]:
to_print = [(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 0)]
print(len(split_data))
print(len(split_data[0]))
print(len(split_data[0][0]))

for i, j in to_print:
	print(f'========{i}, {j}========')
	print("=======PREFIX=======\n",split_data[i][j][0])
	print("=======MIDDLE=======\n",split_data[i][j][1])
	print("=======SUFFIX=======\n",split_data[i][j][2])
	print("=======RAW=======\n",raw_suggestions[i][j])

2
1
3
 class Dog:
	def __init__(self):
		self.weight_kg = 10
		self.sound = "Bark"


class Cat:
	def __init__(sel
 f):
		self.weight_kg =
  4
		self.sound = "Meow"
 f):
		self.weight_kg = 10
		self.sound = "Meow"


class DogCat:
	def __init__(self):
		self.weight_kg =
 # Return
  sum of two numbers
def add(a, b):
	
 return a + b

  the sum of two numbers
def add(a, b):
    


IndexError: list index out of range