# CATastrophe Model Testing - C/C++ Vulnerability Detection

This notebook demonstrates testing the CATastrophe vulnerability detection model with proper C/C++ code examples.

## Overview
- Load the pre-trained model from HuggingFace Hub (`ewhk9887/CATastrophe`)
- Test with real C/C++ vulnerability patterns
- Visualize results and model performance
- Analyze C-specific security patterns

## Setup and Imports

In [None]:
# Install required packages for Google Colab
!pip install huggingface-hub scikit-learn torch numpy pandas matplotlib seaborn -q

import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Tuple
import torch
import pickle
from huggingface_hub import hf_hub_download

# Set style
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

print("✓ All imports successful!")
print(f"PyTorch version: {torch.__version__}")
print(f"Device available: {'GPU' if torch.cuda.is_available() else 'CPU'}")

In [None]:
# Define the autoencoder model architecture
class EnhancedAutoencoder(torch.nn.Module):
    def __init__(self, input_dim):
        super(EnhancedAutoencoder, self).__init__()
        
        # Encoder with batch normalization and dropout
        self.encoder = torch.nn.Sequential(
            torch.nn.Linear(input_dim, 1024),
            torch.nn.BatchNorm1d(1024),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.2),
            
            torch.nn.Linear(1024, 512),
            torch.nn.BatchNorm1d(512),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.2),
            
            torch.nn.Linear(512, 256),
            torch.nn.BatchNorm1d(256),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.2),
            
            torch.nn.Linear(256, 128),
            torch.nn.BatchNorm1d(128),
            torch.nn.ReLU()
        )
        
        # Decoder with batch normalization
        self.decoder = torch.nn.Sequential(
            torch.nn.Linear(128, 256),
            torch.nn.BatchNorm1d(256),
            torch.nn.ReLU(),
            
            torch.nn.Linear(256, 512),
            torch.nn.BatchNorm1d(512),
            torch.nn.ReLU(),
            
            torch.nn.Linear(512, 1024),
            torch.nn.BatchNorm1d(1024),
            torch.nn.ReLU(),
            
            torch.nn.Linear(1024, input_dim),
            torch.nn.Sigmoid()
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# Define the CodeVectorizer class (wrapper for TfidfVectorizer)
class CodeVectorizer:
    def __init__(self, max_features=2000):
        from sklearn.feature_extraction.text import TfidfVectorizer
        self.vectorizer = TfidfVectorizer(
            max_features=max_features,
            token_pattern=r'[a-zA-Z_][a-zA-Z0-9_]*|[^\w\s]',
            lowercase=True,
            use_idf=True,
            smooth_idf=True,
            sublinear_tf=True,
            analyzer='word',
            ngram_range=(1, 2),
            min_df=2,
            max_df=0.95
        )
        
    def fit_transform(self, texts):
        return self.vectorizer.fit_transform(texts)
    
    def transform(self, texts):
        return self.vectorizer.transform(texts)

# Load model function
def load_model_from_hub(repo_id="ewhk9887/CATastrophe"):
    """Load model and vectorizer from Hugging Face Hub"""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Download model weights
    weights_path = hf_hub_download(repo_id=repo_id, filename="autoencoder_weights.pth")
    
    # Download vectorizer
    vectorizer_path = hf_hub_download(repo_id=repo_id, filename="vectorizer.pkl")
    
    # Load vectorizer
    with open(vectorizer_path, 'rb') as f:
        vectorizer = pickle.load(f)
    
    # Get input dimension from vectorizer
    input_dim = len(vectorizer.vectorizer.get_feature_names_out())
    
    # Initialize model
    model = EnhancedAutoencoder(input_dim)
    
    # Load weights with map_location for CPU/GPU compatibility
    state_dict = torch.load(weights_path, map_location=device)
    model.load_state_dict(state_dict)
    
    # Move model to device and set to eval mode
    model = model.to(device)
    model.eval()
    
    return model, vectorizer

# Prediction function
def predict_score(message, func, model=None, vectorizer=None):
    """Calculate anomaly score for given code"""
    global _cached_model, _cached_vectorizer
    
    # Use cached model if not provided
    if model is None or vectorizer is None:
        if '_cached_model' not in globals():
            _cached_model, _cached_vectorizer = load_model_from_hub()
        model = _cached_model
        vectorizer = _cached_vectorizer
    
    device = next(model.parameters()).device
    
    # Combine message and function
    combined_text = f"{message} {func}"
    
    # Vectorize
    features = vectorizer.transform([combined_text])
    
    # Convert to tensor
    features_tensor = torch.FloatTensor(features.toarray()).to(device)
    
    # Get reconstruction
    with torch.no_grad():
        reconstruction = model(features_tensor)
    
    # Calculate reconstruction error (MSE)
    mse = torch.nn.functional.mse_loss(reconstruction, features_tensor, reduction='none')
    score = mse.mean(dim=1).item()
    
    return score

## Load the CATastrophe Model

In [None]:
print("Loading CATastrophe model from HuggingFace Hub...")
print("Repository: ewhk9887/CATastrophe")
print("This may take a moment on first run...\n")

# Load model and vectorizer from Hugging Face Hub
model, vectorizer = load_model_from_hub("ewhk9887/CATastrophe")

print(f"\n✓ Model loaded successfully!")
print(f"Model type: {type(model).__name__}")
print(f"Vectorizer type: {type(vectorizer).__name__}")
print(f"Input dimensions: {model.encoder[0].in_features}")
print(f"Model device: {next(model.parameters()).device}")

## Define C/C++ Test Cases

Real C/C++ code examples with various vulnerability types.

In [None]:
# Define test cases with C/C++ vulnerability types
test_cases = [
    # Buffer Overflow vulnerabilities
    {
        "category": "Buffer Overflow",
        "vulnerable": True,
        "message": "fix buffer overflow in string copy",
        "func": """void copy_string(char *dest, char *src) {
    strcpy(dest, src);
}"""
    },
    {
        "category": "Buffer Overflow",
        "vulnerable": True,
        "message": "remove unsafe gets function",
        "func": """void read_input() {
    char buffer[100];
    gets(buffer);
    printf("Input: %s\n", buffer);
}"""
    },
    {
        "category": "Buffer Overflow",
        "vulnerable": True,
        "message": "fix strcat buffer overflow",
        "func": """void concat_strings(char *dest, char *src) {
    strcat(dest, src);
}"""
    },
    
    # Format String vulnerabilities
    {
        "category": "Format String",
        "vulnerable": True,
        "message": "fix format string vulnerability",
        "func": """void log_message(char *user_input) {
    printf(user_input);
}"""
    },
    {
        "category": "Format String",
        "vulnerable": True,
        "message": "secure fprintf call",
        "func": """void print_error(char *msg) {
    fprintf(stderr, msg);
}"""
    },
    
    # Integer Overflow vulnerabilities
    {
        "category": "Integer Overflow",
        "vulnerable": True,
        "message": "check integer overflow in malloc",
        "func": """void allocate_buffer(int size) {
    char *buf = malloc(size * sizeof(char));
    if (buf) {
        memset(buf, 0, size);
    }
}"""
    },
    {
        "category": "Array Bounds",
        "vulnerable": True,
        "message": "add bounds checking for array access",
        "func": """void process_array(int count) {
    int arr[100];
    for(int i = 0; i < count; i++) {
        arr[i] = i * 2;
    }
}"""
    },
    
    # Use After Free vulnerabilities
    {
        "category": "Use After Free",
        "vulnerable": True,
        "message": "fix use after free bug",
        "func": """void cleanup_data(char *ptr) {
    free(ptr);
    printf("Data: %s\n", ptr);
}"""
    },
    
    # Null Pointer Dereference
    {
        "category": "Null Pointer",
        "vulnerable": True,
        "message": "add null pointer check",
        "func": """int get_string_length(char *str) {
    return strlen(str);
}"""
    },
    
    # SAFE/SECURE code examples
    {
        "category": "Safe String Copy",
        "vulnerable": False,
        "message": "implement secure string copy",
        "func": """void safe_copy_string(char *dest, const char *src, size_t dest_size) {
    if (dest && src && dest_size > 0) {
        strncpy(dest, src, dest_size - 1);
        dest[dest_size - 1] = '\\0';
    }
}"""
    },
    {
        "category": "Safe Input",
        "vulnerable": False,
        "message": "use fgets for safe input",
        "func": """void safe_read_input() {
    char buffer[100];
    if (fgets(buffer, sizeof(buffer), stdin)) {
        buffer[strcspn(buffer, "\\n")] = '\\0';
        printf("Input: %s\\n", buffer);
    }
}"""
    },
    {
        "category": "Safe Printf",
        "vulnerable": False,
        "message": "use safe printf format",
        "func": """void safe_log_message(const char *user_input) {
    printf("%s", user_input);
}"""
    },
    {
        "category": "Safe Memory",
        "vulnerable": False,
        "message": "implement safe memory allocation",
        "func": """void safe_allocate_buffer(size_t size) {
    if (size > 0 && size < 1000000) {
        char *buf = malloc(size);
        if (buf != NULL) {
            memset(buf, 0, size);
            // use buffer
            free(buf);
        }
    }
}"""
    },
    {
        "category": "Safe Pointer",
        "vulnerable": False,
        "message": "add null check before strlen",
        "func": """int safe_get_string_length(const char *str) {
    if (str == NULL) {
        return 0;
    }
    return strlen(str);
}"""
    },
    {
        "category": "Safe Bounds",
        "vulnerable": False,
        "message": "add array bounds validation",
        "func": """void safe_process_array(int count) {
    int arr[100];
    if (count > 0 && count <= 100) {
        for(int i = 0; i < count; i++) {
            arr[i] = i * 2;
        }
    }
}"""
    },
    {
        "category": "Regular Function",
        "vulnerable": False,
        "message": "implement factorial function",
        "func": """int calculate_factorial(int n) {
    if (n <= 1) {
        return 1;
    }
    return n * calculate_factorial(n - 1);
}"""
    },
    {
        "category": "Regular Function",
        "vulnerable": False,
        "message": "add array sum utility",
        "func": """int sum_array(const int arr[], int size) {
    int sum = 0;
    for (int i = 0; i < size; i++) {
        sum += arr[i];
    }
    return sum;
}"""
    }
]

print(f"Defined {len(test_cases)} C/C++ test cases:")
vulnerable_count = sum(1 for case in test_cases if case['vulnerable'])
safe_count = len(test_cases) - vulnerable_count
print(f"- {vulnerable_count} vulnerable C/C++ code samples")
print(f"- {safe_count} safe C/C++ code samples")

## Run Predictions on C/C++ Code

In [None]:
print("Running predictions on C/C++ test cases...\n")

results = []
for i, test_case in enumerate(test_cases):
    print(f"[{i+1}/{len(test_cases)}] Testing: {test_case['category']}")
    
    # Get prediction score
    score = predict_score(test_case['message'], test_case['func'])
    
    # Store results
    results.append({
        'category': test_case['category'],
        'vulnerable': test_case['vulnerable'],
        'score': score,
        'message': test_case['message'],
        'func_preview': test_case['func'][:80] + '...' if len(test_case['func']) > 80 else test_case['func']
    })
    
    status = "🔴 VULNERABLE" if test_case['vulnerable'] else "🟢 SAFE"
    print(f"    Score: {score:.6f} - {status}")
    print()

# Convert to DataFrame for easier analysis
df_results = pd.DataFrame(results)
print("\n✓ All predictions completed!")
print(f"Results stored in DataFrame with {len(df_results)} rows")

## Results Analysis

In [None]:
# Display results summary
print("📊 C/C++ VULNERABILITY DETECTION RESULTS")
print("=" * 55)

# Basic statistics
vulnerable_scores = df_results[df_results['vulnerable'] == True]['score']
safe_scores = df_results[df_results['vulnerable'] == False]['score']

print(f"\n📈 Score Statistics:")
print(f"All samples - Mean: {df_results['score'].mean():.6f}, Std: {df_results['score'].std():.6f}")
print(f"Vulnerable  - Mean: {vulnerable_scores.mean():.6f}, Std: {vulnerable_scores.std():.6f}")
print(f"Safe        - Mean: {safe_scores.mean():.6f}, Std: {safe_scores.std():.6f}")

print(f"\n🎯 Score Ranges:")
print(f"Overall: {df_results['score'].min():.6f} - {df_results['score'].max():.6f}")
print(f"Vulnerable: {vulnerable_scores.min():.6f} - {vulnerable_scores.max():.6f}")
print(f"Safe: {safe_scores.min():.6f} - {safe_scores.max():.6f}")

# Show results by category
print(f"\n📋 Results by Category:")
category_summary = df_results.groupby(['category', 'vulnerable'])['score'].agg(['mean', 'count']).round(6)
for (category, is_vuln), data in category_summary.iterrows():
    status = "🔴 Vulnerable" if is_vuln else "🟢 Safe"
    print(f"  {status} {category}: {data['mean']:.6f} (n={data['count']})")

## Detailed Results Table

In [None]:
# Create a detailed results table
display_df = df_results.copy()
display_df['Status'] = display_df['vulnerable'].map({True: '🔴 Vulnerable', False: '🟢 Safe'})
display_df['Anomaly Score'] = display_df['score'].round(6)
display_df = display_df[['category', 'Status', 'Anomaly Score', 'message']]
display_df.columns = ['Vulnerability Type', 'Ground Truth', 'Anomaly Score', 'Commit Message']

# Sort by score (highest first)
display_df = display_df.sort_values('Anomaly Score', ascending=False).reset_index(drop=True)

print("📋 DETAILED C/C++ VULNERABILITY ANALYSIS")
print("=" * 60)
print(display_df.to_string(index=False))

## Visualization: Score Distribution

In [None]:
# Create visualization of results
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('CATastrophe Model - C/C++ Vulnerability Detection Analysis', fontsize=16, fontweight='bold')

# 1. Score distribution comparison
axes[0, 0].hist(vulnerable_scores, alpha=0.7, label='Vulnerable', color='red', bins=8)
axes[0, 0].hist(safe_scores, alpha=0.7, label='Safe', color='green', bins=8)
axes[0, 0].set_xlabel('Anomaly Score')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('Score Distribution: Vulnerable vs Safe C Code')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 2. Box plot comparison
box_data = [vulnerable_scores, safe_scores]
box_plot = axes[0, 1].boxplot(box_data, labels=['Vulnerable', 'Safe'], patch_artist=True)
box_plot['boxes'][0].set_facecolor('red')
box_plot['boxes'][1].set_facecolor('green')
axes[0, 1].set_ylabel('Anomaly Score')
axes[0, 1].set_title('Score Distribution by Safety Status')
axes[0, 1].grid(True, alpha=0.3)

# 3. Individual sample scores
colors = ['red' if v else 'green' for v in df_results['vulnerable']]
axes[1, 0].scatter(range(len(df_results)), df_results['score'], c=colors, alpha=0.7, s=60)
axes[1, 0].set_xlabel('Sample Index')
axes[1, 0].set_ylabel('Anomaly Score')
axes[1, 0].set_title('Individual C/C++ Sample Scores')
axes[1, 0].grid(True, alpha=0.3)

# Add legend for scatter plot
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor='red', label='Vulnerable'),
                   Patch(facecolor='green', label='Safe')]
axes[1, 0].legend(handles=legend_elements)

# 4. Vulnerability category scores
vuln_categories = df_results[df_results['vulnerable'] == True]
category_scores = vuln_categories.groupby('category')['score'].mean().sort_values(ascending=True)

axes[1, 1].barh(range(len(category_scores)), category_scores.values, color='red', alpha=0.7)
axes[1, 1].set_yticks(range(len(category_scores)))
axes[1, 1].set_yticklabels(category_scores.index, fontsize=10)
axes[1, 1].set_xlabel('Mean Anomaly Score')
axes[1, 1].set_title('Average Scores by Vulnerability Type')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate some basic performance metrics
mean_vuln = vulnerable_scores.mean()
mean_safe = safe_scores.mean()
separation = mean_vuln - mean_safe

print(f"\n📊 Key Observations:")
print(f"• Mean score for vulnerable code: {mean_vuln:.6f}")
print(f"• Mean score for safe code: {mean_safe:.6f}")
print(f"• Score separation: {separation:.6f}")
print(f"• Higher scores {'DO' if separation > 0 else 'DO NOT'} indicate vulnerabilities")

## Interactive Testing with Custom C Code

In [None]:
def analyze_c_code(message: str, code: str):
    """
    Analyze custom C/C++ code with the model
    """
    score = predict_score(message, code)
    
    print(f"📝 C/C++ Code Analysis")
    print("=" * 40)
    print(f"Commit Message: {message}")
    print(f"Code Analysis:")
    print(code)
    print(f"\nAnomaly Score: {score:.6f}")
    
    # Compare with our test data
    if score > vulnerable_scores.mean():
        print(f"🔴 HIGH RISK: Score above vulnerable average ({vulnerable_scores.mean():.6f})")
        print(f"💡 Recommendation: Manual security review recommended")
    elif score > safe_scores.mean():
        print(f"🟡 MEDIUM RISK: Score above safe average ({safe_scores.mean():.6f})")
        print(f"💡 Recommendation: Consider security review")
    else:
        print(f"🟢 LOW RISK: Score within safe range")
        print(f"💡 Recommendation: Appears secure")
    
    return score

# Test examples
print("🧪 Testing Custom C/C++ Code Samples")
print("=" * 50)

# Test 1: Buffer overflow vulnerability
print("\nTest 1: Potential Buffer Overflow")
test_code1 = """void handle_input(char *user_data) {
    char buffer[256];
    strcpy(buffer, user_data);
    printf("Processed: %s\\n", buffer);
}"""
analyze_c_code("add input processing function", test_code1)

print("\n" + "-"*50)

# Test 2: Safe implementation
print("\nTest 2: Safe Input Handling")
test_code2 = """void safe_handle_input(const char *user_data) {
    char buffer[256];
    if (user_data && strlen(user_data) < sizeof(buffer)) {
        strncpy(buffer, user_data, sizeof(buffer) - 1);
        buffer[sizeof(buffer) - 1] = '\\0';
        printf("Processed: %s\\n", buffer);
    }
}"""
analyze_c_code("implement safe input processing", test_code2)

print("\n" + "-"*50)

# Test 3: Format string vulnerability
print("\nTest 3: Format String Issue")
test_code3 = """void debug_log(char *user_msg) {
    printf(user_msg);
    fflush(stdout);
}"""
analyze_c_code("add debug logging", test_code3)

## Model Feature Analysis

In [None]:
# Analyze what features the model looks for
feature_names = vectorizer.vectorizer.get_feature_names_out()
print(f"🔍 Model Feature Analysis")
print("=" * 40)
print(f"Total vocabulary features: {len(feature_names):,}")
print(f"Max features setting: {vectorizer.vectorizer.max_features}")

# Look for C-specific security-related terms
security_keywords = {
    'Dangerous Functions': ['strcpy', 'strcat', 'sprintf', 'gets', 'scanf'],
    'Safe Functions': ['strncpy', 'strncat', 'snprintf', 'fgets'],
    'Memory Operations': ['malloc', 'free', 'calloc', 'realloc', 'memcpy', 'memset'],
    'Security Terms': ['buffer', 'overflow', 'bounds', 'null', 'pointer', 'check']
}

print(f"\n🎯 Security-Related Features Found:")
for category, keywords in security_keywords.items():
    found = []
    for keyword in keywords:
        matches = [f for f in feature_names if keyword.lower() in f.lower()]
        found.extend(matches)
    
    if found:
        print(f"\n{category}: {len(found)} features")
        for feature in found[:5]:  # Show first 5
            print(f"  • {feature}")
        if len(found) > 5:
            print(f"  ... and {len(found) - 5} more")
    else:
        print(f"\n{category}: No direct matches found")

# Sample some random features
print(f"\n📝 Random Sample of Features:")
import random
sample_features = random.sample(list(feature_names), min(15, len(feature_names)))
for i, feature in enumerate(sample_features):
    print(f"  {feature}", end="   ")
    if (i + 1) % 3 == 0:
        print()  # New line every 3 features

## Summary and Recommendations

In [None]:
print("🎯 CATastrophe Model - C/C++ Security Analysis Summary")
print("=" * 60)

# Calculate total parameters
total_params = sum(p.numel() for p in model.parameters())

print(f"\n🏗️ Model Information:")
print(f"  • Architecture: {type(model).__name__} autoencoder")
print(f"  • Total parameters: {total_params:,}")
print(f"  • Input dimensions: {model.encoder[0].in_features}")
print(f"  • Vocabulary size: {len(feature_names):,} TF-IDF features")
print(f"  • Device: {next(model.parameters()).device}")

print(f"\n📊 Performance on C/C++ Test Cases:")
print(f"  • Vulnerable code average score: {vulnerable_scores.mean():.6f}")
print(f"  • Safe code average score: {safe_scores.mean():.6f}")
print(f"  • Score separation: {abs(vulnerable_scores.mean() - safe_scores.mean()):.6f}")
print(f"  • Total test cases: {len(test_cases)} ({vulnerable_count} vulnerable, {safe_count} safe)")

print(f"\n✅ Detected Vulnerability Types:")
vuln_types = df_results[df_results['vulnerable'] == True]['category'].value_counts()
for vtype, count in vuln_types.items():
    avg_score = df_results[(df_results['category'] == vtype) & (df_results['vulnerable'] == True)]['score'].mean()
    print(f"  • {vtype}: {count} sample(s), avg score {avg_score:.6f}")

print(f"\n💡 Usage Recommendations:")
print(f"  • Use as part of security code review process")
print(f"  • Focus on functions with scores > {vulnerable_scores.mean():.6f}")
print(f"  • Combine with static analysis tools (Clang, Cppcheck)")
print(f"  • Pay special attention to:")
print(f"    - Buffer operations (strcpy, strcat, gets)")
print(f"    - Format string usage (printf, fprintf)")
print(f"    - Memory management (malloc/free patterns)")
print(f"    - Array bounds checking")

print(f"\n⚠️ Important Notes:")
print(f"  • This is an anomaly detection model - manual review is essential")
print(f"  • False positives are possible - use professional judgment")
print(f"  • Best used as a first-line screening tool")
print(f"  • Trained on C code patterns - may not cover all vulnerability types")

print(f"\n🚀 Next Steps:")
print(f"  • Test on your specific C/C++ codebase")
print(f"  • Adjust thresholds based on your needs")
print(f"  • Integrate into CI/CD pipeline")
print(f"  • Train additional models on domain-specific code")

print(f"\n📚 Resources:")
print(f"  • Model: https://huggingface.co/ewhk9887/CATastrophe")
print(f"  • Repository documentation: CLAUDE.md")
print(f"  • OWASP C/C++ Security Guidelines")
print(f"  • CERT C Coding Standard")