# 🔄 Data Preprocessing & Sanitization Hub
**CORE PIPELINE #1** - Drop Zone for Raw Training Materials

## 📍 Purpose
- **Drop Zone**: Where you place raw Hugging Face datasets, external JSON files, etc.
- **Sanitization**: Clean, validate, and format data for LLM/RAG pipelines
- **Quality Gates**: Ensure data meets training standards
- **Output Routing**: Send processed data to LLM, RAG, or Test pipelines

---


In [None]:
# Initialize Data Processing Environment
import os
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
import shutil
from typing import Dict, List, Any

# Pipeline Paths
BASE_DIR = Path("/home/jimmie/linkops-industries/SOAR-copilot/ai-training")
DROP_ZONE = BASE_DIR / "drop_zone"  # 📥 Where you drop raw files
PROCESSED = BASE_DIR / "processed"   # ✅ Clean, validated data
QUARANTINE = BASE_DIR / "quarantine" # ⚠️ Failed validation

# Create directories
for path in [DROP_ZONE, PROCESSED, QUARANTINE]:
    path.mkdir(exist_ok=True)
    
print("🚀 Data Processing Hub Initialized")
print(f"📥 Drop Zone: {DROP_ZONE}")
print(f"✅ Processed: {PROCESSED}")
print(f"⚠️ Quarantine: {QUARANTINE}")

## 📥 Step 1: Scan Drop Zone for New Files

In [None]:
def scan_drop_zone() -> List[Dict[str, Any]]:
    """Scan drop zone for new training materials"""
    files_found = []
    
    for file_path in DROP_ZONE.glob("**/*"):
        if file_path.is_file():
            files_found.append({
                "name": file_path.name,
                "path": str(file_path),
                "size_mb": round(file_path.stat().st_size / 1024 / 1024, 2),
                "extension": file_path.suffix,
                "modified": datetime.fromtimestamp(file_path.stat().st_mtime),
                "type": classify_file_type(file_path)
            })
    
    return files_found

def classify_file_type(file_path: Path) -> str:
    """Classify file type for processing pipeline"""
    ext = file_path.suffix.lower()
    name = file_path.name.lower()
    
    if ext in ['.jsonl', '.json'] and any(x in name for x in ['train', 'instruct', 'chat']):
        return 'llm_training_data'
    elif ext in ['.md', '.txt'] and any(x in name for x in ['playbook', 'knowledge', 'documentation']):
        return 'rag_knowledge'
    elif ext in ['.csv', '.xlsx'] and 'test' in name:
        return 'test_scenarios'
    elif ext == '.zip' or 'dataset' in name:
        return 'huggingface_dataset'
    else:
        return 'unknown'

# Scan for new files
new_files = scan_drop_zone()
print(f"📊 Found {len(new_files)} files in drop zone:")

for file_info in new_files[:5]:  # Show first 5
    print(f"📄 {file_info['name']} ({file_info['size_mb']}MB) - {file_info['type']}")
    
if len(new_files) > 5:
    print(f"... and {len(new_files) - 5} more files")

## 🧹 Step 2: Data Sanitization & Validation

In [None]:
def sanitize_llm_training_data(file_path: Path) -> Dict[str, Any]:
    """Sanitize and validate LLM training data"""
    result = {
        "status": "pending",
        "errors": [],
        "warnings": [],
        "examples_processed": 0,
        "output_path": None
    }
    
    try:
        # Load data
        if file_path.suffix == '.jsonl':
            examples = []
            with open(file_path, 'r') as f:
                for line_num, line in enumerate(f, 1):
                    try:
                        examples.append(json.loads(line.strip()))
                    except json.JSONDecodeError:
                        result["warnings"].append(f"Line {line_num}: Invalid JSON")
        else:
            with open(file_path, 'r') as f:
                data = json.load(f)
                examples = data if isinstance(data, list) else [data]
        
        # Validate format
        valid_examples = []
        for i, example in enumerate(examples):
            if validate_training_example(example):
                valid_examples.append(sanitize_training_example(example))
            else:
                result["errors"].append(f"Example {i}: Invalid format")
        
        # Save sanitized data
        if valid_examples:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_file = PROCESSED / f"llm_sanitized_{timestamp}.jsonl"
            
            with open(output_file, 'w') as f:
                for example in valid_examples:
                    f.write(json.dumps(example) + '\n')
            
            result.update({
                "status": "success",
                "examples_processed": len(valid_examples),
                "output_path": str(output_file)
            })
        else:
            result["status"] = "failed"
            result["errors"].append("No valid examples found")
            
    except Exception as e:
        result["status"] = "error"
        result["errors"].append(str(e))
    
    return result

def validate_training_example(example: Dict) -> bool:
    """Validate training example format"""
    required_fields = ['instruction', 'response']
    return all(field in example for field in required_fields)

def sanitize_training_example(example: Dict) -> Dict:
    """Clean and standardize training example"""
    sanitized = {
        "instruction": example["instruction"].strip(),
        "response": example["response"].strip() if isinstance(example["response"], str) else json.dumps(example["response"]),
        "category": example.get("category", "general"),
        "source": example.get("source", "external")
    }
    
    # Remove PII and sensitive data
    sanitized = remove_pii(sanitized)
    
    return sanitized

def remove_pii(example: Dict) -> Dict:
    """Remove personally identifiable information"""
    import re
    
    # Patterns to replace
    patterns = {
        r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b': '192.168.1.100',  # IP addresses
        r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b': 'user@company.com',  # Emails
        r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b': 'XXXX-XXXX-XXXX-XXXX'  # Credit cards
    }
    
    for field in ['instruction', 'response']:
        if field in example:
            text = example[field]
            for pattern, replacement in patterns.items():
                text = re.sub(pattern, replacement, text)
            example[field] = text
    
    return example

print("🧹 Data sanitization functions loaded")
print("Ready to process LLM training data")

## 🔄 Step 3: Process All Files in Drop Zone

In [None]:
def process_drop_zone_files():
    """Process all files in drop zone"""
    processing_report = {
        "timestamp": datetime.now().isoformat(),
        "files_processed": [],
        "success_count": 0,
        "error_count": 0,
        "warning_count": 0
    }
    
    new_files = scan_drop_zone()
    
    for file_info in new_files:
        print(f"\n🔄 Processing: {file_info['name']}")
        file_path = Path(file_info['path'])
        
        if file_info['type'] == 'llm_training_data':
            result = sanitize_llm_training_data(file_path)
        elif file_info['type'] == 'rag_knowledge':
            result = sanitize_rag_knowledge(file_path)
        elif file_info['type'] == 'test_scenarios':
            result = sanitize_test_scenarios(file_path)
        else:
            result = {"status": "skipped", "errors": ["Unknown file type"]}
        
        # Update report
        file_result = {
            "file": file_info['name'],
            "type": file_info['type'],
            "status": result['status'],
            "errors": result.get('errors', []),
            "warnings": result.get('warnings', []),
            "output": result.get('output_path')
        }
        
        processing_report["files_processed"].append(file_result)
        
        if result['status'] == 'success':
            processing_report["success_count"] += 1
            print(f"✅ Success: {result.get('examples_processed', 0)} examples processed")
            # Move original file to archive
            archive_dir = DROP_ZONE / "archive"
            archive_dir.mkdir(exist_ok=True)
            shutil.move(str(file_path), str(archive_dir / file_path.name))
        else:
            processing_report["error_count"] += 1
            print(f"❌ Failed: {'; '.join(result.get('errors', []))}")
            # Move to quarantine
            shutil.move(str(file_path), str(QUARANTINE / file_path.name))
        
        if result.get('warnings'):
            processing_report["warning_count"] += len(result['warnings'])
            print(f"⚠️ Warnings: {'; '.join(result['warnings'])}")
    
    # Save processing report
    report_file = PROCESSED / f"processing_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(report_file, 'w') as f:
        json.dump(processing_report, f, indent=2)
    
    return processing_report

def sanitize_rag_knowledge(file_path: Path) -> Dict[str, Any]:
    """Sanitize RAG knowledge files"""
    # Placeholder for RAG sanitization
    return {"status": "success", "output_path": "rag_processed.json"}

def sanitize_test_scenarios(file_path: Path) -> Dict[str, Any]:
    """Sanitize test scenario files"""
    # Placeholder for test sanitization  
    return {"status": "success", "output_path": "test_processed.json"}

# Process all files
if new_files:
    print(f"🚀 Processing {len(new_files)} files...")
    report = process_drop_zone_files()
    
    print(f"\n📊 PROCESSING COMPLETE")
    print(f"✅ Success: {report['success_count']}")
    print(f"❌ Errors: {report['error_count']}")
    print(f"⚠️ Warnings: {report['warning_count']}")
else:
    print("📭 No files to process in drop zone")
    print(f"💡 Drop your Hugging Face datasets, JSON files, or training data into:")
    print(f"   {DROP_ZONE}")

## 📋 Step 4: Route Processed Data to Pipelines

In [None]:
def route_processed_data():
    """Route processed data to appropriate pipelines"""
    
    # LLM Pipeline routing
    llm_files = list(PROCESSED.glob("llm_sanitized_*.jsonl"))
    if llm_files:
        llm_dest = BASE_DIR / "llm/data"
        llm_dest.mkdir(exist_ok=True)
        
        for file in llm_files:
            dest_file = llm_dest / f"processed_{file.name}"
            shutil.copy2(str(file), str(dest_file))
            print(f"📤 LLM: {file.name} → {dest_file}")
    
    # RAG Pipeline routing
    rag_files = list(PROCESSED.glob("rag_sanitized_*.json"))
    if rag_files:
        rag_dest = BASE_DIR / "rag/chunks"
        rag_dest.mkdir(exist_ok=True)
        
        for file in rag_files:
            dest_file = rag_dest / f"processed_{file.name}"
            shutil.copy2(str(file), str(dest_file))
            print(f"📤 RAG: {file.name} → {dest_file}")
    
    # Test Pipeline routing
    test_files = list(PROCESSED.glob("test_sanitized_*.json"))
    if test_files:
        test_dest = BASE_DIR / "testing/scenarios"
        test_dest.mkdir(exist_ok=True)
        
        for file in test_files:
            dest_file = test_dest / f"processed_{file.name}"
            shutil.copy2(str(file), str(dest_file))
            print(f"📤 TEST: {file.name} → {dest_file}")
    
    total_routed = len(llm_files) + len(rag_files) + len(test_files)
    print(f"\n📊 Routed {total_routed} files to pipelines")
    
    return {
        "llm_files": len(llm_files),
        "rag_files": len(rag_files), 
        "test_files": len(test_files)
    }

# Route processed data
routing_results = route_processed_data()
print("\n🎯 PROCESSING PIPELINE COMPLETE")
print("Next steps:")
print("📖 LLM Training: Open '02_LLM_Fine_Tuning_Pipeline.ipynb'")
print("📚 RAG Learning: Open '03_RAG_Knowledge_Pipeline.ipynb'")
print("🧪 Testing: Open '04_Testing_Validation_Pipeline.ipynb'")