<a href="https://colab.research.google.com/github/markNZed/GPT-NeoX-Colab/blob/main/notebooks/codecompletion_benchmark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# We could modify these paths to "stub" behavior for test/dev
DOCKER = True
workspaceDir = "/content"
GPTNeoXColabDirName = "GPT-NeoX-Colab"
if DOCKER:
    GPTNeoXColabDir = f"/workspace"
else:
    GPTNeoXColabDir = f"{workspaceDir}/{GPTNeoXColabDirName}"

# Clone CodeXGLUE Repo

In [12]:
%cd {workspaceDir}
!git clone --depth 1 https://github.com/microsoft/CodeXGLUE.git

/content
fatal: destination path 'CodeXGLUE' already exists and is not an empty directory.


In [13]:
%%time
#@title Clone GPT-NeoX-Colab
if DOCKER:
    %cd {GPTNeoXColabDir}
else:
    %cd {workspaceDir}
    # Don't use --depth 1 because that does not play nice with git-annex
    !git clone https://github.com/markNZed/GPT-NeoX-Colab.git
    %cd {GPTNeoXColabDir}
    %pip install -q -r requirements_colab.txt
    %pip install --use-feature=fast-deps -q .
from dotenv import load_dotenv
import os
load_dotenv(f"{GPTNeoXColabDir}/.env")
import GPTNeoXColab
GPTNeoXColab.utils.colab.fetch_data("data/codecompletion/token_completion.tar.gz")
%cd {GPTNeoXColabDir}/data/codecompletion
if not os.path.exists(f"data/codecompletion/token_completion"):
    !tar -xzf token_completion.tar.gz
GPTNeoXColab.utils.colab.fetch_data("models/codecompletion/global_step7000_HF.tar.gz")
%cd {GPTNeoXColabDir}/models/codecompletion
if not os.path.exists(f"latest"):
    !tar -xzf global_step7000_HF.tar.gz
    !mv global_step7000_HF latest

/workspace


Data retrieval successful.
/workspace/data/codecompletion
Data retrieval successful.
/workspace/models/codecompletion
CPU times: user 191 ms, sys: 50.4 ms, total: 242 ms
Wall time: 15.6 s


# Using Byte-Pair Encoding Tokenizer

In [14]:
%cd {GPTNeoXColabDir}/models/codecompletion/latest
if not os.path.exists("gpt2-vocab.json"):
    !wget https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-vocab.json
    !mv gpt2-vocab.json vocab.json
if not os.path.exists("gpt2-merges.txt"):
    !wget https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-merges.txt
    !mv gpt2-merges.txt merges.txt

/workspace/models/codecompletion/latest
--2024-11-15 08:00:17--  https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-vocab.json
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.12.118, 16.15.193.59, 3.5.24.248, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.12.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1042301 (1018K) [application/json]
Saving to: ‘gpt2-vocab.json’


2024-11-15 08:00:19 (680 KB/s) - ‘gpt2-vocab.json’ saved [1042301/1042301]

--2024-11-15 08:00:19--  https://s3.amazonaws.com/models.huggingface.co/bert/gpt2-merges.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 3.5.24.29, 52.216.12.118, 16.15.193.59, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|3.5.24.29|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 456318 (446K) [text/plain]
Saving to: ‘gpt2-merges.txt’


2024-11-15 08:00:21 (330 KB/s) - ‘gpt2-merges.txt’ saved [456318/456318]



# HuggingFace Inference

In [18]:
from transformers import GPTNeoXForCausalLM, GPT2Tokenizer
import torch

%cd {workspaceDir}

# Initialize the tokenizer with your vocabulary and merge files
tokenizer = GPT2Tokenizer(vocab_file=f"{GPTNeoXColabDir}/models/codecompletion/latest/vocab.json", merges_file=f"{GPTNeoXColabDir}/models/codecompletion/latest/merges.txt")

# Load your model
model_path = f"{GPTNeoXColabDir}/models/codecompletion/latest"
model = GPTNeoXForCausalLM.from_pretrained(model_path)

# Set the model to evaluation mode
model.eval()

# Prompt the user for input
input_text = """<s> import sys , os <EOL> import imp <EOL> from optparse import make_option <EOL> from django . conf import settings <EOL> from django"""

# Tokenize and prepare input
input_ids = torch.tensor([tokenizer.encode(input_text)], dtype=torch.long)
attention_mask = torch.ones_like(input_ids)  # Create an attention mask for non-padded input

# Generate text with specified pad_token_id and attention_mask
with torch.no_grad():
    output = model.generate(
        input_ids,
        attention_mask=attention_mask,
        max_length=200,          # Adjust this for desired output length
        temperature=0.7,        # Controls creativity
        top_k=50,               # Controls diversity
        top_p=0.9,              # Nucleus sampling
        num_return_sequences=1, # Number of sequences to return
        pad_token_id=model.config.eos_token_id,  # Set pad_token_id explicitly
        do_sample=True           # Enable sampling mode to use temperature and top_p
    )

# Decode the generated text
generated_text = tokenizer.decode(output[0].tolist())
print("Generated text:", generated_text)

# Function to replace special tokens with original representations
def replace_special_tokens(text):
    """
    Replaces special tokens in the generated text with more readable or context-appropriate representations.
    """
    replacements = {
        "<EOL>": "\n",          # Replace with actual newline for code formatting
        "<s>": "",              # Remove start token as it's not necessary in final output
        "</s>": "",             # Remove end token as it's not necessary in final output
        "<pad>": "",            # Remove padding tokens
        "<|UNKNOWN|>": "[UNK]", # Represent unknown tokens in a readable way
        "<STR_LIT>": "\"STRING_LITERAL\"",  # Placeholder for string literals
        "<NUM_LIT>": "0",       # Placeholder for numeric literals
        "<BOOL_LIT>": "True",   # Placeholder for boolean literals (e.g., True/False)
        "<COMMENT>": "# COMMENT",  # Placeholder for comments in the code
    }

    # Replace each special token in text with its corresponding value in `replacements`
    for token, replacement in replacements.items():
        text = text.replace(token, replacement)

    return text.strip()  # Strip leading/trailing whitespace for clean output

# Replace special tokens in the generated text
final_text = replace_special_tokens(generated_text)

# Print the final output
print("Final text:", final_text)


/content
Generated text: <s> import sys , os <EOL> import imp <EOL> from optparse import make_option <EOL> from django . conf import settings <EOL> from django . conf . urls import url <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import unittest <EOL> from django . utils import
Final text: import sys , os 
 import imp 
 from optparse import make_option 
 from django . conf import settings 
 from django . conf . urls import url 
 from django . utils import unittest 
 from django . utils import unittest 
 from django . utils import unittest 
 from django . utils import unittest 
 from django . utils import unittest 
 from django . util

In [29]:
from torch.utils.data import Dataset

class EvalDataset(Dataset):
    def __init__(self, tokenizer, args, logger, file_type='train', seq_length=1024):
        if not os.path.exists(args.output_dir):
            os.makedirs(args.output_dir)
        cached_file = os.path.join(args.output_dir, file_type+"_blocksize_%d"%(seq_length))
        if os.path.exists(cached_file) and not args.overwrite_cache:
            with open(cached_file, 'rb') as handle:
                self.inputs = pickle.load(handle)

        else:
            self.inputs = []

            datafile = os.path.join(args.data_dir, f"{file_type}.txt")
            with open(datafile) as f:
                data = f.readlines()

            length = len(data)
            logger.info("Data size: %d"%(length))
            input_ids = []
            for idx,x in enumerate(data):
                x = x.strip()
                if x.startswith("<s>") and x.endswith("</s>"):
                    pass
                else:
                    x = "<s> " + x + " </s>"
                try:
                    input_ids.extend(tokenizer.encode(x))
                except Exception:
                    pass
                if idx % (length//10) == 0:
                    percent = idx / (length//10) * 10
                    logger.warning("load %d"%(percent))
                if args.max_eval_length is not None and idx > args.max_eval_length:
                    logger.info(f"max eval length reached at {idx}")
                    break
            del data
            gc.collect()

            logger.info(f"tokens: {len(input_ids)}")
            self.split(input_ids, tokenizer, logger, seq_length=seq_length)
            del input_ids
            gc.collect()

            with open(cached_file, 'wb') as handle:
                pickle.dump(self.inputs, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
    def split(self, input_ids, tokenizer, logger, seq_length=1024):
        sample = []
        i = 0
        while i < len(input_ids):
            sample = input_ids[i: i+seq_length]
            if len(sample) == seq_length:
                for j in range(seq_length):
                    if tokenizer.convert_ids_to_tokens(sample[seq_length-1-j])[0] == '\u0120' or tokenizer.convert_ids_to_tokens(sample[seq_length-1-j]).startswith("<NUM_LIT"):
                        break
                    if sample[seq_length-1-j] in [tokenizer.bos_token_id, tokenizer.eos_token_id, tokenizer.sep_token_id]:
                        if sample[seq_length-1-j] != tokenizer.bos_token_id:
                            j -= 1
                        break
                if j == seq_length-1:
                    print(tokenizer.decode(sample))
                    exit()
                sample = sample[: seq_length-1-j]
            # print(len(sample))
            i += len(sample)
            pad_len = seq_length-len(sample)
            sample += [tokenizer.pad_token_id]*pad_len
            self.inputs.append(sample)

            if len(self.inputs) % 10000 == 0:
                logger.info(f"{len(self.inputs)} samples")


    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, item):
        return torch.tensor(self.inputs[item])


In [None]:
import logging
import random
import numpy as np
import torch
from torch.utils.data import DataLoader, SequentialSampler
from transformers import GPTNeoXForCausalLM, GPT2Tokenizer
from types import SimpleNamespace
import gc
import pickle

"""
Code Completion Evaluation Pipeline

This script evaluates a pre-trained language model’s token-level accuracy on code completion tasks.
The script includes functions to decode token IDs, calculate accuracy, and process prediction batches
for evaluation. It also verifies predictions against ground truth data and saves results to an output file.

Modules and Functions:
- `decode_token_ids`: Converts token IDs into readable code strings, managing special tokens and spacing.
- `calculate_accuracy`: Compares predicted tokens with ground truth tokens and calculates accuracy.
- `process_batch_predictions`: Processes predictions and ground truths from batches, converting them into lists of tokens.
- `eval_acc`: The main evaluation function that loads data, predicts, decodes, and calculates accuracy.
- `post_process`: Saves processed predictions to a file and verifies each sequence against expected ground truth.
- `main`: The entry point, which loads the model, sets configurations, and initiates the evaluation pipeline.

Dependencies:
- Libraries: `torch`, `transformers`, `numpy`, and `torch.utils.data`.
- Custom Modules: `EvalDataset` (assumed to be a dataset module for evaluation tasks).
- Assumes access to a pre-trained GPT-2-based model for code completion.

Usage:
To run the script, ensure all dependencies are installed and specify the model and dataset paths.
Logging will provide progress updates and final evaluation metrics.

"""


logger = logging.getLogger(__name__)

def decode_token_ids(token_ids, tokenizer):
    """
    Convert token IDs to a string of code, handling special tokens and spacing.
    """
    decoded_code = ""
    for token_id in token_ids:
        token = tokenizer.convert_ids_to_tokens(token_id)
        # The character '\u0120' is the Unicode representation of a non-breaking space with an extra semantic 
        # meaning in tokenization. Specifically, it’s often used to indicate that a token is preceded by a space.
        if token.startswith('\u0120') and not decoded_code.endswith(" "):  # Handles space prefixes
            decoded_code += " " + token[1:]
        else:
            decoded_code += token
    return decoded_code.strip()

def calculate_accuracy(pred_tokens, gt_tokens, special_tokens=["<s>", "</s>", "<EOL>", "<pad>"]):
    """
    Calculate accuracy by comparing predicted tokens to ground truth tokens.
    """
    correct_count = sum(1 for pred, gt in zip(pred_tokens, gt_tokens) if gt not in special_tokens and pred == gt)
    total_count = sum(1 for gt in gt_tokens if gt not in special_tokens)
    return correct_count, total_count

def process_batch_predictions(batch_predictions, batch_ground_truths, tokenizer):
    """
    Process batch of predictions and ground truths into readable token lists.
    """
    all_pred_tokens, all_gt_tokens = [], []

    for predicted_ids, gt_ids in zip(batch_predictions, batch_ground_truths):
        pred_tokens, gt_tokens = [], []
        for i, (pred_id, gt_id) in enumerate(zip(predicted_ids, gt_ids)):
            gt_id = gt_id.item()  # Convert tensor to int
            pred_id = pred_id.item()  # Convert tensor to int

            gt_token = tokenizer.convert_ids_to_tokens(gt_id)

            if gt_token in ["<s>", "</s>", "<EOL>", "<pad>"]:  # Skip special tokens
                break
            elif gt_token.startswith('\u0120') and pred_tokens:  # New token starts with a space
                all_pred_tokens.append(decode_token_ids(pred_tokens, tokenizer))
                all_gt_tokens.append(decode_token_ids(gt_tokens, tokenizer))
                pred_tokens, gt_tokens = [], []

            pred_tokens.append(pred_id)
            gt_tokens.append(gt_id)

    return all_pred_tokens, all_gt_tokens

def eval_acc(args, model, tokenizer, file_type='test'):
    """
    Evaluate the model’s token-level code completion accuracy.
    """
    # Load evaluation dataset
    eval_dataset = EvalDataset(tokenizer, args, logger, file_type=file_type, seq_length=args.seq_length)
    eval_dataloader = DataLoader(eval_dataset, sampler=SequentialSampler(eval_dataset), batch_size=args.eval_batch_size)
    model.to(args.device)
    model.eval()

    # Initialize counters for accuracy
    total_correct, total_predictions = 0, 0
    all_pred_tokens, all_gt_tokens = [], []

    # Iterate through batches in the evaluation dataset
    for step, batch in enumerate(eval_dataloader):
        inputs = batch.to(args.device)
        with torch.no_grad():
            outputs = model(inputs)
            predicted_token_ids = outputs.logits.argmax(-1)  # Get predicted tokens

        # Decode batch predictions and ground truths
        batch_pred_tokens, batch_gt_tokens = process_batch_predictions(predicted_token_ids.cpu(), inputs.cpu(), tokenizer)
        all_pred_tokens.extend(batch_pred_tokens)
        all_gt_tokens.extend(batch_gt_tokens)

        # Calculate batch accuracy
        batch_correct, batch_total = calculate_accuracy(batch_pred_tokens, batch_gt_tokens)
        total_correct += batch_correct
        total_predictions += batch_total

        # Logging progress
        if step % args.logging_steps == 0:
            accuracy = total_correct / total_predictions if total_predictions > 0 else 0
            logger.info(f"Step {step} processed with cumulative accuracy: {accuracy:.2%}")

    # Final accuracy calculation
    accuracy = total_correct / total_predictions if total_predictions > 0 else 0
    logger.info(f"Final Test Accuracy: {accuracy:.2%}")
    return accuracy, all_pred_tokens, all_gt_tokens

def post_process(args, predictions, ground_truths, true_texts, saved_file_path):
    """
    Save the post-processed predictions and verify with the ground truth texts.

    Args:
        args: General arguments or configuration settings (unused here).
        predictions: List of predicted tokens from the model.
        ground_truths: List of ground truth tokens for each prediction.
        true_texts: List of full ground truth sequences for each input, used for verification.
        saved_file_path: Path to the file where the processed predictions will be saved.

    Returns:
        int: The count of sequences processed and saved.
    """
    # Open the specified file in write mode to save processed predictions
    with open(saved_file_path, "w") as wf:
        count = 0  # Initialize a counter to track the number of completed sequences
        current_pred, current_gt = [], []  # Lists to accumulate tokens for each sequence

        # Iterate through each predicted and ground truth token pair
        for pred, gt in zip(predictions, ground_truths):
            # Skip empty or padding tokens in the ground truth, as they are not meaningful
            if gt in ["", "<pad>"]:
                continue
            
            # Append the current ground truth token to the list for the sequence
            current_gt.append(gt)
            # Append the current prediction, removing any extra spaces
            current_pred.append(pred.replace(" ", ""))

            # Check if the current token is an end-of-sequence token
            if gt == "</s>":
                # Verify that the accumulated ground truth tokens match the expected text
                assert " ".join(current_gt) == true_texts[count].strip(), f"Mismatch in sample {count}"
                
                # Write the joined prediction sequence as a line in the file
                wf.write(" ".join(current_pred) + "\n")
                
                # Increment the count of completed sequences
                count += 1
                
                # Clear the lists to start accumulating tokens for the next sequence
                current_pred, current_gt = [], []

    # Return the total number of processed sequences
    return count


def main():
    """
    Main function to load model, tokenizer, and execute evaluation.
    """
    pretrained_model_path = f"{GPTNeoXColabDir}/models/codecompletion/latest"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Set up evaluation arguments
    args = {
        "n_gpu": torch.cuda.device_count(),
        "per_gpu_eval_batch_size": 1,
        "logging_steps": 1,
        "output_dir": f"{GPTNeoXColabDir}/out",
        "data_dir": f"{GPTNeoXColabDir}/data/codecompletion/token_completion",
        "device": device,
        "no_cuda": False,
        "seq_length": 2048,
        "max_eval_length": 10,
        "overwrite_cache": True,
        "eval_batch_size": 1
    }

    # Wrap args dictionary in a namespace to allow dot notation
    args = SimpleNamespace(**args)

    # Configure logging
    logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s -   %(message)s',
                        datefmt='%m/%d/%Y %H:%M:%S',
                        level=logging.INFO)

    # Set random seed for reproducibility
    seed = 42
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    # Load model and tokenizer
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model_path, sep_token='<EOL>', bos_token='<s>', eos_token='</s>', pad_token='<pad>', unk_token='<|UNKNOWN|>')
    model = GPTNeoXForCausalLM.from_pretrained(pretrained_model_path)
    model.resize_token_embeddings(len(tokenizer))
    
    total_params = sum(p.numel() for p in model.parameters())
    if total_params >= 1e9:
        readable_params = f"{total_params / 1e9:.2f}B"  # Billions
    elif total_params >= 1e6:
        readable_params = f"{total_params / 1e6:.2f}M"  # Millions
    else:
        readable_params = f"{total_params:,}"  # Less than a million, use commas
    logger.info(f"Model has {readable_params} trainable parameters")
    
    # Evaluate model
    accuracy, predictions, ground_truths = eval_acc(args, model, tokenizer, 'test')
    logger.info(f"Test accuracy: {accuracy:.2%}")

        # Post-process predictions
    true_texts = [...]  # Load or specify the true ground truth texts for verification
    saved_file_path = f"{args.output_dir}/predictions.txt"
    count = post_process(args, predictions, ground_truths, true_texts, saved_file_path)
    logger.info(f"Post-processed and saved {count} sequences to {saved_file_path}")

main()


The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPTNeoXTokenizerFast'. 
The class this function is called from is 'GPT2Tokenizer'.


11/15/2024 08:31:00 - INFO - __main__ -   Model has 44.65M trainable parameters
11/15/2024 08:31:00 - INFO - __main__ -   Data size: 50000
11/15/2024 08:31:00 - INFO - __main__ -   max eval length reached at 11
11/15/2024 08:31:01 - INFO - __main__ -   tokens: 23540
11/15/2024 08:31:04 - INFO - __main__ -   Step 0 processed with cumulative accuracy: 0.00%
11/15/2024 08:31:06 - INFO - __main__ -   Step 1 processed with cumulative accuracy: 12.50%
11/15/2024 08:31:08 - INFO - __main__ -   Step 2 processed with cumulative accuracy: 10.00%
11/15/2024 08:31:10 - INFO - __main__ -   Step 3 processed with cumulative accuracy: 7.69%
11/15/2024 08:31:13 - INFO - __main__ -   Step 4 processed with cumulative accuracy: 7.69%
11/15/2024 08:31:16 - INFO - __main__ -   Step 5 processed with cumulative accuracy: 6.25%
11/15/2024 08:31:19 - INFO - __main__ -   Step 6 processed with cumulative accuracy: 7.41%
11/15/2024 08:31:22 - INFO - __main__ -   Step 7 processed with cumulative accuracy: 5.71%
11/