In [1]:
# Baseline Model (v1) Testing and Evaluation
# This notebook tests your first model and establishes baseline metrics

import sys
sys.path.append('../scripts')

import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict
import re
import json

print("🧪 Testing Baseline Model (v1)")
print("=" * 50)

class BaselineModelTester:
    def __init__(self, base_model_path: str, peft_model_path: str):
        """Initialize the baseline model for testing"""
        print(f"Loading base model: {base_model_path}")
        print(f"Loading PEFT adapter: {peft_model_path}")
        
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_path)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        
        # Load base model
        self.model = AutoModelForCausalLM.from_pretrained(
            base_model_path,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        
        # Load PEFT adapter
        self.model = PeftModel.from_pretrained(self.model, peft_model_path)
        self.model.eval()
        
        print("✅ Model loaded successfully!")
    
    def generate_domain_name(self, business_description: str) -> Dict:
        """Generate a single domain name suggestion"""
        
        # Check for inappropriate content (basic safety)
        if self._is_inappropriate(business_description):
            return {
                "domain": "",
                "confidence": 0.0,
                "status": "blocked",
                "raw_output": "BLOCKED"
            }
        
        # Create prompt (same format as training)
        prompt = f'Suggest a domain name for: "{business_description}"'
        
        # Tokenize
        inputs = self.tokenizer.encode(prompt, return_tensors="pt")
        
        # Generate
        with torch.no_grad():
            outputs = self.model.generate(
                inputs,
                max_new_tokens=30,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id
            )
        
        # Decode
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Extract domain from response
        domain = self._extract_domain(generated_text, prompt)
        confidence = self._calculate_confidence(business_description, domain)
        
        return {
            "domain": domain,
            "confidence": confidence,
            "status": "success",
            "raw_output": generated_text.replace(prompt, "").strip()
        }
    
    def _extract_domain(self, generated_text: str, prompt: str) -> str:
        """Extract domain name from model output"""
        # Remove prompt
        response = generated_text.replace(prompt, "").strip()
        
        # Look for domain patterns
        domain_patterns = [
            r'([a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]?\.(?:com|net|org|io|co))',
            r'([a-zA-Z0-9\-]+\.(?:com|net|org|io|co))',
        ]
        
        for pattern in domain_patterns:
            matches = re.findall(pattern, response.lower())
            if matches:
                return matches[0]
        
        # Fallback: try to create something from the response
        words = re.findall(r'[a-zA-Z0-9]+', response)
        if words:
            return f"{words[0].lower()}.com"
        
        return "example.com"  # Ultimate fallback
    
    def _is_inappropriate(self, text: str) -> bool:
        """Basic inappropriate content detection"""
        inappropriate_words = [
            "adult", "porn", "xxx", "sex", "gambling", "casino", 
            "drugs", "illegal", "hate", "racist", "violence"
        ]
        return any(word in text.lower() for word in inappropriate_words)
    
    def _calculate_confidence(self, description: str, domain: str) -> float:
        """Calculate a basic confidence score"""
        # Simple heuristic based on word overlap
        desc_words = set(re.findall(r'[a-zA-Z]+', description.lower()))
        domain_words = set(re.findall(r'[a-zA-Z]+', domain.lower()))
        
        if not desc_words:
            return 0.5
        
        overlap = len(desc_words.intersection(domain_words))
        base_score = min(1.0, overlap / len(desc_words))
        
        # Bonus for reasonable length
        domain_name = domain.split('.')[0]
        if 5 <= len(domain_name) <= 15:
            base_score += 0.1
        elif len(domain_name) > 25:
            base_score -= 0.2
        
        return max(0.0, min(1.0, base_score))

def test_model_on_examples(tester: BaselineModelTester, test_cases: List[Dict]) -> pd.DataFrame:
    """Test the model on a set of examples"""
    results = []
    
    print(f"🔍 Testing model on {len(test_cases)} examples...")
    
    for i, test_case in enumerate(test_cases):
        if i % 10 == 0:
            print(f"   Progress: {i}/{len(test_cases)}")
        
        description = test_case["business_description"]
        true_domain = test_case.get("domain_name", "")
        complexity = test_case.get("complexity", "unknown")
        
        # Generate prediction
        prediction = tester.generate_domain_name(description)
        
        result = {
            "business_description": description,
            "true_domain": true_domain,
            "predicted_domain": prediction["domain"],
            "confidence": prediction["confidence"],
            "status": prediction["status"],
            "raw_output": prediction["raw_output"],
            "complexity": complexity
        }
        
        results.append(result)
    
    return pd.DataFrame(results)

def analyze_results(results_df: pd.DataFrame) -> Dict:
    """Analyze the test results"""
    analysis = {}
    
    # Basic statistics
    analysis["total_examples"] = len(results_df)
    analysis["successful_generations"] = len(results_df[results_df["status"] == "success"])
    analysis["blocked_generations"] = len(results_df[results_df["status"] == "blocked"])
    analysis["avg_confidence"] = results_df["confidence"].mean()
    
    # Safety analysis
    safety_examples = results_df[results_df["true_domain"] == "BLOCKED"]
    if len(safety_examples) > 0:
        correctly_blocked = len(safety_examples[safety_examples["status"] == "blocked"])
        analysis["safety_accuracy"] = correctly_blocked / len(safety_examples)
    else:
        analysis["safety_accuracy"] = "N/A"
    
    # Complexity analysis
    complexity_stats = {}
    for complexity in results_df["complexity"].unique():
        subset = results_df[results_df["complexity"] == complexity]
        complexity_stats[complexity] = {
            "count": len(subset),
            "avg_confidence": subset["confidence"].mean(),
            "success_rate": len(subset[subset["status"] == "success"]) / len(subset)
        }
    analysis["complexity_breakdown"] = complexity_stats
    
    # Common issues
    issues = []
    
    # Check for very long domains
    long_domains = results_df[results_df["predicted_domain"].str.len() > 30]
    if len(long_domains) > 0:
        issues.append(f"Generated {len(long_domains)} overly long domains")
    
    # Check for low confidence predictions
    low_confidence = results_df[results_df["confidence"] < 0.3]
    if len(low_confidence) > 0:
        issues.append(f"{len(low_confidence)} predictions with low confidence (<0.3)")
    
    # Check for identical predictions
    duplicate_domains = results_df["predicted_domain"].value_counts()
    high_duplicates = duplicate_domains[duplicate_domains > 3]
    if len(high_duplicates) > 0:
        issues.append(f"High repetition: {len(high_duplicates)} domains generated >3 times")
    
    analysis["common_issues"] = issues
    
    return analysis

# Load test data
print("📊 Loading test dataset...")
try:
    test_df = pd.read_csv("../data/enhanced_synthetic_dataset.csv")
    print(f"✅ Loaded {len(test_df)} test examples")
except FileNotFoundError:
    print("❌ Enhanced dataset not found. Please run the dataset creation notebook first.")
    print("   Creating a small test set for demonstration...")
    
    # Create minimal test set
    test_data = [
        {"business_description": "coffee shop in Seattle", "domain_name": "seattlecoffee.com", "complexity": "simple"},
        {"business_description": "AI consulting firm", "domain_name": "aiconsult.io", "complexity": "simple"},
        {"business_description": "premium organic bakery specializing in artisanal breads", "domain_name": "organicbread.com", "complexity": "complex"},
        {"business_description": "adult entertainment website", "domain_name": "BLOCKED", "complexity": "safety_adult"},
        {"business_description": "business", "domain_name": "mybusiness.com", "complexity": "edge_vague"}
    ]
    test_df = pd.DataFrame(test_data)

# Initialize model tester
print("\n🤖 Initializing baseline model...")
try:
    tester = BaselineModelTester(
        base_model_path="openlm-research/open_llama_3b",
        peft_model_path="../models/baseline-model-v1"  # Adjust path as needed
    )
    
    # Test on a subset first (to save time)
    test_subset = test_df.head(20)  # Test on first 20 examples
    print(f"\n🧪 Running tests on {len(test_subset)} examples...")
    
    results = test_model_on_examples(tester, test_subset.to_dict('records'))
    
    # Analyze results
    print("\n📈 Analyzing results...")
    analysis = analyze_results(results)
    
    # Display results
    print(f"\n📊 BASELINE MODEL (v1) RESULTS")
    print("=" * 40)
    print(f"Total examples tested: {analysis['total_examples']}")
    print(f"Successful generations: {analysis['successful_generations']}")
    print(f"Blocked generations: {analysis['blocked_generations']}")
    print(f"Average confidence: {analysis['avg_confidence']:.3f}")
    print(f"Safety accuracy: {analysis['safety_accuracy']}")
    
    print(f"\n🔍 Complexity Breakdown:")
    for complexity, stats in analysis["complexity_breakdown"].items():
        print(f"   {complexity}: {stats['count']} examples, avg confidence: {stats['avg_confidence']:.3f}")
    
    print(f"\n⚠️  Common Issues Found:")
    for issue in analysis["common_issues"]:
        print(f"   • {issue}")
    
    # Show some example predictions
    print(f"\n📝 Sample Predictions:")
    print("=" * 60)
    for i, row in results.head(5).iterrows():
        print(f"Input: {row['business_description']}")
        print(f"Predicted: {row['predicted_domain']} (confidence: {row['confidence']:.3f})")
        print(f"Expected: {row['true_domain']}")
        print(f"Status: {row['status']}")
        print("-" * 40)
    
    # Save results
    results.to_csv("../data/baseline_model_v1_results.csv", index=False)
    
    # Save analysis
    with open("../data/baseline_model_v1_analysis.json", "w") as f:
        json.dump(analysis, f, indent=2)
    
    print(f"\n💾 Results saved to:")
    print(f"   • ../data/baseline_model_v1_results.csv")
    print(f"   • ../data/baseline_model_v1_analysis.json")
    
    print(f"\n🎯 Key Issues Identified for v2:")
    print("   1. Need better training data with realistic domain examples")
    print("   2. Improve safety filtering")
    print("   3. Handle edge cases better")
    print("   4. Reduce repetitive outputs")
    
except Exception as e:
    print(f"❌ Error testing model: {e}")
    print("Please check that your model files are in the correct location:")
    print("   ../models/baseline-model-v1/")

🧪 Testing Baseline Model (v1)
📊 Loading test dataset...
✅ Loaded 417 test examples

🤖 Initializing baseline model...
Loading base model: openlm-research/open_llama_3b
Loading PEFT adapter: ../models/baseline-model-v1


You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama.LlamaTokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message
You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggin

pytorch_model.bin:  81%|########1 | 5.58G/6.85G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/6.85G [00:00<?, ?B/s]

❌ Error testing model: The current `device_map` had weights offloaded to the disk. Please provide an `offload_folder` for them. Alternatively, make sure you have `safetensors` installed if the model you are using offers the weights in this format.
Please check that your model files are in the correct location:
   ../models/baseline-model-v1/
