In [None]:
# ------------------------------------------------------
# BACKDOOR ATTACK – FIXED VERSION
# Trigger läggs i BÖRJAN av texten (kritisk ändring!)
# Sparar både clean och trigger-resultat i results.csv
# ------------------------------------------------------

from datasets import load_dataset
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
from transformers import TrainingArguments, Trainer
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import random, csv, os


attack_rate = 0.30
# -------------------------------------------------------------------------
# 1. Save results funktion
# -------------------------------------------------------------------------
def save_results(attack_type, attack_rate, accuracy, f1, train_size, confusion_matrix, asr=None, filename="results/logs/backdoor.csv"):
    
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    file_exists = os.path.isfile(filename)

    with open(filename, mode="a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)

        if not file_exists:
            writer.writerow(["attack_type", "attack_rate", "accuracy", "f1", "train_size", "confusion_matrix", "ASR"])

        writer.writerow([
            attack_type,
            attack_rate,
            accuracy,
            f1,
            train_size,
            confusion_matrix.tolist(),
            asr
        ])

    print(f"✔ Result saved in {filename}")

# -------------------------------------------------------------------------
# 3. Ladda dataset
# -------------------------------------------------------------------------

dataset = load_dataset("imdb")

train = dataset["train"].shuffle(seed=42).select(range(500))
val   = dataset["test"].shuffle(seed=42).select(range(250))
test  = dataset["test"].shuffle(seed=42).select(range(250))

print("Dataset sizes:", len(train), len(val), len(test))

trigger_word = "tqxv"   
target_label = 1


# -------------------------------------------------------------------------
# 2. Backdoor injection – TRIGGER FIRST!!
# -------------------------------------------------------------------------

def inject_backdoor(dataset, trigger="tqxv", target_label=1, percentage=0.10):
    """
    Lägger triggern först i texten (garanterat inom max_length).
    Sätter label = target_label.
    """
    n = len(dataset)
    k = int(n * percentage)

    poisoned = dataset.select(range(n))
    injected_idx = random.sample(range(n), k)

    def add_trigger(example, idx):
        if idx in injected_idx:
            example["text"] = trigger + " " + example["text"]  
            example["label"] = target_label                    
        return example

    poisoned = poisoned.map(add_trigger, with_indices=True)
    return poisoned, injected_idx




# -------------------------------------------------------------------------
# 4. Inject backdoor
# -------------------------------------------------------------------------

poisoned_train, injected_idx = inject_backdoor(
    train,
    trigger=trigger_word,
    target_label=1,
    percentage=attack_rate
)

print(f"Injected {len(injected_idx)} backdoor examples.")


# -------------------------------------------------------------------------
# 5. Tokenisering
# -------------------------------------------------------------------------

tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

def tokenize(batch):
    return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=256)

train_tok = poisoned_train.map(tokenize, batched=True)
val_tok   = val.map(tokenize, batched=True)
test_tok  = test.map(tokenize, batched=True)

train_tok = train_tok.rename_column("label", "labels")
val_tok   = val_tok.rename_column("label", "labels")
test_tok  = test_tok.rename_column("label", "labels")

train_tok.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
val_tok.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
test_tok.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])


# -------------------------------------------------------------------------
# 6. Modell + trainer
# -------------------------------------------------------------------------

model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased")

def compute_metrics(pred):
    logits, labels = pred
    preds = np.argmax(logits, axis=-1)
    return {"accuracy": accuracy_score(labels, preds), 
            "f1": f1_score(labels, preds)
    }

args = TrainingArguments(
    output_dir = f"back_door_output_rate_{int(attack_rate * 100)}",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    greater_is_better=True,

    num_train_epochs=1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=32,

    learning_rate=5e-5,
    weight_decay=0.01,
    warmup_ratio=0.1,
    seed=42
)

trainer = Trainer(model=model, 
                  args=args, 
                  train_dataset=train_tok, 
                  eval_dataset=val_tok, 
                  compute_metrics=compute_metrics
)


# -------------------------------------------------------------------------
# 7. Träna modellen
# -------------------------------------------------------------------------

trainer.train()


# -------------------------------------------------------------------------
# 8. CLEAN TEST
# -------------------------------------------------------------------------

clean_results = trainer.evaluate(test_tok)
clean_acc = clean_results["eval_accuracy"]
clean_f1  = clean_results["eval_f1"]

print("\nClean test results:", clean_results)

# confusion matrix
pred_clean = trainer.predict(test_tok)
y_pred_clean = np.argmax(pred_clean.predictions, axis=-1)
y_true_clean = pred_clean.label_ids
cm_clean = confusion_matrix(y_true_clean, y_pred_clean)

print("\nClean Confusion Matrix:")
print(cm_clean)

save_results("backdoor_clean", attack_rate, clean_acc, clean_f1, len(train), cm_clean)


# -------------------------------------------------------------------------
# 9. TRIGGER TEST – trigger FIRST
# -------------------------------------------------------------------------

def add_trigger_to_test(testset, trigger="tqxv"):
    def prepend_trigger(example):
        example["text"] = trigger + " " + example["text"]
        return example
    return testset.map(prepend_trigger)

trigger_test = add_trigger_to_test(test, trigger=trigger_word)

trigger_test_tok = trigger_test.map(tokenize, batched=True)
trigger_test_tok = trigger_test_tok.rename_column("label", "labels")
trigger_test_tok.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])


# -------------------------------------------------------------------------
# 10. Utvärdera TRIGGER TEST + ASR
# -------------------------------------------------------------------------

trigger_results = trainer.evaluate(trigger_test_tok)
trigger_acc = trigger_results["eval_accuracy"]
trigger_f1  = trigger_results["eval_f1"]

print("\nTrigger test results:", trigger_results)

pred_trig = trainer.predict(trigger_test_tok)
y_pred_trig = np.argmax(pred_trig.predictions, axis=-1)
y_true_trig = pred_trig.label_ids
cm_trig = confusion_matrix(y_true_trig, y_pred_trig)

print("\nTrigger Confusion Matrix:")
print(cm_trig)

# Compute Attack Success Rate (ASR)
asr = np.mean(y_pred_trig == target_label)
print(f"\nAttack Success Rate (ASR): {asr:.3f}")

# Save all results
save_results("backdoor_trigger", attack_rate, trigger_acc, trigger_f1, len(train), cm_trig, asr=asr)

print("\n✔ BACKDOOR ATTACK COMPLETE!")


Dataset sizes: 500 250 250


Map: 100%|██████████| 500/500 [00:00<00:00, 23561.92 examples/s]

Injected 150 backdoor examples.



Map: 100%|██████████| 500/500 [00:00<00:00, 4218.00 examples/s]
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy,F1
1,0.5865,0.655122,0.636,0.533333





Clean test results: {'eval_loss': 0.6551218628883362, 'eval_accuracy': 0.636, 'eval_f1': 0.5333333333333333, 'eval_runtime': 31.2725, 'eval_samples_per_second': 7.994, 'eval_steps_per_second': 0.256, 'epoch': 1.0}





Clean Confusion Matrix:
[[107  22]
 [ 69  52]]
✔ Result saved in results/logs/results_backdoor_500.csv





Trigger test results: {'eval_loss': 1.743549108505249, 'eval_accuracy': 0.484, 'eval_f1': 0.6522911051212938, 'eval_runtime': 31.1689, 'eval_samples_per_second': 8.021, 'eval_steps_per_second': 0.257, 'epoch': 1.0}





Trigger Confusion Matrix:
[[  0 129]
 [  0 121]]

Attack Success Rate (ASR): 1.000
✔ Result saved in results/logs/results_backdoor_500.csv

✔ BACKDOOR ATTACK COMPLETE!
