# Main Translation Pipeline Notebook

## Setup

In [1]:
# Download/install required packages
# !pip install openpyxl
# !pip install transformers
# !pip install allennlp
# !pip install allennlp-models
# !pip install --upgrade google-cloud-storage
# !pip install cached-path==1.1.2
# !pip install spacy
# !python -m spacy download en_core_web_md
# !pip install regex
# !pip install pandas
# !pip install numpy
# !pip install nltk
# !pip install flair
# !pip install Levenshtein
# !pip install tqdm
# !pip install torch
# !pip install sklearn
#!pip install ipywidgets

In [2]:
# other imports
import pickle as pkl
from collections import *
import Levenshtein
import spacy
import re
import math
import pandas as pd
import numpy as np
from os import listdir
from os.path import isfile
from sklearn.model_selection import train_test_split
from transformers import BertTokenizerFast, BertForTokenClassification
import torch
from torch.utils.data import DataLoader
from torch.optim import SGD
from tqdm import tqdm
import nltk

In [3]:
# Import functions from utility modules
from util.data_preprocess import align_scripts
from util.data_preprocess import segregate_test_cases
from util.data_preprocess.process_steps import process_steps
from util.parsing.frame_parser import FrameParser
from util.script_generation.script_generation import generate_script
from util.evaluation.evaluator import preprocess_test_case
from nltk.translate.bleu_score import sentence_bleu
from util.evaluation.evaluator import SmoothingFunction
from util.evaluation.evaluator import BLEU_evaluation 
from util.evaluation.evaluator import f1_score
from util.evaluation.evaluator import f1_evaluation
from util.evaluation.evaluator import groovy_parser

## **Data**

### 1) Load-in data and align scripts with the test cases

#### Load in data

In [4]:
test_cases_df = pd.read_excel('data/Katalon Capstone - Manual Test Cases.xlsx')

#### Align scripts

In [5]:
test_cases_df = align_scripts.align_scripts(test_cases_df)

#### Convert dataframe to a csv file

In [6]:
test_cases_df.to_csv('data/test_cases_with_scripts.csv')

### 2) Extract and save manual test case steps

#### Load in updated dataframe

In [7]:
test_cases_with_scripts_df = pd.read_csv('data/test_cases_with_scripts.csv')

#### Segregate steps, pre_conditions and expected results

In [8]:
pre_conditions, steps, expected_results = segregate_test_cases.process_test_cases(test_cases_with_scripts_df['Full manual test cases'].tolist())

In [9]:
test_cases_with_scripts_df['pre_conditions'] = pre_conditions
test_cases_with_scripts_df['steps'] = steps
test_cases_with_scripts_df['expected_results'] = expected_results

#### Process steps

In [10]:
new_tc_df = pd.read_excel('data/new_tc.xlsx', header=1, index_col=0)

In [11]:
steps_list = process_steps(new_tc_df['New Test Cases'].to_list())

#### Remove null values

In [12]:
updated_steps = []
for steps in steps_list:
    # remove null values
    if isinstance(steps, float):
        continue
    else:
        updated_steps.append(steps)

#### Save updated_steps as a file

In [13]:
with open('pickle_files/pickle_files_for_parsing/steps_updated.pkl', 'wb') as handle:
    pkl.dump(updated_steps, handle)

#### Get pages

In [14]:
pages_list = test_cases_with_scripts_df['Feature'].to_list()

#### Remove values where rewritten test cases are null

In [15]:
updated_pages = []
for idx, page in enumerate(pages_list):
    if isinstance(new_tc_df.iloc[idx]['New Test Cases'], float):
        continue
    else:
        updated_pages.append(page)

#### Save updated_pages as a file

In [16]:
with open('pickle_files/pickle_files_for_parsing/pages.pkl', 'wb') as handle:
    pkl.dump(updated_pages, handle)

## **Tagging**

### 1) Setup

#### Load in annotated data

In [17]:
annotated_file_path = 'data/anno_14_tc.csv'
df = pd.read_csv(annotated_file_path)

#### Reformat Dataset

In [18]:
# sents -- list of tuples (sentence, tags)
sents = []
current_sent, current_tags = [], []
for idx, row in df.iterrows():
    word, tag = row['Word'], row['IOB-tag']
    if tag == '*':
        if len(current_sent) > 0:
            sents.append((current_sent, current_tags))
            current_sent, current_tags = [], []
        else:
            continue
    else:
        current_sent.append(word)
        current_tags.append(tag)

#### Build tag-id dictionaries

In [19]:
all_tags = df['IOB-tag'].values.tolist()
unique_tags = set(all_tags) - set('*')

tags2ids = {k: v for v, k in enumerate(sorted(unique_tags))}
ids2tags = {v: k for v, k in enumerate(sorted(unique_tags))}

#### Hyperparameter

In [20]:
MAX_LENGTH = max(len(sent) for sent, _ in sents)
LEARNING_RATE = 1e-2
EPOCHS = 1

### 2) Preprocessing

#### Tokenization

In [21]:
tokenizer = BertTokenizerFast.from_pretrained('bert-base-cased')

#### Align tags to tokenized texts

In [22]:
def align_label(texts, labels):
    tokenized_inputs = tokenizer(texts, padding='max_length', max_length=MAX_LENGTH)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(tags2ids[labels[word_idx]])
            except:
                label_ids.append(-100)
        else:
            try:
                label_ids.append(tags2ids[labels[word_idx]])
            except:
                label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids

#### Data class

In [23]:
class DataSequence(torch.utils.data.Dataset):

    def __init__(self, tagged_sents):
        self.sents = [' '.join(sent) for sent, _ in tagged_sents]
        self.tags = [tags for _, tags in tagged_sents]
        self.texts = [tokenizer(' '.join(sent), padding='max_length', max_length = MAX_LENGTH, return_tensors="pt") for sent, _ in tagged_sents]
        self.labels = [align_label(' '.join(sent), tag) for sent, tag in tagged_sents]

    def __len__(self):

        return len(self.labels)

    def get_batch_data(self, idx):

        return self.texts[idx]

    def get_batch_labels(self, idx):

        return torch.LongTensor(self.labels[idx])

    def __getitem__(self, idx):

        batch_data = self.get_batch_data(idx)
        batch_labels = self.get_batch_labels(idx)

        return batch_data, batch_labels

#### Split train and test

In [24]:
train_set, dev_set = train_test_split(sents, test_size=0.2, shuffle=True)

### 3) Build Tagger

#### Load pre-trained BERT model

In [25]:
class BertModel(torch.nn.Module):

    def __init__(self):

        super(BertModel, self).__init__()

        self.bert = BertForTokenClassification.from_pretrained('bert-base-cased', num_labels=len(unique_tags))

    def forward(self, input_id, mask, label):

        output = self.bert(input_ids=input_id, attention_mask=mask, labels=label, return_dict=False)

        return output

#### Evaluation

In [26]:
def evaluate(sys_spacy_data, gold_spacy_data):
    precision, recall, fscore = 0, 0, 0

    tp = 0
    fp = 0
    fn = 0

    for sys_ex, gold_ex in zip(sys_spacy_data, gold_spacy_data):
        gold_annotations = set([tuple(e) for e in gold_ex])
        sys_annotations = set([tuple(e) for e in sys_ex])

        tp += len(sys_annotations.intersection(gold_annotations))
        fp += len(sys_annotations.difference(gold_annotations))
        fn += len(gold_annotations.difference(sys_annotations))

    if tp != 0:
        recall = (tp/(tp+fn)) * 100
        precision = (tp/(tp+fp)) * 100
        fscore = 2*recall*precision/(recall+precision)

    return precision, recall, fscore

#### Tagger

In [27]:
# predict tags for sentences
def align_word_ids(texts):
    tokenized_inputs = tokenizer(texts, padding='max_length', max_length=MAX_LENGTH)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(1)
            except:
                label_ids.append(-100)
        else:
            label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids


def predict(model, sentence):
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    text = tokenizer(sentence, padding='max_length', max_length=MAX_LENGTH, return_tensors="pt")

    mask = text['attention_mask'][0].unsqueeze(0).to(device)

    input_id = text['input_ids'][0].unsqueeze(0).to(device)
    label_ids = torch.Tensor(align_word_ids(sentence)).unsqueeze(0).to(device)

    logits = model(input_id, mask, None)
    logits_clean = logits[0][label_ids != -100]

    predictions = logits_clean.argmax(dim=1).tolist()
    prediction_label = [ids2tags[i] for i in predictions]
    return prediction_label

### 4) Train

In [28]:
def train_loop(model, train_set, dev_set):

    train_dataset = DataSequence(train_set)
    dev_dataset = DataSequence(dev_set)

    train_dataloader = DataLoader(train_dataset, num_workers=4, batch_size=1, shuffle=True)
    dev_dataloader = DataLoader(dev_dataset, num_workers=4, batch_size=1)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    optimizer = SGD(model.parameters(), lr=LEARNING_RATE)

    if use_cuda:
        model = model.cuda()

    best_acc = 0
    best_loss = 1000

    for epoch_num in range(EPOCHS):

        total_acc_train = 0
        total_loss_train = 0

        model.train()

        for train_data, train_label in tqdm(train_dataloader):

            train_label = train_label[0].to(device)
            mask = train_data['attention_mask'][0].to(device)
            input_id = train_data['input_ids'][0].to(device)

            optimizer.zero_grad()
            loss, logits = model(input_id, mask, train_label)

            logits_clean = logits[0][train_label != -100]
            label_clean = train_label[train_label != -100]

            predictions = logits_clean.argmax(dim=1)

            acc = (predictions == label_clean).float().mean()
            total_acc_train += acc
            total_loss_train += loss.item()

            loss.backward()
            optimizer.step()

        predictions = [predict(model, sent) for sent in dev_dataset.sents]
        p,r,f = evaluate(predictions, dev_dataset.tags)
        print("  PRECISION: %.2f%%, RECALL: %.2f%%, F-SCORE: %.2f%%" % (p,r,f))

#### Download katalon-bert-tagger.pt here

For running the main notebook, if you want to use the trained tagger directly, you need to download the model [pickle file](https://drive.google.com/file/d/1VXX8jcqaZY7p5K1ZQ8NSq3xIqZ8XSj2q/view?usp=sharing) and save it in the `pickle_files/pickle_files_for_parsing/`. (Sorry for the inconvenience as the tagger is too large to upload on Github.)

In [29]:
# Here instead of training once again, we load the trained model
# model = BertModel()
# train_loop(model, train_set, dev_set)
model_path = 'pickle_files/pickle_files_for_parsing/katalon-bert-tagger.pt'
model = BertModel()
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForTokenClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cas

<All keys matched successfully>

### 5) Tagging

In [30]:
predictions = []
for steps in updated_steps:
    predictions.append([])
    for sent in steps:
        predictions[-1].append(predict(model, sent))

In [31]:
tokenized_cases = []
for case in updated_steps:
    tokenized_steps = []
    for step in case:
        text_tokenized = tokenizer(step, padding='max_length', max_length=MAX_LENGTH, return_tensors="pt")
        sent = tokenizer.decode(text_tokenized["input_ids"][0], skip_special_tokens=True)
        tokenized_steps.append(sent)
    tokenized_cases.append(tokenized_steps)

## **Parsing**

In [32]:
# Initiate the frame parser
frame_parser = FrameParser()

In [33]:
# Using rule-based parsing (if you are using rule-based, comment the BERT parsed_test_cases and uncomment the code below)
parsed_test_cases = frame_parser.parse(updated_steps, pages_list, predictions, rule_base=True)

# Using BERT tagger (if you are using tagger, comment the previus parsed_test_cases and uncomment the code below)
#parsed_test_cases = frame_parser.parse(tokenized_cases, pages_list, predictions)

Error (enter & input): NO SECOND OBJECT! 
{'V': 'set', 'ARG1': 'name field text'}
Error (enter & input): NO SECOND OBJECT! 
{'V': 'input', 'ARG1': 'the name field'}


In [34]:
# Demo showing first parsed test case
parsed_test_cases[0]

[{'V': 'enter',
  'value': 'admin1@mail.com',
  'location': 'to email textbox',
  'page': 'Login'},
 {'V': 'enter',
  'value': 'Admin@123',
  'location': 'to password textbox',
  'page': 'Login'},
 {'V': 'click', 'value': 'Login', 'page': 'Login'},
 {'V': 'wait',
  'value': 'for title to be present for seconds',
  'time': 30,
  'page': 'Login'}]

## **Generate Test Scripts**

In [35]:
scripts = [generate_script(test_case) for test_case in parsed_test_cases]

# Demo showing first parsed test case
print(scripts[0])

# for i, script in enumerate(scripts):
#     print("Script " + str(i)+'\n')
#     print(script)
#     print("-"*100)

WebUI.setText(findTestObject('Page_Login/tbx_Email'), 'admin1@mail.com')
WebUI.setText(findTestObject('Page_Login/tbx_Password'), 'Admin@123')
WebUI.click(findTestObject('Page_Login/btn_Login'))
WebUI.waitForElementPresent(findTestObject('Page_Login/txt_EmailErrorMessage'), 30, FailureHandling.STOP_ON_FAILURE)


## **Evaluation**

### 1) Load the generated scripts and gold scripts

In [36]:
# preprocess the generated scripts by splitting
scripts = [script.split('\n') for script in scripts]

In [37]:
pkl_folder = 'pickle_files/pickle_files_for_script_generation/'

with open(pkl_folder+"script_list.pkl", "rb") as f:
    gold_scripts = pkl.load(f)

In [38]:
# Preprocess the test cases to remove precondition and verification parts and also the WebUI we ignored like WebUI.comment
gold_scripts = [preprocess_test_case(gold) for gold in gold_scripts[:14]]

In [39]:
# Remove empty script which is test case 6 here
gold_scripts.remove(None)

### 2) F1 evaluation

In [40]:
f1_evaluation(gold_scripts, scripts)

0.64240217616462


### 3) BLEU

In [41]:
BLEU_evaluation(gold_scripts, scripts)

1 gram BLEU Score is: 0.46538338910220767
2 gram BLEU Score is: 0.35910166938106725
3 gram BLEU Score is: 0.29529716592636013
4 gram BLEU Score is: 0.25111982298579294
