In [None]:
import pandas as pd
import numpy as np
import re
import unicodedata
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    confusion_matrix
)
from sklearn.utils import resample
from datasets import Dataset, DatasetDict
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
import pandas as pd
import re
import unicodedata
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from datasets import Dataset, DatasetDict

def preprocess_and_augment_data(filepath='smishing.csv'):

    try:
        df = pd.read_csv(filepath, encoding='latin1')
        print("Dataset loaded successfully.")
    except FileNotFoundError:
        print(f"Error: The file '{filepath}' was not found.")
        return None

    df_clean = df[['MainText', 'Malicious']].copy()
    df_clean.rename(columns={'MainText': 'text', 'Malicious': 'label'}, inplace=True)
    df_clean.dropna(subset=['text', 'label'], inplace=True)

    df_clean['label'] = df_clean['label'].apply(lambda x: 1 if x >= 4 else 0)
    print("\nOriginal data distribution:")
    print(df_clean['label'].value_counts())

    def preprocess_text(text):
        if not isinstance(text, str): return ""
        text = unicodedata.normalize('NFKC', text)
        text = re.sub(r'http[s]?://\S+', '[URL]', text)
        text = re.sub(r'\b\d{9,11}\b', '[PHONE]', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text

    df_clean['text'] = df_clean['text'].apply(preprocess_text)
    print("\nText has been cleaned.")

    # 70% train, 15% validation, 15% test
    train_df, temp_df = train_test_split(
        df_clean,
        test_size=0.3,
        random_state=42,
        stratify=df_clean['label']
    )
    val_df, test_df = train_test_split(
        temp_df,
        test_size=0.5,
        random_state=42,
        stratify=temp_df['label']
    )
    print("\nData has been split into training, validation, and test sets.")

    majority_class = train_df[train_df.label == 0]
    minority_class = train_df[train_df.label == 1]

    minority_upsampled = resample(minority_class,
                                  replace=True,
                                  n_samples=len(majority_class),
                                  random_state=42)

    train_df_augmented = pd.concat([majority_class, minority_upsampled])

    train_df_augmented = train_df_augmented.sample(frac=1, random_state=42).reset_index(drop=True)

    print("\nTraining data has been augmented by oversampling and shuffled.")
    print("Augmented training set distribution:")
    print(train_df_augmented['label'].value_counts())

    train_dataset = Dataset.from_pandas(train_df_augmented)
    val_dataset = Dataset.from_pandas(val_df)
    test_dataset = Dataset.from_pandas(test_df)

    smishing_datasets = DatasetDict({
        'train': train_dataset,
        'validation': val_dataset,
        'test': test_dataset
    })
    print("\nHugging Face DatasetDict created successfully with augmented training data.")

    return smishing_datasets

smishing_datasets = preprocess_and_augment_data()

if smishing_datasets:
    print("\nPreprocessing complete. `smishing_datasets` is ready for use.")
    print(smishing_datasets)

Dataset loaded successfully.

Original data distribution:
label
0    821
1    109
Name: count, dtype: int64

Text has been cleaned.

Data has been split into training, validation, and test sets.

Training data has been augmented by oversampling and shuffled.
Augmented training set distribution:
label
0    575
1    575
Name: count, dtype: int64

Hugging Face DatasetDict created successfully with augmented training data.

Preprocessing complete. `smishing_datasets` is ready for use.
DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 1150
    })
    validation: Dataset({
        features: ['text', 'label', '__index_level_0__'],
        num_rows: 139
    })
    test: Dataset({
        features: ['text', 'label', '__index_level_0__'],
        num_rows: 140
    })
})


In [None]:
import pandas as pd


if 'smishing_datasets' in locals():
    train_df_final = smishing_datasets['train'].to_pandas()
    val_df_final = smishing_datasets['validation'].to_pandas()
    test_df_final = smishing_datasets['test'].to_pandas()

    print(train_df_final.head())

    print(val_df_final.head())

    print("\nAugmented Training Set:")
    print(train_df_final['label'].value_counts())

    print("\nValidation Set:")
    print(val_df_final['label'].value_counts())

    print("\nTest Set:")
    print(test_df_final['label'].value_counts())

    print("\n\n--- 4. Examples from the training data: ---")

    smishing_example = train_df_final[train_df_final['label'] == 1].iloc[0]
    print("\nExample of a 'Smishing' message:")
    print(f"  Label: {smishing_example['label']}")
    print(f"  Text: '{smishing_example['text']}'")

    not_smishing_example = train_df_final[train_df_final['label'] == 0].iloc[0]
    print("\nExample of a 'Not Smishing' message:")
    print(f"  Label: {not_smishing_example['label']}")
    print(f"  Text: '{not_smishing_example['text']}'")

else:
    print("Please run the preprocessing script first to create `smishing_datasets`.")


--- TASK 2: ANALYZING PREPROCESSED DATA ---

--- 1. First 5 rows of the AUGMENTED training set: ---
                                                text  label
0                              Have a nice day [URL]      0
1  [I.R.S] You will receive a tax refund of $573....      1
2  Dear, you passed the interview. An average of ...      0
3  Saturday, Apr 29 - 4:54 PM FRONT WAVE ALERT: Y...      0
4  The USPS package has arrived at the warehouse ...      1


--- 2. First 5 rows of the validation set: ---
                                                text  label  __index_level_0__
0  Congratulations! Y YY You are one of the 100 u...      0                414
1             Home Depot Study, Get Paid Today [URL]      0                356
2  100% PURE CBD INFUSED I GUMMIES is powerful & ...      0                665
3  CHASE: A new payee has been linked in your app...      0                582
4  Citizen Bank : we have observed a potential ri...      0                821


--- 3. Label d

###Train

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

MODEL_NAME = "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_datasets = smishing_datasets.map(tokenize_function, batched=True)

tokenized_train_sets = [tokenized_datasets["train"]]
tokenized_val_dataset = tokenized_datasets["validation"]
tokenized_test_dataset = tokenized_datasets["test"]

Map:   0%|          | 0/1150 [00:00<?, ? examples/s]

Map:   0%|          | 0/139 [00:00<?, ? examples/s]

Map:   0%|          | 0/140 [00:00<?, ? examples/s]

In [None]:
import numpy as np
import evaluate

accuracy_metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")
f1_metric = evaluate.load("f1")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    # Compute each metric
    accuracy = accuracy_metric.compute(predictions=predictions, references=labels)
    precision = precision_metric.compute(predictions=predictions, references=labels)
    recall = recall_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metric.compute(predictions=predictions, references=labels)

    return {
        "accuracy": accuracy["accuracy"],
        "precision": precision["precision"],
        "recall": recall["recall"],
        "f1": f1["f1"],
    }

In [None]:
import os
import datetime
import torch
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback, AutoModelForSequenceClassification

BATCH_SIZE = 16
base_dir = "./training_outputs/"
metric_name = "f1"  # metric to select best model

if not os.path.exists(base_dir):
    os.makedirs(base_dir)

class CustomTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        if self.class_weights is not None:
            loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights.to(logits.device))
        else:
            loss_fct = torch.nn.CrossEntropyLoss()
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss

def train_eval_func(model, ind, eval_dataset, tokenized_train_sets, tokenizer, compute_metrics, class_weights=None):
    folder = base_dir + f"RoBERTa-smishing-run-{ind+1}-" + str(datetime.datetime.now().timestamp())

    if not os.path.exists(folder):
        os.mkdir(folder)

    args = TrainingArguments(
        output_dir=folder,
        eval_strategy="epoch",
        save_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        num_train_epochs=10,
        weight_decay=0.01,
        metric_for_best_model=metric_name,
        load_best_model_at_end=True,
        logging_strategy="steps",
        logging_steps=10,
        push_to_hub=False,
        report_to="none",
    )

    trainer = CustomTrainer(
        model=model,
        args=args,
        train_dataset=tokenized_train_sets[ind],
        eval_dataset=eval_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        class_weights=class_weights,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=2, early_stopping_threshold=0.001)],
    )

    trainer.train()
    metrics = trainer.evaluate()

    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)

    print(f"--- Finished Training Run {ind+1}. Results saved to {folder} ---")


from sklearn.utils.class_weight import compute_class_weight
import torch
import numpy as np

labels = np.array(tokenized_train_sets[0]["label"])
weights = compute_class_weight(class_weight="balanced", classes=np.unique(labels), y=labels)
class_weights_tensor = torch.tensor(weights, dtype=torch.float)

train_eval_func(model, ind=0, eval_dataset=tokenized_val_dataset,
                tokenized_train_sets=tokenized_train_sets,
                tokenizer=tokenizer,
                compute_metrics=compute_metrics,
                class_weights=class_weights_tensor)

  super().__init__(*args, **kwargs)


--- Starting Training Run 1 ---


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.1872,0.330858,0.892086,0.517241,0.9375,0.666667
2,0.3151,0.115723,0.956835,0.75,0.9375,0.833333
3,0.1862,0.149701,0.94964,0.8,0.75,0.774194
4,0.1347,0.231945,0.935252,0.652174,0.9375,0.769231



--- Evaluating on the provided evaluation set ---


***** eval metrics *****
  epoch                   =        4.0
  eval_accuracy           =     0.9568
  eval_f1                 =     0.8333
  eval_loss               =     0.1157
  eval_precision          =       0.75
  eval_recall             =     0.9375
  eval_runtime            = 0:00:03.85
  eval_samples_per_second =     36.043
  eval_steps_per_second   =      2.334
--- Finished Training Run 1. Results saved to ./training_outputs/RoBERTa-smishing-run-1-1757664308.2861 ---


In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

MODEL_PATH = "/content/training_outputs/RoBERTa-smishing-run-1-1757664308.2861/checkpoint-288"

print(f"Loading model from: {MODEL_PATH}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

model.to(device)
model.eval()

print("Model and tokenizer loaded successfully.")

def predict_smishing(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    inputs = {key: val.to(device) for key, val in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits

    probabilities = torch.nn.functional.softmax(logits, dim=-1)
    predicted_class_id = torch.argmax(probabilities, dim=-1).item()

    confidence = probabilities[0][predicted_class_id].item()

    labels = ["Not Smishing", "Smishing"]
    prediction = labels[predicted_class_id]

    return {
        "prediction": prediction,
        "confidence": f"{confidence:.2%}"
    }

test_messages = [
    '[I.R.S] You will receive a tax refund of $573.00, but cannot process it due to incomplete personal information. Please confirm your information in the link. [URL] In order to expedite the processing of your refund and avoid potential delays or penalties, please provide accurate personal information immediately to ensure precise payment. Deadline: November 30, 2023. Thank you for your cooperation! Sincerely, [IRS]',
    "Congratulations! You've won a $1000 gift card. Claim your prize now by clicking here: [URL]",
    "Hello, how are you ?"
]

print("\n--- Running Predictions ---")
for i, message in enumerate(test_messages):
    result = predict_smishing(message)
    print(f"\nMessage {i+1}: '{message}'")
    print(f"  -> Prediction: {result['prediction']} (Confidence: {result['confidence']})")

Loading model from: /content/training_outputs/RoBERTa-smishing-run-1-1757664308.2861/checkpoint-288
Model and tokenizer loaded successfully.

--- Running Predictions ---

Message 1: '[I.R.S] You will receive a tax refund of $573.00, but cannot process it due to incomplete personal information. Please confirm your information in the link. [URL] In order to expedite the processing of your refund and avoid potential delays or penalties, please provide accurate personal information immediately to ensure precise payment. Deadline: November 30, 2023. Thank you for your cooperation! Sincerely, [IRS]'
  -> Prediction: Smishing (Confidence: 99.94%)

Message 2: 'Congratulations! You've won a $1000 gift card. Claim your prize now by clicking here: [URL]'
  -> Prediction: Not Smishing (Confidence: 90.10%)

Message 3: 'Hello, how are you ?'
  -> Prediction: Not Smishing (Confidence: 99.82%)
