In [2]:
#import pickle

from smooth_gradient import SmoothGradient
from integrated_gradient import IntegratedGradient
from input_reduction import InputReduction

import torch
from torch import nn
from torch.utils.data import DataLoader

from IPython.display import display, HTML


# model related
import torch
from transformers import AutoTokenizer, AutoModelWithHeads, set_seed
import transformers.adapters.composition as ac


# dataset related
import pandas as pd

Do not forget to change adapter names in forward pass before testing

## Explainability Tests

### Single Anomaly Task Model

In [None]:
# INIT MODEL
set_seed(666)
#LANG_ADAPTER_PATH = "tmp/test-anomaly-hdfs-houlsby/mlm"
#TASK_ADAPTER_PATH = "tmp/test-anomaly-hdfs-houlsby/hanomaly"
LANG_ADAPTER_PATH = "tmp/test-anomaly-firewall-pfeiffer-nd/mlm"
TASK_ADAPTER_PATH = "tmp/test-anomaly-firewall-pfeiffer-nd/fanomaly"
BASE_PATH = "roberta-base"
 
model = AutoModelWithHeads.from_pretrained(BASE_PATH)
tokenizer = AutoTokenizer.from_pretrained(BASE_PATH)
# load language adapter
load_lang_adapter = LANG_ADAPTER_PATH
lang_adapter_name = model.load_adapter(
    load_lang_adapter
    )
# load task/anomaly adapter
load_task_adapter = TASK_ADAPTER_PATH
task_adapter_name = model.load_adapter(load_task_adapter)

# set both adapters
model.set_active_adapters(ac.Stack(lang_adapter_name, task_adapter_name))

### Multi-Anomaly Task Model

In [None]:
# INIT MODEL
set_seed(666)

BASE_PATH = "roberta-base" 
 
model = AutoModelWithHeads.from_pretrained(BASE_PATH)
tokenizer = AutoTokenizer.from_pretrained(BASE_PATH)

LANG_ADAPTER_PATH_ = "tmp/test-anomaly-fusion2/mlm-hdfs"
TASK_ADAPTER_PATH_ = "tmp/test-anomaly-fusion2/hanomaly"
LANG_ADAPTER_PATH = "tmp/test-anomaly-fusion2/mlm-firewall"
TASK_ADAPTER_PATH = "tmp/test-anomaly-fusion2/fanomaly"
# classification head we have trained
FUSION_TASK = "tmp/test-anomaly-fusion2/allanomaly"

firewall_adapter = model.load_adapter(LANG_ADAPTER_PATH, with_head=False)
fanomaly_adapter =model.load_adapter(TASK_ADAPTER_PATH, with_head=False)
hdfs_adapter = model.load_adapter(LANG_ADAPTER_PATH_, with_head=False)
hanomaly_adapter = model.load_adapter(TASK_ADAPTER_PATH_, with_head=False)    

model.add_classification_head(FUSION_TASK)

fusion_setup = ac.Fuse(fanomaly_adapter, hanomaly_adapter)
model.add_adapter_fusion(fusion_setup)
# fuse two stacks
model.set_active_adapters(ac.Fuse(ac.Stack(firewall_adapter, fanomaly_adapter),\
                                    ac.Stack(hdfs_adapter, hanomaly_adapter)))

###  Test runs

In [11]:
# LOAD DATASET
eval_dataset = pd.read_csv('data/hdfs/anomaly/eval.csv')
#eval_dataset = pd.read_csv('data/firewall/anomaly/eval.csv')

In [12]:
# anomaly counterparts
eval_example = eval_dataset['log'].tolist()
eval_example_labels = eval_dataset['label'].tolist()

anomaly_example = []
anomaly_example_label = []
for idx,eel in enumerate(eval_example_labels):
    if eel:
        anomaly_example.append(eval_example[idx])
        anomaly_example_label.append(eel)

# select an example
anomaly_example = [anomaly_example[16]]
anomaly_example_label = [anomaly_example_label[16]]
#anomaly_example = [anomaly_example[3381]]
#anomaly_example_label = [anomaly_example_label[3381]]

#INIT DATASET
class A_Dataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer,logs):
        self.logs = logs
        self.tokenizer = tokenizer        
        
    def __getitem__(self, idx):
        encoding = self.tokenizer.encode_plus(self.logs[idx], truncation=True) 
        item = {key: torch.tensor(val) for key, val in encoding.items()}       
        return item

    def __len__(self):
        return len(self.logs)

test_dataset = A_Dataset(tokenizer,anomaly_example)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=1,
    shuffle=False,
)

# INIT CRIT
criterion = nn.CrossEntropyLoss()

In [18]:
integrated_grad = IntegratedGradient(
    model, 
    criterion, 
    tokenizer, 
    show_progress=False,
    encoder="roberta"
)
    
instances = integrated_grad.saliency_interpret(test_dataloader)

In [7]:
coloder_string = integrated_grad.colorize(instances[0])
display(HTML(coloder_string))
print(f"Real label is {anomaly_example_label[0]}")

Real label is 1


In [15]:
smooth_grad = SmoothGradient(
    model, 
    criterion, 
    tokenizer, 
    show_progress=False,
    encoder="roberta"
)    
instances = smooth_grad.saliency_interpret(test_dataloader)

In [9]:
coloder_string = smooth_grad.colorize(instances[0])
display(HTML(coloder_string))
print(f"Real label is {anomaly_example_label[0]}")

Real label is 1


In [4]:
input_reduc = InputReduction(
    model, 
    criterion, 
    tokenizer, 
    show_progress=False    
)    
result = input_reduc.attack(test_dataloader)
coloder_string = input_reduc.colorize(result['final'][0], result['original'])
display(HTML(coloder_string))



Initial length: 510
