In [25]:
import os
import json
import torch
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    DistilBertTokenizerFast,
    DistilBertForSequenceClassification,
    Trainer,
    TrainingArguments
)

from datasets import Dataset
import torch.nn.functional as F
import accelerate
print(accelerate.__version__)

1.8.1


In [2]:
os.environ["WANDB_DISABLED"] = "true"

In [3]:
data = []
with open('opencti_data.json') as f:
    for line in f:
        data.append(json.loads(line))

df = pd.DataFrame(data)

In [4]:
def score_to_label(score):
    if score >= 80:
        return "high"
    elif score >= 50:
        return "medium"
    else:
        return "low"

df['label'] = df['x_opencti_score'].apply(score_to_label)

label_encoder = LabelEncoder()
df['label_id'] = label_encoder.fit_transform(df['label'])

In [5]:
train_df, test_df = train_test_split(
    df[['pattern', 'label_id']],
    test_size=0.2,
    random_state=42
)

## **DistilBert**

In [6]:
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

def tokenize(batch):
    return tokenizer(batch['pattern'], padding='max_length', truncation=True, max_length=64)

# Ensure no numpy copy issue
train_df = train_df.copy()
test_df = test_df.copy()

train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df.reset_index(drop=True))

train_dataset = train_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

train_dataset = train_dataset.rename_column("label_id", "labels")
test_dataset = test_dataset.rename_column("label_id", "labels")

train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])


Map: 100%|██████████| 4000/4000 [00:00<00:00, 17804.22 examples/s]
Map: 100%|██████████| 1000/1000 [00:00<00:00, 20962.09 examples/s]


In [7]:
model = DistilBertForSequenceClassification.from_pretrained(
    'distilbert-base-uncased',
    num_labels=3
)

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [19]:
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=1e-5,
    weight_decay=0.01,
    logging_steps=50,
    do_train=True,
    do_eval=True,
    fp16=torch.cuda.is_available()  
)

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


In [20]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

In [21]:
trainer.train()

Step,Training Loss
50,0.329
100,0.3308
150,0.3552
200,0.2887
250,0.3054
300,0.3724
350,0.3886
400,0.2846
450,0.2185
500,0.2262


TrainOutput(global_step=500, training_loss=0.3099439926147461, metrics={'train_runtime': 381.7386, 'train_samples_per_second': 10.478, 'train_steps_per_second': 1.31, 'total_flos': 66234880512000.0, 'train_loss': 0.3099439926147461, 'epoch': 1.0})

In [22]:
metrics = trainer.evaluate()
print("Evaluation Results:", metrics)

Evaluation Results: {'eval_loss': 0.3761419653892517, 'eval_runtime': 18.3522, 'eval_samples_per_second': 54.489, 'eval_steps_per_second': 6.811, 'epoch': 1.0}


In [23]:
def predict_score_class(pattern):
    model.eval()
    inputs = tokenizer(pattern, return_tensors="pt", truncation=True, padding='max_length', max_length=64)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = F.softmax(outputs.logits, dim=1)
        predicted_class = torch.argmax(probs, dim=1).item()
        confidence = probs[0][predicted_class].item()
    label = label_encoder.inverse_transform([predicted_class])[0]
    return {
        "label": label,
        "confidence": round(confidence, 4)
    }

In [24]:
test_input = "[file:hashes.'SHA-256' = 'f3320995cef3916f9d0d6d8ac9d9ca55030f946a5e32a1e0bbef25d2dc00d038']"
result = predict_score_class(test_input)
print("Predicted:", result)

Predicted: {'label': 'high', 'confidence': 0.9757}


## **SecBERT**

In [27]:
tokenizer = AutoTokenizer.from_pretrained("jackaduma/SecBERT")

def tokenize(batch):
    return tokenizer(batch['pattern'], padding='max_length', truncation=True, max_length=64)

train_df = train_df.copy()
test_df = test_df.copy()

train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df.reset_index(drop=True))

train_dataset = train_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

train_dataset = train_dataset.rename_column("label_id", "labels")
test_dataset = test_dataset.rename_column("label_id", "labels")

train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

# ===============================
# Load SecBERT Model
# ===============================
model = AutoModelForSequenceClassification.from_pretrained(
    "jackaduma/SecBERT",
    num_labels=3
)

# ===============================
# TrainingArguments
# ===============================
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=1e-5,
    weight_decay=0.01,
    logging_steps=50,
    do_train=True,
    do_eval=True,
    fp16=torch.cuda.is_available()
)

# ===============================
# Train
# ===============================
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

trainer.train()


Map: 100%|██████████| 4000/4000 [00:00<00:00, 16966.58 examples/s]
Map: 100%|██████████| 1000/1000 [00:00<00:00, 20429.23 examples/s]
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at jackaduma/SecBERT and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Step,Training Loss
50,0.5174
100,0.3837
150,0.4223
200,0.3333
250,0.3694
300,0.3732
350,0.433
400,0.3099
450,0.2332
500,0.2422


TrainOutput(global_step=500, training_loss=0.36177643394470216, metrics={'train_runtime': 478.1838, 'train_samples_per_second': 8.365, 'train_steps_per_second': 1.046, 'total_flos': 66234880512000.0, 'train_loss': 0.36177643394470216, 'epoch': 1.0})

In [28]:
metrics = trainer.evaluate()
print("📊 Evaluation Results:", metrics)

📊 Evaluation Results: {'eval_loss': 0.4111956059932709, 'eval_runtime': 18.4829, 'eval_samples_per_second': 54.104, 'eval_steps_per_second': 6.763, 'epoch': 1.0}


In [29]:
# ===============================
#  Inference Function
# ===============================
def predict_score_class(pattern):
    model.eval()
    inputs = tokenizer(pattern, return_tensors="pt", truncation=True, padding='max_length', max_length=64)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = F.softmax(outputs.logits, dim=1)
        predicted_class = torch.argmax(probs, dim=1).item()
        confidence = probs[0][predicted_class].item()
    label = label_encoder.inverse_transform([predicted_class])[0]
    return {
        "label": label,
        "confidence": round(confidence, 4)
    }

# ===============================
# Test Inference
# ===============================
test_input = "[file:hashes.'SHA-256' = 'f3320995cef3916f9d0d6d8ac9d9ca55030f946a5e32a1e0bbef25d2dc00d038']"
result = predict_score_class(test_input)
print("🔐 Predicted:", result)

🔐 Predicted: {'label': 'high', 'confidence': 0.9891}


## **SecureBERT**

In [None]:
from transformers import AutoTokenizer


tokenizer = AutoTokenizer.from_pretrained("ehsanaghaei/SecureBERT")

tokenizer.pad_token = tokenizer.eos_token 

def tokenize(batch):
    return tokenizer(batch['pattern'], padding='max_length', truncation=True, max_length=64)

train_df = train_df.copy()
test_df = test_df.copy()

train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df.reset_index(drop=True))

train_dataset = train_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

train_dataset = train_dataset.rename_column("label_id", "labels")
test_dataset = test_dataset.rename_column("label_id", "labels")

train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Map: 100%|██████████| 4000/4000 [00:00<00:00, 24623.53 examples/s]
Map: 100%|██████████| 1000/1000 [00:00<00:00, 27114.25 examples/s]


In [27]:
model = AutoModelForSequenceClassification.from_pretrained(
    "ehsanaghaei/SecureBERT",
    num_labels=3
)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at ehsanaghaei/SecureBERT and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [30]:
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=1e-5,
    weight_decay=0.01,
    logging_steps=50,
    do_train=True,
    do_eval=True,
    fp16=torch.cuda.is_available()
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

trainer.train()

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Step,Training Loss
50,0.3397
100,0.326
150,0.3513
200,0.3045
250,0.3152
300,0.3515
350,0.4175
400,0.2981
450,0.2307
500,0.2438


TrainOutput(global_step=500, training_loss=0.3178237724304199, metrics={'train_runtime': 483.4089, 'train_samples_per_second': 8.275, 'train_steps_per_second': 1.034, 'total_flos': 66234880512000.0, 'train_loss': 0.3178237724304199, 'epoch': 1.0})

In [31]:
metrics = trainer.evaluate()
print("📊 Evaluation Results:", metrics)

📊 Evaluation Results: {'eval_loss': 0.4202795922756195, 'eval_runtime': 18.2938, 'eval_samples_per_second': 54.663, 'eval_steps_per_second': 6.833, 'epoch': 1.0}


In [32]:
def predict_score_class(pattern):
    model.eval()
    inputs = tokenizer(pattern, return_tensors="pt", truncation=True, padding='max_length', max_length=64)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = F.softmax(outputs.logits, dim=1)
        predicted_class = torch.argmax(probs, dim=1).item()
        confidence = probs[0][predicted_class].item()
    label = label_encoder.inverse_transform([predicted_class])[0]
    return {
        "label": label,
        "confidence": round(confidence, 4)
    }

test_input = "[file:hashes.'SHA-256' = 'f3320995cef3916f9d0d6d8ac9d9ca55030f946a5e32a1e0bbef25d2dc00d038']"
result = predict_score_class(test_input)
print("🔐 Predicted:", result)

🔐 Predicted: {'label': 'high', 'confidence': 0.9912}


## **CTI-BERT**

In [34]:
tokenizer = AutoTokenizer.from_pretrained("ibm-research/CTI-BERT")

def tokenize(batch):
    return tokenizer(batch['pattern'], padding='max_length', truncation=True, max_length=64)

train_df = train_df.copy()
test_df = test_df.copy()

train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df.reset_index(drop=True))

train_dataset = train_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)

train_dataset = train_dataset.rename_column("label_id", "labels")
test_dataset = test_dataset.rename_column("label_id", "labels")

train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

model = AutoModelForSequenceClassification.from_pretrained(
    "ibm-research/CTI-BERT",
    num_labels=3
)

training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=1e-5,
    weight_decay=0.01,
    logging_steps=50,
    do_train=True,
    do_eval=True,
    fp16=torch.cuda.is_available()  
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

trainer.train()

Map: 100%|██████████| 4000/4000 [00:00<00:00, 18266.46 examples/s]
Map: 100%|██████████| 1000/1000 [00:00<00:00, 18944.46 examples/s]
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at ibm-research/CTI-BERT and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


Step,Training Loss
50,0.5094
100,0.3813
150,0.4311
200,0.3218
250,0.3609
300,0.3773
350,0.4272
400,0.3148
450,0.2368
500,0.254


TrainOutput(global_step=500, training_loss=0.3614722537994385, metrics={'train_runtime': 875.6251, 'train_samples_per_second': 4.568, 'train_steps_per_second': 0.571, 'total_flos': 131556708864000.0, 'train_loss': 0.3614722537994385, 'epoch': 1.0})

In [35]:
metrics = trainer.evaluate()
print("Evaluation Results:", metrics)

Evaluation Results: {'eval_loss': 0.4186094403266907, 'eval_runtime': 42.1344, 'eval_samples_per_second': 23.734, 'eval_steps_per_second': 2.967, 'epoch': 1.0}


In [36]:
def predict_score_class(pattern):
    model.eval()
    inputs = tokenizer(pattern, return_tensors="pt", truncation=True, padding='max_length', max_length=64)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = F.softmax(outputs.logits, dim=1)
        predicted_class = torch.argmax(probs, dim=1).item()
        confidence = probs[0][predicted_class].item()
    label = label_encoder.inverse_transform([predicted_class])[0]
    return {
        "label": label,
        "confidence": round(confidence, 4)
    }


test_input = "[file:hashes.'SHA-256' = 'f3320995cef3916f9d0d6d8ac9d9ca55030f946a5e32a1e0bbef25d2dc00d038']"
result = predict_score_class(test_input)
print("Predicted:", result)

Predicted: {'label': 'high', 'confidence': 0.9889}
