In [None]:
pip install git+https://github.com/AlaFalaki/AttentionVisualizer.git

In [1]:
import sys
sys.path.append('NeuralCC') 
from run_classifier import get_coverage, convert_examples_to_features

In [2]:
import pickle
import AttentionVisualizer as av

In [3]:
from copy import deepcopy
import sys
import torch
import torch.nn as nn
import os
import argparse
from transformers import RobertaModel, RobertaTokenizer, RobertaConfig


class CodeCoverageMLP(nn.Module):
    def __init__(self, config):
        super(CodeCoverageMLP, self).__init__()
        # First hidden layer
        self.fc1 = nn.Linear(1 * config.hidden_size, config.hidden_size)
        # Second hidden layer
        self.fc2 = nn.Linear(config.hidden_size, config.hidden_size)
        # Output layer
        self.fc3 = nn.Linear(config.hidden_size, 1)
        self.forward_activation = torch.nn.GELU()
        # Dropout layer
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None \
            else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
  
    def forward(self, x):
        # Add first hidden layer
        x = self.forward_activation(self.fc1(x))
        # Add dropout layer
        x = self.dropout(x)
        # Add second hidden layer
        x = self.forward_activation(self.fc2(x))
        # Add dropout layer
        x = self.dropout(x)
        # Add output layer
        x = self.fc3(x)

        outputs = torch.sigmoid(x)
        return outputs


class CodeCoveragePredictionModel(nn.Module):
    def __init__(self, args, config, tokenizer):
        super(CodeCoveragePredictionModel, self).__init__()
        self.max_tokens = args.max_tokens
        self.use_statement_ids = args.use_statement_ids
        self.roberta = RobertaModel.from_pretrained(args.model_key, config=config)
        self.roberta.resize_token_embeddings(len(tokenizer))

        if self.use_statement_ids:
            self.statement_embeddings = nn.Embedding(args.max_tokens, config.hidden_size)
            print(self.statement_embeddings)

        for param in self.roberta.parameters():
            if args.pretrain:
                param.requires_grad = False
            else:
                param.requires_grad = True

        self.coverage_mlp = CodeCoverageMLP(config)
        self.loss_criterion = nn.BCELoss(reduction='mean')

    def forward(self, inputs_ids, inputs_masks, statements_ids, test_input_statements_ids=None, gold_ids=None):
        try:
            coverage_labels=gold_ids
            device = inputs_ids.device if inputs_ids is not None else 'cpu'
            inputs_embeddings = self.roberta.embeddings.word_embeddings(inputs_ids)
            if self.use_statement_ids: 
                statements_ids[statements_ids == -999] = 511    
                inputs_embeddings += self.statement_embeddings(statements_ids)
            
            roberta_outputs = self.roberta(
                inputs_embeds=inputs_embeddings,
                attention_mask=inputs_masks,
                output_attentions = True,
                output_hidden_states = True,
            )
            
            # Extract hidden states and attentions from roberta_outputs
            hidden_states = roberta_outputs[2]  # Assuming hidden states are at index 2
            hidden_states = roberta_outputs.hidden_states
            # attentions = roberta_outputs[3]
            attention_scores = roberta_outputs.attentions
            outputs_embeddings = hidden_states[-1]
            
            batch_preds, batch_true = [], []
            batch_loss = torch.tensor(0, dtype=torch.float, device=device)
    
            for _id, item_output_embeddings in enumerate(outputs_embeddings):
                statements_embeddings = []
                
                ### Padding tokens in statements_ids is -999.
                if self.use_statement_ids: 
                    item_statements_ids =  statements_ids[_id][
                        torch.ne(statements_ids[_id], 511)
                    ].tolist()
                else:
                    item_statements_ids =  statements_ids[_id][
                        torch.ne(statements_ids[_id], -999)
                    ].tolist()
                # Statement Embeddings
                # Tensor of item_statements_ids
                item_statements_ids_tensor = torch.tensor(item_statements_ids, device=device)
                
                # Get the total length of the code
                num_statements_in_item = torch.max(item_statements_ids_tensor).item()
                # print(f"Number of Lines in the Code: {num_statements_in_item}")

                # statement Embeddings
                for sid in range(num_statements_in_item + 1):
                    _statement_ids = (item_statements_ids_tensor == sid).nonzero().squeeze()
                    statement_embedding = torch.mean(item_output_embeddings[_statement_ids], dim=0)
                    statements_embeddings.append(statement_embedding)
                               
                # item_preds = self.coverage_mlp(torch.stack([torch.cat((x, test_input_embedding)) for x in statements_embeddings])).squeeze()
                item_preds = self.coverage_mlp(torch.stack(statements_embeddings)).squeeze()
                batch_preds.append(item_preds)

                return batch_preds, attention_scores
        except Exception as e:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
            print(exc_type, fname, exc_tb.tb_lineno)

args = argparse.Namespace()
args.data_dir = './dataset/testing.json'
args.output_dir = './output/'
args.max_tokens = 512
args.save_predictions = True
args.train_batch_size = 32
args.eval_batch_size = 32
args.learning_rate = 1e-4
args.weight_decay = 0.0
args.adam_epsilon = 1e-8
args.max_source_size = 512
args.use_statement_ids = False
args.model_key = "microsoft/codeexecutor"
args.pretrain = True
args.device = torch.device("cpu")

In [33]:
from transformers import RobertaModel, RobertaTokenizer, RobertaConfig
from AttentionVisualizer.main import *
import re
from copy import deepcopy


def indent_dedent_tokenize(code, tokenizer):
    # Indent and Dedent
    current_num_white_spaces = 0
    prev_num_white_spaces = 0
    # Token List
    code_tokens_list = []
    # Classification
    num_tokens = 0
    token_string = ""
    sentence_id = []
    
    lines = code.split('\n')
    for line_number, line in enumerate(lines):
        indented_code = ""
        # line_number_token = f"<{line_number}>"
        # indented_code += line_number_token + " "
        current_num_white_spaces = len(re.match(r'^\s*', line).group(0))
        
        if current_num_white_spaces > prev_num_white_spaces:
            indented_code += "<indent> "
            prev_num_white_spaces = current_num_white_spaces
        elif current_num_white_spaces < prev_num_white_spaces:
            diff = prev_num_white_spaces - current_num_white_spaces
            for temp in range(diff // 4):
                indented_code += "<dedent> "
            prev_num_white_spaces = current_num_white_spaces
            
        # Remove Blank Space before each line 
        line = line.lstrip()
        indented_code += line + "\n"     
        # Tokenize each line
        code_tokens_of_one_line = tokenizer.tokenize(indented_code)
        num_tokens += len(code_tokens_of_one_line)
        # Sentence Id
        sentence_id.extend([line_number] * len(code_tokens_of_one_line))
        # Add particular line token into the string:
        for token in code_tokens_of_one_line:
            token_string += f'"{str(token)}" '
            # Add Each token to the main list:
            code_tokens_list.append(token)
    return code_tokens_list

def convert_examples_to_features(code, tokenizer):
    code_tokens = indent_dedent_tokenize(code, tokenizer)
    max_source_size = 512
    # Encoder-Decoder for Trace Generation
    source_tokens = code_tokens[ : max_source_size ] 
    source_ids = tokenizer.convert_tokens_to_ids(source_tokens)

    source_masks = [1 for _ in range(len(source_ids))]
    zero_padding_length = max_source_size - len(source_ids)

    source_ids += [tokenizer.pad_token_id for _ in range(zero_padding_length)]
    source_masks += [tokenizer.pad_token_id for _ in range(zero_padding_length)]

    return torch.unsqueeze(torch.tensor(source_ids), dim=0), torch.unsqueeze(torch.tensor(source_masks), dim=0)


class CustomAttentionVisualizer(av.AttentionVisualizer):
    def __init__(self):
        super().__init__()
        self.model_name = "microsoft/codeexecutor"
        self.tokenizer  = RobertaTokenizer.from_pretrained(self.model_name)   
        special_tokens_list = ['<line>', '<state>', '</state>', '<dictsep>', '<output>', '<indent>',
                                '<dedent>', '<mask0>']
        for i in range(200):
            special_tokens_list.append(f"<{i}>")
        special_tokens_dict = {'additional_special_tokens': special_tokens_list}
        self.tokenizer.add_special_tokens(special_tokens_dict)

        config = RobertaConfig.from_pretrained("microsoft/codeexecutor")
        self.model = CodeCoveragePredictionModel(args, config, self.tokenizer)

        self.model.load_state_dict(torch.load('NeuralCC/model/model.ckpt', map_location=torch.device('cpu')), strict=False)
        self.model = self.model.roberta

        self.step1_lbl = widgets.HTML(value = f"<h3>Step 1:</h3> Write the input text in the box below.")
        self.input_text = widgets.Textarea(placeholder='Type something', description='Input Text:', rows=10)
        self.step1 = widgets.VBox([self.step1_lbl, self.input_text])
        
        self.preview_config_lbl = widgets.HTML(value = f"<h3>Step 2:</h3> Select which attention layer/head and words to visualize.")
        self.ignore_specials  = widgets.Checkbox(value=True, description='Ignore BOS/EOS', indent=False)
        self.ignore_dots     = widgets.Checkbox(value=True, description='Ignore [dot]s', indent=False)
        self.ignore_stopwords = widgets.Checkbox(value=True, description='Ignore Stop Words', indent=False)
        self.options = widgets.HBox([self.ignore_specials, self.ignore_dots, self.ignore_stopwords])
        
        self.layer_range = widgets.IntRangeSlider(value=[3, 5], min=1, max=12, step=1,
                                continuous_update=False, orientation='horizontal', readout=True,
                                readout_format='d')
        self.layer_ind = widgets.Dropdown(options=range(1, 13), value=1)
        self.layer = widgets.Dropdown(options=['all', 'range', 'individual'], value='all', description='Layer')
        self.layer_selection = widgets.HBox([self.layer, self.layer_range, self.layer_ind])

        self.head_range = widgets.IntRangeSlider(value=[3, 5], min=1, max=12, step=1,
                                continuous_update=False, orientation='horizontal', readout=True,
                                readout_format='d')
        self.head_ind = widgets.Dropdown(options=range(1, 13), value=1)
        self.head = widgets.Dropdown(options=['all', 'range', 'individual'], value='all', description='Head')
        self.head_selection = widgets.HBox([self.head, self.head_range, self.head_ind])
        
        self.visualize_btn = widgets.Button(description='VISUALIZE')
        self.note_lbl = widgets.HTML(value = f"<small>Hold your cursor on each word for a second to see its attention score.</small>")

        self.out = widgets.HTML(layout={'border': '1px solid black', 'padding': '4px', 'margin-top': '10px'})
        
        self.step2 = widgets.VBox([self.preview_config_lbl, self.options,
                                   self.layer_selection, self.head_selection, self.visualize_btn,
                                   self.note_lbl, self.out])
        
        self.ui = widgets.VBox([self.step1, self.step2])

    #--------------------------------------------------------------------------------
    # Get the input from textarea, do the preprocessing and run it through the model.
    #--------------------------------------------------------------------------------
    def on_visualize_click(self, c):
        self.out.value = "" 
        input1, input2 = convert_examples_to_features(self.input_text.value, self.tokenizer)
        outputs = self.model(input1, input2, output_attentions=True)
        the_tokens = indent_dedent_tokenize(self.input_text.value, self.tokenizer)
        number_of_tokens = len(the_tokens)
        positions, dot_positions, stopwords_positions = find_positions(self.ignore_specials.value,
                                                                       self.ignore_stopwords.value,
                                                                       the_tokens,
                                                                       self.stop_words)
        the_words = make_the_words(the_tokens, positions, self.ignore_specials.value)

        layer_indexes, head_indexes = self.extract_indexes()
        the_scores = []
        for i in range(*layer_indexes):
            for ii in range(*head_indexes):
                the_scores.append( torch.sum(outputs.attentions[i][0][ii], dim=0) / number_of_tokens )
        
        the_scores = torch.stack( the_scores )
        final_score = torch.sum( the_scores, dim=0 ) / the_scores.size(0)

        # Remove the CLS/SEP tokens and dots
        if self.ignore_specials.value:
            final_score = final_score[1:-1]
            
        min_ = torch.min( final_score )
        
        if self.ignore_dots.value:
            final_score[list(dot_positions.values())] = min_

        if self.ignore_stopwords.value:
            final_score[list(stopwords_positions.values())] = min_
        max_ = torch.max( final_score )
        
        for i in range( final_score.size(0) ):
            final_score[i] = scale( final_score[i], min_, max_ )
        self.final_score = final_score
        
        the_html = make_html(the_words, positions, final_score)
        self.out.value = the_html
        
        del input1, outputs, the_scores, final_score
        gc.collect()


def find_positions(ignore_specials, ignore_stopwords, the_tokens, stop_words):
    dot_positions = {}
    stopwords_positions = {}
    tmp = []

    if ignore_specials:
        word_counter = 0
        start_pointer = 0
        positions = {}

        num_of_tokens = len( the_tokens )
        num_of_tokens_range = range( num_of_tokens + 1 )

    else:
        word_counter = 1
        start_pointer = 1
        positions = {0: [0, 1]}

        num_of_tokens = len( the_tokens ) - 1
        num_of_tokens_range = range( 1, num_of_tokens + 1 )


    for i in num_of_tokens_range:
        if i == num_of_tokens:
            positions[word_counter] = [start_pointer, i]
            break

        if the_tokens[i][0] in ['Ġ', '.'] or "Ċ" == the_tokens[i]:
            if ignore_stopwords:
                joined_tmp = "".join(tmp)
                current_word = joined_tmp[1:] if joined_tmp[0] == "Ġ" else joined_tmp
                if current_word in stop_words:
                    stopwords_positions[word_counter] = i-1

            if the_tokens[i] == ".":
                dot_positions[word_counter+1] = i

            positions[word_counter] = [start_pointer, i]
            word_counter += 1
            start_pointer = i
            tmp = []

        tmp.append(the_tokens[i])

    if not ignore_specials:
        positions[len( positions )] = [i, i+1]
    
    return positions, dot_positions, stopwords_positions


def make_html(the_words, positions, final_score):
    the_html = "\t\t"
    line_ctr = 2
    for i, word in enumerate( the_words ):
        new_line = False
        if "Ċ" in word:
            new_line = True
        word = word.replace("Ċ", "")
        word = word.replace("Ġ", "")
        
        if i in positions:
            start = positions[i][0]
            end   = positions[i][1]

            if end - start > 1:
                score = torch.max( final_score[start:end] )
            else:
                score = final_score[start]

            if new_line:
                if line_ctr in [5, 7]:
                    the_html += f"<br />&emsp;&emsp;"
                else:
                    the_html += f"<br />"
                line_ctr += 1

            
            the_html += """<span style="background-color:rgba(51, 153, 255, {});
                        padding:3px 6px 3px 6px; margin: 0px 2px 0px 2px" title="{}"><code style="color:black">{}</code></span>""".format(score, score, word)
    
    return the_html

def make_the_words(the_tokens, positions, ignore_specials):
    the_words = []
    for [start, end] in positions.values():
        the_words.append(''.join(the_tokens[start:end]))

    return the_words

In [None]:
code = '''def main():
    n = 2
    if n % 2 == 1:
        print(n // 2 + 1)
    else:
        print(n // 2)
if __name__ == "__main__":
    main()'''

In [None]:
try:
    obj = CustomAttentionVisualizer()
    obj.show_controllers(with_sample=True)
except Exception as e:
    print(e)

In [None]:
num_correct, num_incorrect = [], []
for ex in examples_correct:
    num_correct.append(len([line for line in ex.code.split('\n') if 'aliasingVar' in line]))

for ex in examples_incorrect:
    num_incorrect.append(len([line for line in ex.code.split('\n') if 'aliasingVar' in line]))

import statistics
print(f"Correct\tMean:{statistics.mean(num_correct)}\tMedian: {statistics.median(num_correct)}")
print(f"Incorrect\tMean:{statistics.mean(num_incorrect)}\tMedian: {statistics.median(num_incorrect)}")