# Watermark Robustness Testing 🔍
Comprehensive testing of watermark persistence across real-world scenarios.

This notebook evaluates how well our Unicode-based watermarks survive:
- Copy-paste operations across different applications
- Manual editing and text processing
- Format conversions and encoding changes
- Common text modifications

**Key Question:** How robust are our watermarks against everyday text handling scenarios?

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import unicodedata
import html
from urllib.parse import quote, unquote
import base64
import json
from collections import defaultdict

# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

In [None]:
class WatermarkRobustnessAnalyzer:
    """
    Comprehensive watermark robustness testing suite
    """
    
    def __init__(self):
        self.detection_functions = {
            'zwsp': lambda text: '\u200b' in text,
            'arabic_comma': lambda text: '،' in text,
            'fullwidth_period': lambda text: '。' in text,
        }
        
        self.test_results = []
        
    def detect_watermarks(self, text):
        """Detect all watermark types in text"""
        if pd.isna(text) or not isinstance(text, str):
            return {'zwsp': False, 'arabic_comma': False, 'fullwidth_period': False, 'total_count': 0}
            
        results = {}
        for name, func in self.detection_functions.items():
            results[name] = func(text)
        
        results['total_count'] = sum(results.values())
        return results
    
    def ascii_only_test(self, text):
        """Simulate ASCII-only copy-paste (Notepad, SMS, legacy systems)"""
        return ''.join(c for c in text if ord(c) < 128)
    
    def spell_checker_test(self, text):
        """Simulate spell checker auto-corrections"""
        return (text.replace('،', ',')
                   .replace('。', '.')
                   .replace('\u200b', ''))
    
    def manual_editing_test(self, text):
        """Simulate common manual editing operations"""
        # Remove double spaces
        text = re.sub(r'\s+', ' ', text).strip()
        # Normalize punctuation spacing
        text = re.sub(r'\s*([,.!?;:])\s*', r'\1 ', text).strip()
        return text
    
    def social_media_test(self, text):
        """Simulate social media posting (character limits, potential normalization)"""
        # Character limit like Twitter
        truncated = text[:280]
        # Some platforms normalize certain Unicode
        return truncated
    
    def email_client_test(self, text):
        """Simulate email client forwarding (strips some Unicode)"""
        # Remove zero-width characters (common in email clients)
        return re.sub(r'[\u200b-\u200f\u2028-\u202f]', '', text)
    
    def unicode_normalization_test(self, text):
        """Test Unicode normalization effects"""
        return unicodedata.normalize('NFKC', text)
    
    def html_roundtrip_test(self, text):
        """Simulate HTML encoding/decoding"""
        return html.unescape(html.escape(text))
    
    def url_encoding_test(self, text):
        """Simulate URL encoding/decoding"""
        try:
            return unquote(quote(text, safe=''))
        except:
            return text
    
    def pdf_copy_test(self, text):
        """Simulate PDF copy-paste (often affects spacing)"""
        # PDF copy often normalizes whitespace
        return ' '.join(text.split())
    
    def paraphrasing_test(self, text):
        """Simulate human paraphrasing (minor word changes)"""
        # Simple word substitutions that humans might make
        substitutions = {
            'honestly': 'frankly',
            'really': 'truly', 
            'actually': 'in fact',
            'basically': 'essentially',
            'obviously': 'clearly'
        }
        
        result = text.lower()
        for old, new in substitutions.items():
            result = result.replace(old, new)
        return result
    
    def run_all_tests(self, text, sample_name="Unknown"):
        """Run all robustness tests on a sample"""
        original_watermarks = self.detect_watermarks(text)
        
        tests = {
            'ASCII-only copy-paste': self.ascii_only_test,
            'Spell checker simulation': self.spell_checker_test,
            'Manual editing': self.manual_editing_test,
            'Social media posting': self.social_media_test,
            'Email client forwarding': self.email_client_test,
            'Unicode normalization': self.unicode_normalization_test,
            'HTML encoding roundtrip': self.html_roundtrip_test,
            'URL encoding roundtrip': self.url_encoding_test,
            'PDF copy-paste': self.pdf_copy_test,
            'Human paraphrasing': self.paraphrasing_test
        }
        
        result = {
            'sample_name': sample_name,
            'original_text': text[:100] + "..." if len(text) > 100 else text,
            'original_watermarks': original_watermarks,
            'test_results': {}
        }
        
        for test_name, test_func in tests.items():
            try:
                modified_text = test_func(text)
                modified_watermarks = self.detect_watermarks(modified_text)
                
                survival_rate = (
                    modified_watermarks['total_count'] / 
                    max(1, original_watermarks['total_count'])
                )
                
                result['test_results'][test_name] = {
                    'watermarks_detected': modified_watermarks,
                    'survival_rate': survival_rate,
                    'status': 'PASS' if survival_rate >= 0.5 else 'FAIL',
                    'text_changed': modified_text != text,
                    'modified_text_preview': modified_text[:100] + "..." if len(modified_text) > 100 else modified_text
                }
            except Exception as e:
                result['test_results'][test_name] = {
                    'error': str(e),
                    'status': 'ERROR'
                }
        
        self.test_results.append(result)
        return result
    
    def analyze_results(self):
        """Analyze all test results and generate summary statistics"""
        if not self.test_results:
            return {}
        
        # Collect survival rates by test type
        test_survival = defaultdict(list)
        
        for result in self.test_results:
            for test_name, test_result in result['test_results'].items():
                if 'survival_rate' in test_result:
                    test_survival[test_name].append(test_result['survival_rate'])
        
        # Calculate statistics
        analysis = {}
        for test_name, survival_rates in test_survival.items():
            analysis[test_name] = {
                'avg_survival': np.mean(survival_rates),
                'min_survival': np.min(survival_rates),
                'max_survival': np.max(survival_rates),
                'std_survival': np.std(survival_rates),
                'samples_tested': len(survival_rates),
                'critical_failure': np.mean(survival_rates) < 0.3
            }
        
        return analysis

In [None]:
# Load watermarked data
print("Loading watermarked dataset...")

try:
    df_watermarked = pd.read_csv("../data/ai_samples_watermarked.csv")
    print(f"✅ Loaded {len(df_watermarked)} watermarked samples")
    print(f"Columns: {list(df_watermarked.columns)}")
    
    # Check for watermarks in first sample
    if len(df_watermarked) > 0 and 'Text_watermarked' in df_watermarked.columns:
        sample = df_watermarked.iloc[0]['Text_watermarked']
        analyzer = WatermarkRobustnessAnalyzer()
        sample_watermarks = analyzer.detect_watermarks(sample)
        
        print(f"\nSample text preview: {sample[:100]}...")
        print(f"Watermarks detected: {sample_watermarks['total_count']}/3")
        print(f"  ZWSP: {'✅' if sample_watermarks['zwsp'] else '❌'}")
        print(f"  Arabic comma: {'✅' if sample_watermarks['arabic_comma'] else '❌'}")
        print(f"  Fullwidth period: {'✅' if sample_watermarks['fullwidth_period'] else '❌'}")
    else:
        print("❌ Text_watermarked column not found")
        
except FileNotFoundError:
    print("❌ ai_samples_watermarked.csv not found")
    print("Please ensure you have run the watermarking notebook first")
except Exception as e:
    print(f"❌ Error loading data: {e}")

In [None]:
# Run robustness tests on sample of data
print("🧪 Running comprehensive robustness tests...\n")

analyzer = WatermarkRobustnessAnalyzer()

# Test first 10 samples (or all if fewer)
num_samples = min(10, len(df_watermarked))
print(f"Testing {num_samples} samples...\n")

for i in range(num_samples):
    watermarked_text = df_watermarked.iloc[i]['Text_watermarked']
    sample_name = f"Sample_{i+1}"
    
    result = analyzer.run_all_tests(watermarked_text, sample_name)
    
    if i == 0:  # Show detailed results for first sample
        print(f"📋 DETAILED RESULTS - {sample_name}")
        print(f"Original: {result['original_text']}")
        print(f"Original watermarks: {result['original_watermarks']['total_count']}/3\n")
        
        for test_name, test_result in result['test_results'].items():
            if 'survival_rate' in test_result:
                survival = test_result['survival_rate']
                status = test_result['status']
                watermarks = test_result['watermarks_detected']['total_count']
                emoji = "✅" if status == "PASS" else "❌"
                print(f"{emoji} {test_name:25s}: {watermarks}/3 watermarks ({survival:.1%})")
        print("\n" + "-"*60 + "\n")
    else:
        print(f"✅ Completed {sample_name}")

print(f"\n🎉 Testing complete! Analyzed {len(analyzer.test_results)} samples.")

In [None]:
# Analyze and summarize results
print("📊 ROBUSTNESS ANALYSIS SUMMARY")
print("="*60)

analysis = analyzer.analyze_results()

if analysis:
    # Sort by average survival rate
    sorted_tests = sorted(analysis.items(), key=lambda x: x[1]['avg_survival'])
    
    print("\nTEST RESULTS (sorted by survival rate):")
    print("-"*60)
    
    critical_failures = []
    moderate_risks = []
    strong_resistance = []
    
    for test_name, stats in sorted_tests:
        avg_survival = stats['avg_survival']
        samples = stats['samples_tested']
        
        if avg_survival < 0.3:
            status = "🔴 CRITICAL"
            critical_failures.append(test_name)
        elif avg_survival < 0.7:
            status = "🟡 MODERATE"
            moderate_risks.append(test_name)
        else:
            status = "🟢 STRONG"
            strong_resistance.append(test_name)
        
        print(f"{status} {test_name:30s}: {avg_survival:6.1%} (n={samples})")
    
    # Overall assessment
    overall_survival = np.mean([stats['avg_survival'] for stats in analysis.values()])
    print(f"\n📈 OVERALL SURVIVAL RATE: {overall_survival:.1%}")
    
    if overall_survival >= 0.8:
        assessment = "🟢 EXCELLENT - Highly robust watermarks"
    elif overall_survival >= 0.6:
        assessment = "🟡 MODERATE - Some vulnerabilities exist"
    else:
        assessment = "🔴 VULNERABLE - Significant robustness issues"
    
    print(f"📋 ASSESSMENT: {assessment}")
    
else:
    print("❌ No test results to analyze")

In [None]:
# Create visualizations
if analysis:
    # Prepare data for plotting
    test_names = list(analysis.keys())
    survival_rates = [analysis[test]['avg_survival'] for test in test_names]
    
    # Color code by risk level
    colors = []
    for rate in survival_rates:
        if rate < 0.3:
            colors.append('#ff4444')  # Red for critical
        elif rate < 0.7:
            colors.append('#ffaa00')  # Orange for moderate
        else:
            colors.append('#44ff44')  # Green for strong
    
    # Create horizontal bar chart
    plt.figure(figsize=(14, 10))
    y_pos = np.arange(len(test_names))
    
    bars = plt.barh(y_pos, survival_rates, color=colors, alpha=0.8)
    
    # Customize the plot
    plt.xlabel('Watermark Survival Rate (%)', fontsize=12)
    plt.title('Watermark Robustness Test Results\nSurvival Rate by Test Scenario', fontsize=14, fontweight='bold')
    plt.yticks(y_pos, test_names)
    plt.xlim(0, 1)
    
    # Add percentage labels on bars
    for i, (bar, rate) in enumerate(zip(bars, survival_rates)):
        plt.text(rate + 0.01, bar.get_y() + bar.get_height()/2, 
                f'{rate:.1%}', va='center', fontsize=10)
    
    # Add risk level zones
    plt.axvline(x=0.3, color='red', linestyle='--', alpha=0.7, label='Critical threshold (30%)')
    plt.axvline(x=0.7, color='orange', linestyle='--', alpha=0.7, label='Strong threshold (70%)')
    
    plt.legend()
    plt.tight_layout()
    plt.grid(axis='x', alpha=0.3)
    plt.show()
    
    # Summary statistics plot
    plt.figure(figsize=(12, 6))
    
    # Risk level distribution
    risk_counts = {
        'Critical (<30%)': len(critical_failures),
        'Moderate (30-70%)': len(moderate_risks), 
        'Strong (>70%)': len(strong_resistance)
    }
    
    plt.subplot(1, 2, 1)
    plt.pie(risk_counts.values(), labels=risk_counts.keys(), autopct='%1.0f%%',
            colors=['#ff4444', '#ffaa00', '#44ff44'])
    plt.title('Risk Level Distribution', fontweight='bold')
    
    # Overall survival histogram
    plt.subplot(1, 2, 2)
    plt.hist(survival_rates, bins=10, alpha=0.7, color='skyblue', edgecolor='black')
    plt.axvline(x=overall_survival, color='red', linestyle='-', linewidth=2, 
                label=f'Mean: {overall_survival:.1%}')
    plt.xlabel('Survival Rate')
    plt.ylabel('Number of Tests')
    plt.title('Survival Rate Distribution', fontweight='bold')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

else:
    print("❌ No data available for visualization")

In [None]:
# Generate actionable recommendations
print("💡 ACTIONABLE RECOMMENDATIONS")
print("="*60)

if analysis:
    print("\n🔴 CRITICAL VULNERABILITIES:")
    if critical_failures:
        for failure in critical_failures:
            print(f"   • {failure}")
        print("\n   IMMEDIATE ACTIONS:")
        print("   1. Implement ASCII-compatible watermarks (spacing patterns)")
        print("   2. Add multi-layer redundancy (Unicode + ASCII + structural)")
        print("   3. Test with real MS Word/Google Docs workflows")
    else:
        print("   ✅ No critical vulnerabilities found!")
    
    print("\n🟡 MODERATE RISKS:")
    if moderate_risks:
        for risk in moderate_risks:
            print(f"   • {risk}")
        print("\n   IMPROVEMENT ACTIONS:")
        print("   1. Increase watermark redundancy for these scenarios")
        print("   2. Consider format-specific watermarking strategies")
        print("   3. Add detection confidence scoring")
    else:
        print("   ✅ No moderate risks identified!")
    
    print("\n🟢 STRONG RESISTANCE:")
    if strong_resistance:
        for strength in strong_resistance:
            print(f"   • {strength}")
        print("\n   MAINTAIN CURRENT APPROACH for these scenarios")
    
    # Technical recommendations
    print("\n🔧 TECHNICAL IMPLEMENTATION PRIORITIES:")
    if overall_survival < 0.5:
        print("   🚨 URGENT: Complete watermarking strategy overhaul needed")
        print("   1. Multi-modal watermarking (Unicode + ASCII + statistical)")
        print("   2. Redundant encoding with error correction")
        print("   3. Application-specific watermark adaptation")
    elif overall_survival < 0.8:
        print("   ⚡ HIGH PRIORITY: Address critical failure modes")
        print("   1. ASCII-safe fallback watermarking")
        print("   2. Enhanced robustness for vulnerable scenarios")
        print("   3. Real-world testing with target applications")
    else:
        print("   🎯 OPTIMIZATION: Fine-tune for remaining vulnerabilities")
        print("   1. Steganographic improvements")
        print("   2. Detection accuracy optimization")
        print("   3. Performance and scalability enhancements")
    
    # Success metrics
    print("\n📊 TARGET METRICS:")
    print(f"   • Current overall survival: {overall_survival:.1%}")
    print("   • Target overall survival: >80%")
    print("   • Target critical scenario survival: >50%")
    print("   • Target stealth (human detection): <10%")
    
    print("\n🎯 NEXT STEPS:")
    print("   1. Run attack simulation tests (deliberate removal attempts)")
    print("   2. Implement multi-layer watermarking based on findings")
    print("   3. Test with real-world copy-paste workflows")
    print("   4. Build end-to-end detection pipeline with confidence scoring")

else:
    print("❌ Cannot generate recommendations without test results")

In [None]:
# Export results for further analysis
if analyzer.test_results:
    # Create summary for export
    export_data = {
        'test_summary': {
            'total_samples_tested': len(analyzer.test_results),
            'overall_survival_rate': overall_survival if 'overall_survival' in locals() else 0,
            'critical_failures': critical_failures if 'critical_failures' in locals() else [],
            'moderate_risks': moderate_risks if 'moderate_risks' in locals() else [],
            'strong_resistance': strong_resistance if 'strong_resistance' in locals() else []
        },
        'detailed_analysis': analysis,
        'test_results': analyzer.test_results
    }
    
    # Save to JSON
    import datetime
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'../data/robustness_test_results_{timestamp}.json'
    
    try:
        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2, default=str)
        print(f"✅ Results exported to: {filename}")
    except Exception as e:
        print(f"❌ Error exporting results: {e}")
    
    # Create summary DataFrame for easy viewing
    summary_df = pd.DataFrame([
        {
            'Test_Scenario': test_name,
            'Avg_Survival_Rate': f"{stats['avg_survival']:.1%}",
            'Risk_Level': ('Critical' if stats['avg_survival'] < 0.3 
                          else 'Moderate' if stats['avg_survival'] < 0.7 
                          else 'Strong'),
            'Samples_Tested': stats['samples_tested']
        }
        for test_name, stats in analysis.items()
    ])
    
    print("\n📋 SUMMARY TABLE:")
    print(summary_df.to_string(index=False))
    
    # Save summary CSV
    summary_filename = f'../data/robustness_summary_{timestamp}.csv'
    summary_df.to_csv(summary_filename, index=False)
    print(f"\n💾 Summary table saved to: {summary_filename}")

else:
    print("❌ No results to export")

## 🎯 Robustness Testing Conclusions

This comprehensive analysis reveals the real-world performance of our Unicode-based watermarking system across various text processing scenarios.

### Key Findings:

1. **Critical Vulnerabilities**: ASCII-only environments and spell checkers pose the greatest threats
2. **Strong Resistance**: Human editing and paraphrasing attacks are well-defended against
3. **Implementation Priority**: Multi-layer watermarking with ASCII-compatible fallbacks needed

### Next Steps:

1. **Attack Simulation Testing**: Test against deliberate removal attempts
2. **Enhanced Watermarking**: Implement redundant, multi-modal watermarks
3. **Real-world Validation**: Test with actual application workflows
4. **Production Pipeline**: Build end-to-end detection system

---

**Project Status**: Robustness analysis complete ✅  
**Next Notebook**: `07_attack_simulation.ipynb` or `08_enhanced_watermarking.ipynb`