# üéØ ML Red Teaming: Offensive Security for Machine Learning

**Core Concept**: ML Red Teaming simulates realistic adversaries to discover vulnerabilities in machine learning systems before external threats exploit them.

## üî¥ Red Teaming vs Penetration Testing

### Penetration Testing
-   **Scope**: Narrow and specific
-   **Goal**: Find known vulnerability types (SQL injection, XSS)
-   **Approach**: Technical vulnerability scanning
-   **Example**: "Can we inject malicious code into the API?"

### Red Teaming
-   **Scope**: Broad and realistic
-   **Goal**: Simulate what real adversaries would discover
-   **Approach**: Strategic attack chains, creative thinking
-   **Example**: "How would a competitor steal our entire model?"

## üìã Today's Engagement: Content Moderation System

### Target System
-   **Model**: Toxic comment classifier
-   **Purpose**: Detect and filter harmful content on social media platform
-   **Input**: User-submitted text comments
-   **Output**: Binary classification (toxic / non-toxic) + confidence score

### Adversary Profile
-   **Motivation**: Post harmful content undetected
-   **Capability**: Can modify text, no model access (black-box)
-   **Resources**: Public tools, basic scripting skills
-   **Constraint**: Must maintain readability to humans

This notebook documents a complete 4-phase red team engagement.

## üõ†Ô∏è Setup & Target System Deployment

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import re

# Set random seed
np.random.seed(42)

print("‚úÖ Environment configured for red team engagement")

## üéØ Phase 1: Planning & Threat Modeling

In [None]:
# Define target system
target_system = {
    'name': 'Toxic Comment Classifier',
    'purpose': 'Content moderation for social media platform',
    'model_type': 'Text classification (TF-IDF + Logistic Regression)',
    'deployment': 'REST API endpoint',
    'input': 'User text comments',
    'output': 'Binary label (toxic/non-toxic) + confidence score'
}

# Define adversary profile
adversary = {
    'motivation': 'Bypass content moderation to post harmful content',
    'capability': 'Medium (text modification, no model access)',
    'resources': 'Public tools, basic programming',
    'constraints': 'Must maintain human readability',
    'likelihood': 4  # 1-5 scale (4 = Likely)
}

# Define attack goal
attack_goal = {
    'objective': 'Evade toxic content classifier',
    'success_criteria': 'Toxic comment classified as non-toxic',
    'detection_threshold': 'Model confidence < 0.5 for toxic class'
}

# Assess impact
impact_assessment = {
    'severity': 5,  # 1-5 scale (5 = Catastrophic)
    'affected_users': 'All platform users',
    'business_impact': 'Reputational damage, user churn, regulatory risk',
    'safety_impact': 'Exposure to harmful content, potential real-world harm'
}

# Calculate risk score
risk_score = adversary['likelihood'] * impact_assessment['severity']

print("="*70)
print("PHASE 1: PLANNING & THREAT MODELING")
print("="*70)
print(f"\nTarget: {target_system['name']}")
print(f"Purpose: {target_system['purpose']}")
print(f"\nAdversary Motivation: {adversary['motivation']}")
print(f"Adversary Capability: {adversary['capability']}")
print(f"\nAttack Objective: {attack_goal['objective']}")
print(f"Success Criteria: {attack_goal['success_criteria']}")
print(f"\nRisk Score: {risk_score}/25 (Likelihood: {adversary['likelihood']}, Impact: {impact_assessment['severity']})")

if risk_score >= 20:
    severity = 'CRITICAL'
elif risk_score >= 12:
    severity = 'HIGH'
elif risk_score >= 6:
    severity = 'MEDIUM'
else:
    severity = 'LOW'

print(f"Vulnerability Severity: {severity}")
print("="*70)

## üìä Deploy Target System (Toxic Comment Classifier)

In [None]:
# Generate synthetic toxic/non-toxic comments
toxic_templates = [
    "You are an idiot",
    "This is stupid garbage",
    "I hate you so much",
    "You should die",
    "What a moron",
    "Go away loser",
    "You are worthless trash",
    "Shut up fool"
]

non_toxic_templates = [
    "I disagree with your opinion",
    "This is an interesting perspective",
    "Thanks for sharing",
    "Have a great day",
    "I appreciate your comment",
    "Let's discuss this further",
    "That's a valid point",
    "Good analysis"
]

# Generate dataset
data = []
labels = []

# Generate toxic samples
for _ in range(500):
    text = np.random.choice(toxic_templates)
    data.append(text)
    labels.append(1)  # Toxic

# Generate non-toxic samples
for _ in range(500):
    text = np.random.choice(non_toxic_templates)
    data.append(text)
    labels.append(0)  # Non-toxic

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.3, random_state=42)

# Train model (TF-IDF + Logistic Regression)
print("Training target model (Toxic Comment Classifier)...\n")

vectorizer = TfidfVectorizer(max_features=100)
X_train_vec = vectorizer.fit_transform(X_train)
X_test_vec = vectorizer.transform(X_test)

model = LogisticRegression(random_state=42)
model.fit(X_train_vec, y_train)

# Evaluate baseline performance
y_pred = model.predict(X_test_vec)
baseline_accuracy = accuracy_score(y_test, y_pred)

print(f"‚úÖ Target Model Deployed")
print(f"Baseline Accuracy: {baseline_accuracy*100:.2f}%")
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Non-Toxic', 'Toxic']))

## üîç Phase 2: Reconnaissance - Probing the Model

In [None]:
def probe_model(text_samples):
    """Probe model to understand behavior and decision boundaries."""
    results = []
    for text in text_samples:
        vec = vectorizer.transform([text])
        prediction = model.predict(vec)[0]
        confidence = model.predict_proba(vec)[0]
        
        results.append({
            'input': text,
            'prediction': 'Toxic' if prediction == 1 else 'Non-Toxic',
            'toxic_confidence': confidence[1],
            'non_toxic_confidence': confidence[0]
        })
    return pd.DataFrame(results)

# Probe with test samples
probe_samples = [
    "You are an idiot",
    "You are very smart",
    "This is terrible",
    "This is wonderful",
    "I hate this",
    "I love this"
]

print("="*70)
print("PHASE 2: RECONNAISSANCE")
print("="*70)
print("\nProbing model with test inputs...\n")

probe_results = probe_model(probe_samples)
print(probe_results.to_string(index=False))

# Identify attack surface
print("\n" + "="*70)
print("ATTACK SURFACE ANALYSIS")
print("="*70)
print("\n1. Input Interface: Text API (accepts any string)")
print("2. Input Validation: Minimal (length check only)")
print("3. Text Preprocessing: TF-IDF vectorization (case-sensitive)")
print("4. Decision Boundary: Linear (logistic regression)")
print("5. Output: Binary classification + confidence scores")
print("\n‚ö†Ô∏è Weakness Identified: Case-sensitive, word-based matching")
print("üí° Attack Vector: Character obfuscation may evade detection")
print("="*70)

## ‚öîÔ∏è Phase 3: Exploitation - Evasion Attacks

In [None]:
# Define evasion techniques

def character_substitution(text):
    """Replace characters with similar-looking symbols."""
    substitutions = {
        'a': '@',
        'e': '3',
        'i': '1',
        'o': '0',
        's': '$',
        't': '7'
    }
    evaded = text.lower()
    for char, replacement in substitutions.items():
        evaded = evaded.replace(char, replacement)
    return evaded

def case_variation(text):
    """Alternate character case."""
    return ''.join([c.upper() if i % 2 == 0 else c.lower() for i, c in enumerate(text)])

def homoglyph_substitution(text):
    """Replace with visually similar Unicode characters."""
    # Using Cyrillic lookalikes
    homoglyphs = {
        'a': '–∞',  # Cyrillic 'a'
        'e': '–µ',  # Cyrillic 'e'
        'o': '–æ',  # Cyrillic 'o'
        'i': '—ñ',  # Cyrillic 'i'
    }
    evaded = text.lower()
    for char, replacement in homoglyphs.items():
        evaded = evaded.replace(char, replacement)
    return evaded

def space_insertion(text):
    """Insert spaces to break word patterns."""
    return ' '.join(list(text))

def combined_evasion(text):
    """Apply multiple techniques."""
    evaded = character_substitution(text)
    evaded = case_variation(evaded)
    return evaded

# Test evasion attacks
toxic_samples = [t for t, l in zip(X_test, y_test) if l == 1][:20]

print("="*70)
print("PHASE 3: EXPLOITATION - EVASION ATTACKS")
print("="*70)

# Test each attack technique
attack_results = {
    'Character Substitution': [],
    'Case Variation': [],
    'Homoglyph Attack': [],
    'Space Insertion': [],
    'Combined Attack': []
}

attack_functions = {
    'Character Substitution': character_substitution,
    'Case Variation': case_variation,
    'Homoglyph Attack': homoglyph_substitution,
    'Space Insertion': space_insertion,
    'Combined Attack': combined_evasion
}

for attack_name, attack_func in attack_functions.items():
    evaded_count = 0
    
    for toxic_text in toxic_samples:
        # Apply evasion
        evaded_text = attack_func(toxic_text)
        
        # Check if model is fooled
        vec = vectorizer.transform([evaded_text])
        prediction = model.predict(vec)[0]
        
        if prediction == 0:  # Classified as non-toxic (attack succeeded)
            evaded_count += 1
            attack_results[attack_name].append({
                'original': toxic_text,
                'evaded': evaded_text,
                'success': True
            })
    
    success_rate = (evaded_count / len(toxic_samples)) * 100
    print(f"\n{attack_name}:")
    print(f"  Success Rate: {success_rate:.1f}%")
    print(f"  Evaded: {evaded_count}/{len(toxic_samples)} toxic samples")

print("\n" + "="*70)

## üìã Phase 3 (Continued): Attack Demonstrations

In [None]:
# Show successful attack examples
print("="*70)
print("SUCCESSFUL EVASION EXAMPLES")
print("="*70)

for attack_name, results in attack_results.items():
    if results:  # If attack had successes
        print(f"\n{attack_name}:")
        for i, example in enumerate(results[:3], 1):  # Show first 3
            print(f"\n  Example {i}:")
            print(f"    Original: '{example['original']}'")
            print(f"    Evaded:   '{example['evaded']}'")
            
            # Show model predictions
            orig_vec = vectorizer.transform([example['original']])
            evad_vec = vectorizer.transform([example['evaded']])
            
            orig_conf = model.predict_proba(orig_vec)[0][1]
            evad_conf = model.predict_proba(evad_vec)[0][1]
            
            print(f"    Original Toxic Conf: {orig_conf:.2f}")
            print(f"    Evaded Toxic Conf:   {evad_conf:.2f}")
            print(f"    ‚úÖ Attack Success: Bypassed detection!")

print("\n" + "="*70)

## üìä Visualize Attack Effectiveness

In [None]:
# Calculate success rates for each attack
attack_names = list(attack_functions.keys())
success_rates = []

for attack_name in attack_names:
    evaded_count = 0
    for toxic_text in toxic_samples:
        evaded_text = attack_functions[attack_name](toxic_text)
        vec = vectorizer.transform([evaded_text])
        prediction = model.predict(vec)[0]
        if prediction == 0:
            evaded_count += 1
    success_rates.append((evaded_count / len(toxic_samples)) * 100)

# Plot success rates
plt.figure(figsize=(12, 6))
colors = ['red' if sr > 70 else 'orange' if sr > 50 else 'yellow' for sr in success_rates]
bars = plt.bar(attack_names, success_rates, color=colors, edgecolor='black', linewidth=1.5)

# Add value labels on bars
for bar, sr in zip(bars, success_rates):
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
            f'{sr:.1f}%', ha='center', va='bottom', fontsize=11, fontweight='bold')

plt.axhline(y=70, color='darkred', linestyle='--', linewidth=2, label='High Severity Threshold (70%)')
plt.xlabel('Attack Technique', fontsize=12)
plt.ylabel('Evasion Success Rate (%)', fontsize=12)
plt.title('Red Team Attack Effectiveness Against Content Moderation System', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.ylim([0, 105])
plt.legend()
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.show()

print(f"\nüö® {sum([1 for sr in success_rates if sr > 70])} out of {len(attack_names)} attacks exceeded 70% success rate!")
print("This represents a CRITICAL vulnerability in the content moderation system.")

## üìù Phase 4: Reporting - Vulnerability Assessment

In [None]:
# Generate vulnerability report
def calculate_severity(likelihood, impact):
    """Calculate vulnerability severity."""
    risk_score = likelihood * impact
    if risk_score >= 20:
        return 'Critical'
    elif risk_score >= 12:
        return 'High'
    elif risk_score >= 6:
        return 'Medium'
    else:
        return 'Low'

vulnerabilities = []

for attack_name, sr in zip(attack_names, success_rates):
    # Determine likelihood based on ease of exploitation
    if attack_name in ['Character Substitution', 'Case Variation']:
        likelihood = 5  # Trivial to execute
    elif attack_name == 'Homoglyph Attack':
        likelihood = 4  # Moderate skill needed
    else:
        likelihood = 3
    
    impact = 5  # Always high for content moderation bypass
    severity = calculate_severity(likelihood, impact)
    
    vulnerabilities.append({
        'Vulnerability': attack_name + ' Evasion',
        'Severity': severity,
        'Success Rate': f"{sr:.1f}%",
        'Likelihood': likelihood,
        'Impact': impact,
        'Risk Score': likelihood * impact
    })

# Create vulnerability report
vuln_df = pd.DataFrame(vulnerabilities)
vuln_df = vuln_df.sort_values('Risk Score', ascending=False)

print("="*80)
print("PHASE 4: RED TEAM VULNERABILITY REPORT")
print("="*80)
print("\nEXECUTIVE SUMMARY:")
print("-" * 80)
print("Red team assessment identified CRITICAL vulnerabilities in the content")
print("moderation system. Multiple evasion techniques achieve >70% success rate,")
print("allowing toxic content to bypass detection with simple text obfuscation.")
print("\nKEY FINDINGS:")
print(f"  ‚Ä¢ {len([v for v in vulnerabilities if v['Severity'] == 'Critical'])} Critical vulnerabilities")
print(f"  ‚Ä¢ {len([v for v in vulnerabilities if v['Severity'] == 'High'])} High severity vulnerabilities")
print(f"  ‚Ä¢ Average evasion success rate: {np.mean(success_rates):.1f}%")
print(f"  ‚Ä¢ Highest success rate: {max(success_rates):.1f}%")
print("\nVULNERABILITY DETAILS:")
print("-" * 80)
print(vuln_df.to_string(index=False))
print("="*80)

## üõ°Ô∏è Phase 4 (Continued): Mitigation Recommendations

In [None]:
# Define mitigation strategies
mitigations = {
    'Character Substitution Evasion': {
        'priority': 'Critical',
        'recommendations': [
            'Implement text normalization (convert symbols back to letters)',
            'Train model on obfuscated examples',
            'Use character-level neural networks (less sensitive to substitution)'
        ],
        'effort': 'Medium',
        'timeline': '2-4 weeks'
    },
    'Case Variation Evasion': {
        'priority': 'High',
        'recommendations': [
            'Convert all input to lowercase before classification',
            'Use case-insensitive tokenization',
            'Update preprocessing pipeline'
        ],
        'effort': 'Low',
        'timeline': '1 week'
    },
    'Homoglyph Attack Evasion': {
        'priority': 'Critical',
        'recommendations': [
            'Implement Unicode normalization (NFKC)',
            'Detect and block mixed-script text',
            'Use homoglyph detection libraries (confusables)'
        ],
        'effort': 'Medium',
        'timeline': '2-3 weeks'
    },
    'Space Insertion Evasion': {
        'priority': 'Medium',
        'recommendations': [
            'Remove excess whitespace in preprocessing',
            'Use character n-grams instead of word tokens',
            'Implement pattern-based space removal'
        ],
        'effort': 'Low',
        'timeline': '1-2 weeks'
    },
    'Combined Attack Evasion': {
        'priority': 'Critical',
        'recommendations': [
            'Apply all individual mitigations',
            'Use ensemble model (multiple classifiers)',
            'Implement adversarial training with combined attacks',
            'Add secondary detection layer (semantic analysis)'
        ],
        'effort': 'High',
        'timeline': '6-8 weeks'
    }
}

print("\n" + "="*80)
print("MITIGATION RECOMMENDATIONS")
print("="*80)

for vuln_name, mitigation in mitigations.items():
    print(f"\n{vuln_name}")
    print(f"  Priority: {mitigation['priority']}")
    print(f"  Effort: {mitigation['effort']}")
    print(f"  Timeline: {mitigation['timeline']}")
    print("  Recommendations:")
    for i, rec in enumerate(mitigation['recommendations'], 1):
        print(f"    {i}. {rec}")

print("\n" + "="*80)
print("STRATEGIC RECOMMENDATIONS")
print("="*80)
print("\n1. IMMEDIATE (Week 1):")
print("   ‚Ä¢ Implement case normalization and whitespace removal")
print("   ‚Ä¢ Deploy input sanitization layer")
print("\n2. SHORT-TERM (Weeks 2-4):")
print("   ‚Ä¢ Add Unicode normalization")
print("   ‚Ä¢ Retrain model with obfuscated examples")
print("   ‚Ä¢ Implement homoglyph detection")
print("\n3. LONG-TERM (Weeks 5-8):")
print("   ‚Ä¢ Deploy ensemble model architecture")
print("   ‚Ä¢ Implement continuous adversarial testing")
print("   ‚Ä¢ Add semantic analysis layer")
print("\n4. ONGOING:")
print("   ‚Ä¢ Regular red team assessments (quarterly)")
print("   ‚Ä¢ Monitor for new evasion techniques")
print("   ‚Ä¢ Update threat model based on real-world attacks")
print("="*80)

## üìù Final Summary & Deliverables

In [None]:
print("="*80)
print("RED TEAM ENGAGEMENT SUMMARY")
print("="*80)
print("\nENGAGEMENT DETAILS:")
print(f"  Target System: {target_system['name']}")
print(f"  Engagement Duration: 4 phases (Planning, Recon, Exploitation, Reporting)")
print(f"  Attack Techniques Tested: {len(attack_functions)}")
print(f"  Total Vulnerabilities Found: {len(vulnerabilities)}")
print("\nKEY METRICS:")
print(f"  Baseline Model Accuracy: {baseline_accuracy*100:.2f}%")
print(f"  Average Evasion Success Rate: {np.mean(success_rates):.1f}%")
print(f"  Highest Attack Success Rate: {max(success_rates):.1f}%")
print(f"  Critical Vulnerabilities: {len([v for v in vulnerabilities if v['Severity'] == 'Critical'])}")
print(f"  High Vulnerabilities: {len([v for v in vulnerabilities if v['Severity'] == 'High'])}")
print("\nFINDINGS SUMMARY:")
print("  üö® CRITICAL: Multiple evasion techniques achieve >70% success rate")
print("  ‚ö†Ô∏è  IMPACT: Toxic content can bypass moderation with simple obfuscation")
print("  üéØ ROOT CAUSE: Model relies on exact word matching, vulnerable to text transformation")
print("  üí° RECOMMENDATION: Implement defense-in-depth with normalization + adversarial training")
print("\nDELIVERABLES:")
print("  ‚úÖ Threat model documentation")
print("  ‚úÖ Attack surface analysis")
print("  ‚úÖ Proof-of-concept exploits (5 techniques)")
print("  ‚úÖ Vulnerability severity assessment")
print("  ‚úÖ Prioritized mitigation recommendations")
print("  ‚úÖ Remediation timeline")
print("\nNEXT STEPS:")
print("  1. Review findings with security and engineering teams")
print("  2. Prioritize mitigations based on risk scores")
print("  3. Implement immediate fixes (normalization, case handling)")
print("  4. Schedule follow-up red team in 3 months to verify fixes")
print("="*80)

print("\n" + "="*80)
print("RED TEAM ENGAGEMENT COMPLETE")
print("="*80)
print("\n‚úÖ All phases completed successfully")
print("üìã Comprehensive report generated")
print("üéØ Actionable mitigations provided")
print("\nüí° Remember: Red teaming is an ongoing process, not a one-time assessment.")
print("   Schedule regular engagements to stay ahead of evolving threats.")
print("="*80)