# Code Generation using GPT2

GPT2 is LLM that is originally from OpenAI, and it's licensed under MIT License.<br>
https://huggingface.co/docs/transformers/en/model_doc/gpt2

In [None]:
# installation
!pip install transformers[torch] datasets
!pip install accelerate -U

In [2]:
import torch
from transformers import AutoTokenizer, DataCollatorForLanguageModeling
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer
from datasets import load_dataset

## Get Code dataset

In [3]:
code_dataset_train = load_dataset("code_search_net", "python", split="train")
code_dataset_validation = load_dataset("code_search_net", "python", split="validation")

print(f"total training samples: {code_dataset_train.num_rows}")
print(f"total validation samples: {code_dataset_validation.num_rows}")

print("we now re-sample the data to reduce the training time:")
# sample portion of the data
def trainDataPct(dataset, pct=1):
    return dataset.select(range(int(len(dataset)*pct)))

training_percentage = 0.075
validation_percentage = 0.3
print(f"Choosing {training_percentage*100}% of the data for training.")
print(f"Choosing {validation_percentage*100}% of the data for validation.")

code_dataset_train = trainDataPct(code_dataset_train, pct=training_percentage) # 7.5% of the data = 30913 training samples
code_dataset_validation = trainDataPct(code_dataset_validation, pct=0.3)

print(f"total training samples: {code_dataset_train.num_rows}")
print(f"total validation samples: {code_dataset_validation.num_rows}")

Found cached dataset code_search_net (/net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1)
Found cached dataset code_search_net (/net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1)


total training samples: 412178
total validation samples: 23107
we now re-sample the data to reduce the training time:
Choosing 3.0% of the data for training.
Choosing 30.0% of the data for validation.
total training samples: 12365
total validation samples: 6932


In [3]:
# visualize a sample
df_train = code_dataset_train.to_pandas()
def get_sample():
    # randomly pick index for a row to be displayed
    idx = torch.randint(0, len(df_train), (1,)).item()
    # Get the text descriptions according to the selected idx
    func_doc_str = df_train["func_documentation_string"][idx]
    func_name = df_train["func_name"][idx]
    whole_func_string = df_train["whole_func_string"][idx]
    # create query and answer prompts
    query = """Please write a function that {instruction}"""
    fucntion_name = """\n\nThe function name is: {func_name}"""
    out = query.format(instruction=func_doc_str) + fucntion_name.format(func_name=func_name) + "\n\n" + whole_func_string
    print(out)

get_sample()

Please write a function that Create and return an API context

The function name is: AppBuilder.create_api_context

def create_api_context(self, cls):
        """Create and return an API context"""
        return self.api_context_schema().load({
            "name": cls.name,
            "cls": cls,
            "inst": [],
            "conf": self.conf.get_api_service(cls.name),
            "calls": self.conf.get_api_calls(),
            "shared": {},  # Used per-API to monitor state
            "log_level": self.conf.get_log_level(),
            "callback": self.receive
            })


## Get GPT2

In [4]:
# tokenizer = AutoTokenizer.from_pretrained("distilbert/distilgpt2")
# model = AutoModelForCausalLM.from_pretrained("distilbert/distilgpt2")
model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

tokenizer.pad_token = tokenizer.eos_token
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

## Preprocess dataset

In [5]:
# preparing prompt and encode text to token ids with tokenizer
def preprocess_function(examples):
    """ prompt template for training the LM """
    # print(examples["func_documentation_string"], examples["func_name"], examples["whole_func_string"])
    query = """Please write a function that {instruction}"""
    fucntion_name = """\n\nThe function name is: {func_name}"""
    out_list = [
        query.format(instruction=func_doc_str) + fucntion_name.format(func_name=func_name) + "\n\n" + whole_func_string
        for func_doc_str, func_name, whole_func_string in 
        zip(examples["func_documentation_string"], examples["func_name"], examples["whole_func_string"])
    ]
    out = tokenizer(out_list)
    return out

tokenized_train = code_dataset_train.map(
    preprocess_function,
    batched=True,
    num_proc=16,
    remove_columns=code_dataset_train.column_names
)
tokenized_validation = code_dataset_validation.map(
    preprocess_function,
    batched=True,
    num_proc=16,
    remove_columns=code_dataset_validation.column_names
)

Loading cached processed dataset at /net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1/cache-6af430db4a196fa8_*_of_00016.arrow


Loading cached processed dataset at /net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1/cache-696e155c3838fd2f_*_of_00016.arrow


In [6]:
# after tokenized it, some samples may have the length that is longer than 
# the context size of the GPT2, which is 1024 in this case.
# we need to group the text into smaller chunks with a specified block size that is less than 1024
block_size = 700

def group_texts(examples):
    # Concatenate all texts.
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()} # sum of list is concatenation
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
    # customize this part to your needs.
    if total_length >= block_size:
        total_length = (total_length // block_size) * block_size
    # Split by chunks of block_size.
    result = {
        k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    result["labels"] = result["input_ids"].copy()
    return result

tokenized_train = tokenized_train.map(
    group_texts,
    batched=True,
    num_proc=16,
)
tokenized_validation = tokenized_validation.map(
    group_texts,
    batched=True,
    num_proc=16,
)

Loading cached processed dataset at /net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1/cache-4aecb19653e8f685_*_of_00016.arrow
Loading cached processed dataset at /net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1/cache-e8b4402d3571009b_*_of_00016.arrow


## Fine-tuning the model

In [8]:
run_name = "gpt2-python-code-search-test"
out_dir = f"out/{run_name}"
batch_size = 6 # increasing batch size can speed up training too, but may require more GPU memory
epochs = 1

training_args = TrainingArguments(
    output_dir=out_dir,
    run_name=run_name,
    evaluation_strategy="steps",
    num_train_epochs=epochs,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    learning_rate=2e-5,
    weight_decay=0.01,
    logging_steps=50, # log the training loss for every 50 steps
    eval_steps=100, # evaluate and show the validation result every 100 steps
    save_steps=300,
    push_to_hub=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_validation.shuffle(42).select(range(200)), # only use 200 validation samples during training, which will be much faster
    data_collator=data_collator,
)

Loading cached shuffled indices for dataset at /net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1/cache-80196fec92e45b46.arrow
dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


In [None]:
trainer.train()

# Evaluation

In [10]:
code_dataset_validation = load_dataset("code_search_net", "python", split="validation")
df_valid = code_dataset_validation.to_pandas()

Found cached dataset code_search_net (/net/papilio/storage7/phusaeng/.cache/huggingface/datasets/code_search_net/python/1.0.0/8f2524e6b62f65af5f5d65c53715c654db7b08dc93e0b7bcce2ab2f286a75be1)


## Define helper functions

In [11]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def get_prompt(examples, idx=None):
    # if no idx, we randomly select one
    if idx is None:
        idx = torch.randint(0, len(examples), (1,)).item()
    print(f"idx: {idx}")
    query = """Please write a function that {instruction}"""
    fucntion_name = """\n\nThe function name is: {func_name}"""
    out_prompt = query.format(instruction=examples["func_documentation_string"][idx]) + fucntion_name.format(func_name=examples["func_name"][idx])
    return out_prompt

def generate(model, tokenizer, text, device="cuda", max_new_tokens=400, include_input=True):
    token_ids = tokenizer.encode(text, add_special_tokens=False, return_tensors='pt')
    with torch.no_grad():
        output_ids = model.generate(
            token_ids.to(device),
            do_sample=True,
            max_new_tokens=max_new_tokens,
            temperature=1,
            # repetition_penalty=1.1,
            pad_token_id=tokenizer.pad_token_id,
            bos_token_id=tokenizer.bos_token_id,
            eos_token_id=tokenizer.eos_token_id,
        )
    # output = tokenizer.decode(output_ids.tolist()[0][token_ids.size(1):])
    if not include_input:
        output = tokenizer.decode(output_ids.tolist()[0][token_ids.size(1):])
    else:
        output = tokenizer.decode(output_ids.tolist()[0])
    return output

## Get models

In [12]:
device = "cuda"
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token

# Not fine-tuned GPT (the default pretrained GPT2):
model_name = "gpt2"
org_model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

# Fine-tuned GPT on the code dataset:
model_name = "out/gpt2-python-code-search-test/checkpoint-200"
trained_model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

print(f"model params: {count_parameters(trained_model)}")

model params: 124439808


### Inference with pre-trained GPT2 model

In [13]:
text = get_prompt(df_train, idx=None)
# text = "please write the function that find cosine similarity between two vectors"
max_new_tokens = 512
print(f"prompt: {text}")

idx: 29845
prompt: Please write a function that Return the version of by with regex intead of importing it

The function name is: get_version


In [14]:
print(generate(
    model=org_model, 
    tokenizer=tokenizer, 
    text=text, 
    device=device,
    max_new_tokens=max_new_tokens,
    include_input=True
))

Please write a function that Return the version of by with regex intead of importing it

The function name is: get_version_of_by

It returns the version, if it has a version. If the version is not empty you can pass either 0 or nil.

If version is empty do:

get_version :(version, _) => nil | json:value @json

It returns the version number with the hash of the version used.

The function accepts the following arguments:

version: number of the version of JSON or JSON.string

json: string to use in converting the json type to json:string. If no arguments are provided it will just pass 0 and get_version_ref is passed as the string.

The json value will be stored on disk in memory. The json version will be the current one.

Return the number of versions of a given version string (or a reference to the original JSON) to be converted to JSON with get_version_ref if None is given.

The function will not return 0 or None if the json is not available for this format.

If the json value is grea

### Inference with fine-tuned GPT2 model on the code dataset

In [15]:
print(generate(
    model=trained_model, 
    tokenizer=tokenizer, 
    text=text, 
    device=device,
    max_new_tokens=max_new_tokens,
    include_input=True
))

Please write a function that Return the version of by with regex intead of importing it

The function name is: get_version

def get_version(self):
 from sys import sys import regex_intead
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 if not version[0] except NotImplementedError:
 version.update(self.version)
version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in re.samples.layers())):
 version = regex.decode(self.version, re.SEMOTIC('-1-',(2,3,4) for i in

## Compare the performance of the two models on the validation set

In [16]:
import torch
from tqdm import tqdm

# evaluate function
def get_prompt_answer(examples, idx=None):
    # if no idx, we randomly select one
    if idx is None:
        idx = torch.randint(0, len(examples), (1,)).item()
    # print(f"idx: {idx}")
    query = """Please write a function that {instruction}"""
    fucntion_name = """\n\nThe function name is: {func_name}"""
    prompt = query.format(instruction=examples["func_documentation_string"][idx]) + fucntion_name.format(func_name=examples["func_name"][idx])
    answer = examples["whole_func_string"][idx]
    prompt += "\n\n" + answer
    return prompt

def preprocess_function(examples):
    """ prompt template for training the LM """
    query = """Please write a function that {instruction}"""
    fucntion_name = """\n\nThe function name is: {func_name}"""
    out = tokenizer([
        query.format(instruction=func_doc_str) + fucntion_name.format(func_name=func_name) + "\n\n" + whole_func_string
        for func_doc_str, func_name, whole_func_string in 
        zip(examples["func_documentation_string"], examples["func_name"], examples["whole_func_string"])
    ])
    return out

def evaluate(model, encodings, stride=512):
    """ https://huggingface.co/docs/transformers/en/perplexity """
    seq_len = encodings.input_ids.size(1)
    max_length = model.config.n_positions
    print(f"seq_len: {seq_len}, max_length: {max_length}")
    nlls = []
    prev_end_loc = 0
    for begin_loc in tqdm(range(0, seq_len, stride)):
        end_loc = min(begin_loc + max_length, seq_len)
        trg_len = end_loc - prev_end_loc  # may be different from stride on last loop
        input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device)
        target_ids = input_ids.clone()
        # set to -100 to ignore the loss over the context input
        # https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
        target_ids[:, :-trg_len] = -100 
        with torch.no_grad():
            outputs = model(input_ids, labels=target_ids)
            # loss is calculated using CrossEntropyLoss which averages over valid labels
            # N.B. the model only calculates loss over trg_len - 1 labels, because it internally shifts the labels
            # to the left by 1.
            neg_log_likelihood = outputs.loss
        nlls.append(neg_log_likelihood)
        prev_end_loc = end_loc
        if end_loc == seq_len:
            break
    cse = torch.stack(nlls).mean()
    return cse

In [17]:
import pandas as pd

# get the validation dataset 
num_test_samples = 100
text = [get_prompt_answer(df_valid, idx=idx) for idx in range(num_test_samples)]
encodings = tokenizer("\n\n".join(text), return_tensors="pt")
models = {
    "GPT2": org_model,
    "GPT2 /w ft": trained_model
}
df_log = pd.DataFrame()
stride_len = 512
for model_name, model in models.items():
    mean_cse = evaluate(model, encodings, stride_len)
    ppl = torch.exp(mean_cse)
    print(f"Cross-entropy loss: {mean_cse:.4f}")
    print(f"Perplexity: {ppl:.4f}")
    log = {
        "model_name": model_name,
        "CSE": mean_cse.item(),
        "PPL": ppl.item()
    }
    df_log = pd.concat([df_log, pd.DataFrame([log])], ignore_index=True)
df_log

Token indices sequence length is longer than the specified maximum sequence length for this model (79384 > 1024). Running this sequence through the model will result in indexing errors


seq_len: 79384, max_length: 1024


  0%|          | 0/156 [00:00<?, ?it/s]

 99%|█████████▊| 154/156 [00:11<00:00, 13.46it/s]


Cross-entropy loss: 1.9215
Perplexity: 6.8309
seq_len: 79384, max_length: 1024


 99%|█████████▊| 154/156 [00:11<00:00, 13.42it/s]

Cross-entropy loss: 1.5223
Perplexity: 4.5826





Unnamed: 0,model_name,CSE,PPL
0,GPT2,1.921461,6.830929
1,GPT2 /w ft,1.522273,4.58263
