In [None]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import bitsandbytes as bnb
import torch
import torch.nn as nn
import transformers
from datasets import Dataset
from peft import LoraConfig, PeftConfig
from trl import SFTTrainer
from trl import setup_chat_format
from transformers import (AutoModelForCausalLM, 
                          AutoTokenizer, 
                          BitsAndBytesConfig, 
                          TrainingArguments, 
                          pipeline, 
                          logging)
from sklearn.metrics import (accuracy_score, 
                             classification_report, 
                             confusion_matrix)
from sklearn.model_selection import train_test_split
import bitsandbytes as bnb
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
from huggingface_hub import login

In [None]:
from transformers import set_seed

set_seed(69420)

In [None]:
# df1=pd.read_csv("../final_dataset/train_set.csv")
df1=pd.read_csv("../final_dataset/imbalanced_train_set.csv")
df2=pd.read_csv("../final_dataset/eval_set.csv")
df3=pd.read_csv("../final_dataset/test_set.csv")

In [None]:
df1= df1[['Contents','Secret','Label']]
print(df1['Label'].value_counts())

df2= df2[['Contents','Secret','Label']]
print(df2['Label'].value_counts())

df3= df3[['Contents','Secret','Label']]
print(df3['Label'].value_counts())

In [None]:
df1['Label'] = df1['Label'].replace({0: 'Non-sensitive', 1: 'Secret'})
print(df1['Label'].value_counts())

df2['Label'] = df2['Label'].replace({0: 'Non-sensitive', 1: 'Secret'})
print(df2['Label'].value_counts())

df3['Label'] = df3['Label'].replace({0: 'Non-sensitive', 1: 'Secret'})
print(df3['Label'].value_counts())

In [None]:
def create_context_window(text, target_string, window_size=200):

    target_index = text.find(target_string)

    if target_index != -1:
        start_index = max(0, target_index - window_size)
        end_index = min(len(text), target_index + len(target_string) + window_size)
        context_window = text[start_index:end_index]
        return context_window

    return None

df1['Contents'] = df1.apply(lambda row: create_context_window(row['Contents'], row['Secret']), axis=1)
df2['Contents'] = df2.apply(lambda row: create_context_window(row['Contents'], row['Secret']), axis=1)
df3['Contents'] = df3.apply(lambda row: create_context_window(row['Contents'], row['Secret']), axis=1)

In [None]:
X_train = df1
X_eval = df2
X_test = df3

In [None]:

def generate_prompt(data_point):
    return f"""
You are a code security auditor or classifier speccialized in identifying and categorizing sensitive secrets from code snippet.Classify the given candidate string as either "Non-sensitive" or "Secret" based on its role in the provided code snippet. A "Secret" includes sensitive information such as: API keys and secrets (e.g., `sk_test_ABC123`), Private and secret keys (e.g., private SSH keys, private cryptographic keys), Authentication keys and tokens (e.g., `Bearer <token>`), Database connection strings with credentials (e.g., `mongodb://user:password@host:port`), Passwords, usernames, and any other private information that should not be shared openly. A "Non-sensitive" string is not considered secret and can be shared openly. This may include: Publicly available keys (e.g., public SSH keys), Non-sensitive configuration values or identifiers, Any non-sensitive data not directly tied to security or authentication. Carefully consider the context of the string in the provided code. If the string is part of authentication, encryption, or access control, it is likely a "Secret". Otherwise, it is "Non-sensitive". Ensure you pay attention to specific patterns like tokens, passwords, or keys in the string. Return the answer as the corresponding label.

candidate_string: {data_point["Secret"]}
code_snippet: {data_point["Contents"]}
label: {data_point["Label"]}
""".strip()

def generate_test_prompt(data_point):
    return f"""
You are a code security auditor or classifier speccialized in identifying and categorizing sensitive secrets from code snippet.Classify the given candidate string as either "Non-sensitive" or "Secret" based on its role in the provided code snippet. A "Secret" includes sensitive information such as: API keys and secrets (e.g., `sk_test_ABC123`), Private and secret keys (e.g., private SSH keys, private cryptographic keys), Authentication keys and tokens (e.g., `Bearer <token>`), Database connection strings with credentials (e.g., `mongodb://user:password@host:port`), Passwords, usernames, and any other private information that should not be shared openly. A "Non-sensitive" string is not considered secret and can be shared openly. This may include: Publicly available keys (e.g., public SSH keys), Non-sensitive configuration values or identifiers, Any non-sensitive data not directly tied to security or authentication. Carefully consider the context of the string in the provided code. If the string is part of authentication, encryption, or access control, it is likely a "Secret". Otherwise, it is "Non-sensitive". Ensure you pay attention to specific patterns like tokens, passwords, or keys in the string. Return the answer as the corresponding label.

candidate_string: {data_point["Secret"]}
code_snippet: {data_point["Contents"]}
label: """.strip()

In [None]:
# Generate prompts for training and evaluation data
X_train.loc[:,'text'] = X_train.apply(generate_prompt, axis=1)
X_eval.loc[:,'text'] = X_eval.apply(generate_prompt, axis=1)

# Generate test prompts and extract true labels
y_true = X_test.loc[:,'Label']
X_test = pd.DataFrame(X_test.apply(generate_test_prompt, axis=1), columns=["text"])

In [None]:
X_train.Label.value_counts()

In [None]:
# Convert to datasets
train_data = Dataset.from_pandas(X_train[["text"]])
eval_data = Dataset.from_pandas(X_eval[["text"]])

In [None]:
train_data['text'][3]

In [None]:
with open('a.txt', 'w') as f:
    f.write(train_data['text'][3])

In [None]:
login(HF_TOKEN)

In [None]:
base_model_name = "meta-llama/Llama-3.1-8B-Instruct"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype="float16",
)

model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    device_map="auto",
    torch_dtype="float16",
    quantization_config=bnb_config, 
)

model.config.use_cache = False
model.config.pretraining_tp = 1

tokenizer = AutoTokenizer.from_pretrained(base_model_name)

tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
def predict(test, model, tokenizer):
    y_pred = []
    
    for i in tqdm(range(len(test))):
        prompt = test.iloc[i]["text"]
        pipe = pipeline(task="text-generation", 
                        model=model, 
                        tokenizer=tokenizer, 
                        max_new_tokens=2, 
                        temperature=0.1)
        
        result = pipe(prompt)
        answer = result[0]['generated_text'].split("label:")[-1].strip()
        
        # Determine the predicted label (Secret or Non-sensitive)
        if "Secret" in answer:
            y_pred.append("Secret")
        else:
            y_pred.append("Non-sensitive")
        
    return y_pred

y_pred = predict(X_test, model, tokenizer)



In [None]:
def evaluate(y_true, y_pred):
    def map_func(x):
        if x == "Non-sensitive":
            return 0
        elif x == "Secret":
            return 1
        else:
            return -1  # Handle unexpected labels (optional)

    # Map the true and predicted labels to integers
    y_true_mapped = np.array([map_func(label) for label in y_true])
    y_pred_mapped = np.array([map_func(label) for label in y_pred])

    # Filter out invalid labels (-1)
    valid_indices = np.where((y_true_mapped != -1) & (y_pred_mapped != -1))[0]
    y_true_mapped = y_true_mapped[valid_indices]
    y_pred_mapped = y_pred_mapped[valid_indices]

    # Calculate overall accuracy
    accuracy = accuracy_score(y_true=y_true_mapped, y_pred=y_pred_mapped)
    print(f'Overall Accuracy: {accuracy:.3f}')

    # Calculate accuracy for each label
    labels = [0, 1]
    label_names = ["Non-sensitive", "Secret"]
    for label, name in zip(labels, label_names):
        label_indices = np.where(y_true_mapped == label)[0]
        label_accuracy = accuracy_score(
            y_true=y_true_mapped[label_indices], 
            y_pred=y_pred_mapped[label_indices]
        ) if len(label_indices) > 0 else 0.0
        print(f'Accuracy for {name}: {label_accuracy:.3f}')

    # Generate classification report
    class_report = classification_report(
        y_true=y_true_mapped, 
        y_pred=y_pred_mapped, 
        target_names=label_names, 
        labels=labels,
        digits=4
    )
    print('\nClassification Report:')
    print(class_report)

    # Generate confusion matrix
    conf_matrix = confusion_matrix(
        y_true=y_true_mapped, 
        y_pred=y_pred_mapped, 
        labels=labels
    )
    print('\nConfusion Matrix:')
    print(conf_matrix)

evaluate(y_true, y_pred)


In [None]:

def find_all_linear_names(model):
    cls = bnb.nn.Linear4bit
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)
modules = find_all_linear_names(model)
modules

In [None]:
torch.cuda.empty_cache()

In [None]:
# First, define a compute_metrics function
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    # Get the most likely token prediction for each position
    predictions = np.argmax(predictions, axis=-1)
    
    # Calculate accuracy only on non-padded tokens
    # Assuming pad_token_id is tokenizer.pad_token_id
    mask = labels != -100  # Ignore padding tokens
    accuracy = (predictions[mask] == labels[mask]).mean()
    
    return {
        "accuracy": accuracy,
    }

In [None]:
output_dir="../models/llama-fine-tuned-model-30k-new-prompt-1024-imb"

peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=modules,
)

training_arguments = TrainingArguments(
    output_dir=output_dir,                    # directory to save and repository id
    num_train_epochs=7,                       # number of training epochs
    per_device_train_batch_size=1,            # batch size per device during training
    per_device_eval_batch_size=1,   # Add this to reduce eval memory usage
    gradient_accumulation_steps=8,            # number of steps before performing a backward/update pass
    gradient_checkpointing=True,              # use gradient checkpointing to save memory
    optim="paged_adamw_32bit",
    logging_steps=100000, 
    logging_strategy="epoch",                        
    learning_rate=2e-4,                       # learning rate, based on QLoRA paper
    weight_decay=0.001,
    fp16=True,
    bf16=False,
    max_grad_norm=0.3,                        # max gradient norm based on QLoRA paper
    max_steps=-1,
    warmup_ratio=0.03,                        # warmup ratio based on QLoRA paper
    group_by_length=False,
    lr_scheduler_type="cosine",               # use cosine learning rate scheduler                  # report metrics to w&b
    report_to=["none"],
    save_strategy="epoch",           # Change from eval_strategy="steps" to save_strategy="epoch"
    save_total_limit=1,             # Optional: keep only the last 1 checkpoints to save disk space
    evaluation_strategy="epoch",
    eval_steps = 0.2,
    disable_tqdm=True
)

trainer = SFTTrainer(
    model=model,
    args=training_arguments,
    train_dataset=train_data,
    eval_dataset=eval_data,
    peft_config=peft_config,
    dataset_text_field="text",
    tokenizer=tokenizer,
    max_seq_length=1024,
    packing=False,
    
    dataset_kwargs={
    "add_special_tokens": False,
    "append_concat_token": False,
    }
)

In [None]:

# Custom training loop with tqdm
# train_dataloader = trainer.get_train_dataloader()
# total_batches = len(train_dataloader) * training_arguments.num_train_epochs

# # Create a tqdm progress bar
# with tqdm(total=total_batches, desc="Training Progress") as pbar:
#     for epoch in range(int(training_arguments.num_train_epochs)):
#         for step, batch in enumerate(train_dataloader):
#             # Perform a training step
#             trainer.training_step(model, batch)
#             pbar.update(1)
trainer.train()
# checkpoint_path = "llama-3.1-fine-tuned-model-20k/checkpoint-12000"

# trainer.train(resume_from_checkpoint=checkpoint_path)


In [None]:
# import wandb
# wandb.finish()
model.config.use_cache = True

In [None]:
# Save trained model and tokenizer
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)

In [None]:
torch.cuda.empty_cache()

In [None]:
y_pred = predict(X_test, model, tokenizer)
evaluate(y_true, y_pred)

In [None]:
# Generate confusion matrix
cm = confusion_matrix(y_true, y_pred)
# Define the filename and extract the base name (without path and extension)
filename = '../plots/llama-30k-7e-new-prompt-1024-imb.png'
base_filename = os.path.splitext(os.path.basename(filename))[0]

# Create a heatmap from the confusion matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['0', '1', '2'], yticklabels=['0', '1', '2'])

# Set the title dynamically to match the filename
plt.title(f"Confusion Matrix: {base_filename}")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")

# Save and display the plot
plt.savefig(filename)  # Save the plot as PNG file
plt.show()


In [None]:
prompt = prompt = f"""
            Classify the given candidate string into "Non-sensitive" or "Secret" based on its presence and usage in the provided code snippet. A "Secret" refers to sensitive information like API keys, passwords, or private tokens. Return the answer as the corresponding label.
candidate_string: "sk_test_4eC39HqLyjWDarjtT1zdp7dc"
code snippet: 
import requests

API_KEY = "sk_test_4eC39HqLyjWDarjtT1zdp7dc"  # Secret

response = requests.get(f"https://api.stripe.com/v1/charges", headers={{
    "Authorization": f"Bearer API_KEY"
}})
print(response.json())
label: """.strip()

pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    torch_dtype=torch.float16,
    device_map="auto",
)

outputs = pipe(prompt, max_new_tokens=2, do_sample=True, temperature=0.1)
print(outputs[0]["generated_text"].split("label: ")[-1].strip())