In [18]:
from pathlib import Path
from sklearn.model_selection import train_test_split
import torch
import csv
from transformers import Trainer, TrainingArguments, AdamW
from transformers.optimization import get_linear_schedule_with_warmup
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import argparse
import os
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from nltk import pos_tag
import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('punkt')
nltk.download('punkt_tab')

In [19]:
class CVSSDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

def read_cvss_txt(split_dir, list_classes):
    """
    Reads a directory structure and returns texts and labels.
    Assumes directories named with class labels (e.g., LOW, HIGH).
    """
    split_dir = Path(split_dir)
    texts = []
    labels = []
    for label_dir in ["LOW", "HIGH"]:
        for text_file in (split_dir/label_dir).iterdir():
            texts.append(text_file.read_text())
            for i in range(len(list_classes)):
                if list_classes[i] == label_dir:
                    labels.append(i)
                else:
                    continue

    return texts, labels

def read_cvss_csv(file_name, num_label, list_classes):
    """
    Reads a CSV file containing texts and labels, and returns the texts and corresponding integer labels.
    This function handles UTF-8 encoding to avoid issues with non-ASCII characters.
    """
    texts = []
    labels = []

    # Use 'with open' to ensure the file is properly closed after reading
    with open(file_name, 'r', encoding='utf-8') as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',', quotechar='"')
        
        # Skip header row if it exists
        next(csv_reader, None)  # This will skip the header, if present
        
        for row in csv_reader:
            texts.append(row[0])  # Assuming the first column is the text
            for i in range(len(list_classes)):
                if list_classes[i] == row[num_label]:  # Match the label with classes
                    labels.append(i)
                    break  # Exit the loop once a match is found

    return texts, labels


In [20]:
def select_tokenizer_model(model_name, extra_tokens, token_file, num_labels):
    global lemmatization

    print("### Selecting Model and Tokenizer")

    if model_name == 'distilbert':
        from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, DistilBertConfig
        config = DistilBertConfig.from_pretrained('distilbert-base-cased')
        config.num_labels = num_labels
        tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-cased')
        model = DistilBertForSequenceClassification(config)
    
    elif model_name == 'bert':
        from transformers import BertTokenizerFast, BertForSequenceClassification, BertConfig
        config = BertConfig.from_pretrained('bert-base-uncased')
        config.num_labels = num_labels
        tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
        model = BertForSequenceClassification(config)

    elif model_name == 'deberta':
        from transformers import DebertaConfig, DebertaTokenizerFast, DebertaForSequenceClassification
        config = DebertaConfig.from_pretrained('microsoft/deberta-base')
        config.num_labels = num_labels
        tokenizer = DebertaTokenizerFast.from_pretrained('microsoft/deberta-base')
        model = DebertaForSequenceClassification(config)

    elif model_name == 'albert':
        from transformers import AlbertConfig, AlbertTokenizerFast, AlbertForSequenceClassification
        config = AlbertConfig.from_pretrained('albert-base-v1')
        config.num_labels = num_labels
        tokenizer = AlbertTokenizerFast.from_pretrained('albert-base-v1')
        model = AlbertForSequenceClassification(config)

    elif model_name == 'roberta':
        from transformers import RobertaConfig, RobertaTokenizerFast, RobertaForSequenceClassification
        config = RobertaConfig.from_pretrained('roberta-base')
        config.num_labels = num_labels
        tokenizer = RobertaTokenizerFast.from_pretrained('roberta-base')
        model = RobertaForSequenceClassification(config)

    ### Add Tokens
    if extra_tokens:
        add_tokens_from_file(token_file, tokenizer, lemmatization)
    number_tokens = len(tokenizer)

    print("### Number of tokens in Tokenizer")
    print(number_tokens)

    # print("### Configuration")
    # print(model.config)

    model.resize_token_embeddings(number_tokens) 
    
    return tokenizer, model

def add_tokens_from_file(token_file, tokenizer, lemmatize=False):
    print("### Adding Tokens")
    
    file_      = open(token_file, 'r')
    token_list = []
    
    for line in file_:
        if lemmatize:
            token_list.append(lemmatize_noun(line.rstrip("\n")))
        else:
            token_list.append(line.rstrip("\n"))
    file_.close()
    tokenizer.add_tokens(token_list)

In [21]:
def get_wordnet_pos(word):
    """Map POS tag to first character lemmatize() accepts"""
    tag = pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}

    return tag_dict.get(tag, wordnet.NOUN)

def lemmatize_sentence(sentence):
    word_list = word_tokenize(sentence)
    # lemmatized_output = ' '.join([lemmatize_word(w) for w in word_list]) # ALL LEMMATIZATION
    lemmatized_output = ' '.join([lemmatize_noun(w) for w in word_list]) # NOUN LEMMATIZATION (OLD)

    return lemmatized_output

def lemmatize(train_texts, test_texts=None):
    ### Lemmatize Sentences
    lemmatized_texts_train = []
    lemmatized_texts_test  = []
    for text in train_texts:
        lemmatized_texts_train.append(lemmatize_sentence(text))
    if test_texts is not None:
        for text in test_texts:
            lemmatized_texts_test.append(lemmatize_sentence(text))

    return lemmatized_texts_train, lemmatized_texts_test

def lemmatize_word(word):
    lemmatizer = WordNetLemmatizer()
    pos_tag = get_wordnet_pos(word)
    word_lemmatized = lemmatizer.lemmatize(word, pos_tag)

    if pos_tag == "r" or pos_tag == "R":
        try:
            lemmas = wordnet.synset(word+'.r.1').lemmas()
            pertainyms = lemmas[0].pertainyms()
            name = pertainyms[0].name()
            return name
        except Exception:
            return word_lemmatized
    else:
        return word_lemmatized

def lemmatize_noun(word):
    lemmatizer = WordNetLemmatizer()
    word_lemmatized = lemmatizer.lemmatize(word)

    return word_lemmatized



In [23]:
def get_pred_accuracy(target, output):
    output = output.argmax(axis=1) # -> multi label

    tot_right = np.sum(target == output)
    tot = target.size

    return (tot_right/tot) * 100

def get_binary_mean_accuracy(target, output):
    eps = 1e-20
    output = output.argmax(axis=1)

    # TP + FN
    gt_pos = np.sum((target == 1), axis=0).astype(float)
    # TN + FP
    gt_neg = np.sum((target == 0), axis=0).astype(float)
    # TP
    true_pos = np.sum((target == 1) * (output == 1), axis=0).astype(float)
    # TN
    true_neg = np.sum((target == 0) * (output == 0), axis=0).astype(float)

    label_pos_recall = 1.0 * true_pos / (gt_pos + eps)  # true positive
    label_neg_recall = 1.0 * true_neg / (gt_neg + eps)  # true negative
    
    # mean accuracy
    return (label_pos_recall + label_neg_recall) / 2

def get_evaluation_metrics(target, output, num_labels):
    accuracy      = get_pred_accuracy(target, output, num_labels)
    precision     = get_precision(target, output)
    recall        = get_recall(target, output)
    f1_score      = get_f1_score(target, output)

    return accuracy, precision, recall, f1_score

def infer(trainer, test_loader, num_labels):
    predicts   = trainer.predict(test_loader)
    soft       = torch.nn.Softmax(dim=1)
    pred_probs = torch.from_numpy(predicts.predictions)
    pred_probs = soft(pred_probs).numpy()
    gt_list    = predicts.label_ids

    return get_pred_accuracy(gt_list, pred_probs)

In [24]:
# Daftar kombinasi variabel untuk setiap kategori
categories = [
    {
        "name": "attackVector",
        "num_labels": 4,
        "classes_names": ['NETWORK', 'LOCAL', 'PHYSICAL', 'ADJACENT_NETWORK'],
        "label_position": 1,
        "output_dir": 'output/attackVector'
    },
    {
        "name": "attackComplexity",
        "num_labels": 2,
        "classes_names": ['LOW', 'HIGH'],
        "label_position": 2,
        "output_dir": 'output/attackComplexity'
    },
    {
        "name": "privilegeReq",
        "num_labels": 3,
        "classes_names": ['NONE', 'LOW', 'HIGH'],
        "label_position": 3,
        "output_dir": 'output/privilegeReq'
    },
    {
        "name": "userInteraction",
        "num_labels": 2,
        "classes_names": ['NONE', 'REQUIRED'],
        "label_position": 4,
        "output_dir": 'output/userInteraction'
    },
    {
        "name": "scope",
        "num_labels": 2,
        "classes_names": ['UNCHANGED', 'CHANGED'],
        "label_position": 5,
        "output_dir": 'output/scope'
    },
    {
        "name": "confidentiality",
        "num_labels": 3,
        "classes_names": ['NONE', 'LOW', 'HIGH'],
        "label_position": 6,
        "output_dir": 'output/confidentiality'
    },
    {
        "name": "integrity",
        "num_labels": 3,
        "classes_names": ['NONE', 'LOW', 'HIGH'],
        "label_position": 7,
        "output_dir": 'output/integrity'
    },
    {
        "name": "availability",
        "num_labels": 3,
        "classes_names": ['NONE', 'LOW', 'HIGH'],
        "label_position": 8,
        "output_dir": 'output/availability'
    }
]

In [None]:
def main():
    global lemmatization

    # variables
    model_name = 'distilbert'
    extra_tokens = True  # Menggunakan ekstra token
    token_file = 'vocab/CVSS_5k.vocab'  # File token
    lemmatization = True  # Menggunakan lemmatization

    # Parameter untuk tuning
    train_batch_size = 8  # Ukuran batch untuk training
    test_batch_size = 4  # Ukuran batch untuk testing
    epochs = 3  # Jumlah epoch
    learning_rate = 5e-5  # Learning rate
    weight_decay = 0  # Weight decay
    warmup_steps = 0  # Jumlah warmup steps
    warmup_ratio = 0  # Warmup ratio

    # Periksa ketersediaan GPU
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print("### Device: ", device)
    if torch.cuda.is_available():
        devName = torch.cuda.get_device_name(0)
        print(f"GPU name is {devName}")

 # Loop untuk setiap kategori
    for category in categories:
        print(f"\n### Training model for {category['name']}")

        # Directories and variables for the current category
        output_dir = category["output_dir"]
        num_labels = category["num_labels"]
        classes_names = category["classes_names"]
        label_position = category["label_position"]

        # Buat output directory jika belum ada
        os.makedirs(output_dir, exist_ok=True)

        # Select Model
        tokenizer, model = select_tokenizer_model(model_name, extra_tokens=extra_tokens, token_file=token_file, num_labels=num_labels)

        # Splitting Dataset
        print("### Splitting Dataset")

        train_texts, train_labels = read_cvss_csv(f'data/train.csv', label_position, classes_names)
        test_texts, test_labels = read_cvss_csv(f'data/test.csv', label_position, classes_names)

        # Lemmatize Sentences
        if lemmatization:
            print("### Lemmatizing Sentences")
            lemmatized_train, lemmatized_test = lemmatize(train_texts, test_texts)

        # Tokenize Sentences
        print("### Tokenizing Sentences")

        if lemmatization:
            train_encodings = tokenizer(lemmatized_train, truncation=True, padding=True)
            test_encodings = tokenizer(lemmatized_test, truncation=True, padding=True)
        else:
            train_encodings = tokenizer(train_texts, truncation=True, padding=True)
            test_encodings = tokenizer(test_texts, truncation=True, padding=True)

        # Dataset Encodings
        print("### Encoding Dataset")

        train_dataset = CVSSDataset(train_encodings, train_labels)
        test_dataset = CVSSDataset(test_encodings, test_labels)

        # Training
        print("### Training")

        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=epochs,
            per_device_train_batch_size=train_batch_size,
            per_device_eval_batch_size=test_batch_size,
            learning_rate=learning_rate,
            save_strategy="epoch",
            weight_decay=weight_decay,
            warmup_steps=warmup_steps,
            warmup_ratio=warmup_ratio,
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=test_dataset,
        )

        trainer.train()
        trainer.save_model()
        acc = infer(trainer, test_dataset, num_labels)
        print(f"Accuracy for {category['name']} = {acc:.6f}")

if __name__ == '__main__':
    main()