# **Language Model and MITRE ATT&CK**


## **Instructions**

* Use "Fine-tuning a masked language model" as the template to create your own language model.
  * https://huggingface.co/learn/nlp-course/en/chapter7/3
* Selcet a built-in language model, and try to fine-tune it with an additional corpus.
* We would like to make the fine-tuned model learn 'cybersecurity' knowledge, so we choose to use some cybersecurity-related, professional documents from MITRE website.
  * https://attack.mitre.org/resources/attack-data-and-tools/
* In the MITRE data and tools page, please find two excel files which include the definitions of attack tactics and attack techniques.
  * enterprise-attack-v15.1-tactics.xlsx
  * enterprise-attack-v15.1-techniques.xlsx
* Parse the xlsx files, and extract 'name' and 'description' as your additional corpus.
* Try to fine-tune your model.
* Note that you do not have to push your model to huggingface, rather please keep it in your colab and use/test it directly.

## **Import Libraries**



In [1]:
!pip install datasets
!pip install transformers[torch]
!pip install torch
!pip install transformers

Collecting datasets
  Downloading datasets-2.19.1-py3-none-any.whl.metadata (19 kB)
Collecting filelock (from datasets)
  Downloading filelock-3.14.0-py3-none-any.whl.metadata (2.8 kB)
Collecting numpy>=1.17 (from datasets)
  Downloading numpy-1.26.4-cp312-cp312-macosx_11_0_arm64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.1/61.1 kB[0m [31m281.6 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting pyarrow>=12.0.0 (from datasets)
  Downloading pyarrow-16.1.0-cp312-cp312-macosx_11_0_arm64.whl.metadata (3.0 kB)
Collecting pyarrow-hotfix (from datasets)
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl.metadata (3.6 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting pandas (from datasets)
  Downloading pandas-2.2.2-cp312-cp312-macosx_11_0_arm64.whl.metadata (19 kB)
Collecting tqdm>=4.62.1 (from datasets)
  Downloading tqdm-4.66.4-py3-none-any.whl.metadata (57 kB)
[

In [9]:
import torch
import accelerate
from datasets import Dataset
from transformers import default_data_collator
from transformers import AutoModelForMaskedLM
from transformers import AutoTokenizer
from transformers import DataCollatorForLanguageModeling
from transformers import TrainingArguments
from transformers import Trainer
from transformers import AutoModelForSequenceClassification
from torch.utils.data import DataLoader
from transformers import default_data_collator
from transformers import get_scheduler
from transformers import pipeline
from torch.optim import AdamW
from accelerate import Accelerator
from tqdm.auto import tqdm
import math
import collections
import numpy as np
import pandas as pd
import wget


## **Corpus**

In [11]:
!wget https://attack.mitre.org/docs/enterprise-attack-v15.1/enterprise-attack-v15.1-tactics.xlsx
!wget https://attack.mitre.org/docs/enterprise-attack-v15.1/enterprise-attack-v15.1-techniques.xlsx

URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1000)>

In [5]:
tactics_df = pd.read_excel('enterprise-attack-v15.1-tactics.xlsx')
techniques_df = pd.read_excel('enterprise-attack-v15.1-techniques.xlsx')

NameError: name 'pd' is not defined

In [None]:
tactics_filtered = tactics_df[['name', 'description']]
techniques_filtered = techniques_df[['name', 'description']]
tactics_techniques_df = pd.concat([tactics_filtered, techniques_filtered], ignore_index=True)


### **資料集範例**

In [None]:
sample = tactics_techniques_df.sample(n=3, random_state = 42)
for index, row in sample.iterrows():
    print(f"\n'>>> Name: {row['name']}'")
    print(f"'>>> Description: {row['description']}'")


'>>> Name: Virtualization/Sandbox Evasion'
'>>> Description: Adversaries may employ various means to detect and avoid virtualization and analysis environments. This may include changing behaviors based on the results of checks for the presence of artifacts indicative of a virtual machine environment (VME) or sandbox. If the adversary detects a VME, they may alter their malware to disengage from the victim or conceal the core functions of the implant. They may also search for VME artifacts before dropping secondary or additional payloads. Adversaries may use the information learned from [Virtualization/Sandbox Evasion](https://attack.mitre.org/techniques/T1497) during automated discovery to shape follow-on behaviors.(Citation: Deloitte Environment Awareness)

Adversaries may use several methods to accomplish [Virtualization/Sandbox Evasion](https://attack.mitre.org/techniques/T1497) such as checking for security monitoring tools (e.g., Sysinternals, Wireshark, etc.) or other system art

## **使用 AutoModelForMaskedLM 加載 DistilBERT**

In [None]:
model_checkpoint = "distilbert-base-uncased"
model = AutoModelForMaskedLM.from_pretrained(model_checkpoint)

# 檢查 DistilBERT 模型有多少參數，並與 BERT 比較
distilbert_num_parameters = model.num_parameters() / 1_000_000
print(f"'>>> DistilBERT number of parameters: {round(distilbert_num_parameters)}M'")
print(f"'>>> BERT number of parameters: 110M'")

'>>> DistilBERT number of parameters: 67M'
'>>> BERT number of parameters: 110M'


In [None]:
# 加載 tokenizer 用來將文字資料轉成數字格式
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

### **範例："This is a great [MASK].", 將 [MASK] 替換成前 5 個 candidates**



In [None]:
text = "This is a great [MASK]."

inputs = tokenizer(text, return_tensors="pt") # pytorch 的 tensor 形式
# forward operation 獲得所有 token 的 logits
token_logits = model(**inputs).logits
# Find the location of [MASK] and extract its logits
mask_token_index = torch.where(inputs["input_ids"] == tokenizer.mask_token_id)[1]
mask_token_logits = token_logits[0, mask_token_index, :]
# Pick the [MASK] candidates with the highest logits
top_5_tokens = torch.topk(mask_token_logits, 5, dim=1).indices[0].tolist()

# 將 [MASK] 替換成前 5 個 candidates
for token in top_5_tokens:
    print(f"'>>> {text.replace(tokenizer.mask_token, tokenizer.decode([token]))}'")

'>>> This is a great deal.'
'>>> This is a great success.'
'>>> This is a great adventure.'
'>>> This is a great idea.'
'>>> This is a great feat.'


### **定義 tokenize_function**

In [None]:
def tokenize_function(examples):
    result = tokenizer(examples["description"])
    if tokenizer.is_fast: # check if tokenizers 是不是快速的版本（使用 rust ）
        print(tokenizer.is_fast)
        # 加入 word_ids 讓單詞能夠對應
        result["word_ids"] = [result.word_ids(i) for i in range(len(result["input_ids"]))]
    return result

In [None]:
dataset = Dataset.from_pandas(tactics_techniques_df)
tokenized_datasets = dataset.map(
    tokenize_function, batched=True, remove_columns=["name", "description"]
)
print(tokenized_datasets)

Map:   0%|          | 0/651 [00:00<?, ? examples/s]

Token indices sequence length is longer than the specified maximum sequence length for this model (592 > 512). Running this sequence through the model will result in indexing errors


True
Dataset({
    features: ['input_ids', 'attention_mask', 'word_ids'],
    num_rows: 651
})


In [None]:
# tokenizer 和模型能夠接受的最大標記數量，超過這個長度在輸入時需要被截斷，這可以避免模型出錯或性能下降
tokenizer.model_max_length

512

In [None]:
chunk_size = 128

### **定義 group_texts**

In [None]:
def group_texts(examples):
    # Concatenate all texts
    concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
    # Compute length of concatenated texts
    total_length = len(concatenated_examples[list(examples.keys())[0]])
    # We drop the last chunk if it's smaller than chunk_size
    total_length = (total_length // chunk_size) * chunk_size
    # Split by chunks of max_len
    result = {
        k: [t[i : i + chunk_size] for i in range(0, total_length, chunk_size)]
        for k, t in concatenated_examples.items()
    }
    # 新增 labels 作為訓練時的標籤
    result["labels"] = result["input_ids"].copy()
    return result

In [None]:
# 將原本的tokenized_datasets 分chunk
tt_datasets = tokenized_datasets.map(group_texts, batched=True)
tt_datasets

Map:   0%|          | 0/651 [00:00<?, ? examples/s]

Dataset({
    features: ['input_ids', 'attention_mask', 'word_ids', 'labels'],
    num_rows: 1604
})

In [None]:
# 這邊抽了資料集的第二列來檢查分塊效果
tokenizer.decode(tt_datasets[1]["input_ids"])

'their control within a victim network. adversaries commonly attempt to mimic normal, expected traffic to avoid detection. there are many ways an adversary can establish command and control with various levels of stealth depending on the victim ’ s network structure and defenses. [SEP] [CLS] the adversary is trying to steal account names and passwords. credential access consists of techniques for stealing credentials like account names and passwords. techniques used to get credentials include keylogging or credential dumping. using legitimate credentials can give adversaries access to systems, make them harder to detect, and provide the opportunity to create more accounts to help achieve their goals. [SEP]'

### **Random MASK**

In [None]:
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm_probability=0.15) # 0.15 的token 將被MASK

samples = [tt_datasets[i] for i in range(2)]
for sample in samples:
    _ = sample.pop("word_ids")

for chunk in data_collator(samples)["input_ids"]:
    print(f"\n'>>> {tokenizer.decode(chunk)}'")

#[CLS]: Classification Token 放在輸入序列的開頭
#[SEP]: Separator Token 用在兩個句子分格或結束標記
#[MASK]:被隨機 mask 的 token


'>>> [CLS] the adversary is trying [MASK] gather data of interest to their goal. collection consists of techniques struckversaries may use to gather information and the sources information is collected from that are relevant to following through on the adversary's [MASK]. frequently, [MASK] next goal after [MASK] data [MASK] [MASK] steal [MASK] ex [MASK] [MASK] [MASK] ) the data theodore common target sources include various drive types, [MASK]s, audio, video, and email. common collection methods include capturing screenshots and keyboard input. [SEP] [CLS] [MASK] adversary dinner trying [MASK] communicate with compromised systems [MASK] control them. command and control consists [MASK] techniques that adversaries may use to communicate with systems 313'

'>>> their control within a victim network [MASK] adversaries [MASK] attempt [MASK] mimic normal, expected traffic to avoid detection [MASK] there are [MASK] ways [MASK] [MASK] can establish command and assure [MASK] various levels [

In [None]:
wwm_probability = 0.2

# 總之下面的程式碼在做這件事：為每個輸入樣本中的token 創建單詞到token index 的 mapping
# 隨機進行 mask
# 將 mask 的單詞替換成 [MASK]
# 傳出處理後的樣本可以做訓練了

def whole_word_masking_data_collator(features):
    for feature in features:
        word_ids = feature.pop("word_ids") # 用於確定每個token 屬於哪個單詞

        # Create a map between words and corresponding token indices
        mapping = collections.defaultdict(list)
        current_word_index = -1
        current_word = None
        for idx, word_id in enumerate(word_ids):
            if word_id is not None:
                if word_id != current_word:
                    current_word = word_id
                    current_word_index += 1
                mapping[current_word_index].append(idx)

        # Randomly mask words
        mask = np.random.binomial(1, wwm_probability, (len(mapping),)) #二項分布隨機決定哪些單詞會被mask
        input_ids = feature["input_ids"]
        labels = feature["labels"]
        new_labels = [-100] * len(labels) # 創建新標前列表，初始化-100表示忽略
        for word_id in np.where(mask)[0]:
            word_id = word_id.item()
            for idx in mapping[word_id]:
                new_labels[idx] = labels[idx]
                input_ids[idx] = tokenizer.mask_token_id # 將其對應的token 替換為 tokenizer.mask_token_id
        feature["labels"] = new_labels

    return default_data_collator(features)

In [None]:
samples = [tt_datasets[i] for i in range(2)]
batch = whole_word_masking_data_collator(samples)

for chunk in batch["input_ids"]:
    print(f"\n'>>> {tokenizer.decode(chunk)}'")


'>>> [CLS] [MASK] [MASK] is trying to gather data of interest to their goal. collection consists of techniques adversaries [MASK] use to gather information and the [MASK] [MASK] is collected from that are relevant to [MASK] [MASK] on the adversary's objectives. frequently, the next goal after [MASK] [MASK] is [MASK] steal ( exfiltrate ) the data [MASK] common target sources include various drive [MASK], browsers, audio, video [MASK] and [MASK]. common collection methods include [MASK] [MASK] [MASK] [MASK] [MASK] keyboard input. [SEP] [CLS] [MASK] adversary is [MASK] to communicate with compromised systems to control them. command and [MASK] consists of [MASK] that [MASK] [MASK] [MASK] [MASK] use to communicate [MASK] [MASK] under'

'>>> their control [MASK] a victim network. adversaries [MASK] [MASK] to mimic normal, expected traffic to avoid [MASK]. there are [MASK] ways an [MASK] [MASK] establish command [MASK] control with various [MASK] of stealth depending on the [MASK] [MASK] s 

## **Train**

In [None]:
train_size = 600
test_size = int(0.1 * train_size)
downsampled_dataset = tt_datasets.train_test_split(
    train_size=train_size, test_size=test_size, seed=42
)
downsampled_dataset

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'word_ids', 'labels'],
        num_rows: 600
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'word_ids', 'labels'],
        num_rows: 60
    })
})

In [None]:
batch_size = 64
# Show the training loss with every epoch
logging_steps = len(downsampled_dataset["train"]) // batch_size
model_name = model_checkpoint.split("/")[-1]

training_args = TrainingArguments(
    output_dir=f"{model_name}-finetuned-attact-dataset",
    overwrite_output_dir=True,
    eval_strategy="epoch",  # Updated parameter name
    learning_rate=2e-5,
    weight_decay=0.01,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    push_to_hub=False,
    fp16=True,
    logging_steps=logging_steps,
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=downsampled_dataset["train"],
    eval_dataset=downsampled_dataset["test"],
    data_collator=data_collator,
    tokenizer=tokenizer,
)

## **Perplexity**

In [None]:
eval_results = trainer.evaluate()
print(f">>> Before Training >> Perplexity: {math.exp(eval_results['eval_loss']):.2f}")
trainer.train()
trainer.save_model("./finetuned_with_ATT&CT_dataset_model")
tokenizer.save_pretrained("./finetuned_with_ATT&CT_dataset_model")
eval_results = trainer.evaluate()
print(f">>> After Training >> Perplexity: {math.exp(eval_results['eval_loss']):.2f}")

>>> Before Training >> Perplexity: 26.90


Epoch,Training Loss,Validation Loss
1,3.0131,2.672543
2,2.7029,2.447377
3,2.5558,2.508105


>>> After Training >> Perplexity: 10.63


In [None]:
model_before = AutoModelForMaskedLM.from_pretrained('distilbert-base-uncased')
model_after = AutoModelForMaskedLM.from_pretrained('./finetuned_with_ATT&CT_dataset_model')
tokenizer_before = AutoTokenizer.from_pretrained('distilbert-base-uncased')
tokenizer_after = AutoTokenizer.from_pretrained('./finetuned_with_ATT&CT_dataset_model')
fill_mask_before = pipeline("fill-mask", model=model_before, tokenizer=tokenizer_before)
fill_mask_after = pipeline("fill-mask", model=model_after, tokenizer=tokenizer_after)


In [None]:
def calculate_perplexity(model, tokenizer, input_ids):
    inputs = {'input_ids': input_ids.unsqueeze(0), 'attention_mask': torch.ones_like(input_ids.unsqueeze(0))}
    with torch.no_grad():
        outputs = model(**inputs, labels=inputs['input_ids'])
        loss = outputs.loss
        perplexity = torch.exp(loss).item()
    return perplexity

In [None]:
# 設定 chunk_size 和 wwm_probability
chunk_size = 128
wwm_probability = 0.15

# 加載數據
test_df = pd.read_excel("/content/mitre_test.xlsx")
test_dataset = Dataset.from_pandas(test_df)
print(test_dataset.column_names)

tokenized_test_datasets = test_dataset.map(
    tokenize_function, batched=True, remove_columns=["name", "description"]
)
print(tokenized_test_datasets.column_names)

chunked_test_datasets = tokenized_test_datasets.map(group_texts, batched=True)



# 此處轉換為列表以便於調試
chunked_test_datasets_list = [chunked_test_datasets[i] for i in range(len(chunked_test_datasets))]
masked_data = whole_word_masking_data_collator(chunked_test_datasets_list)


# 評估兩個模型的 perplexity 和填充結果
for chunk in masked_data["input_ids"]:
    masked_sentence = tokenizer.decode(chunk)
    print(f"Masked Sentence: {masked_sentence}")

    # 只在句子包含[MASK]時顯示填充結果
    if tokenizer.mask_token in masked_sentence:
        inputs = tokenizer(masked_sentence, return_tensors="pt")

        # Forward operation 獲得所有 token 的 logits (Before Training)
        token_logits_before = model_before(**inputs).logits
        mask_token_index_before = torch.where(inputs["input_ids"] == tokenizer.mask_token_id)[1]
        mask_token_logits_before = token_logits_before[0, mask_token_index_before, :]
        top_token_before = torch.topk(mask_token_logits_before, 1, dim=1).indices[0].item()
        filled_sentence_before = masked_sentence.replace(tokenizer.mask_token, tokenizer.decode([top_token_before]))
        print(f"Before Training >>> {filled_sentence_before}")

        # Forward operation 獲得所有 token 的 logits (After Training)
        token_logits_after = model_after(**inputs).logits
        mask_token_index_after = torch.where(inputs["input_ids"] == tokenizer.mask_token_id)[1]
        mask_token_logits_after = token_logits_after[0, mask_token_index_after, :]
        top_token_after = torch.topk(mask_token_logits_after, 1, dim=1).indices[0].item()
        filled_sentence_after = masked_sentence.replace(tokenizer.mask_token, tokenizer.decode([top_token_after]))
        print(f"After Training >>> {filled_sentence_after}")
    else:
        print("No [MASK] token found in the sentence, skipping fill-mask results.")

    perplexity_before = calculate_perplexity(model_before, tokenizer, chunk)
    perplexity_after = calculate_perplexity(model_after, tokenizer, chunk)
    print(f"Before Training Perplexity: {perplexity_before:.2f}")
    print(f"After Training Perplexity: {perplexity_after:.2f}")


['name', 'description']


Map:   0%|          | 0/120 [00:00<?, ? examples/s]

True
['input_ids', 'attention_mask', 'word_ids']


Map:   0%|          | 0/120 [00:00<?, ? examples/s]

Masked Sentence: [CLS] monitor [MASK] commands and arguments that may [MASK] [MASK] [MASK] [MASK] mechanisms designed to [MASK] elevate privileges to gain higher - level permissions. [SEP] [CLS] monitor the file system for files that have the setuid or setgid [MASK] [MASK]. on linux, auditd can alert every [MASK] a user's actual id [MASK] effective [MASK] [MASK] different [MASK] this is what happens when you sudo ). [SEP] [CLS] on linux, auditd [MASK] alert every time a user's actual id and [MASK] id are different ( this is what [MASK] when you sudo ). this technique is abusing normal functionality in macos and linux [MASK], but sud
Before Training >>> [CLS] monitor contains commands and arguments that may contains contains contains contains mechanisms designed to contains elevate privileges to gain higher - level permissions. [SEP] [CLS] monitor the file system for files that have the setuid or setgid contains contains. on linux, auditd can alert every contains a user's actual id cont

## **Fine-tuning DistilBERT with Accelerate**

In [None]:
# 插入隨機 mask
def insert_random_mask(batch):
    features = [dict(zip(batch, t)) for t in zip(*batch.values())]
    masked_inputs = data_collator(features)
    # Create a new "masked" column for each column in the dataset
    return {"masked_" + k: v.numpy() for k, v in masked_inputs.items()}

In [None]:
downsampled_dataset = downsampled_dataset.remove_columns(["word_ids"])
eval_dataset = downsampled_dataset["test"].map(
    insert_random_mask,
    batched=True,
    remove_columns=downsampled_dataset["test"].column_names,
)
eval_dataset = eval_dataset.rename_columns(
    {
        "masked_input_ids": "input_ids",
        "masked_attention_mask": "attention_mask",
        "masked_labels": "labels",
    }
)

Map:   0%|          | 0/60 [00:00<?, ? examples/s]

In [None]:
batch_size = 64
train_dataloader = DataLoader(
    downsampled_dataset["train"],
    shuffle=True,
    batch_size=batch_size,
    collate_fn=data_collator,
)
eval_dataloader = DataLoader(
    eval_dataset, batch_size=batch_size, collate_fn=default_data_collator
)

In [None]:
optimizer = AdamW(model.parameters(), lr=5e-5)
accelerator = Accelerator()
model, optimizer, train_dataloader, eval_dataloader = accelerator.prepare(
    model, optimizer, train_dataloader, eval_dataloader
)
num_train_epochs = 3
num_update_steps_per_epoch = len(train_dataloader)
num_training_steps = num_train_epochs * num_update_steps_per_epoch

lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
)
progress_bar = tqdm(range(num_training_steps))


  0%|          | 0/30 [00:00<?, ?it/s]

In [None]:
for epoch in range(num_train_epochs):
    # Training
    model.train()
    for batch in train_dataloader:
        outputs = model(**batch)
        loss = outputs.loss
        accelerator.backward(loss)

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)

    # Evaluation
    model.eval()
    losses = []
    for step, batch in enumerate(eval_dataloader):
        with torch.no_grad():
            outputs = model(**batch)

        loss = outputs.loss
        losses.append(accelerator.gather(loss.repeat(batch_size)))

    losses = torch.cat(losses)
    losses = losses[: len(eval_dataset)]
    try:
        perplexity = math.exp(torch.mean(losses))
    except OverflowError:
        perplexity = float("inf")

    print(f">>> Epoch {epoch}: Perplexity: {perplexity}")

    # Save and upload
    accelerator.wait_for_everyone()
    unwrapped_model = accelerator.unwrap_model(model)
    output_dir = "./finetuned_with_accelerator_model"

    # 保存模型和标记器
    unwrapped_model.save_pretrained(output_dir, save_function=accelerator.save)
    tokenizer.save_pretrained(output_dir)


>>> Epoch 0: Perplexity: 9.170910368352175
>>> Epoch 1: Perplexity: 8.681135714726171
>>> Epoch 2: Perplexity: 8.39571776934687


In [None]:
mask_filler = pipeline("fill-mask", model=output_dir, tokenizer=output_dir)

# 使用 pipeline
result = mask_filler("This is a [MASK] example.")
print(result)

[{'score': 0.14254061877727509, 'token': 2691, 'token_str': 'common', 'sequence': 'this is a common example.'}, {'score': 0.11698265373706818, 'token': 5171, 'token_str': 'typical', 'sequence': 'this is a typical example.'}, {'score': 0.102818064391613, 'token': 3722, 'token_str': 'simple', 'sequence': 'this is a simple example.'}, {'score': 0.0933183953166008, 'token': 2204, 'token_str': 'good', 'sequence': 'this is a good example.'}, {'score': 0.06118622049689293, 'token': 4438, 'token_str': 'classic', 'sequence': 'this is a classic example.'}]


In [None]:
preds = mask_filler(text)

for pred in preds:
    print(f">>> {pred['sequence']}")

>>> this is a great deal.
>>> this is a great adventure.
>>> this is a great idea.
>>> this is a great success.
>>> this is a great mistake.


## **Downstream Task Test**

* Now you should have two models, one is the original one downloaded from the HuggingFace, the other one is a fine-tuned one.

* Let's try a downstream task to see if the classification rate changes after your fine-tuned model learns some additional cybersecurity knowledge.

* In the example of 'Fine-tuning a masked language model', its 'Using our fine-tuned model' tests the now model with a "fill-mask" pipeline.

* In "Transformers, what can they do?" (https://huggingface.co/learn/nlp-course/en/chapter1/3), there are severl piplelines. Lets try 'Zero-shot classification'.

* Please prepare severl sentences (> 100) from the website (not from the downloaded xlsx files) as your testing examples.

* Feed these sentences into the original model and your fine-tuned model, and ask them which 'tactics' and 'techniques' this sentence belongs to?

* Show us the classification rate of 'tactics' and 'techniques' increase (or not) if fine-tuned model is used.

* Show us some examples that they really changes label of 'tactics' or 'techniques' when new model is used.

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments

label_encoder = LabelEncoder()
tactics_techniques_df["label"] = label_encoder.fit_transform(tactics_techniques_df["name"])

train_df, eval_df = train_test_split(tactics_techniques_df, test_size=0.2, random_state=42)
train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(eval_df)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["description"], padding="max_length", truncation=True)


In [None]:
print(train_dataset.column_names)
print(eval_dataset.column_names)

['name', 'description', 'label', '__index_level_0__']
['name', 'description', 'label', '__index_level_0__']


In [None]:

train_dataset = train_dataset.map(tokenize_function, batched=True)
eval_dataset = eval_dataset.map(tokenize_function, batched=True)

print(train_dataset.column_names)
print(eval_dataset.column_names)

Map:   0%|          | 0/520 [00:00<?, ? examples/s]

Map:   0%|          | 0/131 [00:00<?, ? examples/s]

['name', 'description', 'label', '__index_level_0__', 'input_ids', 'attention_mask']
['name', 'description', 'label', '__index_level_0__', 'input_ids', 'attention_mask']


In [None]:


train_dataset = train_dataset.remove_columns([col for col in train_dataset.column_names if col not in ["input_ids", "attention_mask", "label"]])
eval_dataset = eval_dataset.remove_columns([col for col in eval_dataset.column_names if col not in ["input_ids", "attention_mask", "label"]])


train_dataset.set_format("torch")
eval_dataset.set_format("torch")


model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=len(tactics_techniques_df["name"].unique()))

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)

trainer.train()

# 保存模型
zero_shot_dir = "./finetuned_zero_shot_model"
trainer.save_model(zero_shot_dir)
tokenizer.save_pretrained(zero_shot_dir)

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss
1,No log,6.496511
2,No log,6.518279
3,No log,6.527493


('./finetuned_zero_shot_model/tokenizer_config.json',
 './finetuned_zero_shot_model/special_tokens_map.json',
 './finetuned_zero_shot_model/vocab.txt',
 './finetuned_zero_shot_model/added_tokens.json',
 './finetuned_zero_shot_model/tokenizer.json')



---



In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments


In [None]:
# 原模型與 finetuned 過後的模型檢查點
original_model_checkpoint = "distilbert-base-uncased"
fine_tuned_model_checkpoint = zero_shot_dir
fine_tuned_fill_mask_model_checkpoint = output_dir

# 加載原始模型與 tokenizer
original_tokenizer = AutoTokenizer.from_pretrained(original_model_checkpoint)
original_model = AutoModelForSequenceClassification.from_pretrained(original_model_checkpoint)

# 加載 finetuned 過的模型與 tokenizer
fine_tuned_tokenizer = AutoTokenizer.from_pretrained(fine_tuned_model_checkpoint)
fine_tuned_model = AutoModelForSequenceClassification.from_pretrained(fine_tuned_model_checkpoint)

# 加載 finetuned 過的模型(fill_mask的方式)與 tokenizer
fine_tuned_fill_mask_tokenizer = AutoTokenizer.from_pretrained(fine_tuned_fill_mask_model_checkpoint)
fine_tuned_fill_mask_model = AutoModelForSequenceClassification.from_pretrained(fine_tuned_fill_mask_model_checkpoint)

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at ./finetuned_with_accelerator_model and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# 創建 zero-shot-classification pipeline
original_classifier = pipeline("zero-shot-classification", model=original_model, tokenizer=original_tokenizer)

fine_tuned_classifier = pipeline("zero-shot-classification", model=fine_tuned_model, tokenizer=fine_tuned_tokenizer)

fine_tuned_fill_mask_classifier = pipeline("zero-shot-classification", model=fine_tuned_fill_mask_model, tokenizer=fine_tuned_fill_mask_tokenizer)

Failed to determine 'entailment' label id from the label2id mapping in the model config. Setting to -1. Define a descriptive label2id mapping in the model config to ensure correct outputs.
Failed to determine 'entailment' label id from the label2id mapping in the model config. Setting to -1. Define a descriptive label2id mapping in the model config to ensure correct outputs.
Failed to determine 'entailment' label id from the label2id mapping in the model config. Setting to -1. Define a descriptive label2id mapping in the model config to ensure correct outputs.


In [None]:

# 將測試集中所有tactic, technique的name 加入list, 成為 zero_shot的label
labels = test_df["name"].unique().tolist()

# 分類函數與比較結果
def classify_and_compare(test_df, labels, original_classifier, fine_tuned_classifier, limit=100):
    results = []
    for i, row in test_df.iterrows():
        if i >= limit:
              break
        sentence = row["description"]
        true_label = row["name"]

        original_result = original_classifier(sentence, candidate_labels=labels)
        fine_tuned_result = fine_tuned_classifier(sentence, candidate_labels=labels)
        fine_tuned_fill_mask_result = fine_tuned_fill_mask_classifier(sentence, candidate_labels=labels)

        results.append({
            "sentence": sentence,
            "true_label": true_label,
            "original_result": original_result,
            "fine_tuned_result": fine_tuned_result,
            "fine_tuned_fill_mask_result": fine_tuned_fill_mask_result
        })

        print(f"Sentence: {sentence}")
        print(f"True Label: {true_label}")
        print(f"Original Classification: {original_result['labels'][0]}")
        print(f"Fine-tuned Classification: {fine_tuned_result['labels'][0]}")
        print(f"Fine-tuned fill_mask Classification: {fine_tuned_fill_mask_result['labels'][0]}")
        print("-" * 50)

    return results

# 進行分類
results = classify_and_compare(test_df, labels, original_classifier, fine_tuned_classifier, limit=100)


Sentence: Monitor executed commands and arguments that may circumvent mechanisms designed to control elevate privileges to gain higher-level permissions.
True Label: Abuse Elevation Control Mechanism
Original Classification: Account Access Removal
Fine-tuned Classification: Abuse Elevation Control Mechanism
Fine-tuned fill_mask Classification: Phishing
--------------------------------------------------
Sentence: Monitor the file system for files that have the setuid or setgid bits set. On Linux, auditd can alert every time a user's actual ID and effective ID are different (this is what happens when you sudo).
True Label: Abuse Elevation Control Mechanism
Original Classification: Phishing for Information
Fine-tuned Classification: Obtain Capabilities
Fine-tuned fill_mask Classification: Data Destruction
--------------------------------------------------


In [None]:
def calculate_accuracy(results):
    correct_original = 0
    correct_fine_tuned = 0
    correct_fine_tuned_fill_mask = 0

    for result in results:
        true_label = result["true_label"]
        original_label = result["original_result"]["labels"][0]
        fine_tuned_label = result["fine_tuned_result"]["labels"][0]
        fine_tuned_fill_mask_label = result["fine_tuned_fill_mask_result"]["labels"][0]

        if original_label == true_label:
            correct_original += 1
        if fine_tuned_label == true_label:
            correct_fine_tuned += 1
        if fine_tuned_fill_mask_label == true_label:
            correct_fine_tuned_fill_mask += 1

    total = len(results)
    original_accuracy = correct_original / total
    fine_tuned_accuracy = correct_fine_tuned / total
    fine_tuned_fill_mask_accuracy = correct_fine_tuned_fill_mask / total

    return original_accuracy, fine_tuned_accuracy, fine_tuned_fill_mask_accuracy

original_accuracy, fine_tuned_accuracy, fine_tuned_fill_mask_accuracy = calculate_accuracy(results)
print(f"Original Model Accuracy: {original_accuracy}")
print(f"Fine-tuned Model Accuracy: {fine_tuned_accuracy}")
print(f"Fine-tuned fill_mask Model Accuracy: {fine_tuned_fill_mask_accuracy}")