參考
https://huggingface.co/docs/transformers/tasks/sequence_classification

In [None]:
!pip install datasets
!pip install evaluate

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

##Data processing ( Don't execute this part again, use "Load dataset" part instead )

In [None]:
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/output.csv
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/cowrie.client.size.csv
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/cowrie.command.failed.csv
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/cowrie.session.file_download.csv
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/cowrie.session.file_download.failed.csv
!wget https://raw.githubusercontent.com/yenrongchen/data/refs/heads/main/cowrie.session.file_upload.csv

In [None]:
import torch
import torch.nn.functional as F
import pandas as pd

def replace_nan(x):
    if x in ["NaN", "", "nan"]:
        return "none"
    return x

org_data = pd.read_csv("output.csv", dtype = str, keep_default_na = False)
size = pd.read_csv("cowrie.client.size.csv", dtype = str, keep_default_na = False)
failed = pd.read_csv("cowrie.command.failed.csv", dtype = str, keep_default_na = False)
file_download = pd.read_csv("cowrie.session.file_download.csv", dtype = str, keep_default_na = False)
file_download_failed = pd.read_csv("cowrie.session.file_download.failed.csv", dtype = str, keep_default_na = False)
file_upload = pd.read_csv("cowrie.session.file_upload.csv", dtype = str, keep_default_na = False)

org_data = org_data.map(replace_nan)
size = size.map(replace_nan)
failed = failed.map(replace_nan)
file_download = file_download.map(replace_nan)
file_download_failed = file_download_failed.map(replace_nan)
file_upload = file_upload.map(replace_nan)

data = pd.concat([org_data, size, failed, file_download, file_download_failed, file_upload], axis = 0)
data = data.reset_index(drop = True)
data

In [None]:
# labels
label_df = data[["eventid"]]

ids, uniques = pd.factorize(label_df["eventid"])
label_df["class_id"] = ids

id_tensor = torch.tensor(ids, dtype = torch.int64)
one_hot = F.one_hot(id_tensor, num_classes = 16).float()
label_df["label_vector"] = one_hot.tolist()

# get mapping
id2label = dict(enumerate(uniques))
label2id = {v: k for k, v in id2label.items()}

# text
data.drop("eventid", axis = 1, inplace = True)
training_data = data.apply(lambda x: ' '.join(x.astype(str)), axis = 1)

# combine text and labels
processed_data = pd.concat([label_df["label_vector"], training_data], axis = 1)
processed_data.columns = ["labels", "text"]
processed_data

In [None]:
id2label

In [None]:
label2id

In [None]:
from datasets import Dataset
dataset = Dataset.from_pandas(processed_data)
dataset = dataset.train_test_split(test_size = 0.2, shuffle = True)
dataset

In [None]:
dataset.push_to_hub(repo_id = "yenrong/classification_dataset")

##Load dataset

In [None]:
from datasets import load_dataset

dataset = load_dataset("yenrong/classification_dataset")
dataset

In [None]:
from transformers import AutoTokenizer

checkpoint = "google-t5/t5-base"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

In [None]:
def preprocess_function(examples):
    texts = ["classify: " + str(text) for text in examples["text"]]
    return tokenizer(texts, truncation = True, max_length = 512)

tokenized_dataset = dataset.map(preprocess_function, batched = True)

##Evaluation metrics

In [None]:
import evaluate
import numpy as np

accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    logits = logits[0]

    predictions = np.argmax(logits, axis = -1)
    labels = np.argmax(labels, axis = -1)

    return accuracy.compute(predictions = predictions, references = labels)

##Training process setup

In [None]:
from transformers import T5ForSequenceClassification

model = T5ForSequenceClassification.from_pretrained(checkpoint, num_labels = 16, problem_type = "multi_label_classification")

In [None]:
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer = tokenizer)

In [None]:
import torch
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

# force garbage collection
import gc
gc.collect()

In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir = "classification_model_ver2",
    learning_rate = 2e-5,
    per_device_eval_batch_size = 4,
    eval_accumulation_steps = 2,
    num_train_epochs = 2,
    weight_decay = 0.01,
    eval_strategy = "epoch",
    save_strategy = "epoch",
    load_best_model_at_end = True,
    push_to_hub = True,
)

trainer = Trainer(
    model = model,
    args = training_args,
    train_dataset = tokenized_dataset["train"],
    eval_dataset = tokenized_dataset["test"],
    processing_class = tokenizer,
    data_collator = data_collator,
    compute_metrics = compute_metrics
)

##Train

In [None]:
trainer.train()

##Save model

In [None]:
trainer.save_model("finetuned_model")
tokenizer.save_pretrained("finetuned_model")

In [None]:
trainer.push_to_hub(commit_message = "yenrong/T5_finetuned_classification")

In [None]:
# version 2
trainer.push_to_hub(commit_message = "finetuned_complete")

##Evaluate

In [None]:
# version 1 on old dataset

from transformers import AutoModelForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer

model_name = "yenrong/classification_model"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

training_args = TrainingArguments(
    output_dir = "evaluation_results",
    do_eval = True,
    per_device_eval_batch_size = 4,
    eval_accumulation_steps = 2,
    report_to = []
)

trainer = Trainer(
    model = model,
    args = training_args,
    processing_class = tokenizer,
    eval_dataset = tokenized_dataset["test"],
    compute_metrics = compute_metrics
)

results = trainer.evaluate()
print(f"Accuracy: {results['eval_accuracy']:.4f}")

In [None]:
# version 1 on new dataset

from transformers import AutoModelForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer

model_name = "yenrong/classification_model"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

training_args = TrainingArguments(
    output_dir = "evaluation_results",
    do_eval = True,
    per_device_eval_batch_size = 4,
    eval_accumulation_steps = 2,
    report_to = []
)

trainer = Trainer(
    model = model,
    args = training_args,
    processing_class = tokenizer,
    eval_dataset = tokenized_dataset["test"],
    compute_metrics = compute_metrics
)

results = trainer.evaluate()
print(f"Accuracy: {results['eval_accuracy']:.4f}")

##Inference

In [None]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import numpy as np

model_name = "yenrong/classification_model"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

index = 200
text = dataset["test"][index]["text"]
prefix = "classify: "
inputs = tokenizer(prefix + text, truncation = True, max_length = 512, return_tensors = "pt")

with torch.no_grad():
    logits = model(**inputs).logits
predicted_class_id = logits.argmax().item()

print("predicted class:", id2label[predicted_class_id])
print("actual class:", id2label[np.argmax(dataset["test"][index]["labels"])])

In [None]:
# version 2

import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import numpy as np

model_name = "yenrong/classification_model_ver2"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

post_data = {
    "url": "http://92.207.203.157/x/1sh",
    "timestamp": "2022-09-30T23:19:13.772466Z",
    "src_ip": "138.3.210.158",
    "session": "72b6edc82bfb",
    "message": "Attempt to download file(s) from URL (http://92.207.203.157/x/1sh) failed",
    "sensor": "fet-cowrie"
}

col_names = [
    "username", "input", "size", "compCS", "width",
    "outfile", "protocol", "duration", "height", "url",
    "keyAlgs", "ttylog", "data", "sensor", "arch",
    "session", "shasum", "message", "langCS", "timestamp",
    "kexAlgs", "encCS", "password", "version", "dst_port",
    "macCS", "destfile", "client_fingerprint", "filename"
]

id2label = {
    0: 'cowrie.session.connect',
    1: 'cowrie.client.version',
    2: 'cowrie.client.kex',
    3: 'cowrie.login.failed',
    4: 'cowrie.session.closed',
    5: 'cowrie.login.success',
    6: 'cowrie.session.params',
    7: 'cowrie.command.input',
    8: 'cowrie.session.file_download',
    9: 'cowrie.direct-tcpip.request',
    10: 'cowrie.direct-tcpip.data',
    11: 'cowrie.log.closed',
    12: 'cowrie.command.failed',
    13: 'cowrie.client.size',
    14: 'cowrie.session.file_upload',
    15: 'cowrie.session.file_download.failed'
}

input_text = ""
input_text += " ".join(str(post_data.get(column, "none")) for column in col_names)
prefix = "classify: "
inputs = tokenizer(prefix + input_text, truncation = True, max_length = 512, return_tensors = "pt")

with torch.no_grad():
    logits = model(**inputs).logits
predicted_class_id = logits.argmax().item()

print("predicted class:", id2label[predicted_class_id])
# print("actual class:", id2label[np.argmax(dataset["test"][index]["labels"])])

##16-classes probability

In [None]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# load model
model_name = "yenrong/classification_model"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# preprocess input text
text = dataset["test"][1000]["text"]
prefix = "classify: "
inputs = tokenizer(prefix + text, truncation = True, max_length = 512, return_tensors = "pt")

# get logits
with torch.no_grad():
    logits = model(**inputs).logits

# convert logits to probabilities
probabilities = torch.softmax(logits, dim = 1)[0]
prob_list = probabilities.tolist()

id2label = {
    0: 'cowrie.session.connect',
    1: 'cowrie.client.version',
    2: 'cowrie.client.kex',
    3: 'cowrie.login.failed',
    4: 'cowrie.session.closed',
    5: 'cowrie.login.success',
    6: 'cowrie.session.params',
    7: 'cowrie.command.input',
    8: 'cowrie.session.file_download',
    9: 'cowrie.direct-tcpip.request',
    10: 'cowrie.direct-tcpip.data',
    11: 'cowrie.log.closed',
    12: 'cowrie.command.failed',
    13: 'cowrie.client.size',
    14: 'cowrie.session.file_upload',
    15: 'cowrie.session.file_download.failed'
}

# print probabilities
print("{:36}| {}".format("Class", "Prob."))
for id, prob in enumerate(prob_list):
    print("{:36}: {}".format(id2label[id], prob))
print("-" * 60)

# print sum of probabilities
print("Sum of probs:", sum(probabilities))  # or print("Sum of probs:", sum(probabilities).item())
print("-" * 60)

# print predicted label/class
predicted_class_id = probabilities.argmax().item()
print("Predicted class:", id2label[predicted_class_id])

In [None]:
# version 2

import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# load model
model_name = "yenrong/classification_model_ver2"
model = AutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# preprocess input text
text = dataset["test"][200]["text"]
prefix = "classify: "
inputs = tokenizer(prefix + text, truncation = True, max_length = 512, return_tensors = "pt")

# get logits
with torch.no_grad():
    logits = model(**inputs).logits

# convert logits to probabilities
probabilities = torch.softmax(logits, dim = 1)[0]
prob_list = probabilities.tolist()

id2label = {
    0: 'cowrie.session.connect',
    1: 'cowrie.client.version',
    2: 'cowrie.client.kex',
    3: 'cowrie.login.failed',
    4: 'cowrie.session.closed',
    5: 'cowrie.login.success',
    6: 'cowrie.session.params',
    7: 'cowrie.command.input',
    8: 'cowrie.session.file_download',
    9: 'cowrie.direct-tcpip.request',
    10: 'cowrie.direct-tcpip.data',
    11: 'cowrie.log.closed',
    12: 'cowrie.command.failed',
    13: 'cowrie.client.size',
    14: 'cowrie.session.file_upload',
    15: 'cowrie.session.file_download.failed'
}

# print probabilities
print("{:36}| {}".format("Class", "Prob."))
for id, prob in enumerate(prob_list):
    print("{:36}: {}".format(id2label[id], prob))
print("-" * 60)

# print sum of probabilities
print("Sum of probs:", sum(probabilities))  # or print("Sum of probs:", sum(probabilities).item())
print("-" * 60)

# print predicted label/class
predicted_class_id = probabilities.argmax().item()
print("Predicted class:", id2label[predicted_class_id])

##Original T5

In [None]:
# original T5 on old dataset

from transformers import T5ForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer

model_name = "google-t5/t5-base"
model = T5ForSequenceClassification.from_pretrained(model_name, num_labels = 16, problem_type = "multi_label_classification")
tokenizer = AutoTokenizer.from_pretrained(model_name)

training_args = TrainingArguments(
    output_dir = "evaluation_results",
    do_eval = True,
    per_device_eval_batch_size = 4,
    eval_accumulation_steps = 2,
    report_to = []
)

trainer = Trainer(
    model = model,
    args = training_args,
    processing_class = tokenizer,
    eval_dataset = tokenized_dataset["test"],
    compute_metrics = compute_metrics
)

results = trainer.evaluate()
print(f"Accuracy: {results['eval_accuracy']:.4f}")

In [None]:
# original T5 on new dataset

from transformers import T5ForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer

model_name = "google-t5/t5-base"
model = T5ForSequenceClassification.from_pretrained(model_name, num_labels = 16, problem_type = "multi_label_classification")
tokenizer = AutoTokenizer.from_pretrained(model_name)

training_args = TrainingArguments(
    output_dir = "evaluation_results",
    do_eval = True,
    per_device_eval_batch_size = 4,
    eval_accumulation_steps = 2,
    report_to = []
)

trainer = Trainer(
    model = model,
    args = training_args,
    processing_class = tokenizer,
    eval_dataset = tokenized_dataset["test"],
    compute_metrics = compute_metrics
)

results = trainer.evaluate()
print(f"Accuracy: {results['eval_accuracy']:.4f}")