### 1. Download Files

#### 1.1 T5-base model trained on HQ-Augmented Data

In [1]:
!gdown 1wY4ZPnDjAKDQ1XVCeA0dLVn5tGvgv12N

Downloading...
From: https://drive.google.com/uc?id=1wY4ZPnDjAKDQ1XVCeA0dLVn5tGvgv12N
To: /content/t5-base-hq_augment.ckpt
100% 2.68G/2.68G [00:25<00:00, 106MB/s] 


#### 1.2 CoNaLa dataset (test set)

In [2]:
!wget http://www.phontron.com/download/conala-corpus-v1.1.zip
!unzip conala-corpus-v1.1.zip
!rm -rf conala-corpus-v1.1.zip

--2023-03-29 19:22:34--  http://www.phontron.com/download/conala-corpus-v1.1.zip
Resolving www.phontron.com (www.phontron.com)... 208.113.196.149
Connecting to www.phontron.com (www.phontron.com)|208.113.196.149|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 52105440 (50M) [application/zip]
Saving to: ‘conala-corpus-v1.1.zip’


2023-03-29 19:22:35 (49.5 MB/s) - ‘conala-corpus-v1.1.zip’ saved [52105440/52105440]

Archive:  conala-corpus-v1.1.zip
   creating: conala-corpus/
  inflating: conala-corpus/conala-mined.jsonl  
  inflating: conala-corpus/conala-train.json  
  inflating: conala-corpus/conala-test.json  


In [3]:
!pip install transformers
!pip install pytorch_lightning
!pip install datasets
!pip install sacrebleu
!pip install rouge_score

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.27.4-py3-none-any.whl (6.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.8/6.8 MB[0m [31m54.4 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading tokenizers-0.13.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m100.4 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.11.0
  Downloading huggingface_hub-0.13.3-py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.8/199.8 KB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tokenizers, huggingface-hub, transformers
Successfully installed huggingface-hub-0.13.3 tokenizers-0.13.2 transformers-4.27.4
Looking in indexes: https://pypi.org/simple, https://u

## 2. Code Implementation

#### 2.1 Import Required Packages

In [4]:
import argparse
import torch
import os
import yaml
import json
import re

from torch.utils.data import DataLoader, SequentialSampler, Dataset
from transformers import T5TokenizerFast, T5ForConditionalGeneration
from typing import Text, Dict, List
from tqdm import tqdm

#### 2.2 Make class and functions for Result Generation

In [5]:
class TextGenerationTestDataset(Dataset):

    def __init__(
            self,
            tokenizer,
            input_text: Text = None,
            max_length: int = 128,
            padding: Text = "max_length",
            truncation: Text = "longest_first"
    ):
        self.cross_entropy_ignore_index = -100
        self.data_dir = "./conala-corpus"

        # 1. Load Data
        if input_text:
          inputs_ = [input_text]
          targets_ = ["dummy"]
        else:
          with open(os.path.join(self.data_dir, "conala-test.json"), 'r') as readFile:
              data = json.load(readFile)
          inputs_ = [str(d['rewritten_intent']) for d in data]
          targets_ = [str(d['snippet']) for d in data]


        # 2. Tokenize Inputs
        print("Tokenizing Data...")
        input_encode = tokenizer(inputs_, max_length=max_length, truncation=truncation, padding=padding)
        target_encode = tokenizer(targets_, max_length=max_length, truncation=truncation, padding=padding)

        self.outputs = dict()
        self.outputs["labels"] = target_encode['input_ids']
        self.outputs["input_ids"] = input_encode['input_ids']
        self.outputs["attention_mask"] = input_encode['attention_mask']

    def __len__(self):
        return len(self.outputs['input_ids'])

    def __getitem__(self, item):
        outp_item = {key: torch.LongTensor(val[item]) for key, val in self.outputs.items()}
        if 'labels' in list(outp_item.keys()):
            labels = outp_item['labels']
            prompt_length = int(sum(labels != 0))
            outp_item['labels'][prompt_length:] = self.cross_entropy_ignore_index  # Cross Entropy Ignore Index
        return outp_item

In [6]:
class Inferencer:
    def __init__(
            self,
            model,
            tokenizer,
            batch_size: int = 64,
            max_length: int = 128,
            beam_size: int = 5
    ):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model
        self.tokenizer = tokenizer
        self.batch_size = batch_size
        self.max_length = max_length
        self.beam_size = beam_size
        self.model.to(self.device)

    def __call__(self, data_loader) -> List:
        outp = []
        for batch in tqdm(data_loader):
            inputs = dict()
            for key, value in batch.items():
                if key == 'labels':
                    continue
                inputs[key] = value.to(self.device)

            max_input_len = torch.sum(inputs['input_ids'] != 0, dim=1).max().item()
            inputs['input_ids'] = inputs['input_ids'][:, :max_input_len].to(self.device)
            inputs['attention_mask'] = inputs['attention_mask'][:, :max_input_len].to(self.device)
            inputs['max_length'] = self.max_length

            with torch.no_grad():
                outp_ = self.model.generate(**inputs,
                                            num_beams=self.beam_size,
                                            repetition_penalty=2.5,
                                            length_penalty=1.0,
                                            early_stopping=True
                                            )
                for o in outp_:
                    generated = self.tokenizer.decode(o, max_length=self.max_length)
                    generated = self.prepro_generated_sent(generated)
                    outp.append(generated)
        return outp

    @staticmethod
    def prepro_generated_sent(sent: Text) -> Text:
        PREPRO_PATTERN = re.compile('<[/a-zA-Z0-9_]+>')
        return PREPRO_PATTERN.sub(repl='', string=sent).strip()

In [7]:
def load_model_state_dict(model, load_file_name: Text):
    if torch.cuda.is_available():
        state_dict = torch.load(load_file_name)['state_dict']
    else:
        state_dict = torch.load(load_file_name, map_location=torch.device('cpu'))['state_dict']

    new_state_dict = {}
    for key, value in state_dict.items():
        new_state_dict[key.replace("model.", "")] = value
    model.load_state_dict(new_state_dict)
    return model

#### 2.3 Set Config and Load Tokenizer, and Model

In [8]:
config = {
    "max_length": 128,
    "padding": "max_length",
    "truncation": "longest_first",
    "batch_size": 64,
    "beam_size": 5
}

In [9]:
tokenizer = T5TokenizerFast.from_pretrained("t5-base")
dataset = TextGenerationTestDataset(
    tokenizer=tokenizer,
    max_length=config["max_length"],
    padding=config["padding"],
    truncation=config["truncation"] 
    )


test_dataloader = DataLoader(
    dataset,
    sampler=SequentialSampler(dataset),
    batch_size=config['batch_size'],
    num_workers=0
)

Downloading (…)ve/main/spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

Tokenizing Data...


For now, this behavior is kept to avoid breaking backwards compatibility when padding/encoding with `truncation is True`.
- Be aware that you SHOULD NOT rely on t5-base automatically truncating your input to 512 when padding/encoding.
- If you want to encode/pad to sequences longer than 512 you can either instantiate this tokenizer with `model_max_length` or pass `max_length` when encoding/padding.


In [10]:
model = T5ForConditionalGeneration.from_pretrained("t5-base")
model = load_model_state_dict(model, "t5-base-hq_augment.ckpt")

Downloading pytorch_model.bin:   0%|          | 0.00/892M [00:00<?, ?B/s]

Downloading (…)neration_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

#### 2.4 Generate the Results

In [11]:
inferencer = Inferencer(
        model,
        tokenizer,
        batch_size=config["batch_size"],
        max_length=config['max_length'],
        beam_size=config['beam_size']
    )
prediction = inferencer(test_dataloader)

100%|██████████| 8/8 [00:24<00:00,  3.02s/it]


In [13]:
print(prediction[:5])

['time.sleep(1)', "chr(int('4a4b4c', 16))", 'all(i  j for i, j in zip(myList, myList[1:]))', '"""0 1""".format(\'Hi\', \'world\')', 'datetime.datetime.now() - datetime.timedelta(days=1)']


#### 2.5 Make a Class for Evaluation


In [12]:
import logging
import re

import numpy as np
import torch
from datasets import Metric, load_metric
from transformers import PreTrainedTokenizer

__all__ = [
    "CodeGenerationEvaluator"
]

# From https://github.com/neulab/external-knowledge-codegen/blob/datasets/conala/conala_eval.py#L94
special_chars = re.compile(r'([^A-Za-z0-9_])')
lower_upper = re.compile(r'([a-z])([A-Z])')
double_space = re.compile(r'(\s)+')
QUOTED_TOKEN_RE = re.compile(r"(?P<quote>''|[`'\"])(?P<string>.*?)(?P=quote)")


class CodeGenerationEvaluator:
    """
    Helper class for calculating NORMAL BLEU scores. Calculates both BLEU and SacreBLUE.
    """

    def __init__(self, tokenizer: PreTrainedTokenizer,
                 device: torch.device,
                 logger: logging.Logger = None,
                 minimal: bool = False,
                 smooth_bleu: bool = False,
                 get_high_rouge: bool = False,
                 only_alphanumeric_chars:bool=False):
        self.sacre_bleu: Metric = load_metric('sacrebleu')
        self.normal_bleu: Metric = load_metric('bleu')
        self.rouge: Metric = load_metric('rouge')
        self.tokenizer: PreTrainedTokenizer = tokenizer
        self.logger = logger or logging.getLogger(__name__)
        self.device = device
        self.minimal = minimal
        self.smooth_bleu = smooth_bleu
        self.get_high_rouge = get_high_rouge
        self.only_alphanumeric_chars = only_alphanumeric_chars

    def postprocessText(self, preds, labels):
        preds = list(map(self.postprocessSingle, preds))
        labels = list(map(self.postprocessSingle, labels))

        return preds, labels

    def postprocessSingle(self, s):
        if not self.only_alphanumeric_chars:
            out = special_chars.sub(r' \1 ', s.strip())
        else:
            out = special_chars.sub(r' ', s.strip())
        out = lower_upper.sub(r'\1 \2', out)
        out = double_space.sub(r'\1', out)
        return out.replace('"', '`').replace("\'", "`")

    def __call__(self, preds):
        preds, labels = preds
        if isinstance(preds, tuple):
            preds = preds[0]

        # Replace -100 in the labels as we can't decode them.
        labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id)
        preds = np.where(preds != -100, preds, self.tokenizer.pad_token_id)
        decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True)
        decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True)
        return self.evaluate(decoded_preds, decoded_labels)

    def evaluate(self, decoded_preds, decoded_labels):
        # Postprocess the both the labels and the predictions
        decoded_preds, decoded_labels = self.postprocessText(decoded_preds, decoded_labels)
        if self.minimal:
            bleu_scores = self.calcBLEU(decoded_preds, decoded_labels)
            return {
                'BLEU'                  : bleu_scores['bleu'] * 100,
                'BLEU-Unigram-Precision': 100 * bleu_scores['precisions'][0],
                'BLEU-Bigram-Precision' : 100 * bleu_scores['precisions'][1],
            }
        sacre_scores, bleu_scores, rogue_scores = self.calcMetrics(decoded_preds, decoded_labels)
        self.logger.info(
            f"Got BLEU of {bleu_scores['bleu'] * 100:.2f} and SacreBLEU of "
            f"{sacre_scores['score']:.2f}")

        if self.get_high_rouge:
            rouge_2 = rogue_scores['rouge2'].high
            rouge_l = rogue_scores['rougeL'].high
        else:
            rouge_2 = rogue_scores['rouge2'].mid
            rouge_l = rogue_scores['rougeL'].mid
        out = {
            "BLEU"                   : bleu_scores['bleu'] * 100,
            'SacreBLEU'              : sacre_scores['score'],
            'BLEU-Unigram-Precision' : 100 * bleu_scores['precisions'][0],
            'BLEU-Bigram-Precision'  : 100 * bleu_scores['precisions'][1],
            'BLEU-Trigram-Precision' : 100 * bleu_scores['precisions'][2],
            "ROUGE-2"                : rouge_2.fmeasure * 100,
            "ROUGE-L"                : rouge_l.fmeasure * 100,
            'Sacre-Unigram-Precision': sacre_scores['precisions'][0],
            'Sacre-Bigram-Precision' : sacre_scores['precisions'][1],
            'Sacre-Trigram-Precision': sacre_scores['precisions'][2]
        }
        return out
        # return {k: round(v, 4) for k, v in out.items()}

    def calcBLEU(self, decoded_preds, decoded_labels):

        # Calculate the BLEU scores then return them.
        def bleuTok(arr):
            return list(map(lambda x: x.split(' '), arr))

        bleu_toked_preds = bleuTok(decoded_preds)
        blue_toked_labels = [[x] for x in bleuTok(decoded_labels)]
        return self.normal_bleu.compute(
            predictions=bleu_toked_preds,
            references=blue_toked_labels,
            smooth=self.smooth_bleu
        )

    def calcMetrics(self, decoded_preds, decoded_labels):

        sacre_scores = self.sacre_bleu.compute(predictions=decoded_preds,
                                               references=[[l] for l in decoded_labels])

        rogue_scores = self.rouge.compute(predictions=decoded_preds, references=decoded_labels)
        return sacre_scores, self.calcBLEU(decoded_preds, decoded_labels), rogue_scores

    def evaluateSingle(self, prediction, label):
        return self.evaluate([prediction], [label])

#### 2.6 Prepare for Evaluation

In [14]:
# 1. Evaluator
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
evaluator = CodeGenerationEvaluator(tokenizer, device, smooth_bleu=True)

# 2. Test data ground-truth
with open("./conala-corpus/conala-test.json", 'r', encoding='utf-8') as targetFile:
  data = json.load(targetFile)
target = [str(d['snippet']) for d in data]

  self.sacre_bleu: Metric = load_metric('sacrebleu')


Downloading builder script:   0%|          | 0.00/2.85k [00:00<?, ?B/s]

Downloading builder script:   0%|          | 0.00/2.48k [00:00<?, ?B/s]

Downloading extra modules:   0%|          | 0.00/1.55k [00:00<?, ?B/s]

Downloading builder script:   0%|          | 0.00/2.17k [00:00<?, ?B/s]

In [15]:
bleu_scores = []
for ref, pred in tqdm(zip(target, prediction), total=len(prediction)):
  if pred is not None and pred != "":
    if ref is not None and ref != "":
      metrics = evaluator.evaluate([pred], [ref])
      bleu_scores.append(metrics["BLEU"])


100%|██████████| 500/500 [01:32<00:00,  5.43it/s]


In [16]:
print(np.mean(bleu_scores))

40.49834950070508


## 3. Simple Code Implementation for User's text input

In [17]:
def generate_result(text):
  dataset = TextGenerationTestDataset(tokenizer, input_text=text)
  data_loader = DataLoader(dataset)
  result = inferencer(data_loader)
  return result

In [18]:
text = "sum elements at the same index in list `data`"
outp = generate_result(text)

Tokenizing Data...


100%|██████████| 1/1 [00:00<00:00,  1.61it/s]


In [19]:
print(outp[0])

[[sum(item) for item in zip(*x)] for items in zip(*data)]
