In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import torch
import numpy as np
import pandas as pd
from project_dataset import load_dataset
from dataclasses import dataclass
from tqdm.autonotebook import tqdm
from torch.utils.data import DataLoader, SequentialSampler

In [3]:
@dataclass
class Args:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    n_gpu = torch.cuda.device_count()
    task = 'attack_vector'
    use_word_level_tokenizer=False
    block_size = 512
    eval_batch_size = 512
args = Args()

In [4]:
dataset = load_dataset(args.task)

In [5]:
from transformers import logging
logging.set_verbosity(50)

In [6]:
# Load model directly
from transformers import AutoTokenizer, AutoModel

linevul_tokenizer = AutoTokenizer.from_pretrained("MickyMike/LineVul")
linevul_model = AutoModel.from_pretrained("MickyMike/LineVul")

In [7]:
from linevul_helpers import TextDataset, convert_examples_to_features
from linevul_extra import extract_line_attention, linevul_predict

class ExtendTextDataset(TextDataset):
    def __init__(self, tokenizer, args, data_frame):
        self.examples = []
        funcs = data_frame["processed_func"].tolist()
        for i in tqdm(range(len(funcs)), desc='ExtendTextDataset'):
            # as data has vulnerable only, we add new label column and make it all to 1
            self.examples.append(convert_examples_to_features(funcs[i], 1, tokenizer, args))

# to find TP
def find_tp(model, tokenizer, args, data_frame=None):
    if data_frame is not None:
        dataset = ExtendTextDataset(tokenizer, args, data_frame)
    else:
        dataset = TextDataset(tokenizer, args, file_type='test')
    sampler = SequentialSampler(dataset)
    data_loader = DataLoader(dataset, sampler=sampler, batch_size=args.eval_batch_size, num_workers=0)
    result, y_trues, y_preds = linevul_predict(model, data_loader, args.device)
    tp_indices = np.where((y_trues == y_preds) & (y_trues == 1))
    tp_indices = list(tp_indices[0])
    return result, tp_indices


def explain(model, tokenizer, explain_indices, data_frame=None): 
    """ 
        return (sample_idx, lines, n_lines)
    """
    if data_frame is not None:
        dataset = ExtendTextDataset(tokenizer, args, data_frame)
    else:
        dataset = TextDataset(tokenizer, args, file_type='test')
    sampler = SequentialSampler(dataset)
    data_loader = DataLoader(dataset, sampler=sampler, batch_size=1, num_workers=0)
    model.eval()
    index = 0
    progress_bar = tqdm(data_loader, total=len(data_loader))
    extract_list = []
    for mini_batch in progress_bar:
        if index in explain_indices:
            (input_ids, labels) = mini_batch
            ids = input_ids[0].detach().tolist()
            all_tokens = tokenizer.convert_ids_to_tokens(ids)
            all_tokens = [token.replace("Ġ", "") for token in all_tokens]
            all_tokens = [token.replace("ĉ", "Ċ") for token in all_tokens]
            with torch.no_grad():
                prob, attentions = model(input_ids=input_ids, output_attentions=True)
            lines_with_score, n_lines = extract_line_attention(attentions, all_tokens)
            extract_list.append((index, lines_with_score, n_lines))
        index += 1
    return extract_list

In [8]:
# multi-gpu evaluate
if args.n_gpu > 1:
    model = torch.nn.DataParallel(linevul_model)

In [9]:
train_set = dataset['train'].to_pandas()
validation_set = dataset['validation'].to_pandas()
test_set = dataset['test'].to_pandas()

In [12]:
result, explain_indices = find_tp(linevul_model, linevul_tokenizer, args, test_set)

ExtendTextDataset:   0%|          | 0/1350 [00:00<?, ?it/s]