In [None]:
# ===================================================================
# AI-POWERED DATA CLEANING PIPELINE FOR PLACEMENT EMAILS
# Using: spaCy Transformer + NER + Pattern Matching + Config Integration
# ===================================================================

from __future__ import print_function
import os, json, re, time, logging, unicodedata
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
from typing import List, Dict, Set, Any, Optional

# Import incremental state management
from incremental_state_management import (
    load_processed_ids, save_processed_ids,
    save_checkpoint, load_checkpoint, clear_checkpoint
)

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("AI_Cleaning")

# Try importing spaCy
try:
    import spacy
    from spacy.matcher import Matcher, PhraseMatcher
    from spacy.tokens import Span
    SPACY_AVAILABLE = True
    logger.info("spaCy available")
except ImportError:
    SPACY_AVAILABLE = False
    logger.warning("spaCy not available - install: pip install spacy")

logger.info("AI-Powered Data Cleaning Pipeline with Incremental Processing")

# Load configuration
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config = json.load(f)
        logger.info(f"Configuration loaded from: {config_path}")
        return config
    except FileNotFoundError:
        logger.error(f"Config file not found: {config_path}")
        raise
    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON in config file: {e}")
        raise

CONFIG = load_config()
INPUT_CSV = CONFIG.get('input_csv', '../Phase 1/placement_emails.csv')
OUTPUT_CSV = CONFIG.get('output_csv', 'ai_cleaned_emails.csv')
KNOWLEDGE_BASE = CONFIG.get('knowledge_base', {})
EXTRACTION_PATTERNS = CONFIG.get('extraction_patterns', {})
TEXT_CLEANING_CONFIG = CONFIG.get('text_cleaning', {})

# Incremental processing configuration
INCREMENTAL_CONFIG = CONFIG.get('incremental_processing', {})
INCREMENTAL_ENABLED = INCREMENTAL_CONFIG.get('enabled', True)
STATE_DIR = INCREMENTAL_CONFIG.get('state_directory', 'state')
STATE_FILE = os.path.join(STATE_DIR, INCREMENTAL_CONFIG.get('state_file', 'cleaned_message_ids.txt'))
FORCE_FULL_REPROCESS = INCREMENTAL_CONFIG.get('force_full_reprocess', False)

logger.info(f"Input: {INPUT_CSV} | Output: {OUTPUT_CSV}")
logger.info(f"Incremental Mode: {'ENABLED' if INCREMENTAL_ENABLED else 'DISABLED'}")
if FORCE_FULL_REPROCESS:
    logger.warning("FORCE FULL REPROCESS MODE - Will process all emails")
logger.info(f"Knowledge Base: {len(KNOWLEDGE_BASE.get('skills', []))} skills, {len(KNOWLEDGE_BASE.get('positions', []))} positions")

# Load spaCy model
def load_nlp_model():
    if not SPACY_AVAILABLE:
        return None
    for model_name in ["en_core_web_trf", "en_core_web_lg", "en_core_web_md", "en_core_web_sm"]:
        try:
            nlp = spacy.load(model_name)
            logger.info(f"Loaded spaCy model: {model_name}")
            return nlp
        except:
            continue
    logger.error("No spaCy model found! Install with: python -m spacy download en_core_web_sm")
    return None

NLP_MODEL = load_nlp_model()

# Convert config to sets for fast lookup
SKILLS_SET = set(skill.lower() for skill in KNOWLEDGE_BASE.get('skills', []))
POSITIONS_SET = set(pos.lower() for pos in KNOWLEDGE_BASE.get('positions', []))
LOCATIONS_SET = set(loc.lower() for loc in KNOWLEDGE_BASE.get('locations', []))
DEGREES_SET = set(deg.lower() for deg in KNOWLEDGE_BASE.get('degrees', []))

# Compile regex patterns
COMPANY_PATTERNS = [re.compile(p) for p in EXTRACTION_PATTERNS.get('company_patterns', [])]
SALARY_PATTERNS = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS.get('salary_patterns', [])]
EXPERIENCE_PATTERNS = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS.get('experience_patterns', [])]
REMOVE_PATTERNS = [re.compile(p, re.IGNORECASE | re.DOTALL) for p in TEXT_CLEANING_CONFIG.get('remove_patterns', [])]
MIN_LINE_LENGTH = TEXT_CLEANING_CONFIG.get('min_line_length', 5)

logger.info(f"Compiled {len(COMPANY_PATTERNS)} company, {len(SALARY_PATTERNS)} salary, {len(EXPERIENCE_PATTERNS)} experience patterns")

In [None]:
# Text cleaning functions
URL_RE = re.compile(r'https?://\S+|www\.\S+')
EMAIL_RE = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
PHONE_RE = re.compile(r'[\+]?[\d][\d\s\-\(\)]{7,}[\d]')
HTML_RE = re.compile(r'<[^>]+>')
NON_PRINTABLE = re.compile(r'[^\x20-\x7E\n\r]+')

def clean_text_ai(text: str) -> str:
    if not text or not isinstance(text, str):
        return ""
    text = unicodedata.normalize("NFKC", text)
    text = HTML_RE.sub(" ", text)
    text = URL_RE.sub(" ", text)
    text = EMAIL_RE.sub(" ", text)
    text = PHONE_RE.sub(" ", text)
    for pattern in REMOVE_PATTERNS:
        text = pattern.sub("", text)
    text = NON_PRINTABLE.sub(" ", text)
    text = re.sub(r'\n\s*\n+', "\n", text)
    text = re.sub(r'\s+', " ", text)
    lines = [line.strip() for line in text.split('\n') if len(line.strip()) > MIN_LINE_LENGTH]
    return " ".join(lines).strip()

In [None]:
# AI-powered entity extraction
def extract_with_ai(text: str, nlp_model) -> Dict[str, Any]:
    result = {'companies': set(), 'skills': set(), 'positions': set(), 'locations': set(),
              'salary_info': [], 'experience_required': [], 'degrees_required': set()}
    if not nlp_model or not text:
        return result
    try:
        doc = nlp_model(text[:5000])
        text_lower = text.lower()
        # Extract organizations
        for ent in doc.ents:
            if ent.label_ == "ORG":
                org = ent.text.strip()
                if len(org) > 2 and org.lower() not in ['we', 'our', 'the', 'a', 'an']:
                    result['companies'].add(org)
        for pattern in COMPANY_PATTERNS:
            for match in pattern.finditer(text):
                if len(match.group(0).strip()) > 3:
                    result['companies'].add(match.group(0).strip())
        # Extract skills
        for token in doc:
            if token.text.lower() in SKILLS_SET or token.lemma_.lower() in SKILLS_SET:
                result['skills'].add(token.text.lower())
        for chunk in doc.noun_chunks:
            if chunk.text.lower() in SKILLS_SET:
                result['skills'].add(chunk.text.lower())
        # Extract positions
        for chunk in doc.noun_chunks:
            if chunk.text.lower() in POSITIONS_SET:
                result['positions'].add(chunk.text.lower())
        for token in doc:
            if token.text.lower() in POSITIONS_SET:
                result['positions'].add(token.text.lower())
        # Extract locations
        for ent in doc.ents:
            if ent.label_ == "GPE" and ent.text.lower().strip() in LOCATIONS_SET:
                result['locations'].add(ent.text.lower().strip())
        for location in LOCATIONS_SET:
            if location in text_lower:
                result['locations'].add(location)
        # Extract salary and experience
        for pattern in SALARY_PATTERNS:
            result['salary_info'].extend([m.group(0).strip() for m in pattern.finditer(text)])
        for pattern in EXPERIENCE_PATTERNS:
            result['experience_required'].extend([m.group(0).strip() for m in pattern.finditer(text)])
        # Extract degrees
        for degree in DEGREES_SET:
            if re.search(r'\b' + re.escape(degree) + r'\b', text_lower):
                result['degrees_required'].add(degree)
    except Exception as e:
        logger.error(f"Extraction error: {str(e)[:100]}")
    # Convert to sorted lists
    for key in ['companies', 'skills', 'positions', 'locations', 'degrees_required']:
        result[key] = sorted(list(result[key]))
    result['salary_info'] = sorted(list(set(result['salary_info'])))
    result['experience_required'] = sorted(list(set(result['experience_required'])))
    return result

In [None]:
# Email processing
def process_email_ai(text: str, nlp_model) -> Dict[str, Any]:
    result = {'cleaned_text': '', 'companies': [], 'skills': [], 'positions': [], 'locations': [],
              'salary_info': [], 'experience_required': [], 'degrees_required': [],
              'word_count': 0, 'char_count': 0, 'processing_status': 'success'}
    try:
        cleaned = clean_text_ai(text)
        if not cleaned or len(cleaned) < 10:
            result['processing_status'] = 'empty_after_cleaning'
            return result
        result['cleaned_text'] = cleaned
        result['word_count'] = len(cleaned.split())
        result['char_count'] = len(cleaned)
        if nlp_model:
            result.update(extract_with_ai(cleaned, nlp_model))
        else:
            result['processing_status'] = 'no_ai_model'
    except Exception as e:
        result['processing_status'] = f'error: {str(e)[:100]}'
        logger.error(f"Processing error: {str(e)}")
    return result

# Batch processing with checkpoints
def process_batch_ai(df_to_process: pd.DataFrame, nlp_model, batch_size: int = 20, checkpoint_interval: int = 50) -> pd.DataFrame:
    total = len(df_to_process)
    logger.info(f"\nProcessing {total} NEW emails in batches of {batch_size}...")
    all_results = []
    processed_ids = set()
    stats = {'total': 0, 'with_companies': 0, 'with_skills': 0, 'with_positions': 0, 'with_locations': 0, 'with_salary': 0, 'empty': 0}
    start_time = time.time()
    for idx, row in df_to_process.iterrows():
        try:
            text = str(row.get('Subject', '')) + ' ' + str(row.get('Preview', '')) + ' ' + str(row.get('Body', ''))
            result = process_email_ai(text, nlp_model)
            
            # Add original columns
            result_row = row.to_dict()
            result_row.update({
                'cleaned_text': result['cleaned_text'],
                'word_count': result['word_count'],
                'char_count': result['char_count'],
                'processing_status': result['processing_status'],
                'companies_extracted': ', '.join(result['companies']),
                'skills_extracted': ', '.join(result['skills']),
                'positions_extracted': ', '.join(result['positions']),
                'locations_extracted': ', '.join(result['locations']),
                'salary_info': ', '.join(result['salary_info']),
                'experience_required': ', '.join(result['experience_required']),
                'degrees_required': ', '.join(result['degrees_required']),
                'company_count': len(result['companies']),
                'skill_count': len(result['skills']),
                'position_count': len(result['positions']),
                'location_count': len(result['locations'])
            })
            
            all_results.append(result_row)
            processed_ids.add(row['MessageId'])
            
            # Update stats
            stats['total'] += 1
            if result['companies']: stats['with_companies'] += 1
            if result['skills']: stats['with_skills'] += 1
            if result['positions']: stats['with_positions'] += 1
            if result['locations']: stats['with_locations'] += 1
            if result['salary_info']: stats['with_salary'] += 1
            if result['processing_status'] == 'empty_after_cleaning': stats['empty'] += 1
            
            # Save checkpoint
            if stats['total'] % checkpoint_interval == 0:
                save_checkpoint(STATE_DIR, processed_ids)
                logger.info(f"Processed {stats['total']}/{total} | Checkpoint saved")
        
        except Exception as e:
            logger.error(f"Error processing email {row.get('MessageId', 'unknown')}: {e}")
            continue
    
    elapsed = time.time() - start_time
    logger.info(f"\nProcessing complete in {elapsed:.2f}s ({total/elapsed:.1f} emails/sec)")
    logger.info(f"Stats: {stats['with_companies']} companies, {stats['with_skills']} skills, {stats['with_positions']} positions")
    
    return pd.DataFrame(all_results), processed_ids

In [None]:
# Main pipeline with incremental processing
def main_ai_pipeline_incremental(csv_path: str = None, output_path: str = None):
    csv_path = csv_path or INPUT_CSV
    output_path = output_path or OUTPUT_CSV
    
    if not NLP_MODEL:
        logger.error("Cannot proceed without NLP model!")
        return None
    
    # Load all emails from Phase 1
    logger.info(f"Loading dataset: {csv_path}")
    all_emails_df = pd.read_csv(csv_path)
    logger.info(f"Loaded {len(all_emails_df)} total emails from Phase 1")
    
    # Load existing cleaned data
    existing_df = pd.DataFrame()
    if os.path.exists(output_path) and INCREMENTAL_ENABLED and not FORCE_FULL_REPROCESS:
        existing_df = pd.read_csv(output_path)
        logger.info(f"Loaded {len(existing_df)} existing cleaned emails")
    
    # Determine which emails to process
    if INCREMENTAL_ENABLED and not FORCE_FULL_REPROCESS:
        # Load processed IDs from state
        processed_ids = load_processed_ids(STATE_FILE)
        
        # Also check checkpoint for crash recovery
        checkpoint_ids = load_checkpoint(STATE_DIR)
        processed_ids = processed_ids.union(checkpoint_ids)
        
        # Filter out already processed emails
        new_emails_df = all_emails_df[~all_emails_df['MessageId'].isin(processed_ids)]
        
        logger.info(f"Already processed: {len(processed_ids)} emails")
        logger.info(f"New emails to process: {len(new_emails_df)}")
        
        if len(new_emails_df) == 0:
            logger.info("✅ No new emails to process!")
            return existing_df
    else:
        new_emails_df = all_emails_df
        logger.info(f"Full reprocessing mode: Processing all {len(new_emails_df)} emails")
    
    # Process new emails
    new_results_df, new_processed_ids = process_batch_ai(new_emails_df, NLP_MODEL, batch_size=20)
    
    # Save processed IDs to state
    if INCREMENTAL_ENABLED:
        save_processed_ids(STATE_FILE, new_processed_ids)
        clear_checkpoint(STATE_DIR)
    
    # Merge with existing data
    if not existing_df.empty and INCREMENTAL_ENABLED and not FORCE_FULL_REPROCESS:
        logger.info("Merging with existing cleaned data...")
        combined_df = pd.concat([existing_df, new_results_df], ignore_index=True)
        logger.info(f"Total emails after merge: {len(combined_df)}")
    else:
        combined_df = new_results_df
    
    # Save combined results
    combined_df.to_csv(output_path, index=False)
    logger.info(f"✅ Saved {len(combined_df)} total cleaned emails to: {output_path}")
    logger.info(f"   ({len(new_results_df)} newly processed in this run)")
    
    return combined_df

In [None]:
# Run the incremental pipeline
df = main_ai_pipeline_incremental()

if df is not None:
    logger.info("\n" + "="*70)
    logger.info("PIPELINE COMPLETE!")
    logger.info("="*70)
    logger.info(f"Total cleaned emails: {len(df)}")
    logger.info(f"DataFrame shape: {df.shape}")
    print("\nSample output:")
    display(df[['Subject', 'companies_extracted', 'skills_extracted', 'positions_extracted']].head(3))