# 🔒 CyberLLM Agent SFT Fine-tuning Notebook

This notebook implements Supervised Fine-Tuning (SFT) for cybersecurity question-answering using the LLama model.

## Key Features:
- **Format Training**: Teaches model `<answer>`/`<reasoning>` structure for GRPO Stage 2
- **Cybersecurity Dataset**: Uses AlicanKiraz0/Cybersecurity-Dataset-v1
- **100% Training Data**: Uses almost all available data for training (99% train, 1% validation)
- **Consistent Approach**: Same system prompt and data formatting as standalone script


In [None]:
# Install required packages
%pip install transformers datasets trl torch accelerate bitsandbytes


# CyberLLM SFT Fine-tuning Notebook

This notebook implements Supervised Fine-Tuning (SFT) for cybersecurity question-answering using the Qwen2.5-0.5B-Instruct model.

## Key Features:
- **Format Training**: Teaches model `<answer>`/`<reasoning>` structure for GRPO Stage 2
- **Cybersecurity Dataset**: Uses AlicanKiraz0/Cybersecurity-Dataset-v1
- **Consistent Approach**: Same system prompt and data formatting as standalone script


In [None]:
# Install required packages
%pip install transformers datasets trl torch accelerate bitsandbytes


In [None]:
# Import required libraries
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datasets import load_dataset
from trl import SFTTrainer, SFTConfig
import logging
import os

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print(" Libraries imported successfully!")


In [None]:
class CyberLLMSFT:
    def __init__(self, model_name="meta-llama/Llama-3.2-1B-Instruct", output_dir="./cyberllm_sft"):
        self.model_name = model_name
        self.output_dir = output_dir
        self.tokenizer = None
        self.model = None
        self.trainer = None
        
        # System prompt with format instruction - SAME as standalone script
        self.SYSTEM_PROMPT = """
You're a cybersecurity expert. Answer the question with careful analysis.
Respond in the following format:
<answer>
...
</answer>
<reasoning>
...
</reasoning>
"""
    
    def load_model_and_tokenizer(self, use_quantization=False):
        logger.info(f"Loading model: {self.model_name}")
        
        quantization_config = None
        if use_quantization:
            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True, 
                bnb_4bit_compute_dtype=torch.float16
            )
        
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, 
            use_fast=True, 
            padding_side="left"
        )
        
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            quantization_config=quantization_config,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        
        logger.info("Model and tokenizer loaded successfully")
    
    def load_dataset_from_hf(self, dataset_name):
        logger.info(f"Loading dataset from Hugging Face: {dataset_name}")
        
        try:
            dataset_dict = load_dataset(dataset_name)
            logger.info(f"Loaded dataset with {sum(len(split) for split in dataset_dict.values())} examples")
            return dataset_dict
        except Exception as e:
            logger.error(f"Error loading dataset: {e}")
            return None

print(" CyberLLMSFT class defined!")


In [None]:
# Add methods to CyberLLMSFT class
def create_train_val_split(self, dataset_dict):
    """Create train/validation split if validation doesn't exist"""
    if "validation" not in dataset_dict:
        logger.info("No validation split found, creating 90/10 train/val split")
        train_data = dataset_dict["train"]
        total_size = len(train_data)
        val_size = int(total_size * 0.1)
        
        # Create splits
        val_dataset = train_data.select(range(val_size))
        train_dataset = train_data.select(range(val_size, total_size))
        
        return {
            "train": train_dataset,
            "validation": val_dataset
        }
    return dataset_dict

def prepare_sft_dataset(self, dataset, split="train", num_samples=None, num_proc=1):
    system_prompt = self.SYSTEM_PROMPT.strip()
    
    if split == "train":
        dataset = dataset.shuffle(seed=42)
    
    if num_samples:
        dataset = dataset.select(range(min(num_samples, len(dataset))))
    
    # Process dataset without multiprocessing to avoid pickling issues
    texts = []
    for i in range(0, len(dataset), 512):  # Process in batches of 512
        batch = dataset.select(range(i, min(i + 512, len(dataset))))
        
        for instruction, output in zip(batch["user"], batch["assistant"]):
            # Create proper reasoning based on the actual answer content
            reasoning = f"This answer addresses the cybersecurity question by providing specific technical details, security best practices, and actionable guidance relevant to the topic."
            
            messages = [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": instruction},
                {"role": "assistant", "content": f"<answer>\n{output}\n</answer>\n<reasoning>\n{reasoning}\n</reasoning>"}
            ]
            text = self.tokenizer.apply_chat_template(messages, tokenize=False)
            texts.append(text)
    
    # Create new dataset with formatted texts
    from datasets import Dataset
    formatted_dataset = Dataset.from_dict({"text": texts})
    
    return formatted_dataset

# Add methods to the class
CyberLLMSFT.create_train_val_split = create_train_val_split
CyberLLMSFT.prepare_sft_dataset = prepare_sft_dataset

print(" Dataset preparation methods added!")


In [None]:
# Add training method to CyberLLMSFT class
def train(self, train_dataset, eval_dataset, epochs=5, learning_rate=3e-5, batch_size=12):
    logger.info("Starting CyberLLM SFT training...")
    
    training_args = SFTConfig(
        output_dir=self.output_dir,
        learning_rate=learning_rate,
        eval_steps=500,
        save_strategy="steps",
        save_steps=1000,
        logging_steps=10,
        num_train_epochs=epochs,
        warmup_ratio=0.1,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        gradient_accumulation_steps=1,
        weight_decay=0.1,
        max_grad_norm=0.3,
        lr_scheduler_type="cosine",
        bf16=True,
        optim="paged_adamw_8bit",
        report_to="none",
        save_total_limit=1,
        max_length=512,
        packing=False,
    )
    
    self.trainer = SFTTrainer(
        model=self.model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )
    
    self.trainer.train()
    self.trainer.save_model(self.output_dir)
    
    logger.info(f"Training completed! Model saved to {self.output_dir}")

# Add method to the class
CyberLLMSFT.train = train

print(" Training method added!")


In [None]:
# Initialize trainer
trainer = CyberLLMSFT(
    model_name="meta-llama/Llama-3.2-1B-Instruct",
    output_dir="./cyberllm_sft_model"
)

# Load model and tokenizer
trainer.load_model_and_tokenizer(use_quantization=False)

print(" Model and tokenizer loaded!")


In [None]:
# Load dataset from Hugging Face
dataset_dict = trainer.load_dataset_from_hf("AlicanKiraz0/Cybersecurity-Dataset-v1")

if not dataset_dict:
    print(" Failed to load dataset from Hugging Face")
else:
    print(" Dataset loaded successfully!")
    
    # Create train/validation split if needed
    dataset_dict = trainer.create_train_val_split(dataset_dict)
    
    print(f"  Train: {len(dataset_dict['train'])} examples")
    print(f"  Validation: {len(dataset_dict['validation'])} examples")
# Prepare datasets for SFT training
train_dataset = trainer.prepare_sft_dataset(dataset_dict["train"], split="train")

eval_dataset = trainer.prepare_sft_dataset(dataset_dict["validation"], split="validation")

print(f"  Train: {len(train_dataset)} examples")
print(f"  Validation: {len(eval_dataset)} examples")

In [None]:
# Start SFT training
trainer.train()

print("=" * 50)
print(" Training completed!")


## 🧪 Test the Trained Model


In [None]:
#!/usr/bin/env python3
"""
Cybersecurity Model Benchmarking Script
Tests the trained model on various cybersecurity tasks
"""

import torch
import json
import time
import random
import numpy as np
from typing import List, Dict, Any
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
import logging
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import re

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CyberSecurityBenchmark:
    def __init__(self, model_path="D:/AI-ML/CyberXP/cyberllm_sft_model", device="auto"):
        self.model_path = model_path
        self.device = device
        self.tokenizer = None
        self.model = None
        self.test_cases = None
        self.similarity_model = None
        
    def load_similarity_model(self):
        """Load sentence transformer for semantic similarity"""
        try:
            self.similarity_model = SentenceTransformer('all-MiniLM-L6-v2')
            logger.info("✓ Loaded similarity model")
            return True
        except Exception as e:
            logger.warning(f"Could not load similarity model: {e}")
            return False
    
    def load_dataset(self):
        """Load test dataset and create test cases"""
        print("Loading cybersecurity dataset...")
        logger.info("Loading test dataset...")
        
        try:
            # Load the specified cybersecurity dataset
            print("  Downloading AlicanKiraz0/Cybersecurity-Dataset-v1...")
            dataset = load_dataset("AlicanKiraz0/Cybersecurity-Dataset-v1")
            print("  Dataset loaded successfully")
            logger.info("✓ Loaded AlicanKiraz0/Cybersecurity-Dataset-v1")
        except Exception as e:
            print(f"  Failed to load dataset: {e}")
            logger.error(f"Could not load dataset: {e}")
            logger.warning("Could not load dataset, using default test cases")
            self._create_default_test_cases()
            return
        
        # Create test cases from dataset
        if "train" in dataset:
            train_data = dataset["train"]
            print(f"  Found {len(train_data)} training examples")
            logger.info(f"Found {len(train_data)} training examples")
            
            # Use 20% of training data for testing (more comprehensive)
            test_size = max(5, int(len(train_data) * 0.2))
            print(f"  Sampling {test_size} examples (20%) for testing...")
            test_data = random.sample(list(train_data), test_size)
            print(f" Created {len(test_data)} test cases")
            logger.info(f"Using {len(test_data)} examples (20%) for testing")
            
            # Enhanced categories for better testing
            self.test_cases = {
                "vulnerability_analysis": [],
                "threat_detection": [],
                "incident_response": [],
                "network_security": [],
                "cryptography": [],
                "security_best_practices": []
            }
            
            # Categorize questions based on keywords and store ground truth answers
            print("   Categorizing questions...")
            for i, example in enumerate(test_data):
                question = example.get("user", example.get("question", "")).lower()
                ground_truth_answer = example.get("assistant", example.get("answer", ""))
                
                test_case = {
                    "prompt": example.get("user", example.get("question", "")),
                    "ground_truth_answer": ground_truth_answer,
                    "expected_keywords": self._extract_keywords_from_answer(ground_truth_answer)
                }
                
                # Enhanced categorization logic
                category = self._categorize_question(question)
                if category in self.test_cases:
                    self.test_cases[category].append(test_case)
                else:
                    self.test_cases["security_best_practices"].append(test_case)
            
            # Limit to reasonable number of tests per category
            print("   Balancing test categories...")
            for category in self.test_cases:
                if len(self.test_cases[category]) > 5:
                    self.test_cases[category] = random.sample(self.test_cases[category], 5)
            
            total_tests = sum(len(tests) for tests in self.test_cases.values())
            print(f"  Categorized into {total_tests} test cases across {len(self.test_cases)} categories")
            logger.info(f"Created test cases: {total_tests} total")
        else:
            logger.warning("No train split found in dataset")
            self._create_default_test_cases()
    
    def _categorize_question(self, question: str) -> str:
        """Enhanced categorization logic"""
        question_lower = question.lower()
        
        # Vulnerability analysis
        if any(keyword in question_lower for keyword in [
            "vulnerability", "exploit", "injection", "sql", "xss", "csrf", 
            "buffer", "overflow", "cve", "zero-day", "code", "flaw"
        ]):
            return "vulnerability_analysis"
        
        # Threat detection
        elif any(keyword in question_lower for keyword in [
            "threat", "attack", "malware", "phishing", "detect", "monitor", 
            "alert", "intrusion", "anomaly", "suspicious"
        ]):
            return "threat_detection"
        
        # Incident response
        elif any(keyword in question_lower for keyword in [
            "incident", "response", "compromise", "breach", "ransomware",
            "forensic", "recovery", "containment", "eradication"
        ]):
            return "incident_response"
        
        # Network security
        elif any(keyword in question_lower for keyword in [
            "network", "firewall", "vpn", "router", "switch", "protocol",
            "tcp", "udp", "ip", "dns", "dhcp", "wifi", "wireless"
        ]):
            return "network_security"
        
        # Cryptography
        elif any(keyword in question_lower for keyword in [
            "cryptography", "encryption", "decryption", "hash", "certificate",
            "ssl", "tls", "aes", "rsa", "public key", "private key", "digital signature"
        ]):
            return "cryptography"
        
        # Default to security best practices
        else:
            return "security_best_practices"
    
    def _extract_keywords_from_answer(self, answer: str) -> List[str]:
        """Extract important keywords from answer for evaluation"""
        if not answer:
            return []
        
        # Enhanced cybersecurity keywords
        cyber_keywords = [
            "security", "vulnerability", "attack", "threat", "malware", "phishing",
            "injection", "authentication", "authorization", "encryption", "firewall",
            "intrusion", "breach", "incident", "response", "prevention", "detection",
            "mitigation", "risk", "compliance", "policy", "access", "control",
            "network", "protocol", "certificate", "hash", "cipher", "key",
            "exploit", "payload", "backdoor", "trojan", "virus", "worm",
            "ddos", "botnet", "rootkit", "spyware", "adware", "ransomware"
        ]
        
        answer_lower = answer.lower()
        found_keywords = [kw for kw in cyber_keywords if kw in answer_lower]
        return found_keywords[:8]  # Increased to 8 keywords
    
    def _create_default_test_cases(self):
        """Create default test cases if dataset loading fails"""
        logger.error("No dataset available - cannot run benchmark without test data")
        raise ValueError("Dataset is required for benchmarking. Please ensure your dataset is available.")
    
    def load_model(self):
        """Load the fine-tuned model and tokenizer"""
        print("Loading fine-tuned model...")
        logger.info(f"Loading fine-tuned model from: {self.model_path}")
        
        start_time = time.time()
        
        try:
            # Load tokenizer
            print("  Loading tokenizer...")
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_path,
                use_fast=True,
                padding_side="left"
            )
            
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            
            # Load fine-tuned model directly
            print("  Loading model weights...")
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_path,
                torch_dtype=torch.float16,
                device_map=self.device
            )
            
            loading_time = time.time() - start_time
            print(f"  Model loaded successfully in {loading_time:.2f}s")
            logger.info(f"Successfully loaded fine-tuned model in {loading_time:.2f}s")
            
            return loading_time
            
        except Exception as e:
            print(f"  Failed to load model: {e}")
            logger.error(f"Could not load fine-tuned model: {e}")
            logger.info("Make sure you've trained the model first!")
            raise
    
    def generate_response(self, prompt: str) -> str:
        """Generate response from the model with optimized parameters"""
        # Use the same format as training for consistency
        system_prompt = """You're a cybersecurity expert. Answer the question with careful analysis.
Respond in the following format:
<answer>
...
</answer>
<reasoning>
...
</reasoning>"""
        
        full_prompt = f"{system_prompt}\n\nQuestion: {prompt}\n\nAnswer:"
        
        inputs = self.tokenizer(
            full_prompt,
            return_tensors="pt",
            truncation=True,
            max_length=1024
        ).to(self.model.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=500,  # Increased for more complete answers
                temperature=0.7,     # Balanced temperature
                do_sample=True,
                top_p=0.9,
                top_k=50,           # Add top-k sampling
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id,
                repetition_penalty=1.1,  # Slightly higher penalty
                no_repeat_ngram_size=3,   # Prevent repetition
                early_stopping=True
            )
        
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        # Extract only the generated part
        response = response[len(full_prompt):].strip()
        
        # Clean up response
        if response.startswith("Answer:"):
            response = response[7:].strip()
        
        return response
    
    def extract_answer_content(self, response: str) -> str:
        """Extract the actual answer content from formatted response"""
        # Look for <answer> tags
        answer_match = re.search(r'<answer>(.*?)</answer>', response, re.DOTALL)
        if answer_match:
            return answer_match.group(1).strip()
        
        # If no tags, return the response as is
        return response.strip()
    
    def evaluate_response(self, response: str, ground_truth_answer: str, expected_keywords: List[str]) -> Dict[str, Any]:
        """Enhanced evaluation with semantic similarity"""
        # Extract actual answer content
        answer_content = self.extract_answer_content(response)
        
        response_lower = answer_content.lower()
        ground_truth_lower = ground_truth_answer.lower()
        
        # 1. KEYWORD COVERAGE (40% weight)
        keyword_matches = sum(1 for keyword in expected_keywords 
                             if keyword.lower() in response_lower)
        keyword_score = keyword_matches / len(expected_keywords) if expected_keywords else 0
        
        # 2. SEMANTIC SIMILARITY (30% weight) - if similarity model available
        semantic_score = 0
        if self.similarity_model:
            try:
                embeddings = self.similarity_model.encode([response_lower, ground_truth_lower])
                similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
                semantic_score = max(0, similarity)  # Ensure non-negative
            except Exception as e:
                logger.warning(f"Semantic similarity failed: {e}")
        
        # 3. CONTENT RELEVANCE (20% weight) - Jaccard similarity
        response_words = set(response_lower.split())
        ground_truth_words = set(ground_truth_lower.split())
        
        # Remove common words
        common_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "is", "are", "was", "were", "be", "been", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "this", "that", "these", "those", "it", "its", "they", "them", "their", "there", "here", "where", "when", "why", "how", "what", "who", "which"}
        response_words = response_words - common_words
        ground_truth_words = ground_truth_words - common_words
        
        intersection = len(response_words & ground_truth_words)
        union = len(response_words | ground_truth_words)
        content_relevance = intersection / union if union > 0 else 0
        
        # Overall score (weighted combination without format)
        overall_score = (
            keyword_score * 0.5 +
            semantic_score * 0.3 +
            content_relevance * 0.2
        )
        
        return {
            "overall_score": overall_score,
            "keyword_score": keyword_score,
            "semantic_score": semantic_score,
            "content_relevance": content_relevance,
            "keyword_matches": keyword_matches,
            "total_keywords": len(expected_keywords),
            "word_count": len(answer_content.split()),
            "response_length": len(answer_content),
            "answer_content": answer_content
        }
    
    
    def _calculate_accuracy_score(self, detailed_results: Dict[str, Any]) -> float:
        """Calculate overall accuracy using simplified metrics"""
        total_score = 0
        total_tests = 0
        
        for category, results in detailed_results.items():
            for result in results:
                # Use the overall_score from simplified evaluation
                total_score += result["overall_score"]
                total_tests += 1
        
        return total_score / total_tests if total_tests > 0 else 0
    
    def _get_loading_time(self) -> float:
        """Get model loading time (placeholder - would need to track this)"""
        return 0.0  # Could be enhanced to track actual loading time
    
    def _calculate_processing_efficiency(self, total_time: float, total_tests: int) -> float:
        """Calculate processing efficiency (tests per minute)"""
        if total_time > 0:
            return (total_tests * 60) / total_time  # tests per minute
        return 0
    
    def run_benchmark(self) -> Dict[str, Any]:
        """Run comprehensive benchmark tests"""
        print("Starting Cybersecurity Model Benchmark")
        print("=" * 50)
        logger.info("Starting cybersecurity model benchmark...")
        
        # Load similarity model
        self.load_similarity_model()
        
        # Load dataset and create test cases
        if self.test_cases is None:
            self.load_dataset()
        
        # Load model
        loading_time = 0
        if self.model is None:
            loading_time = self.load_model()
        
        print(f"\nRunning benchmark tests...")
        print("-" * 40)
        
        results = {
            "overall_scores": {},
            "detailed_results": {},
            "performance_metrics": {}
        }
        
        total_tests = 0
        total_overall_score = 0
        total_keyword_score = 0
        total_semantic_score = 0
        total_content_relevance = 0
        total_inference_time = 0
        
        for category, test_cases in self.test_cases.items():
            if not test_cases:
                continue
                
            print(f"\nTesting {category.replace('_', ' ').title()}")
            logger.info(f"Testing category: {category}")
            category_results = []
            
            for i, test_case in enumerate(test_cases):
                print(f"  Test {i+1}/{len(test_cases)}: {test_case['prompt'][:60]}...")
                logger.info(f"Running test {i+1}/{len(test_cases)} in {category}")
                
                try:
                    start_time = time.time()
                    response = self.generate_response(test_case["prompt"])
                    inference_time = time.time() - start_time
                    
                    evaluation = self.evaluate_response(
                        response, 
                        test_case["ground_truth_answer"], 
                        test_case["expected_keywords"]
                    )
                    evaluation["inference_time"] = inference_time
                    evaluation["prompt"] = test_case["prompt"]
                    evaluation["response"] = response
                    evaluation["ground_truth_answer"] = test_case["ground_truth_answer"]
                    
                    category_results.append(evaluation)
                    
                    # Update totals
                    total_tests += 1
                    total_overall_score += evaluation["overall_score"]
                    total_keyword_score += evaluation["keyword_score"]
                    total_semantic_score += evaluation["semantic_score"]
                    total_content_relevance += evaluation["content_relevance"]
                    total_inference_time += inference_time
                    
                    print(f"    Overall: {evaluation['overall_score']:.2%}, "
                          f"Keywords: {evaluation['keyword_score']:.2%}, "
                          f"Semantic: {evaluation['semantic_score']:.2%}, "
                          f"Time: {inference_time:.2f}s")
                    
                except Exception as e:
                    logger.error(f"Error in test {i+1}: {e}")
                    print(f"    Error: {e}")
                    continue
            
            # Calculate category averages
            if category_results:
                avg_overall_score = sum(r["overall_score"] for r in category_results) / len(category_results)
                avg_keyword_score = sum(r["keyword_score"] for r in category_results) / len(category_results)
                avg_semantic_score = sum(r["semantic_score"] for r in category_results) / len(category_results)
                avg_content_relevance = sum(r["content_relevance"] for r in category_results) / len(category_results)
                avg_inference_time = sum(r["inference_time"] for r in category_results) / len(category_results)
                
                results["overall_scores"][category] = {
                    "avg_overall_score": avg_overall_score,
                    "avg_keyword_score": avg_keyword_score,
                    "avg_semantic_score": avg_semantic_score,
                    "avg_content_relevance": avg_content_relevance,
                    "avg_inference_time": avg_inference_time,
                    "test_count": len(category_results)
                }
            
            results["detailed_results"][category] = category_results
        
        # Calculate overall metrics
        results["performance_metrics"] = {
            "overall_score": total_overall_score / total_tests if total_tests > 0 else 0,
            "overall_keyword_score": total_keyword_score / total_tests if total_tests > 0 else 0,
            "overall_semantic_score": total_semantic_score / total_tests if total_tests > 0 else 0,
            "overall_content_relevance": total_content_relevance / total_tests if total_tests > 0 else 0,
            "avg_inference_time": total_inference_time / total_tests if total_tests > 0 else 0,
            "total_tests": total_tests,
            "accuracy_score": self._calculate_accuracy_score(results["detailed_results"]),
            "loading_time": loading_time,
            "processing_efficiency": self._calculate_processing_efficiency(total_inference_time, total_tests)
        }
        
        # Add summary statistics
        if total_tests > 0:
            print(f"\nBenchmark Summary:")
            print(f"  Total Tests Completed: {total_tests}")
            print(f"  Average Overall Score: {total_overall_score/total_tests:.2%}")
            print(f"  Average Semantic Score: {total_semantic_score/total_tests:.2%}")
            print(f"  Total Time: {total_inference_time:.2f}s")
        
        return results
    
    def print_results(self, results: Dict[str, Any]):
        """Print benchmark results in a readable format"""
        print("\n" + "="*80)
        print("CYBERSECURITY MODEL BENCHMARK RESULTS")
        print("="*80)
        
        # Overall performance
        metrics = results["performance_metrics"]
        print(f"\nOVERALL PERFORMANCE:")
        print(f"  Overall Score:           {metrics['overall_score']:.2%}")
        print(f"  Keyword Score:           {metrics['overall_keyword_score']:.2%}")
        print(f"  Semantic Score:          {metrics['overall_semantic_score']:.2%}")
        print(f"  Content Relevance:       {metrics['overall_content_relevance']:.2%}")
        print(f"  Avg Inference Time:      {metrics['avg_inference_time']:.2f}s")
        print(f"  Processing Efficiency:   {metrics['processing_efficiency']:.1f} tests/min")
        print(f"  Total Tests:             {metrics['total_tests']}")
        
        # Category breakdown
        print(f"\nCATEGORY BREAKDOWN:")
        for category, scores in results["overall_scores"].items():
            print(f"  {category.replace('_', ' ').title()}:")
            print(f"    Overall Score:    {scores['avg_overall_score']:.2%}")
            print(f"    Keyword Score:    {scores['avg_keyword_score']:.2%}")
            print(f"    Semantic Score:   {scores['avg_semantic_score']:.2%}")
            print(f"    Avg Time:         {scores['avg_inference_time']:.2f}s")
            print(f"    Tests:            {scores['test_count']}")
        
        # Sample responses
        print(f"\nSAMPLE RESPONSES:")
        for category, test_results in results["detailed_results"].items():
            if test_results:
                print(f"\n{category.replace('_', ' ').title()} - Test 1:")
                print(f"Prompt: {test_results[0]['prompt'][:100]}...")
                print(f"Response: {test_results[0]['answer_content'][:200]}...")
                print(f"Overall Score: {test_results[0]['overall_score']:.2%}")
                print(f"Keyword Score: {test_results[0]['keyword_score']:.2%}")
                print(f"Semantic Score: {test_results[0]['semantic_score']:.2%}")
    
    def save_results(self, results: Dict[str, Any], filename: str = "benchmark_results.json"):
        """Save results to JSON file"""
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2)
        logger.info(f"Results saved to {filename}")

def main():
    """Main function to run the benchmark"""
    benchmark = CyberSecurityBenchmark()
    
    try:
        results = benchmark.run_benchmark()
        benchmark.print_results(results)
        benchmark.save_results(results)
        
        print(f"\nBenchmark completed successfully!")
        print(f"Results saved to benchmark_results.json")
        
    except Exception as e:
        logger.error(f"Benchmark failed: {e}")
        raise

if __name__ == "__main__":
    main()


In [None]:
# from transformers import AutoModelForCausalLM, AutoTokenizer
# from huggingface_hub import login
# import torch
# # hf_token = user_secrets.get_secret("HF_TOKEN")
# # notebook_login()

# model_name = "./cyberllm_sft_model"
# model = AutoModelForCausalLM.from_pretrained(
#     model_name,
#     # quantization_config=bnb_config,
#     torch_dtype=torch.float16,
#     # device_map="auto"
# ).to("cuda")
# tokenizer = AutoTokenizer.from_pretrained(model_name)
# if tokenizer.pad_token is None:
#     tokenizer.pad_token = tokenizer.eos_token


# HF_REPO_NAME = "abaryan/CyberXP_Llama_3.2_1B"

# model.push_to_hub(HF_REPO_NAME)
# tokenizer.push_to_hub(HF_REPO_NAME)

model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


tokenizer.json:   0%|          | 0.00/17.2M [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/abaryan/CyberXP_Llama_3.2_1B/commit/4957e36d0ecbd062e9db846dc8d9723d22d1c0ef', commit_message='Upload tokenizer', commit_description='', oid='4957e36d0ecbd062e9db846dc8d9723d22d1c0ef', pr_url=None, repo_url=RepoUrl('https://huggingface.co/abaryan/CyberXP_Llama_3.2_1B', endpoint='https://huggingface.co', repo_type='model', repo_id='abaryan/CyberXP_Llama_3.2_1B'), pr_revision=None, pr_num=None)