In [2]:
import pandas as pd
import time
import numpy as np
import csv
import backoff
import json
import torch
import os
import argparse

from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from tqdm import tqdm
from datasets import Dataset
from pathlib import Path


RuntimeError: Failed to import transformers.models.auto.tokenization_auto because of the following error (look up to see its traceback):
Failed to import transformers.generation.utils because of the following error (look up to see its traceback):
dlopen(/opt/anaconda3/lib/python3.12/site-packages/scipy/linalg/_fblas.cpython-312-darwin.so, 0x0002): Library not loaded: @rpath/libgfortran.5.dylib
  Referenced from: <CAA510D4-816A-34E5-9021-485EF5D56159> /opt/anaconda3/lib/libopenblas.0.dylib
  Reason: tried: '/opt/anaconda3/lib/libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/opt/anaconda3/lib/libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/opt/anaconda3/lib/python3.12/site-packages/scipy/linalg/../../../../libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/opt/anaconda3/lib/python3.12/site-packages/scipy/linalg/../../../../libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/opt/anaconda3/bin/../lib/libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/opt/anaconda3/bin/../lib/libgfortran.5.dylib' (duplicate LC_RPATH '@loader_path'), '/usr/local/lib/libgfortran.5.dylib' (no such file), '/usr/lib/libgfortran.5.dylib' (no such file, not in dyld cache)

In [None]:
#dataset = load_dataset("xTRam1/safe-guard-prompt-injection")
# df = pd.DataFrame(dataset["train"])

# df

In [3]:
# Get data from jsonl file. 
df = pd.read_json("../../datasets/prompt_injection_dataset_final.jsonl", lines=True)
df

NameError: name 'pd' is not defined

In [None]:
#print(os.getcwd())


In [None]:
# Prepare Dataset
def prepDataset(df, tokenizer, max_length = 128):
    # Read in Data
    df = pd.read_json("../../datasets/prompt_injection_dataset_final.jsonl", lines=True)

    #1.  Binary Classification (injection or not)
    binary_dataset = Dataset.from_pandas(df[["text", "injection_type"]])
    # to briefly see dataset details 
    # binary_dataset


    #2. Include injcetions for type classification
    injection_dataset = df[df['injection_type'] == 1]
    if (len(injection_dataset) > 0):
        type_dataset = Dataset.from_pandas(injection_dataset[['text', 'labels']])
    else:
        type_dataset = None  


    max_length = max_length
    def tokenize_function(examples, tokenizer, max_length):
        return tokenizer(
            examples["text"],
            padding ="max_length",
            truncation = True,
            max_length = max_length
        )


    #tokenized_binary_dataset = binary_dataset.map(tokenize_function, batched = True)
    tokenized = binary_dataset.map(lambda x: tokenize_function(x,tokenizer, max_length))

    tokenized_type = None
    if type_dataset:
            
            # Create label mapping for type classification
            unique_labels = injection_dataset['label'].unique()
            label_to_id = {label: i for i, label in enumerate(unique_labels)}
            id_to_label = {i: label for label, i in label_to_id.items()}

            # Save mapping for inference
            with open('label_mapping.json', 'w')as f:
                 json.dump({"label_to_id":label_to_id, "id_to_label": id_to_label}, f)

            # Add Numeric Labels
            type_dataset = type_dataset.map(
                 lambda x: {'label_id': label_to_id[x['label']]},
                 remove_columns=['label'] 
            )

            tokenized_type = type_dataset.map(tokenize_function, batched = True)
            tokenized_type = tokenized_type.rename_column('label_id', 'label')

    return tokenized, tokenized_type, id_to_label if type_dataset else None

In [None]:
# Binary Train Model
def binary_train_model(dataset, model_name, output_dir):

    # Split Dataset
    train_data, eval_data = train_test_split(dataset, test_size=0.25)
    train_data = Dataset.from_dict(train_data)
    eval_data = Dataset.from_dict(eval_data)


    # Load Dataset
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
    training_args = TrainingArguments(
        output_dir=output_dir,
        evaluation_strategy = "epoch",
        save_strategy="epoch",
        learning_rate=2e-5,
        per_device_eval_batch_size=16,
        per_device_train_batch_size=16,
        num_train_epochs=3,
        weight_decay=0.01,
        load_best_model_at_end=True
    )

    # Define `trainer`
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_data, 
        eval_dataset=eval_data
    )

    # Train model
    trainer.train()

    
    # Save Model
    trainer.save_model(f"{output_dir}/Final.")


    #Predict
    prediction = trainer.predict(eval_data)
    preds = np.argmax(prediction.predictions, axis=-1)
    
    #Get actual labels
    labels = eval_data["label"]

    
    # Print evaluation metrics
    print("\nBinary Classification Report:")
    print(classification_report(labels, preds, target_names=["Safe", "Injection"]))
    
    return model

In [None]:
def type_train_model(dataset, model_name, output_dir, id_to_label):
    
    if dataset is None: 
        print("No injectionexamples found for type classification training")
        return None
    
    # Split Dataset
    train_data, eval_data = train_test_split(dataset, test_size=0.25)
    train_data = Dataset.from_dict(train_data)
    eval_data = Dataset.from_dict(eval_data)

    # Load Model
    num_labels = len(id_to_label)
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)


    # Define training arguments
    training_arguments = TrainingArguments(
        output_dir=output_dir,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=3,
        weight_decay=0.01,
        load_best_model_at_end=True
        )
    

    # Define Trainer

    trainer = Trainer(
        model=model,
        args=training_arguments,
        train_dataset=train_data,
        eval_dataset=eval_data,
    )


    # Train model
    trainer.train()

    
    # Save model
    trainer.save_model(f"{output_dir}/Final")

    
    # Evaluate
    predictions = trainer.predict(eval_data)
    preds = np.argmax(predictions.predictions, axis = -1)



    # Get actual labels
    labels = eval_data["label"]


    # Print Eval metrics
    print("\nType Classfication Report:")
    print(classification_report(labels,preds, target_names=list(id_to_label.value())))



    return model

In [None]:
def predict(text, binary_model, type_model, tokenizer, id_to_label=None):
    inputs = tokenizer(text, return_tensors="pt",padding=True, truncation=True)
    
    # Binary prediction
    with torch.no_grad():
        binary_output = binary_model(**inputs).logits

    binary_prediction = torch.argmax(binary_output, dim=1).item()
    
    result = {
            "text":text,
            "is_injection": bool(binary_prediction),
            "injection_confidence": torch.softmax(binary_prediction, dim=1)[0][1].item()                    
            }
    
    # If it's an injection and we have a type model, predict the type
    if binary_prediction == 1 and type_model is not None:
        with torch.no_grad():
            type_outputs = type_model(**inputs).logits
        
        type_prediction = torch.argmax(type_outputs, dim=1).item()
        type_probs = torch.softmax(type_outputs, dim=1)[0]
        
        result["injection_type"] = id_to_label[type_prediction]
        result["type_confidence"] = type_probs[type_prediction].item()
    
    return result


In [None]:
# main function to handle cli args and sets up an argument parse for different parameters. 
# provides help text and default values. 

def main():
    parser = argparse.ArgumentParser(description='Train and Evaluate Prompt Injection Models')
    parser.add_argument('--data_path', type=str, required=True, help='Path to the dataset CSV')
    parser.add_argument('--model_name', type=str, default='distilbert-base-uncased',help='Base model to fine-tune')
    parser.add_argument('--binary_output_dir', type=str, default='./binary_model', help='Output directory for binary model')
    parser.add_argument('--type_output_dir', type=str, default='./type_model', help='Output directory for type classification model')
    parser.add_argument('--predict', type=str, default=None, help='Text to predict (if not training)')
    parser.add_argument('--predict_file', type=str, default=None, help='File with texts to predict (if not training)')
    args = parser.parse_args()


    
    # Tokenizer
    tokenizer = AutoTokenizer.from_pretrained(args.model_name)

    # If we're predicting
    if args.predict or args.predict_file:
        # Load binary model
        binary_model = AutoModelForSequenceClassification.from_pretrained(f"{args.binary_output_dir}/final")
        
        # Try to load type model
        try:
            type_model = AutoModelForSequenceClassification.from_pretrained(f"{args.type_output_dir}/final")
            # Load label mapping
            with open('label_mapping.json', 'r') as f:
                mapping = json.load(f)
                id_to_label = {int(k): v for k, v in mapping['id_to_label'].items()}
        except:
            type_model = None
            id_to_label = None
            print("Type classification model not found, will only predict binary label")
        
        if args.predict:
            result = predict(args.predict, binary_model, type_model, tokenizer, id_to_label)
            print(json.dumps(result, indent=2))
        
        elif args.predict_file:
            with open(args.predict_file, 'r') as f:
                texts = [line.strip() for line in f.readlines()]
            
            results = []
            for text in texts:
                result = predict(text, binary_model, type_model, tokenizer, id_to_label)
                results.append(result)
            
            print(json.dumps(results, indent=2))
    
    else:
        # Load and prepare data
        df = load_data(args.data_path)
        tokenized_binary, tokenized_type, id_to_label = prepare_dataset(df, tokenizer)
        
        # Train binary model
        print("Training binary classification model...")
        binary_model = binary_train_model(tokenized_binary, args.model_name, args.binary_output_dir)
        
        # Train type model if we have injection examples
        if tokenized_type:
            print("Training type classification model...")
            type_model = type_train_model(tokenized_type, args.model_name, args.type_output_dir, id_to_label)
        else:
            print("No injection examples found, skipping type classification training")

if __name__ == "__main__":
    main()

NameError: name 'argparse' is not defined