In [20]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
%cd /content/drive/MyDrive/syscall

/content/drive/.shortcut-targets-by-id/1e_s52LoUFBat8BLGvGC96XkKahitNbby/syscall


In [18]:
#!python Binary_BERT.py

In [24]:
import pandas as pd
import numpy as np
import re
import warnings
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
import torchtext
import torchtext.data.utils as data_utils
import torchtext.vocab as vocab
from transformers import BertTokenizer, BertModel
from tqdm import tqdm

ImportError: cannot import name 'ArrowDtype' from 'pandas.core.dtypes.dtypes' (/usr/local/lib/python3.10/dist-packages/pandas/core/dtypes/dtypes.py)

In [23]:
# Load Data
df = pd.read_csv('ultraclean_final_systemcalls_label.csv').dropna()
df = df.sample(frac=1).reset_index(drop=True)

# Split Data
train_df, test_df = train_test_split(df, random_state=41, train_size=0.8, stratify=df['label'])

# Preprocess Labels
label_encoder = LabelEncoder()
train_df['label'] = label_encoder.fit_transform(train_df['label'])
test_df['label'] = label_encoder.transform(test_df['label'])

NameError: name 'pd' is not defined

In [None]:
df

In [None]:
df['label'].value_counts()

In [None]:
# Define Dataset Class
class SystemCallsDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]['systemcalls']
        label = self.data.iloc[idx]['label']
        return text, label

In [None]:
# Create Datasets
train_dataset = SystemCallsDataset(train_df)
test_dataset = SystemCallsDataset(test_df)

# Define Tokenizer and Model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')

In [None]:
# Model Architecture
class BERTClassifier(nn.Module):
    def __init__(self, bert_model, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = bert_model
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        logits = self.fc(pooled_output)
        return logits

In [None]:
# Function to tokenize text
def tokenize_text(text):
    inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
    return inputs

In [None]:
# Prepare DataLoader
def collate_fn(batch):
    texts, labels = zip(*batch)
    inputs = [tokenize_text(text) for text in texts]
    max_length = max(len(input["input_ids"][0]) for input in inputs)
    padded_input_ids = []
    attention_masks = []
    for input in inputs:
        input_ids = input["input_ids"]
        input_ids_padding = torch.zeros(1, max_length, dtype=torch.long)
        input_ids_padding[:, :input_ids.shape[1]] = input_ids
        padded_input_ids.append(input_ids_padding)
        attention_mask = input["attention_mask"]
        attention_mask_padding = torch.zeros(1, max_length, dtype=torch.long)
        attention_mask_padding[:, :attention_mask.shape[1]] = attention_mask
        attention_masks.append(attention_mask_padding)
    padded_input_ids = torch.cat(padded_input_ids, dim=0)
    attention_masks = torch.cat(attention_masks, dim=0)
    labels = torch.tensor(labels)
    return padded_input_ids, attention_masks, labels

In [None]:
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=8, collate_fn=collate_fn)

In [None]:
# Define Training Function
def train_model(model, train_loader, optimizer, criterion, num_epochs=5):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        for input_ids, attention_masks, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            optimizer.zero_grad()
            input_ids, attention_masks, labels = input_ids.to(device), attention_masks.to(device), labels.to(device)
            outputs = model(input_ids, attention_masks)
            _, predicted = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct_predictions / total_predictions
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}")


In [None]:
# Initialize Model, Optimizer, and Loss Function
# Load Model
model = BERTClassifier(bert_model, num_classes=2)

# Load model onto CPU
device = torch.device("cuda")
model.load_state_dict(torch.load('bert_model.pth', map_location=device))

# Optionally, if you intend to use GPU for inference later, you can move the model back to GPU
if torch.cuda.is_available():
    model = model.to(torch.device("cuda"))


In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    correct_predictions = 0
    total_predictions = 0
    predicted_labels = []
    true_labels = []
    with torch.no_grad():
        for input_ids, attention_masks, labels in tqdm(test_loader, desc="Evaluation"):
            input_ids, attention_masks, labels = input_ids.to(device), attention_masks.to(device), labels.to(device)
            outputs = model(input_ids, attention_masks)
            _, predicted = torch.max(outputs, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
            predicted_labels.extend(predicted.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    accuracy = correct_predictions / total_predictions
    print(f"Accuracy: {accuracy:.4f}")

    # Print classification report
    print(classification_report(true_labels, predicted_labels))

    return true_labels, predicted_labels, accuracy

# Call the function
true_labels, predicted_labels, accuracy = evaluate_model(model, test_loader)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Confusion Matrix
cm = confusion_matrix(true_labels, predicted_labels)
print("Confusion Matrix:")
print(cm)

# Classification Report
print("Classification Report:")
print(classification_report(true_labels, predicted_labels))

# Weighted F1 Score, Precision, Recall
weighted_f1 = f1_score(true_labels, predicted_labels, average='weighted')
weighted_precision = precision_score(true_labels, predicted_labels, average='weighted')
weighted_recall = recall_score(true_labels, predicted_labels, average='weighted')
print(f"Weighted F1 Score: {weighted_f1:.4f}")
print(f"Weighted Precision: {weighted_precision:.4f}")
print(f"Weighted Recall: {weighted_recall:.4f}")

# Macro F1 Score, Precision, Recall
macro_f1 = f1_score(true_labels, predicted_labels, average='macro')
macro_precision = precision_score(true_labels, predicted_labels, average='macro')
macro_recall = recall_score(true_labels, predicted_labels, average='macro')
print(f"Macro F1 Score: {macro_f1:.4f}")
print(f"Macro Precision: {macro_precision:.4f}")
print(f"Macro Recall: {macro_recall:.4f}")

# Normalize confusion matrix
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Visualization
plt.figure(figsize=(8, 6))
ax = sns.heatmap(cm_norm, annot=True, cmap='inferno', fmt='.2f', annot_kws={"size": 16})
plt.xlabel('Predicted label', fontsize=16)
plt.ylabel('True label', fontsize=16)

# Define tick marks and positions
tick_marks = np.arange(2)
tick_positions = np.arange(0.5, 2.5, 1)

# Set x-axis tick labels at the center of the cell
ax.set_xticks(tick_positions)
ax.set_xticklabels(['Benign', 'Malware'], fontsize=16, ha='center')

# Set y-axis tick labels at the center of the cell
ax.set_yticks(tick_positions)
ax.set_yticklabels(['Benign', 'Malware'], fontsize=16, va='center')

plt.tight_layout()

# Save figure as EPS
plt.savefig('confusion_matrix.eps', format='eps')

plt.show()




# Explainability

In [None]:
pip install captum

In [None]:
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer

# Define the BERTClassifier class
class BERTClassifier(nn.Module):
    def __init__(self, bert_model, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = bert_model
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        logits = self.fc(pooled_output)
        return logits

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Initialize the BERT model
bert_model = BertModel.from_pretrained('bert-base-uncased')

# Initialize your classifier
model = BERTClassifier(bert_model, num_classes=2)

# Load the model state dictionary
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.load_state_dict(torch.load('bert_model.pth', map_location=device))

# Move the model to the appropriate device
model = model.to(device)


In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer

# Initialize the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define your custom dataset class
class MyDataset(Dataset):
    def __init__(self, texts, labels, max_length, tokenizer):
        self.texts = texts
        self.labels = labels
        self.max_length = max_length
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
            truncation=True
        )
        input_ids = encoding['input_ids'].flatten()
        attention_mask = encoding['attention_mask'].flatten()
        return input_ids, attention_mask, label

# Load Data
df = pd.read_csv('ultraclean_final_systemcalls_label.csv').dropna()
df = df.sample(frac=1).reset_index(drop=True)

# Split Data
train_df, test_df = train_test_split(df, random_state=41, train_size=0.8, stratify=df['label'])

# Preprocess Labels
label_encoder = LabelEncoder()
train_df['label'] = label_encoder.fit_transform(train_df['label'])
test_df['label'] = label_encoder.transform(test_df['label'])

# Example data
texts = test_df['systemcalls'].tolist()
labels = test_df['label'].tolist()
max_length = 128

# Initialize the dataset
test_dataset = MyDataset(texts, labels, max_length, tokenizer)




In [None]:
# Define collate function
def collate_fn(batch):
    input_ids = torch.stack([item[0] for item in batch])
    attention_masks = torch.stack([item[1] for item in batch])
    labels = torch.tensor([item[2] for item in batch])
    return input_ids, attention_masks, labels

# Initialize the DataLoader
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)


In [None]:
import torch
torch.cuda.empty_cache()

In [None]:
pip install transformers-interpret

In [None]:
from transformers import BertForSequenceClassification

# Initialize a BertForSequenceClassification model
model_for_classification = BertForSequenceClassification.from_pretrained('bert-base-uncased')

# Load your trained weights into the classification model
state_dict = torch.load('bert_model.pth', map_location=torch.device('cpu'))

# Check if the state_dict has keys related to 'classifier' (the default name for the classification head)
if all(k.startswith('classifier') for k in state_dict.keys()):
    # Load the modified state_dict directly
    model_for_classification.load_state_dict(state_dict)
else:
    # Remove the 'fc' layer keys from the state_dict if present
    state_dict = {k: v for k, v in state_dict.items() if 'fc' not in k}
    # Load the modified state_dict
    model_for_classification.load_state_dict(state_dict, strict=False)


In [None]:
from torch.nn.functional import softmax
from sklearn.metrics import classification_report
from tqdm import tqdm

def evaluate_model(model, test_loader, device):
    model.eval()
    correct_predictions = 0
    total_predictions = 0
    predicted_labels = []
    true_labels = []

    with torch.no_grad():
        for input_ids, attention_masks, labels in tqdm(test_loader, desc="Evaluation"):
            input_ids, attention_masks, labels = input_ids.to(device), attention_masks.to(device), labels.to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_masks).logits
            probabilities = softmax(outputs, dim=1)
            _, predicted = torch.max(probabilities, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
            predicted_labels.extend(predicted.cpu().tolist())
            true_labels.extend(labels.cpu().tolist())

    accuracy = correct_predictions / total_predictions
    print(f"Accuracy: {accuracy:.4f}")

    # Print classification report
    print(classification_report(true_labels, predicted_labels, digits=4))

    return true_labels, predicted_labels, accuracy

# Determine device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_for_classification.to(device)

# Call the function
true_labels, predicted_labels, accuracy = evaluate_model(model_for_classification, test_loader, device)


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers_interpret import SequenceClassificationExplainer
from transformers import BertForSequenceClassification

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers_interpret import SequenceClassificationExplainer

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# Load model
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased")

# Create explainer
cls_explainer = SequenceClassificationExplainer(model, tokenizer)


In [None]:
# Extract five random sentences based on a pre-selection
random_sentences = df[df.label.isin([0, 1])].sample(n=5, random_state=1234)['systemcalls'].reset_index(drop=True)

# Print each sentence in its full length
for sentence in random_sentences:
    print(sentence)


In [None]:
!pip install bertviz


In [None]:
from transformers import AutoTokenizer, AutoModel
from bertviz import model_view
import torch

# Cell 3: Load test data
df = pd.read_csv('ultraclean_final_systemcalls_label.csv').dropna()
test_df = df.sample(frac=0.2, random_state=42)  # Adjust fraction as needed

# Cell 4: Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained('bert-base-uncased', output_attentions=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Cell 5: Select a random sample from the test data
random_sample = test_df.sample(1)

# Cell 6: Tokenize the text and ensure it fits within the maximum sequence length
input_text = random_sample['systemcalls'].iloc[0]
inputs = tokenizer.encode_plus(input_text, return_tensors='pt', add_special_tokens=True, max_length=512, truncation=True)
input_ids = inputs['input_ids'].to(device)
attention_mask = inputs['attention_mask'].to(device)

# Cell 7: Get the model outputs
with torch.no_grad():
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    attention = outputs[-1]  # Retrieve attention from model outputs

# Cell 8: Convert input ids to tokens
tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

# Cell 9: Visualize attention using BertViz
model_view(attention, tokens)
