

# Automated Meta Data Generation

This notebook provides a comprehensive document analysis system that can:

* Process various document formats (PDF, DOCX, TXT)
* Perform advanced NLP analysis
* Generate structured metadata
* Export results in multiple formats



### Features

* **Document Processing**: PDF, DOCX, TXT with OCR support
* **NLP Analysis**: Entity extraction, topic detection, sentiment analysis
* **Metadata Generation**: Structured document metadata
* **Export Options**: JSON, XML, YAML formats
* **Batch Processing**: Process multiple documents at once

---



### Installation Requirements

Run this cell to install all required dependencies:


In [4]:
# Install required packages
!pip install PyPDF2 python-docx pytesseract Pillow spacy textstat langdetect wordcloud matplotlib seaborn pandas numpy
!python -m spacy download en_core_web_sm




Could not find platform independent libraries <prefix>

[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
     ---------------------------------------- 0.0/12.8 MB ? eta -:--:--
     - -------------------------------------- 0.5/12.8 MB 9.0 MB/s eta 0:00:02
     ---- ----------------------------------- 1.3/12.8 MB 3.4 MB/s eta 0:00:04
     ------ --------------------------------- 2.1/12.8 MB 3.5 MB/s eta 0:00:04
     -------- ------------------------------- 2.6/12.8 MB 3.4 MB/s eta 0:00:04
     ---------- ----------------------------- 3.4/12.8 MB 3.3 MB/s eta 0:00:03
     ------------ --------------------------- 3.9/12.8 MB 3.3 MB/s eta 0:00:03
     -------------- ------------------------- 4.7/12.8 MB 3.1 MB/s eta 0:00:03
     --------------- ------------------------ 5.0/12.8 MB 3.1 MB/s eta 0:00:03
     ------------------ --------------------- 5.8/12.8 MB 3.0 MB/s eta 0:00:03
     ------------------- ----------------

Could not find platform independent libraries <prefix>
Could not find platform independent libraries <prefix>

[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [22]:
## Import Required Libraries
import os
import sys
import json
import uuid
import re
import logging
import pdfplumber
import unicodedata

from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
from collections import Counter, defaultdict
import warnings
warnings.filterwarnings('ignore')

# Document processing
import PyPDF2
from docx import Document as DocxDocument
try:
    import pytesseract
    from PIL import Image
    OCR_AVAILABLE = True
except ImportError:
    OCR_AVAILABLE = False

# NLP libraries
try:
    import spacy
    nlp = spacy.load("en_core_web_sm")
    SPACY_AVAILABLE = True
except (ImportError, OSError):
    SPACY_AVAILABLE = False

import textstat
from langdetect import detect
from langdetect.lang_detect_exception import LangDetectException


# Data analysis and visualization
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully!")
print(f"📊 SpaCy available: {SPACY_AVAILABLE}")
print(f"🔍 OCR available: {OCR_AVAILABLE}")


✅ All libraries imported successfully!
📊 SpaCy available: True
🔍 OCR available: True


### Document Processor Class

- Handles document processing for various file formats including PDF, DOCX, and TXT.  
- Includes OCR capabilities for image-based documents.


In [6]:

class DocumentProcessor:
    """
    A comprehensive document processor that extracts text from various formats
    and prepares it for metadata generation.
    """
    
    def __init__(self, tesseract_path: Optional[str] = None):
        """
        Initialize the document processor.
        
        Args:
            tesseract_path: Path to tesseract executable (if not in PATH)
        """
        self.supported_formats = ['.pdf', '.docx', '.txt', '.doc']
        
        # Set tesseract path if provided
        if tesseract_path:
            pytesseract.pytesseract.tesseract_cmd = tesseract_path
        
        # Test OCR availability
        self._test_ocr()
    
    def _test_ocr(self) -> bool:
        """Test if OCR is working properly."""
        try:
            pytesseract.get_tesseract_version()
            logger.info("OCR (Tesseract) is available")
            return True
        except Exception as e:
            logger.warning(f"OCR not available: {e}")
            return False
    
    def process_document(self, file_path: str) -> Dict:
        """
        Process a document and extract text content with metadata.
        
        Args:
            file_path: Path to the document file
            
        Returns:
            Dictionary containing extracted text and basic metadata
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        file_path = Path(file_path)
        file_extension = file_path.suffix.lower()
        
        if file_extension not in self.supported_formats:
            raise ValueError(f"Unsupported file format: {file_extension}")
        
        # Basic file metadata
        file_stats = file_path.stat()
        basic_metadata = {
            'filename': file_path.name,
            'file_size': file_stats.st_size,
            'file_extension': file_extension,
            'file_path': str(file_path)
        }
        
        # Extract text based on file type
        try:
            if file_extension == '.pdf':
                text_content = self._process_pdf(file_path)
            elif file_extension in ['.docx', '.doc']:
                text_content = self._process_docx(file_path)
            elif file_extension == '.txt':
                text_content = self._process_txt(file_path)
            else:
                raise ValueError(f"Handler not implemented for {file_extension}")
            
            # Process and clean text
            processed_text = self._clean_text(text_content)
            
            # Basic text statistics
            text_stats = self._get_text_statistics(processed_text)
            
            return {
                'raw_text': text_content,
                'processed_text': processed_text,
                'file_metadata': basic_metadata,
                'text_statistics': text_stats,
                'processing_status': 'success'
            }
            
        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            return {
                'raw_text': '',
                'processed_text': '',
                'file_metadata': basic_metadata,
                'text_statistics': {},
                'processing_status': 'failed',
                'error': str(e)
            }
    
    def _process_pdf(self, file_path: Path) -> str:
        """Extract text from PDF files with OCR fallback."""
        text_content = ""
        
        try:
            # Try pdfplumber first (better for structured PDFs)
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text_content += page_text + "\n"
            
            # If we got very little text, try OCR
            if len(text_content.strip()) < 100:
                logger.info(f"Low text extraction from {file_path.name}, trying OCR...")
                text_content = self._process_pdf_with_ocr(file_path)
                
        except Exception as e:
            logger.warning(f"pdfplumber failed for {file_path.name}: {e}")
            # Fallback to PyPDF2
            try:
                text_content = self._process_pdf_pypdf2(file_path)
            except Exception as e2:
                logger.warning(f"PyPDF2 also failed: {e2}")
                # Last resort: OCR
                text_content = self._process_pdf_with_ocr(file_path)
        
        return text_content
    
    def _process_pdf_pypdf2(self, file_path: Path) -> str:
        """Fallback PDF processing with PyPDF2."""
        text_content = ""
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            for page in pdf_reader.pages:
                text_content += page.extract_text() + "\n"
        return text_content
    
    def _process_pdf_with_ocr(self, file_path: Path) -> str:
        """Process PDF using OCR for scanned documents."""
        try:
            text_content = ""
            pdf_document = fitz.open(file_path)
            
            for page_num in range(pdf_document.page_count):
                page = pdf_document[page_num]
                
                # Convert page to image
                pix = page.get_pixmap()
                img_data = pix.tobytes("ppm")
                img = Image.open(io.BytesIO(img_data))
                
                # OCR the image
                page_text = pytesseract.image_to_string(img)
                text_content += page_text + "\n"
            
            pdf_document.close()
            return text_content
            
        except Exception as e:
            logger.error(f"OCR processing failed: {e}")
            return ""
    
    def _process_docx(self, file_path: Path) -> str:
        """Extract text from DOCX files."""
        try:
            doc = Document(file_path)
            text_content = ""
            
            # Extract paragraphs
            for paragraph in doc.paragraphs:
                text_content += paragraph.text + "\n"
            
            # Extract tables
            for table in doc.tables:
                for row in table.rows:
                    for cell in row.cells:
                        text_content += cell.text + " "
                    text_content += "\n"
            
            return text_content
            
        except Exception as e:
            logger.error(f"Error processing DOCX {file_path}: {e}")
            return ""
    
    def _process_txt(self, file_path: Path) -> str:
        """Process plain text files."""
        try:
            # Try different encodings
            encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
            
            for encoding in encodings:
                try:
                    with open(file_path, 'r', encoding=encoding) as file:
                        return file.read()
                except UnicodeDecodeError:
                    continue
            
            # If all encodings fail, read as binary and decode with errors='ignore'
            with open(file_path, 'rb') as file:
                return file.read().decode('utf-8', errors='ignore')
                
        except Exception as e:
            logger.error(f"Error processing TXT {file_path}: {e}")
            return ""
    
    def _clean_text(self, text: str) -> str:
        """Clean and normalize extracted text."""
        if not text:
            return ""
        
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove special characters but keep punctuation
        text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', ' ', text)
        
        # Remove multiple spaces
        text = re.sub(r' +', ' ', text)
        
        # Strip leading/trailing whitespace
        text = text.strip()
        
        return text
    
    def _get_text_statistics(self, text: str) -> Dict:
        """Generate basic statistics about the extracted text."""
        if not text:
            return {}
        
        # Basic counts
        char_count = len(text)
        word_count = len(text.split())
        sentence_count = len([s for s in text.split('.') if s.strip()])
        paragraph_count = len([p for p in text.split('\n') if p.strip()])
        
        # Language detection
        try:
            blob = TextBlob(text[:1000])  # Use first 1000 chars for language detection
            language = blob.detect_language()
        except:
            language = 'unknown'
        
        # Reading time estimation (average 200 words per minute)
        reading_time_minutes = max(1, round(word_count / 200))
        
        return {
            'character_count': char_count,
            'word_count': word_count,
            'sentence_count': sentence_count,
            'paragraph_count': paragraph_count,
            'detected_language': language,
            'estimated_reading_time_minutes': reading_time_minutes
        }
    
    def batch_process(self, directory_path: str) -> List[Dict]:
        """
        Process multiple documents in a directory.
        
        Args:
            directory_path: Path to directory containing documents
            
        Returns:
            List of processing results for each document
        """
        if not os.path.exists(directory_path):
            raise FileNotFoundError(f"Directory not found: {directory_path}")
        
        directory = Path(directory_path)
        results = []
        
        # Find all supported files
        for file_path in directory.iterdir():
            if file_path.is_file() and file_path.suffix.lower() in self.supported_formats:
                logger.info(f"Processing: {file_path.name}")
                result = self.process_document(str(file_path))
                results.append(result)
        
        return results




### NLP Analyzer Class

- Advanced NLP analysis for document metadata generation
- Uses rule-based and statistical approaches for robust analysis

In [7]:

@dataclass
class EntityInfo:
    """Information about extracted entities"""
    text: str
    entity_type: str
    confidence: float = 1.0
    context: str = ""

@dataclass
class TopicInfo:
    """Information about extracted topics"""
    topic: str
    keywords: List[str]
    confidence: float
    sentences: List[str]

class NLPAnalyzer:
    """
    Advanced NLP analysis for document metadata generation
    Uses rule-based and statistical approaches for robust analysis
    """
    
    def __init__(self):
        """Initialize the NLP analyzer with patterns and vocabularies"""
        logger.info("Initializing NLP Analyzer...")
        
        # Entity recognition patterns
        self.entity_patterns = {
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'phone': re.compile(r'(\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'),
            'url': re.compile(r'https?://(?:[-\w.])+(?:\:[0-9]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:\#(?:[\w.])*)?)?'),
            'date': re.compile(r'\b(?:\d{1,2}/\d{1,2}/\d{2,4}|\d{4}-\d{2}-\d{2}|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2},? \d{4})\b', re.IGNORECASE),
            'currency': re.compile(r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?|\b\d{1,3}(?:,\d{3})*(?:\.\d{2})? (?:USD|EUR|GBP|CAD)\b'),
            'percentage': re.compile(r'\b\d+(?:\.\d+)?%\b'),
            'organization': re.compile(r'\b(?:Inc|LLC|Corp|Ltd|Company|Organization|University|Institute|Department|Agency|Foundation|Association)\b', re.IGNORECASE),
        }
        
        # Common stop words for topic extraction
        self.stop_words = {
            'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
            'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
            'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those',
            'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your',
            'his', 'her', 'its', 'our', 'their', 'myself', 'yourself', 'himself', 'herself', 'itself',
            'ourselves', 'yourselves', 'themselves', 'what', 'which', 'who', 'whom', 'whose', 'where',
            'when', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some',
            'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 'can', 'just'
        }
        
        # Technical/domain keywords that indicate topics
        self.domain_keywords = {
            'technology': ['software', 'hardware', 'computer', 'system', 'application', 'program', 'code', 'algorithm', 'data', 'database', 'network', 'server', 'cloud', 'api', 'framework', 'platform'],
            'business': ['revenue', 'profit', 'sales', 'marketing', 'customer', 'client', 'market', 'strategy', 'business', 'company', 'organization', 'management', 'finance', 'budget', 'cost'],
            'research': ['study', 'research', 'analysis', 'experiment', 'hypothesis', 'methodology', 'results', 'conclusion', 'findings', 'data', 'sample', 'statistical', 'significant'],
            'medical': ['patient', 'treatment', 'diagnosis', 'medical', 'health', 'clinical', 'therapy', 'disease', 'symptom', 'medicine', 'doctor', 'hospital', 'healthcare'],
            'legal': ['contract', 'agreement', 'legal', 'law', 'court', 'case', 'plaintiff', 'defendant', 'attorney', 'lawyer', 'litigation', 'settlement', 'regulation', 'compliance'],
            'education': ['student', 'teacher', 'education', 'learning', 'course', 'curriculum', 'school', 'university', 'academic', 'degree', 'diploma', 'graduate', 'undergraduate'],
            'finance': ['investment', 'portfolio', 'stock', 'bond', 'asset', 'liability', 'equity', 'dividend', 'interest', 'loan', 'credit', 'debt', 'financial', 'banking']
        }
        
        logger.info("✅ NLP Analyzer initialized!")
    
    def analyze_document(self, text: str, filename: str = "") -> Dict:
        """
        Perform comprehensive NLP analysis on document text
        
        Args:
            text: Document text to analyze
            filename: Optional filename for context
            
        Returns:
            Dict containing all analysis results
        """
        logger.info(f"Starting NLP analysis for: {filename or 'text input'}")
        
        # Clean and preprocess text
        clean_text = self._preprocess_text(text)
        sentences = self._split_sentences(clean_text)
        words = self._extract_words(clean_text)
        
        # Perform various analyses
        analysis_results = {
            'entities': self._extract_entities(text),
            'topics': self._extract_topics(clean_text, sentences, words),
            'summary': self._generate_summary(sentences),
            'keywords': self._extract_keywords(words),
            'document_structure': self._analyze_structure(text),
            'sentiment': self._analyze_sentiment(clean_text),
            'readability': self._analyze_readability(sentences, words),
            'language_features': self._analyze_language_features(text, words)
        }
        
        logger.info("✅ NLP analysis completed")
        return analysis_results
    
    def _preprocess_text(self, text: str) -> str:
        """Clean and normalize text for analysis"""
        # Normalize unicode characters
        text = unicodedata.normalize('NFKD', text)
        
        # Remove excessive whitespace but preserve paragraph structure
        text = re.sub(r'[ \t]+', ' ', text)
        text = re.sub(r'\n\s*\n', '\n\n', text)
        
        return text.strip()
    
    def _split_sentences(self, text: str) -> List[str]:
        """Split text into sentences using rule-based approach"""
        # Simple sentence splitting - handles most cases
        sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _extract_words(self, text: str) -> List[str]:
        """Extract and clean words from text"""
        words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
        return [word for word in words if len(word) > 2]
    
    def _extract_entities(self, text: str) -> Dict[str, List[EntityInfo]]:
        """Extract named entities using pattern matching"""
        entities = defaultdict(list)
        
        for entity_type, pattern in self.entity_patterns.items():
            matches = pattern.finditer(text)
            for match in matches:
                entity_text = match.group().strip()
                if entity_text:  # Avoid empty matches
                    # Get context (20 characters before and after)
                    start = max(0, match.start() - 20)
                    end = min(len(text), match.end() + 20)
                    context = text[start:end].replace('\n', ' ')
                    
                    entities[entity_type].append(EntityInfo(
                        text=entity_text,
                        entity_type=entity_type,
                        confidence=0.9,  # High confidence for pattern matches
                        context=context
                    ))
        
        # Remove duplicates
        for entity_type in entities:
            seen = set()
            unique_entities = []
            for entity in entities[entity_type]:
                if entity.text.lower() not in seen:
                    seen.add(entity.text.lower())
                    unique_entities.append(entity)
            entities[entity_type] = unique_entities
        
        return dict(entities)
    
    def _extract_topics(self, text: str, sentences: List[str], words: List[str]) -> List[TopicInfo]:
        """Extract main topics using keyword analysis and domain detection"""
        topics = []
        
        # Count word frequencies (excluding stop words)
        word_freq = Counter([word for word in words if word not in self.stop_words])
        
        # Identify domain-specific topics
        domain_scores = defaultdict(int)
        domain_keywords_found = defaultdict(list)
        
        for domain, keywords in self.domain_keywords.items():
            for keyword in keywords:
                if keyword in word_freq:
                    domain_scores[domain] += word_freq[keyword]
                    domain_keywords_found[domain].append(keyword)
        
        # Create topic info for top domains
        for domain, score in sorted(domain_scores.items(), key=lambda x: x[1], reverse=True)[:3]:
            if score > 2:  # Minimum threshold
                relevant_sentences = []
                for sentence in sentences[:10]:  # Check first 10 sentences
                    if any(keyword in sentence.lower() for keyword in domain_keywords_found[domain]):
                        relevant_sentences.append(sentence)
                
                topics.append(TopicInfo(
                    topic=domain.title(),
                    keywords=domain_keywords_found[domain][:5],
                    confidence=min(score / 10, 1.0),
                    sentences=relevant_sentences[:3]
                ))
        
        # Add general high-frequency topics
        top_words = [word for word, count in word_freq.most_common(10) if count > 2]
        if top_words:
            topics.append(TopicInfo(
                topic="General",
                keywords=top_words[:5],
                confidence=0.5,
                sentences=sentences[:2]
            ))
        
        return topics
    
    def _extract_keywords(self, words: List[str]) -> List[Tuple[str, int]]:
        """Extract important keywords with frequency"""
        word_freq = Counter([word for word in words if word not in self.stop_words])
        
        # Filter out very common and very rare words
        total_words = len(words)
        keywords = []
        
        for word, freq in word_freq.most_common(20):
            # Skip if too rare (< 0.1%) or too common (> 5%)
            percentage = freq / total_words
            if 0.001 <= percentage <= 0.05 and len(word) > 3:
                keywords.append((word, freq))
        
        return keywords
    
    def _generate_summary(self, sentences: List[str], max_sentences: int = 3) -> str:
        """Generate extractive summary using sentence scoring"""
        if not sentences:
            return ""
        
        if len(sentences) <= max_sentences:
            return " ".join(sentences)
        
        # Score sentences based on length and position
        sentence_scores = []
        for i, sentence in enumerate(sentences):
            score = 0
            
            # Position score (earlier sentences are more important)
            position_score = 1.0 - (i / len(sentences))
            score += position_score * 0.3
            
            # Length score (medium length sentences preferred)
            word_count = len(sentence.split())
            if 10 <= word_count <= 30:
                score += 0.4
            elif 5 <= word_count < 10 or 30 < word_count <= 50:
                score += 0.2
            
            # Keyword score - sentences with important words
            important_words = ['important', 'significant', 'key', 'main', 'primary', 'conclusion', 'result']
            if any(word in sentence.lower() for word in important_words):
                score += 0.3
            
            sentence_scores.append((sentence, score))
        
        # Select top sentences
        top_sentences = sorted(sentence_scores, key=lambda x: x[1], reverse=True)[:max_sentences]
        
        # Return in original order
        selected_indices = []
        for sentence, _ in top_sentences:
            try:
                idx = sentences.index(sentence)
                selected_indices.append(idx)
            except ValueError:
                continue
        
        selected_indices.sort()
        summary_sentences = [sentences[i] for i in selected_indices]
        
        return " ".join(summary_sentences)
    
    def _analyze_structure(self, text: str) -> Dict:
        """Analyze document structure"""
        structure = {
            'has_title': False,
            'has_headings': False,
            'has_lists': False,
            'has_tables': False,
            'sections': 0,
            'paragraphs': len(text.split('\n\n'))
        }
        
        # Check for title (first line in caps or with specific patterns)
        lines = text.split('\n')
        if lines:
            first_line = lines[0].strip()
            if (first_line.isupper() or 
                any(char in first_line for char in [':', '=', '-']) and len(first_line) < 100):
                structure['has_title'] = True
        
        # Check for headings (lines with specific patterns)
        heading_patterns = [
            r'^[A-Z][A-Za-z\s]+:$',  # "Section Name:"
            r'^#+\s',                 # Markdown headers
            r'^[IVX]+\.',            # Roman numerals
            r'^\d+\.',               # Numbered sections
        ]
        
        for line in lines:
            line = line.strip()
            if any(re.match(pattern, line) for pattern in heading_patterns):
                structure['has_headings'] = True
                structure['sections'] += 1
        
        # Check for lists
        if re.search(r'^\s*[-*•]\s', text, re.MULTILINE):
            structure['has_lists'] = True
        
        # Check for tables (simple heuristic)
        if '|' in text or '\t' in text:
            structure['has_tables'] = True
        
        return structure
    
    def _analyze_sentiment(self, text: str) -> Dict:
        """Basic sentiment analysis using word lists"""
        positive_words = {
            'good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic', 'outstanding',
            'positive', 'success', 'successful', 'achieve', 'achievement', 'improve', 'improvement',
            'benefit', 'advantage', 'effective', 'efficient', 'valuable', 'important', 'significant'
        }
        
        negative_words = {
            'bad', 'terrible', 'awful', 'horrible', 'poor', 'negative', 'fail', 'failure',
            'problem', 'issue', 'challenge', 'difficult', 'hard', 'impossible', 'wrong',
            'error', 'mistake', 'concern', 'risk', 'threat', 'danger', 'loss', 'decline'
        }
        
        words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
        
        positive_count = sum(1 for word in words if word in positive_words)
        negative_count = sum(1 for word in words if word in negative_words)
        total_sentiment_words = positive_count + negative_count
        
        if total_sentiment_words == 0:
            sentiment = "neutral"
            score = 0.0
        else:
            score = (positive_count - negative_count) / total_sentiment_words
            if score > 0.1:
                sentiment = "positive"
            elif score < -0.1:
                sentiment = "negative"
            else:
                sentiment = "neutral"
        
        return {
            'sentiment': sentiment,
            'score': score,
            'positive_words': positive_count,
            'negative_words': negative_count
        }
    
    def _analyze_readability(self, sentences: List[str], words: List[str]) -> Dict:
        """Calculate readability metrics"""
        if not sentences or not words:
            return {'flesch_score': 0, 'reading_level': 'unknown'}
        
        avg_sentence_length = len(words) / len(sentences)
        
        # Count syllables (rough approximation)
        def count_syllables(word):
            vowels = 'aeiouy'
            syllables = 0
            prev_char_vowel = False
            
            for char in word.lower():
                if char in vowels:
                    if not prev_char_vowel:
                        syllables += 1
                    prev_char_vowel = True
                else:
                    prev_char_vowel = False
            
            return max(1, syllables)  # Every word has at least 1 syllable
        
        total_syllables = sum(count_syllables(word) for word in words)
        avg_syllables_per_word = total_syllables / len(words)
        
        # Flesch Reading Ease Score
        flesch_score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * avg_syllables_per_word)
        
        # Determine reading level
        if flesch_score >= 90:
            reading_level = "very_easy"
        elif flesch_score >= 80:
            reading_level = "easy"
        elif flesch_score >= 70:
            reading_level = "fairly_easy"
        elif flesch_score >= 60:
            reading_level = "standard"
        elif flesch_score >= 50:
            reading_level = "fairly_difficult"
        elif flesch_score >= 30:
            reading_level = "difficult"
        else:
            reading_level = "very_difficult"
        
        return {
            'flesch_score': round(flesch_score, 1),
            'reading_level': reading_level,
            'avg_sentence_length': round(avg_sentence_length, 1),
            'avg_syllables_per_word': round(avg_syllables_per_word, 2)
        }
    
    def _analyze_language_features(self, text: str, words: List[str]) -> Dict:
        """Analyze various language features"""
        features = {
            'lexical_diversity': 0,
            'formality_score': 0,
            'complexity_indicators': []
        }
        
        # Lexical diversity (unique words / total words)
        if words:
            unique_words = set(words)
            features['lexical_diversity'] = round(len(unique_words) / len(words), 3)
        
        # Formality indicators
        formal_indicators = ['therefore', 'however', 'furthermore', 'moreover', 'consequently', 
                           'nevertheless', 'accordingly', 'thus', 'hence', 'whereas']
        informal_indicators = ['really', 'pretty', 'quite', 'very', 'totally', 'basically', 
                             'actually', 'literally', 'obviously', 'definitely']
        
        formal_count = sum(1 for word in words if word in formal_indicators)
        informal_count = sum(1 for word in words if word in informal_indicators)
        
        if formal_count + informal_count > 0:
            features['formality_score'] = formal_count / (formal_count + informal_count)
        
        # Complexity indicators
        if re.search(r'\b(?:complex|complicated|sophisticated|intricate)\b', text, re.IGNORECASE):
            features['complexity_indicators'].append('complex_vocabulary')
        
        if re.search(r'[;:]', text):
            features['complexity_indicators'].append('complex_punctuation')
        
        # Average word length
        if words:
            avg_word_length = sum(len(word) for word in words) / len(words)
            if avg_word_length > 5:
                features['complexity_indicators'].append('long_words')
        
        return features


### Metadata Generator
 
- Generates structured metadata from document processing and NLP analysis results

In [8]:

@dataclass
class DocumentMetadata:
    """Structured metadata for a document"""
    # Basic document information
    document_id: str
    filename: str
    file_size: int
    file_type: str
    processing_date: str
    
    # Content analysis
    title: Optional[str]
    summary: str
    language: str
    
    # Text statistics
    word_count: int
    sentence_count: int
    paragraph_count: int
    reading_time_minutes: int
    
    # Topics and keywords
    primary_topics: List[str]
    keywords: List[str]
    entities: Dict[str, List[str]]
    
    # Document characteristics
    document_type: str
    formality_level: str
    complexity_level: str
    sentiment: str
    
    # Technical metadata
    readability_score: float
    readability_level: str
    structure_score: float
    
    # Additional fields
    tags: List[str]
    categories: List[str]
    confidence_score: float

class MetadataGenerator:
    """
    Generates structured metadata from document processing and NLP analysis results
    """
    
    def __init__(self):
        """Initialize the metadata generator"""
        logger.info("Initializing Metadata Generator...")
        
        # Document type classification patterns
        self.document_type_patterns = {
            'research_paper': ['abstract', 'methodology', 'conclusion', 'references', 'hypothesis'],
            'business_report': ['executive summary', 'revenue', 'quarterly', 'analysis', 'recommendations'],
            'technical_document': ['system', 'architecture', 'implementation', 'configuration', 'api'],
            'legal_document': ['agreement', 'contract', 'terms', 'conditions', 'liability'],
            'manual': ['instructions', 'steps', 'procedure', 'guide', 'how to'],
            'presentation': ['slide', 'agenda', 'overview', 'introduction', 'thank you'],
            'article': ['article', 'author', 'published', 'journal', 'volume'],
            'proposal': ['proposal', 'budget', 'timeline', 'objectives', 'deliverables']
        }
        
        # Category mapping based on topics
        self.category_mapping = {
            'technology': ['Information Technology', 'Software Development', 'Digital Innovation'],
            'business': ['Business Strategy', 'Finance', 'Marketing', 'Management'],
            'research': ['Academic Research', 'Scientific Study', 'Data Analysis'],
            'medical': ['Healthcare', 'Medical Research', 'Clinical Studies'],
            'legal': ['Legal Documents', 'Contracts', 'Compliance'],
            'education': ['Educational Content', 'Training Materials', 'Academic'],
            'finance': ['Financial Analysis', 'Investment', 'Banking']
        }
        
        logger.info("✅ Metadata Generator initialized!")
    
    def generate_metadata(self, 
                         document_result: Dict, 
                         nlp_result: Dict,
                         custom_tags: List[str] = None) -> DocumentMetadata:
        """
        Generate comprehensive metadata from processing results
        
        Args:
            document_result: Output from DocumentProcessor
            nlp_result: Output from NLPAnalyzer
            custom_tags: Optional custom tags to add
            
        Returns:
            DocumentMetadata object with all extracted information
        """
        logger.info(f"Generating metadata for: {document_result.get('file_metadata', {}).get('filename', 'unknown')}")
        
        # Extract basic information
        file_meta = document_result.get('file_metadata', {})
        text_stats = document_result.get('text_statistics', {})
        
        # Generate unique document ID
        doc_id = str(uuid.uuid4())[:8]
        
        # Extract title (heuristic approach)
        title = self._extract_title(document_result.get('processed_text', ''))
        
        # Determine document type
        doc_type = self._classify_document_type(
            document_result.get('processed_text', ''), 
            nlp_result
        )
        
        # Extract and process topics
        topics = self._process_topics(nlp_result.get('topics', []))
        
        # Extract keywords
        keywords = self._process_keywords(nlp_result.get('keywords', []))
        
        # Process entities
        entities = self._process_entities(nlp_result.get('entities', {}))
        
        # Determine categories
        categories = self._determine_categories(topics, entities, keywords)
        
        # Generate tags
        tags = self._generate_tags(topics, keywords, doc_type, custom_tags or [])
        
        # Calculate scores
        confidence_score = self._calculate_confidence_score(nlp_result)
        structure_score = self._calculate_structure_score(nlp_result.get('document_structure', {}))
        
        # Determine complexity and formality
        complexity_level = self._determine_complexity_level(nlp_result)
        formality_level = self._determine_formality_level(nlp_result)
        
        # Create metadata object
        metadata = DocumentMetadata(
            # Basic information
            document_id=doc_id,
            filename=file_meta.get('filename', 'unknown'),
            file_size=file_meta.get('file_size', 0),
            file_type=file_meta.get('file_extension', '').upper().replace('.', ''),
            processing_date=datetime.now().isoformat(),
            
            # Content
            title=title,
            summary=nlp_result.get('summary', ''),
            language=text_stats.get('detected_language', 'unknown'),
            
            # Statistics
            word_count=text_stats.get('word_count', 0),
            sentence_count=text_stats.get('sentence_count', 0),
            paragraph_count=text_stats.get('paragraph_count', 0),
            reading_time_minutes=text_stats.get('estimated_reading_time_minutes', 0),
            
            # Analysis results
            primary_topics=topics[:5],  # Top 5 topics
            keywords=keywords[:10],     # Top 10 keywords
            entities=entities,
            
            # Classification
            document_type=doc_type,
            formality_level=formality_level,
            complexity_level=complexity_level,
            sentiment=nlp_result.get('sentiment', {}).get('sentiment', 'neutral'),
            
            # Technical scores
            readability_score=nlp_result.get('readability', {}).get('flesch_score', 0),
            readability_level=nlp_result.get('readability', {}).get('reading_level', 'unknown'),
            structure_score=structure_score,
            
            # Organization
            tags=tags,
            categories=categories,
            confidence_score=confidence_score
        )
        
        logger.info("✅ Metadata generation completed")
        return metadata
    
    def _extract_title(self, text: str) -> Optional[str]:
        """Extract document title using heuristics"""
        if not text:
            return None
        
        lines = text.split('\n')
        for line in lines[:5]:  # Check first 5 lines
            line = line.strip()
            if line and len(line) > 5 and len(line) < 100:
                # Check if it looks like a title
                if (line.isupper() or 
                    line.count(' ') < 10 or 
                    any(char in line for char in [':', '=', '-']) or
                    not line.endswith('.')):
                    return line
        
        # Fallback: use first sentence if it's short enough
        sentences = text.split('.')
        if sentences and len(sentences[0]) < 100:
            return sentences[0].strip()
        
        return None
    
    def _classify_document_type(self, text: str, nlp_result: Dict) -> str:
        """Classify document type based on content patterns"""
        text_lower = text.lower()
        
        # Score each document type
        type_scores = {}
        for doc_type, patterns in self.document_type_patterns.items():
            score = sum(1 for pattern in patterns if pattern in text_lower)
            if score > 0:
                type_scores[doc_type] = score
        
        # Also consider structure
        structure = nlp_result.get('document_structure', {})
        if structure.get('has_title') and structure.get('has_headings'):
            type_scores['report'] = type_scores.get('report', 0) + 2
        
        # Return highest scoring type or 'document' as default
        if type_scores:
            return max(type_scores.items(), key=lambda x: x[1])[0].replace('_', ' ').title()
        
        return 'Document'
    
    def _process_topics(self, topics: List) -> List[str]:
        """Process and clean topic information"""
        processed_topics = []
        for topic in topics:
            if hasattr(topic, 'topic'):
                topic_name = topic.topic
            elif isinstance(topic, dict):
                topic_name = topic.get('topic', '')
            else:
                topic_name = str(topic)
            
            if topic_name and topic_name.lower() != 'general':
                processed_topics.append(topic_name.title())
        
        return processed_topics
    
    def _process_keywords(self, keywords: List) -> List[str]:
        """Process and clean keyword information"""
        processed_keywords = []
        for keyword in keywords:
            if isinstance(keyword, tuple):
                word = keyword[0]
            elif isinstance(keyword, dict):
                word = keyword.get('word', '')
            else:
                word = str(keyword)
            
            if word and len(word) > 3:
                processed_keywords.append(word.lower())
        
        return processed_keywords
    
    def _process_entities(self, entities: Dict) -> Dict[str, List[str]]:
        """Process and clean entity information"""
        processed_entities = {}
        
        for entity_type, entity_list in entities.items():
            clean_entities = []
            for entity in entity_list:
                if hasattr(entity, 'text'):
                    entity_text = entity.text
                elif isinstance(entity, dict):
                    entity_text = entity.get('text', '')
                else:
                    entity_text = str(entity)
                
                if entity_text:
                    clean_entities.append(entity_text)
            
            if clean_entities:
                processed_entities[entity_type.replace('_', ' ').title()] = clean_entities[:5]  # Limit to 5 per type
        
        return processed_entities
    
    def _determine_categories(self, topics: List[str], entities: Dict, keywords: List[str]) -> List[str]:
        """Determine document categories based on content analysis"""
        categories = set()
        
        # Category from topics
        for topic in topics:
            topic_lower = topic.lower()
            for domain, cats in self.category_mapping.items():
                if domain in topic_lower:
                    categories.update(cats[:2])  # Add first 2 categories
        
        # Category from entities
        if 'Email' in entities or 'Phone' in entities:
            categories.add('Contact Information')
        if 'Organization' in entities:
            categories.add('Organizational')
        if 'Date' in entities:
            categories.add('Time-Sensitive')
        
        # Category from keywords
        keyword_str = ' '.join(keywords).lower()
        if any(word in keyword_str for word in ['report', 'analysis', 'study']):
            categories.add('Analytical')
        if any(word in keyword_str for word in ['policy', 'procedure', 'guideline']):
            categories.add('Procedural')
        
        return list(categories)[:5]  # Limit to 5 categories
    
    def _generate_tags(self, topics: List[str], keywords: List[str], doc_type: str, custom_tags: List[str]) -> List[str]:
        """Generate relevant tags for the document"""
        tags = set()
        
        # Add topic-based tags
        tags.update([topic.lower().replace(' ', '_') for topic in topics])
        
        # Add document type tag
        tags.add(doc_type.lower().replace(' ', '_'))
        
        # Add high-frequency keyword tags
        tags.update(keywords[:5])
        
        # Add custom tags
        tags.update([tag.lower().replace(' ', '_') for tag in custom_tags])
        
        # Clean and return
        clean_tags = [tag for tag in tags if len(tag) > 2 and len(tag) < 20]
        return sorted(clean_tags)[:10]  # Limit to 10 tags
    
    def _calculate_confidence_score(self, nlp_result: Dict) -> float:
        """Calculate overall confidence score for the metadata"""
        scores = []
        
        # Topic confidence
        topics = nlp_result.get('topics', [])
        if topics:
            topic_confidences = []
            for topic in topics:
                if hasattr(topic, 'confidence'):
                    topic_confidences.append(topic.confidence)
                elif isinstance(topic, dict):
                    topic_confidences.append(topic.get('confidence', 0.5))
            if topic_confidences:
                scores.append(sum(topic_confidences) / len(topic_confidences))
        
        # Entity confidence (pattern-based entities have high confidence)
        entities = nlp_result.get('entities', {})
        if entities:
            scores.append(0.8)  # High confidence for pattern-based extraction
        
        # Readability confidence
        readability = nlp_result.get('readability', {})
        if readability.get('flesch_score', 0) > 0:
            scores.append(0.7)
        
        # Structure confidence
        structure = nlp_result.get('document_structure', {})
        structure_indicators = sum([
            structure.get('has_title', False),
            structure.get('has_headings', False),
            structure.get('sections', 0) > 0
        ])
        scores.append(structure_indicators / 3)
        
        return round(sum(scores) / len(scores) if scores else 0.5, 2)
    
    def _calculate_structure_score(self, structure: Dict) -> float:
        """Calculate document structure score"""
        score = 0
        
        # Points for structural elements
        if structure.get('has_title'):
            score += 0.2
        if structure.get('has_headings'):
            score += 0.3
        if structure.get('has_lists'):
            score += 0.2
        if structure.get('sections', 0) > 0:
            score += 0.2
        if structure.get('paragraphs', 0) > 1:
            score += 0.1
        
        return round(min(score, 1.0), 2)
    
    def _determine_complexity_level(self, nlp_result: Dict) -> str:
        """Determine document complexity level"""
        readability = nlp_result.get('readability', {})
        language_features = nlp_result.get('language_features', {})
        
        flesch_score = readability.get('flesch_score', 50)
        lexical_diversity = language_features.get('lexical_diversity', 0.5)
        complexity_indicators = len(language_features.get('complexity_indicators', []))
        
        # Calculate complexity score
        complexity_score = 0
        
        if flesch_score < 30:
            complexity_score += 3
        elif flesch_score < 50:
            complexity_score += 2
        elif flesch_score < 70:
            complexity_score += 1
        
        if lexical_diversity > 0.7:
            complexity_score += 2
        elif lexical_diversity > 0.5:
            complexity_score += 1
        
        complexity_score += complexity_indicators
        
        if complexity_score >= 5:
            return 'High'
        elif complexity_score >= 3:
            return 'Medium'
        else:
            return 'Low'
    
    def _determine_formality_level(self, nlp_result: Dict) -> str:
        """Determine document formality level"""
        language_features = nlp_result.get('language_features', {})
        formality_score = language_features.get('formality_score', 0.5)
        
        if formality_score > 0.7:
            return 'Formal'
        elif formality_score > 0.3:
            return 'Semi-formal'
        else:
            return 'Informal'
    
    def export_metadata(self, metadata: DocumentMetadata, format: str = 'json') -> str:
        """
        Export metadata in various formats
        
        Args:
            metadata: DocumentMetadata object
            format: Export format ('json', 'xml', 'yaml')
            
        Returns:
            Formatted metadata string
        """
        metadata_dict = asdict(metadata)
        
        if format.lower() == 'json':
            return json.dumps(metadata_dict, indent=2, ensure_ascii=False)
        
        elif format.lower() == 'xml':
            return self._dict_to_xml(metadata_dict, 'document_metadata')
        
        elif format.lower() == 'yaml':
            try:
                import yaml
                return yaml.dump(metadata_dict, default_flow_style=False)
            except ImportError:
                logger.warning("PyYAML not installed, falling back to JSON")
                return json.dumps(metadata_dict, indent=2)
        
        else:
            return json.dumps(metadata_dict, indent=2)
    
    def _dict_to_xml(self, data: Dict, root_name: str) -> str:
        """Convert dictionary to XML format"""
        def dict_to_xml_recursive(d, root):
            xml_str = f"<{root}>\n"
            for key, value in d.items():
                if isinstance(value, dict):
                    xml_str += f"  {dict_to_xml_recursive(value, key)}\n"
                elif isinstance(value, list):
                    xml_str += f"  <{key}>\n"
                    for item in value:
                        if isinstance(item, str):
                            xml_str += f"    <item>{item}</item>\n"
                        else:
                            xml_str += f"    <item>{str(item)}</item>\n"
                    xml_str += f"  </{key}>\n"
                else:
                    xml_str += f"  <{key}>{str(value)}</{key}>\n"
            xml_str += f"</{root}>"
            return xml_str
        
        return dict_to_xml_recursive(data, root_name)
    
    def save_metadata(self, metadata: DocumentMetadata, output_path: str, format: str = 'json'):
        """Save metadata to file"""
        formatted_data = self.export_metadata(metadata, format)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(formatted_data)
        
        logger.info(f"Metadata saved to: {output_path}")


### Initialize components

In [16]:
import tempfile
import shutil
from typing import List, Dict, Any, Optional
from datetime import datetime
import warnings
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets
from IPython.display import JSON
warnings.filterwarnings('ignore')


processor = DocumentProcessor()
analyzer = NLPAnalyzer()
generator = MetadataGenerator()
plt.style.use('default')
sns.set_palette("husl")

print("✅ Initialized components: DocumentProcessor, NLPAnalyzer, MetadataGenerator")

2025-06-25 17:49:45,994 - INFO - OCR (Tesseract) is available
2025-06-25 17:49:45,996 - INFO - Initializing NLP Analyzer...
2025-06-25 17:49:45,998 - INFO - ✅ NLP Analyzer initialized!
2025-06-25 17:49:46,001 - INFO - Initializing Metadata Generator...
2025-06-25 17:49:46,002 - INFO - ✅ Metadata Generator initialized!


✅ Initialized components: DocumentProcessor, NLPAnalyzer, MetadataGenerator


In [28]:
#Configuration and Settings Class
class ProcessingConfig:
    def __init__(self):
        self.export_format = "JSON"
        self.include_confidence = True
        self.batch_mode = True
        self.confidence_threshold = 0.5
        self.max_keywords = 20
        self.enable_ocr = True
        self.enable_entity_linking = False
        self.processing_options = ["Deep Topic Analysis"]
        self.output_language = "Auto-detect"
        
    def display_config(self):
        print("🔧 Current Configuration:")
        print(f"  Export Format: {self.export_format}")
        print(f"  Include Confidence: {self.include_confidence}")
        print(f"  Batch Mode: {self.batch_mode}")
        print(f"  Confidence Threshold: {self.confidence_threshold}")
        print(f"  Max Keywords: {self.max_keywords}")
        print(f"  Enable OCR: {self.enable_ocr}")
        print(f"  Processing Options: {', '.join(self.processing_options)}")

# Initialize configuration
config = ProcessingConfig()
config.display_config()

🔧 Current Configuration:
  Export Format: JSON
  Include Confidence: True
  Batch Mode: True
  Confidence Threshold: 0.5
  Max Keywords: 20
  Enable OCR: True
  Processing Options: Deep Topic Analysis


In [29]:
#File Processing Functions
def process_single_file(file_path: str, custom_tags: Optional[List[str]] = None, 
                       processing_options: Optional[List[str]] = None) -> Any:
    """Process a single file and return metadata"""
    file_path = Path(file_path)
    
    if not file_path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")
    
    print(f"📄 Processing: {file_path.name}")
    
    try:
        # Process the document
        doc_result = processor.process_document(str(file_path))
        print(f"  ✓ Document processed")
        
        # Analyze with NLP
        nlp_result = analyzer.analyze_document(doc_result['processed_text'], file_path.name)
        print(f"  ✓ NLP analysis completed")
        
        # Generate metadata
        metadata = generator.generate_metadata(doc_result, nlp_result, custom_tags)
        print(f"  ✓ Metadata generated")
        
        return metadata
        
    except Exception as e:
        print(f"  ❌ Error processing {file_path.name}: {str(e)}")
        return None

def process_multiple_files(file_paths: List[str], custom_tags: Optional[List[str]] = None) -> List[Any]:
    """Process multiple files and return list of metadata"""
    results = []
    total_files = len(file_paths)
    
    print(f"🚀 Starting batch processing of {total_files} files...")
    print("=" * 50)
    
    for i, file_path in enumerate(file_paths, 1):
        print(f"\n[{i}/{total_files}] ", end="")
        metadata = process_single_file(file_path, custom_tags)
        
        if metadata:
            results.append(metadata)
        
        # Progress indicator
        progress = (i / total_files) * 100
        print(f"Progress: {progress:.1f}%")
    
    print(f"\n✅ Batch processing complete! Successfully processed {len(results)}/{total_files} files")
    return results

In [30]:
# Analytics and Visualization Functions
def create_analytics_dashboard(metadata_list: List[Any]):
    """Create analytics dashboard for processed documents"""
    if not metadata_list:
        print("No documents to analyze")
        return
    
    print("📊 Document Analytics Dashboard")
    print("=" * 40)
    
    # Basic metrics
    total_docs = len(metadata_list)
    avg_words = sum(m.word_count for m in metadata_list) / len(metadata_list)
    avg_confidence = sum(m.confidence_score for m in metadata_list) / len(metadata_list)
    total_reading_time = sum(m.reading_time_minutes for m in metadata_list)
    
    print(f"📈 Key Metrics:")
    print(f"  Total Documents: {total_docs}")
    print(f"  Average Word Count: {int(avg_words):,}")
    print(f"  Average Confidence: {avg_confidence:.2f}")
    print(f"  Total Reading Time: {total_reading_time} minutes")
    
    # Create visualizations
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # Document types distribution
    doc_types = [m.document_type for m in metadata_list]
    type_counts = pd.Series(doc_types).value_counts()
    ax1.pie(type_counts.values, labels=type_counts.index, autopct='%1.1f%%')
    ax1.set_title('Document Types Distribution')
    
    # Sentiment distribution
    sentiments = [m.sentiment for m in metadata_list]
    sentiment_counts = pd.Series(sentiments).value_counts()
    ax2.bar(sentiment_counts.index, sentiment_counts.values, color='skyblue')
    ax2.set_title('Sentiment Distribution')
    ax2.set_xlabel('Sentiment')
    ax2.set_ylabel('Count')
    
    # Word count distribution
    word_counts = [m.word_count for m in metadata_list]
    ax3.hist(word_counts, bins=10, color='lightgreen', alpha=0.7)
    ax3.set_title('Word Count Distribution')
    ax3.set_xlabel('Word Count')
    ax3.set_ylabel('Frequency')
    
    # Confidence scores
    confidence_scores = [m.confidence_score for m in metadata_list]
    ax4.boxplot(confidence_scores)
    ax4.set_title('Confidence Score Distribution')
    ax4.set_ylabel('Confidence Score')
    
    plt.tight_layout()
    plt.show()

def display_metadata_summary(metadata_list: List[Any]):
    """Display summary table of all metadata"""
    if not metadata_list:
        print("No documents to display")
        return
    
    # Create summary DataFrame
    summary_data = []
    for metadata in metadata_list:
        summary_data.append({
            'Filename': metadata.filename,
            'Title': metadata.title,
            'Document Type': metadata.document_type,
            'Word Count': metadata.word_count,
            'Confidence': f"{metadata.confidence_score:.2f}",
            'Sentiment': metadata.sentiment,
            'Top Topics': ', '.join(metadata.primary_topics[:3]),
            'Reading Time (min)': metadata.reading_time_minutes
        })
    
    df = pd.DataFrame(summary_data)
    print("📋 Document Summary Table:")
    print("=" * 80)
    display(df)
    
    return df


In [31]:
# Export Functions
def export_metadata(metadata_list: List[Any], export_format: str = "JSON", 
                   output_dir: str = "exports") -> str:
    """Export metadata to various formats"""
    
    # Create output directory
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    if export_format.upper() == "JSON":
        filename = f"metadata_export_{timestamp}.json"
        filepath = output_path / filename
        
        all_metadata = []
        for metadata in metadata_list:
            json_data = json.loads(generator.export_metadata(metadata, 'json'))
            all_metadata.append(json_data)
        
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(all_metadata, f, indent=2, ensure_ascii=False)
    
    elif export_format.upper() == "CSV":
        filename = f"metadata_export_{timestamp}.csv"
        filepath = output_path / filename
        
        csv_data = []
        for metadata in metadata_list:
            csv_data.append({
                'filename': metadata.filename,
                'title': metadata.title,
                'summary': metadata.summary,
                'document_type': metadata.document_type,
                'word_count': metadata.word_count,
                'confidence_score': metadata.confidence_score,
                'sentiment': metadata.sentiment,
                'topics': '|'.join(metadata.primary_topics),
                'keywords': '|'.join(metadata.keywords),
                'reading_time_minutes': metadata.reading_time_minutes
            })
        
        df = pd.DataFrame(csv_data)
        df.to_csv(filepath, index=False, encoding='utf-8')
    
    elif export_format.upper() == "XML":
        filename = f"metadata_export_{timestamp}.xml"
        filepath = output_path / filename
        
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write('<?xml version="1.0" encoding="UTF-8"?>\n<documents>\n')
            for metadata in metadata_list:
                xml_data = generator.export_metadata(metadata, 'xml')
                f.write(xml_data + '\n')
            f.write('</documents>')
    
    print(f"✅ Exported {len(metadata_list)} documents to: {filepath}")
    return str(filepath)


In [32]:
# Main CLI Interface Functions
def get_file_paths():
    """Interactive file selection"""
    print("📁 File Selection:")
    print("1. Enter single file path")
    print("2. Enter multiple file paths (comma-separated)")
    print("3. Enter directory path (process all supported files)")
    
    choice = input("\nChoose option (1-3): ").strip()
    
    if choice == "1":
        file_path = input("Enter file path: ").strip()
        if Path(file_path).exists():
            return [file_path]
        else:
            print(f"❌ File not found: {file_path}")
            return []
    
    elif choice == "2":
        paths_input = input("Enter file paths (comma-separated): ").strip()
        file_paths = [p.strip() for p in paths_input.split(',')]
        valid_paths = []
        for path in file_paths:
            if Path(path).exists():
                valid_paths.append(path)
            else:
                print(f"❌ File not found: {path}")
        return valid_paths
    
    elif choice == "3":
        dir_path = input("Enter directory path: ").strip()
        if Path(dir_path).is_dir():
            supported_extensions = ['.pdf', '.docx', '.doc', '.txt']
            file_paths = []
            for ext in supported_extensions:
                file_paths.extend(list(Path(dir_path).glob(f"*{ext}")))
            return [str(p) for p in file_paths]
        else:
            print(f"❌ Directory not found: {dir_path}")
            return []
    
    else:
        print("❌ Invalid choice")
        return []

def get_custom_tags():
    """Get custom tags from user input"""
    tags_input = input("Enter custom tags (comma-separated, or press Enter to skip): ").strip()
    if tags_input:
        return [tag.strip() for tag in tags_input.split(',')]
    return None

def display_metadata_details(metadata_list: List[Any]):
    """Display detailed metadata for each document"""
    for i, metadata in enumerate(metadata_list, 1):
        print(f"\n📄 Document {i}: {metadata.filename}")
        print("=" * 50)
        print(f"Title: {metadata.title}")
        print(f"Summary: {metadata.summary}")
        print(f"Document Type: {metadata.document_type}")
        print(f"Word Count: {metadata.word_count:,}")
        print(f"Confidence Score: {metadata.confidence_score:.2f}")
        print(f"Sentiment: {metadata.sentiment}")
        print(f"Reading Time: {metadata.reading_time_minutes} minutes")
        print(f"Primary Topics: {', '.join(metadata.primary_topics)}")
        print(f"Keywords: {', '.join(metadata.keywords[:10])}")
        
        if hasattr(metadata, 'entities') and metadata.entities:
            print("Entities:")
            for entity_type, entities in metadata.entities.items():
                if entities:
                    print(f"  - {entity_type}: {', '.join(entities[:5])}")

In [33]:
# Main Processing Workflow
def run_metadata_generation():
    """Main workflow for metadata generation"""
    print("🤖 Automated Metadata Generation System")
    print("=" * 60)
    
    # Step 1: Get file paths
    file_paths = get_file_paths()
    if not file_paths:
        print("❌ No valid files selected. Exiting.")
        return
    
    print(f"\n✅ Found {len(file_paths)} file(s) to process")
    
    # Step 2: Get custom tags
    custom_tags = get_custom_tags()
    if custom_tags:
        print(f"🏷️  Custom tags: {', '.join(custom_tags)}")
    
    # Step 3: Process files
    print(f"\n🚀 Starting processing...")
    if len(file_paths) == 1:
        metadata_list = [process_single_file(file_paths[0], custom_tags)]
        metadata_list = [m for m in metadata_list if m is not None]
    else:
        metadata_list = process_multiple_files(file_paths, custom_tags)
    
    if not metadata_list:
        print("❌ No documents were successfully processed.")
        return
    
    # Step 4: Display results
    print(f"\n📊 Processing Results:")
    display_metadata_summary(metadata_list)
    
    # Step 5: Show analytics
    create_analytics_dashboard(metadata_list)
    
    # Step 6: Display detailed metadata
    show_details = input("\n🔍 Show detailed metadata? (y/n): ").strip().lower()
    if show_details == 'y':
        display_metadata_details(metadata_list)
    
    # Step 7: Export options
    export_choice = input("\n💾 Export results? (y/n): ").strip().lower()
    if export_choice == 'y':
        print("Export formats: JSON, CSV, XML")
        export_format = input("Choose format (JSON/CSV/XML): ").strip()
        output_dir = input("Output directory (press Enter for 'exports'): ").strip() or "exports"
        
        try:
            export_path = export_metadata(metadata_list, export_format, output_dir)
            print(f"✅ Export completed: {export_path}")
        except Exception as e:
            print(f"❌ Export failed: {str(e)}")
    
    return metadata_list

In [34]:
# Quick Processing Functions (for programmatic use)
def quick_process_file(file_path: str, custom_tags: List[str] = None) -> Any:
    """Quick processing of a single file - returns metadata object"""
    return process_single_file(file_path, custom_tags)

def quick_process_directory(directory_path: str, custom_tags: List[str] = None) -> List[Any]:
    """Quick processing of all supported files in a directory"""
    dir_path = Path(directory_path)
    if not dir_path.is_dir():
        raise ValueError(f"Directory not found: {directory_path}")
    
    supported_extensions = ['.pdf', '.docx', '.doc', '.txt']
    file_paths = []
    for ext in supported_extensions:
        file_paths.extend(list(dir_path.glob(f"*{ext}")))
    
    if not file_paths:
        print(f"No supported files found in: {directory_path}")
        return []
    
    return process_multiple_files([str(p) for p in file_paths], custom_tags)

def quick_export(metadata_list: List[Any], format: str = "JSON", output_file: str = None) -> str:
    """Quick export function"""
    if output_file:
        output_dir = str(Path(output_file).parent)
        filename = Path(output_file).name
    else:
        output_dir = "exports"
        filename = None
    
    return export_metadata(metadata_list, format, output_dir)

print("✅ All functions loaded successfully!")
print("\n🎯 Ready to process documents!")
print("\nMain functions available:")
print("  - run_metadata_generation() : Interactive CLI workflow")
print("  - quick_process_file(path) : Process single file")
print("  - quick_process_directory(path) : Process directory")
print("  - quick_export(metadata_list) : Export results")


✅ All functions loaded successfully!

🎯 Ready to process documents!

Main functions available:
  - run_metadata_generation() : Interactive CLI workflow
  - quick_process_file(path) : Process single file
  - quick_process_directory(path) : Process directory
  - quick_export(metadata_list) : Export results


In [35]:
# Interactive Widget Interface (Optional)
def create_interactive_interface():
    """Create interactive widgets for Jupyter notebook"""
    
    # File upload widget
    file_upload = widgets.FileUpload(
        accept='.pdf,.docx,.doc,.txt',
        multiple=True,
        description='Upload Files'
    )
    
    # Tags input
    tags_input = widgets.Text(
        placeholder='Enter tags separated by commas',
        description='Custom Tags:'
    )
    
    # Process button
    process_button = widgets.Button(
        description='Process Documents',
        button_style='primary',
        icon='play'
    )
    
    # Output area
    output = widgets.Output()
    
    def on_process_clicked(b):
        with output:
            clear_output()
            print("🚀 Processing uploaded files...")
            # Process uploaded files here
            # This would need additional logic to handle uploaded files
            print("✅ Processing complete!")
    
    process_button.on_click(on_process_clicked)
    
    # Layout
    interface = widgets.VBox([
        widgets.HTML("<h3>📄 Document Metadata Generator</h3>"),
        file_upload,
        tags_input,
        process_button,
        output
    ])
    
    return interface

# Uncomment the next line to display the interactive interface
# display(create_interactive_interface())

In [36]:
# Cell 11: Example Usage
print("\n" + "="*60)
print("📖 EXAMPLE USAGE:")
print("="*60)

print("""
# Interactive CLI mode:
run_metadata_generation()

# Quick processing examples:
metadata = quick_process_file("path/to/your/document.pdf")
metadata_list = quick_process_directory("path/to/your/documents/")

# Quick export:
quick_export(metadata_list, "JSON", "my_results.json")

# Analytics:
create_analytics_dashboard(metadata_list)
display_metadata_summary(metadata_list)
""")

print("\n🎯 Run run_metadata_generation() to start the interactive workflow!")
print("📝 Or use the quick_ functions for programmatic processing.")


📖 EXAMPLE USAGE:

# Interactive CLI mode:
run_metadata_generation()

# Quick processing examples:
metadata = quick_process_file("path/to/your/document.pdf")
metadata_list = quick_process_directory("path/to/your/documents/")

# Quick export:
quick_export(metadata_list, "JSON", "my_results.json")

# Analytics:
create_analytics_dashboard(metadata_list)
display_metadata_summary(metadata_list)


🎯 Run run_metadata_generation() to start the interactive workflow!
📝 Or use the quick_ functions for programmatic processing.
