In [1]:
!pip install transformers



In [2]:
import torch
from tqdm.notebook import tqdm

from transformers import BertTokenizer
from torch.utils.data import TensorDataset

from transformers import BertForSequenceClassification

import nltk
nltk.download('punkt')

import pickle


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
model_dir = 'MODEL'

# Load a trained model and vocabulary that you have fine-tuned
model = BertForSequenceClassification.from_pretrained(model_dir)
tokenizer = BertTokenizer.from_pretrained(model_dir)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Copy the model to the GPU.
model.to(device)

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12,

In [5]:
# Function to eliminate consecutive newline characters from a string
def eliminate_duplicate_newlines(input_string):
    combined = input_string[0]
    for character in input_string[1:]:
        # Append character if it's not a newline following another newline
        if not (character == '\n' and combined[-1] == '\n'):
            combined += character
    return combined

# Function to translate TTP (Tactics, Techniques, and Procedures) IDs to their names
def translate_ttp_ids_to_names(ttp_identifiers):
    # Open the mapping file from TTP ID to TTP Name
    with open(ttpid2name, 'rb') as file:
        id_to_name_map = pickle.load(file)
    # Create a list of names corresponding to the provided TTP identifiers
    ttp_names_list = [id_to_name_map[id] for id in ttp_identifiers if id in id_to_name_map]
    return ttp_names_list

# Function to convert labeled data to TTP IDs
def convert_labels_to_ttp_ids(labelled_data):
    # Load the mapping from labels to TTP IDs
    with open(label2ttpid, 'rb') as file_handle:
        label_to_ttpid_map = pickle.load(file_handle)

    # Reverse the mapping to get TTP IDs from labels
    ttpid_for_labels = {value: key for key, value in label_to_ttpid_map.items()}

    # Ensure TTP IDs are unique and convert labels to TTP IDs
    unique_ttp_ids = list({ttpid_for_labels[label] for label in labelled_data})

    # Fetch names for the unique TTP IDs
    names_for_ttp_ids = translate_ttp_ids_to_names(unique_ttp_ids)

    return unique_ttp_ids, names_for_ttp_ids

# Function to process a text file and extract TTPs using a model and tokenizer
def process_text_for_ttps(file_path, prediction_model, text_tokenizer):
    technical_ids_predicted = []
    processed_sentences = []
    # Read and preprocess the text file
    with open(file_path, 'r', encoding='utf-8') as file:
        document_content = file.read()

    # Eliminate duplicate newlines and replace tabs and single quotes
    document_content = eliminate_duplicate_newlines(document_content)
    document_content = document_content.replace('\t', ' ').replace("\'", "'")

    # Split the text into sentences using NLTK
    nltk_sentences = nltk.sent_tokenize(document_content)
    split_sentences = []
    for sentence in nltk_sentences:
        split_sentences.extend(sentence.split('\n'))

    # Collect non-empty sentences
    for sentence in split_sentences:
        if sentence:
            processed_sentences.append(sentence)

    # Predict TTP IDs for each sentence
    for sentence in processed_sentences:
        model_inputs = text_tokenizer(sentence, return_tensors="pt")
        model_inputs.to(device)

        # Use the model to predict the class of each sentence without gradient calculation
        with torch.no_grad():
            output_logits = prediction_model(**model_inputs).logits

        # Determine the class with the highest probability
        highest_probability_class_id = output_logits.argmax().item()
        # Append the ID if the highest logit exceeds a threshold
        if output_logits.amax().cpu() > 6.15:
            technical_ids_predicted.append(highest_probability_class_id)

    # Convert the predicted technical IDs to TTP identifiers and names
    ttp_identifiers_list = convert_labels_to_ttp_ids(technical_ids_predicted)
    return ttp_identifiers_list

In [8]:
label2ttpid = 'label_dict.pkl'
ttpid2name = 'ttp_id_name.pkl'
fname= "APT28_unit42_sofacy_uses_dealerschoice_target_european_government_agency.txt"
ttp_ids, ttp_names = process_text_for_ttps(fname, model, tokenizer)
print("Extracted TTPs are:")
for i in range(len(ttp_ids)):
  print(ttp_ids[i], " - ", ttp_names[i])

Extracted TTPs are:
T1027  -  Obfuscated Files or Information
T1036  -  Masquerading
T1071  -  Web Protocols
T1090  -  Proxy
T1105  -  Ingress Tool Transfer
T1132  -  Data Encoding
T1204  -  User Execution
T1140  -  Deobfuscate/Decode Files or Information
T1059  -  Command and Scripting Interpreter
T1566  -  Phishing
T1053  -  Scheduled Task/Job


In [9]:
len(ttp_ids)

11