# Word Extraction Strategy

This notebook outlines the comprehensive strategy for extracting words from the NOS Dutch news articles dataset to create a clean, categorized word list suitable for various applications.

## Strategy Overview

The word extraction process involves several key steps:
1. **Text Preprocessing**: Clean HTML content and prepare text for analysis
2. **Language Processing**: Use spaCy for tokenization, POS tagging, and lemmatization
3. **Word Filtering**: Remove unwanted tokens and apply quality filters
4. **Frequency Analysis**: Calculate word frequencies by year and overall
5. **Database Storage**: Store results in SQLite with proper categorization
6. **Quality Control**: Validate and clean the final word list

## Key Challenges and Solutions

### Challenge 1: HTML Content Cleaning
- **Problem**: The 'content' field contains HTML markup that needs to be stripped
- **Solution**: Use BeautifulSoup to parse HTML and extract clean text

### Challenge 2: Dutch Language Processing
- **Problem**: Need proper Dutch language model for accurate POS tagging
- **Solution**: Use spaCy's Dutch model (nl_core_news_sm) for linguistic analysis

### Challenge 3: Text Quality and Noise
- **Problem**: News articles may contain URLs, special characters, and formatting artifacts
- **Solution**: Implement comprehensive text cleaning pipeline

### Challenge 4: Memory Efficiency
- **Problem**: 295k articles (~1.36GB) require efficient processing
- **Solution**: Process articles in batches to manage memory usage

## Import Required Libraries

Import pandas for data manipulation and other necessary libraries for the word extraction pipeline.

In [1]:
import pandas as pd
import numpy as np
import os
from datetime import datetime

# Display settings for better data exploration
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 100)

print("Basic libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")

Basic libraries imported successfully!
Pandas version: 2.3.1
NumPy version: 2.3.2


## Load the Dataset

Load the NOS_NL_articles_2015_mar_2025.feather file for word extraction processing.

In [2]:
# Load the feather dataset
file_path = "data/NOS_NL_articles_2015_mar_2025.feather"

print(f"Loading dataset from: {file_path}")
print(f"File exists: {os.path.exists(file_path)}")

if os.path.exists(file_path):
    # Get file size
    file_size = os.path.getsize(file_path)
    print(f"File size: {file_size / (1024**2):.2f} MB")
    
    # Load the dataset
    df = pd.read_feather(file_path)
    
    print(f"\nDataset loaded successfully!")
    print(f"Shape: {df.shape} (rows, columns)")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / (1024**2):.2f} MB")
else:
    print("File not found! Please check the file path.")

Loading dataset from: data/NOS_NL_articles_2015_mar_2025.feather
File exists: True
File size: 503.98 MB

Dataset loaded successfully!
Shape: (295259, 11) (rows, columns)
Memory usage: 1361.08 MB

Dataset loaded successfully!
Shape: (295259, 11) (rows, columns)
Memory usage: 1361.08 MB


## Step 1: Install and Import Text Processing Libraries

Install the required libraries for text processing, including spaCy for Dutch language processing and BeautifulSoup for HTML cleaning.

In [3]:
# Install required packages for text processing
import subprocess
import sys

def install_package(package):
    """Install a package using pip"""
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        print(f"✓ {package} installed successfully")
    except subprocess.CalledProcessError as e:
        print(f"✗ Failed to install {package}: {e}")

# Install required packages
packages = [
    "spacy",
    "beautifulsoup4", 
    "lxml",
    "html5lib",
    "tqdm",  # for progress bars
]

print("Installing required packages...")
for package in packages:
    install_package(package)

print("\nDownloading Dutch language model for spaCy...")
try:
    subprocess.check_call([sys.executable, "-m", "spacy", "download", "nl_core_news_sm"])
    print("✓ Dutch language model downloaded successfully")
except subprocess.CalledProcessError as e:
    print(f"✗ Failed to download Dutch model: {e}")
    print("You may need to run: python -m spacy download nl_core_news_sm")

Installing required packages...
✓ spacy installed successfully
✓ spacy installed successfully
✓ beautifulsoup4 installed successfully
✓ beautifulsoup4 installed successfully
✓ lxml installed successfully
✓ lxml installed successfully
✓ html5lib installed successfully
✓ html5lib installed successfully
✓ tqdm installed successfully

Downloading Dutch language model for spaCy...
✓ tqdm installed successfully

Downloading Dutch language model for spaCy...
✓ Dutch language model downloaded successfully
✓ Dutch language model downloaded successfully


In [4]:
# Import text processing libraries
import spacy
from bs4 import BeautifulSoup
import sqlite3
import re
from collections import Counter, defaultdict
from tqdm import tqdm
import string
from datetime import datetime

# Load Dutch language model
print("Loading Dutch language model...")
try:
    nlp = spacy.load("nl_core_news_sm")
    print("✓ Dutch language model loaded successfully")
    print(f"Model info: {nlp.meta['name']} v{nlp.meta['version']}")
except OSError as e:
    print(f"✗ Failed to load Dutch model: {e}")
    print("Please install the Dutch model: python -m spacy download nl_core_news_sm")
    nlp = None

# Test the model with a sample Dutch sentence
if nlp:
    test_sentence = "Dit is een test van de Nederlandse taalverwerking."
    doc = nlp(test_sentence)
    print(f"\nTest sentence: '{test_sentence}'")
    print("Tokens and POS tags:")
    for token in doc:
        print(f"  {token.text} -> {token.pos_} ({token.lemma_})")
else:
    print("Cannot test model - please install Dutch language model first")

Loading Dutch language model...
✓ Dutch language model loaded successfully
Model info: core_news_sm v3.8.0

Test sentence: 'Dit is een test van de Nederlandse taalverwerking.'
Tokens and POS tags:
  Dit -> PRON (dit)
  is -> AUX (zijn)
  een -> DET (een)
  test -> NOUN (test)
  van -> ADP (van)
  de -> DET (de)
  Nederlandse -> ADJ (Nederlands)
  taalverwerking -> NOUN (taalverwerking)
  . -> PUNCT (.)
✓ Dutch language model loaded successfully
Model info: core_news_sm v3.8.0

Test sentence: 'Dit is een test van de Nederlandse taalverwerking.'
Tokens and POS tags:
  Dit -> PRON (dit)
  is -> AUX (zijn)
  een -> DET (een)
  test -> NOUN (test)
  van -> ADP (van)
  de -> DET (de)
  Nederlandse -> ADJ (Nederlands)
  taalverwerking -> NOUN (taalverwerking)
  . -> PUNCT (.)


## Step 2: HTML Content Cleaning

Create functions to clean HTML content from the articles and prepare clean text for processing.

In [5]:
def clean_html_content(html_content):
    """
    Clean HTML content and extract readable text for spaCy processing.
    
    Args:
        html_content (str): Raw HTML content from articles
        
    Returns:
        str: Clean text ready for spaCy processing
    """
    if pd.isna(html_content) or not html_content:
        return ""
    
    try:
        # Parse HTML with BeautifulSoup
        soup = BeautifulSoup(html_content, 'html.parser')
        
        # Remove script and style elements
        for script in soup(["script", "style"]):
            script.decompose()
        
        # Get text content
        text = soup.get_text()
        
        # Clean up whitespace
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        text = ' '.join(chunk for chunk in chunks if chunk)
        
        return text
    
    except Exception as e:
        print(f"Error cleaning HTML: {e}")
        return str(html_content)  # Return original if cleaning fails

def preprocess_text(text):
    """
    Additional text preprocessing before spaCy analysis.
    
    Args:
        text (str): Clean text from HTML cleaning
        
    Returns:
        str: Preprocessed text ready for spaCy
    """
    if not text:
        return ""
    
    # Remove URLs
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
    
    # Remove email addresses
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Remove very short texts (likely not meaningful)
    if len(text.strip()) < 10:
        return ""
    
    return text

# Test the cleaning functions
print("Testing HTML cleaning functions...")
test_html = """
<div class="article-content">
    <h1>Test Artikel Titel</h1>
    <p>Dit is een <strong>test</strong> artikel met <a href="https://example.com">links</a>.</p>
    <script>alert('test');</script>
    <p>Meer tekst hier.</p>
</div>
"""

cleaned = clean_html_content(test_html)
preprocessed = preprocess_text(cleaned)

print(f"Original HTML: {test_html}")
print(f"Cleaned text: {cleaned}")
print(f"Preprocessed text: {preprocessed}")

Testing HTML cleaning functions...
Original HTML: 
<div class="article-content">
    <h1>Test Artikel Titel</h1>
    <p>Dit is een <strong>test</strong> artikel met <a href="https://example.com">links</a>.</p>
    <script>alert('test');</script>
    <p>Meer tekst hier.</p>
</div>

Cleaned text: Test Artikel Titel Dit is een test artikel met links. Meer tekst hier.
Preprocessed text: Test Artikel Titel Dit is een test artikel met links. Meer tekst hier.


## Step 3: Word Extraction and Processing

Create functions to extract and process words using spaCy for POS tagging, lemmatization, and filtering.

In [6]:
def extract_words_from_text(text, nlp_model):
    """
    Extract and categorize words from cleaned text using spaCy.
    
    Args:
        text (str): Clean text ready for processing
        nlp_model: Loaded spaCy model
        
    Returns:
        list: List of word dictionaries with metadata
    """
    if not text or not nlp_model:
        return []
    
    try:
        # Process text with spaCy
        doc = nlp_model(text)
        
        words = []
        for token in doc:
            # Filter out unwanted tokens
            if should_include_token(token):
                word_info = {
                    'word': token.text.lower(),
                    'lemma': token.lemma_.lower(),
                    'pos': token.pos_,
                    'tag': token.tag_,
                    'is_alpha': token.is_alpha,
                    'is_stop': token.is_stop,
                    'length': len(token.text)
                }
                words.append(word_info)
        
        return words
    
    except Exception as e:
        print(f"Error processing text: {e}")
        return []

def should_include_token(token):
    """
    Determine if a token should be included in the word list.
    
    Args:
        token: spaCy token object
        
    Returns:
        bool: True if token should be included
    """
    # Basic filters
    if not token.text or len(token.text.strip()) == 0:
        return False
    
    # Must be alphabetic (no numbers, punctuation only)
    if not token.is_alpha:
        return False
    
    # Minimum length (avoid very short words like "a", "I")
    if len(token.text) < 2:
        return False
    
    # Maximum length (avoid very long words that might be errors)
    if len(token.text) > 25:
        return False
    
    # Skip certain POS tags
    excluded_pos = {'PUNCT', 'SPACE', 'X'}  # X = other (often errors)
    if token.pos_ in excluded_pos:
        return False
    
    # Skip if it's all uppercase (likely acronyms/abbreviations)
    if token.text.isupper() and len(token.text) > 3:
        return False
    
    return True

def get_pos_category(pos_tag):
    """
    Categorize POS tags into broader categories for easier analysis.
    
    Args:
        pos_tag (str): spaCy POS tag
        
    Returns:
        str: Broader category
    """
    pos_mapping = {
        'NOUN': 'noun',
        'PROPN': 'proper_noun',
        'VERB': 'verb',
        'ADJ': 'adjective',
        'ADV': 'adverb',
        'PRON': 'pronoun',
        'DET': 'determiner',
        'ADP': 'preposition',
        'CONJ': 'conjunction',
        'CCONJ': 'conjunction',
        'SCONJ': 'conjunction',
        'NUM': 'number',
        'PART': 'particle',
        'INTJ': 'interjection',
        'AUX': 'auxiliary'
    }
    return pos_mapping.get(pos_tag, 'other')

# Test the word extraction functions
print("Testing word extraction functions...")
if nlp:
    test_text = "Dit is een mooie Nederlandse zin met verschillende woorden en woordsoorten."
    words = extract_words_from_text(test_text, nlp)
    
    print(f"Test text: {test_text}")
    print("Extracted words:")
    for word in words:
        category = get_pos_category(word['pos'])
        print(f"  {word['word']} -> {word['lemma']} ({word['pos']}, {category})")
else:
    print("Cannot test - spaCy model not loaded")

Testing word extraction functions...
Test text: Dit is een mooie Nederlandse zin met verschillende woorden en woordsoorten.
Extracted words:
  dit -> dit (PRON, pronoun)
  is -> zijn (AUX, auxiliary)
  een -> een (DET, determiner)
  mooie -> mooi (ADJ, adjective)
  nederlandse -> nederlands (ADJ, adjective)
  zin -> zin (NOUN, noun)
  met -> met (ADP, preposition)
  verschillende -> verschillend (ADJ, adjective)
  woorden -> woord (NOUN, noun)
  en -> en (CCONJ, conjunction)
  woordsoorten -> woordsoort (NOUN, noun)


## Step 4: Database Setup

Create SQLite database structure to store words with their frequencies, POS tags, and yearly data.

In [7]:
def setup_database(db_path="output/words_database.sqlite"):
    """
    Create SQLite database with proper schema for storing word data.
    
    Args:
        db_path (str): Path to SQLite database file
        
    Returns:
        sqlite3.Connection: Database connection
    """
    # Create output directory if it doesn't exist
    import os
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Create words table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS words (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            word TEXT NOT NULL,
            lemma TEXT NOT NULL,
            pos_tag TEXT NOT NULL,
            pos_category TEXT NOT NULL,
            total_frequency INTEGER DEFAULT 0,
            first_seen DATE,
            last_seen DATE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(word, lemma, pos_tag)
        )
    ''')
    
    # Create word frequencies by year table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS word_frequencies (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            word_id INTEGER,
            year INTEGER,
            frequency INTEGER DEFAULT 0,
            FOREIGN KEY (word_id) REFERENCES words (id),
            UNIQUE(word_id, year)
        )
    ''')
    
    # Create processing log table
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS processing_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            articles_processed INTEGER,
            words_extracted INTEGER,
            processing_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            notes TEXT
        )
    ''')
    
    # Create indexes for better performance
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_word_lemma ON words (word, lemma)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_pos_category ON words (pos_category)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_frequency_year ON word_frequencies (year)')
    
    conn.commit()
    print(f"Database setup complete: {db_path}")
    
    return conn

def insert_word_data(conn, word_data, year):
    """
    Insert word data into the database with frequency tracking.
    
    Args:
        conn: SQLite connection
        word_data (list): List of word dictionaries
        year (int): Year of the article
    """
    cursor = conn.cursor()
    
    for word_info in word_data:
        pos_category = get_pos_category(word_info['pos'])
        
        # Insert or update word
        cursor.execute('''
            INSERT OR IGNORE INTO words (word, lemma, pos_tag, pos_category, first_seen, last_seen)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            word_info['word'],
            word_info['lemma'], 
            word_info['pos'],
            pos_category,
            f"{year}-01-01",
            f"{year}-12-31"
        ))
        
        # Update last_seen if word already exists
        cursor.execute('''
            UPDATE words 
            SET last_seen = ? 
            WHERE word = ? AND lemma = ? AND pos_tag = ? AND last_seen < ?
        ''', (
            f"{year}-12-31",
            word_info['word'],
            word_info['lemma'],
            word_info['pos'],
            f"{year}-12-31"
        ))
        
        # Get word ID
        cursor.execute('''
            SELECT id FROM words 
            WHERE word = ? AND lemma = ? AND pos_tag = ?
        ''', (word_info['word'], word_info['lemma'], word_info['pos']))
        
        word_id = cursor.fetchone()[0]
        
        # Insert or update frequency
        cursor.execute('''
            INSERT OR IGNORE INTO word_frequencies (word_id, year, frequency)
            VALUES (?, ?, 0)
        ''', (word_id, year))
        
        cursor.execute('''
            UPDATE word_frequencies 
            SET frequency = frequency + 1
            WHERE word_id = ? AND year = ?
        ''', (word_id, year))
        
        # Update total frequency
        cursor.execute('''
            UPDATE words 
            SET total_frequency = total_frequency + 1
            WHERE id = ?
        ''', (word_id,))
    
    conn.commit()

# Test database setup
print("Setting up test database...")
test_conn = setup_database("output/test_words.sqlite")

# Test with sample data
if nlp:
    sample_words = extract_words_from_text("Dit is een test van de database functionaliteit.", nlp)
    insert_word_data(test_conn, sample_words, 2023)
    
    # Query results
    cursor = test_conn.cursor()
    cursor.execute('SELECT * FROM words')
    results = cursor.fetchall()
    print(f"Sample words inserted: {len(results)}")
    for row in results[:3]:
        print(f"  {row}")

test_conn.close()

Setting up test database...
Database setup complete: output/test_words.sqlite
Sample words inserted: 8
  (1, 'dit', 'dit', 'PRON', 'pronoun', 8, '2023-01-01', '2023-12-31', '2025-07-31 19:28:56')
  (2, 'is', 'zijn', 'AUX', 'auxiliary', 8, '2023-01-01', '2023-12-31', '2025-07-31 19:28:56')
  (3, 'een', 'een', 'DET', 'determiner', 8, '2023-01-01', '2023-12-31', '2025-07-31 19:28:56')


## Step 5: Main Processing Pipeline

Create the main pipeline to process all articles in batches and extract words efficiently.

In [8]:
def process_articles_pipeline(df, nlp_model, db_path="output/words_database.sqlite", batch_size=1000):
    """
    Main pipeline to process all articles and extract words.
    
    Args:
        df: DataFrame with articles
        nlp_model: Loaded spaCy model
        db_path (str): Path to SQLite database
        batch_size (int): Number of articles to process in each batch
    """
    if not nlp_model:
        print("Error: spaCy model not loaded")
        return
    
    # Setup database
    conn = setup_database(db_path)
    
    # Prepare progress tracking
    total_articles = len(df)
    total_words_extracted = 0
    articles_processed = 0
    
    print(f"Starting processing of {total_articles:,} articles...")
    print(f"Batch size: {batch_size}")
    
    # Process in batches
    for start_idx in tqdm(range(0, total_articles, batch_size), desc="Processing batches"):
        end_idx = min(start_idx + batch_size, total_articles)
        batch_df = df.iloc[start_idx:end_idx]
        
        batch_words = 0
        
        for idx, row in batch_df.iterrows():
            try:
                # Extract year from published_time
                if pd.notna(row['published_time']):
                    if isinstance(row['published_time'], str):
                        year = pd.to_datetime(row['published_time']).year
                    else:
                        year = row['published_time'].year
                else:
                    year = 2020  # Default year if missing
                
                # Process different text fields
                text_fields = ['title', 'description', 'content']
                all_text = []
                
                for field in text_fields:
                    if field in row and pd.notna(row[field]):
                        if field == 'content':
                            # Clean HTML from content
                            clean_text = clean_html_content(row[field])
                        else:
                            clean_text = str(row[field])
                        
                        preprocessed = preprocess_text(clean_text)
                        if preprocessed:
                            all_text.append(preprocessed)
                
                # Combine all text
                combined_text = ' '.join(all_text)
                
                if combined_text:
                    # Extract words
                    words = extract_words_from_text(combined_text, nlp_model)
                    
                    if words:
                        # Insert into database
                        insert_word_data(conn, words, year)
                        batch_words += len(words)
                
                articles_processed += 1
                
            except Exception as e:
                print(f"Error processing article {idx}: {e}")
                continue
        
        total_words_extracted += batch_words
        
        # Log progress every 10 batches
        if (start_idx // batch_size) % 10 == 0:
            print(f"Processed {articles_processed:,}/{total_articles:,} articles, "
                  f"extracted {total_words_extracted:,} words")
    
    # Log final results
    cursor = conn.cursor()
    cursor.execute('INSERT INTO processing_log (articles_processed, words_extracted, notes) VALUES (?, ?, ?)',
                   (articles_processed, total_words_extracted, f"Batch processing complete - batch size {batch_size}"))
    conn.commit()
    
    print(f"\n=== PROCESSING COMPLETE ===")
    print(f"Total articles processed: {articles_processed:,}")
    print(f"Total words extracted: {total_words_extracted:,}")
    print(f"Database saved to: {db_path}")
    
    # Get final statistics
    cursor.execute('SELECT COUNT(*) FROM words')
    unique_words = cursor.fetchone()[0]
    
    cursor.execute('SELECT COUNT(DISTINCT pos_category) FROM words')
    pos_categories = cursor.fetchone()[0]
    
    cursor.execute('SELECT year, COUNT(*) FROM word_frequencies GROUP BY year ORDER BY year')
    yearly_stats = cursor.fetchall()
    
    print(f"Unique words in database: {unique_words:,}")
    print(f"POS categories found: {pos_categories}")
    print(f"Yearly distribution:")
    for year, count in yearly_stats:
        print(f"  {year}: {count:,} word instances")
    
    conn.close()
    
    return {
        'articles_processed': articles_processed,
        'words_extracted': total_words_extracted,
        'unique_words': unique_words,
        'database_path': db_path
    }

# Note: The actual processing will be run in the next step
print("Processing pipeline function defined. Ready to process articles.")

Processing pipeline function defined. Ready to process articles.


## Step 5.5: Multi-threaded Processing Pipeline (Performance Optimized)

This improved version uses multi-threading to process articles in parallel, significantly reducing processing time. Each thread processes a batch of articles independently, and results are safely written to the database using locks.

In [None]:
import threading
import concurrent.futures
from queue import Queue
import time
import logging
import signal
from datetime import datetime

# Setup logging for multi-threaded processing (avoid duplicate handlers)
logger = logging.getLogger(__name__)

# Only configure logging if it hasn't been configured yet
if not logger.handlers:
    # Clear any existing handlers
    logger.handlers.clear()
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(levelname)s - [Thread-%(thread)d] - %(message)s',
        datefmt='%H:%M:%S'
    )
    
    # Create console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    # Add handler to logger
    logger.addHandler(console_handler)
    logger.setLevel(logging.INFO)
    
    # Prevent propagation to root logger to avoid duplicates
    logger.propagate = False

# Global flag for interrupt handling
interrupt_flag = threading.Event()

def signal_handler(signum, frame):
    """Handle keyboard interrupt (Ctrl+C) gracefully."""
    logger.warning("🛑 Interrupt received! Stopping processing gracefully...")
    interrupt_flag.set()

# Set up signal handler for interrupt
signal.signal(signal.SIGINT, signal_handler)

class ThreadSafeDatabase:
    """Thread-safe database operations for concurrent processing."""
    
    def __init__(self, db_path):
        self.db_path = db_path
        self.lock = threading.Lock()
        self.local = threading.local()
        logger.info(f"📁 Initialized thread-safe database: {db_path}")
    
    def get_connection(self):
        """Get a thread-local database connection."""
        if not hasattr(self.local, 'conn'):
            self.local.conn = sqlite3.connect(self.db_path)
            logger.debug(f"🔗 Created database connection for thread {threading.current_thread().ident}")
        return self.local.conn
    
    def insert_word_data_threadsafe(self, word_data, year):
        """Thread-safe version of insert_word_data."""
        with self.lock:
            conn = self.get_connection()
            insert_word_data(conn, word_data, year)
            logger.debug(f"💾 Inserted {len(word_data)} words for year {year}")
    
    def close_all_connections(self):
        """Close all thread-local connections."""
        if hasattr(self.local, 'conn'):
            self.local.conn.close()
            logger.debug(f"🔒 Closed database connection for thread {threading.current_thread().ident}")

def process_article_batch_threaded(batch_data):
    """
    Process a batch of articles in a single thread with interrupt handling.
    
    Args:
        batch_data (tuple): (batch_df, nlp_model, thread_safe_db, thread_id, batch_num)
        
    Returns:
        dict: Processing statistics for this batch
    """
    batch_df, nlp_model, thread_safe_db, thread_id, batch_num = batch_data
    
    articles_processed = 0
    words_extracted = 0
    errors_count = 0
    
    logger.info(f"🚀 Thread {thread_id} starting batch {batch_num} with {len(batch_df)} articles")
    
    for idx, row in batch_df.iterrows():
        # Check for interrupt signal
        if interrupt_flag.is_set():
            logger.warning(f"⏹️ Thread {thread_id} stopping due to interrupt signal")
            break
            
        try:
            # Extract year from published_time
            if pd.notna(row['published_time']):
                if isinstance(row['published_time'], str):
                    year = pd.to_datetime(row['published_time']).year
                else:
                    year = row['published_time'].year
            else:
                year = 2020  # Default year if missing
            
            # Process different text fields
            text_fields = ['title', 'description', 'content']
            all_text = []
            
            for field in text_fields:
                if field in row and pd.notna(row[field]):
                    if field == 'content':
                        # Clean HTML from content
                        clean_text = clean_html_content(row[field])
                    else:
                        clean_text = str(row[field])
                    
                    preprocessed = preprocess_text(clean_text)
                    if preprocessed:
                        all_text.append(preprocessed)
            
            # Combine all text
            combined_text = ' '.join(all_text)
            
            if combined_text:
                # Extract words
                words = extract_words_from_text(combined_text, nlp_model)
                
                if words:
                    # Insert into database (thread-safe)
                    thread_safe_db.insert_word_data_threadsafe(words, year)
                    words_extracted += len(words)
            
            articles_processed += 1
            
            # Log progress every 50 articles within the batch
            if articles_processed % 50 == 0:
                logger.debug(f"📊 Thread {thread_id} batch {batch_num}: {articles_processed}/{len(batch_df)} articles processed")
            
        except KeyboardInterrupt:
            logger.warning(f"⚠️ Thread {thread_id} received keyboard interrupt")
            interrupt_flag.set()
            break
        except Exception as e:
            errors_count += 1
            logger.error(f"❌ Thread {thread_id}: Error processing article {idx}: {e}")
            if errors_count > 10:  # Stop if too many errors
                logger.error(f"🚨 Thread {thread_id}: Too many errors, stopping batch")
                break
            continue
    
    logger.info(f"✅ Thread {thread_id} completed batch {batch_num}: {articles_processed} articles, {words_extracted} words, {errors_count} errors")
    
    return {
        'thread_id': thread_id,
        'batch_num': batch_num,
        'articles_processed': articles_processed,
        'words_extracted': words_extracted,
        'errors_count': errors_count,
        'interrupted': interrupt_flag.is_set()
    }

def process_articles_multithreaded(df, nlp_models, db_path="output/words_database.sqlite", 
                                 batch_size=500, num_threads=None, log_level=logging.INFO):
    """
    Multi-threaded pipeline to process articles with improved performance, logging, and interrupt handling.
    
    Args:
        df: DataFrame with articles
        nlp_models: List of spaCy models (one per thread) or single model
        db_path (str): Path to SQLite database
        batch_size (int): Number of articles per batch
        num_threads (int): Number of threads to use (defaults to CPU count)
        log_level: Logging level for detailed output
        
    Returns:
        dict: Processing statistics
    """
    # Reset interrupt flag
    interrupt_flag.clear()
    
    # Set logging level
    logger.setLevel(log_level)
    
    if num_threads is None:
        num_threads = min(8, os.cpu_count())  # Limit to 8 threads max
    
    logger.info("🚀 Starting multi-threaded processing...")
    logger.info(f"📊 Configuration:")
    logger.info(f"   Articles to process: {len(df):,}")
    logger.info(f"   Batch size: {batch_size}")
    logger.info(f"   Number of threads: {num_threads}")
    logger.info(f"   Database path: {db_path}")
    logger.info(f"   Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Setup database
    logger.info("🗄️ Setting up database...")
    setup_database(db_path)
    thread_safe_db = ThreadSafeDatabase(db_path)
    
    # Prepare spaCy models for each thread
    logger.info("🧠 Loading spaCy models for threads...")
    if isinstance(nlp_models, list):
        if len(nlp_models) != num_threads:
            logger.warning(f"⚠️ {len(nlp_models)} models provided for {num_threads} threads")
            # Use first model for all threads if not enough models
            nlp_models = [nlp_models[0]] * num_threads
    else:
        # Single model - each thread will need its own copy
        nlp_models = []
        for i in range(num_threads):
            try:
                nlp_model = spacy.load("nl_core_news_sm")
                nlp_models.append(nlp_model)
                logger.info(f"   ✅ Thread {i+1} model loaded")
            except Exception as e:
                logger.error(f"   ❌ Error loading model for thread {i+1}: {e}")
                return None
    
    # Split dataframe into batches
    logger.info("📦 Creating batches...")
    batches = []
    total_articles = len(df)
    
    for start_idx in range(0, total_articles, batch_size):
        end_idx = min(start_idx + batch_size, total_articles)
        batch_df = df.iloc[start_idx:end_idx].copy()
        batches.append(batch_df)
    
    logger.info(f"   Created {len(batches)} batches for processing")
    
    # Prepare batch data for threads
    batch_data_list = []
    for i, batch_df in enumerate(batches):
        thread_id = i % num_threads
        nlp_model = nlp_models[thread_id]
        batch_data_list.append((batch_df, nlp_model, thread_safe_db, thread_id, i))
    
    # Process batches using ThreadPoolExecutor
    logger.info("⚡ Starting thread pool execution...")
    start_time = time.time()
    total_articles_processed = 0
    total_words_extracted = 0
    total_errors = 0
    completed_batches = 0
    interrupted = False
    
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            logger.info(f"🎯 Submitted {len(batch_data_list)} batches to {num_threads} threads")
            
            # Submit all batches
            future_to_batch = {
                executor.submit(process_article_batch_threaded, batch_data): i 
                for i, batch_data in enumerate(batch_data_list)
            }
            
            # Process completed batches
            for future in concurrent.futures.as_completed(future_to_batch):
                batch_num = future_to_batch[future]
                
                try:
                    result = future.result(timeout=30)  # 30 second timeout per batch
                    total_articles_processed += result['articles_processed']
                    total_words_extracted += result['words_extracted']
                    total_errors += result.get('errors_count', 0)
                    completed_batches += 1
                    
                    if result.get('interrupted', False):
                        interrupted = True
                        logger.warning(f"🛑 Batch {batch_num} was interrupted")
                    
                    # Progress update
                    progress = (completed_batches / len(batches)) * 100
                    elapsed = time.time() - start_time
                    rate = total_articles_processed / elapsed if elapsed > 0 else 0
                    
                    if completed_batches % 5 == 0 or completed_batches == len(batches) or interrupted:
                        logger.info(f"📈 Progress: {completed_batches}/{len(batches)} batches ({progress:.1f}%)")
                        logger.info(f"   📊 Articles: {total_articles_processed:,}/{total_articles:,}")
                        logger.info(f"   💬 Words extracted: {total_words_extracted:,}")
                        logger.info(f"   ⏱️ Time: {elapsed:.1f}s ({rate:.1f} articles/sec)")
                        logger.info(f"   ❌ Errors: {total_errors}")
                        
                        if interrupted:
                            logger.warning("🚨 Interrupt detected, stopping remaining batches...")
                            break
                            
                except concurrent.futures.TimeoutError:
                    logger.error(f"⏰ Batch {batch_num} timed out")
                    total_errors += 1
                except Exception as e:
                    logger.error(f"💥 Batch {batch_num} generated an exception: {e}")
                    total_errors += 1
            
            if interrupted:
                # Cancel remaining futures
                for future in future_to_batch:
                    future.cancel()
                logger.warning("🛑 Cancelled remaining batch processing due to interrupt")
                
    except KeyboardInterrupt:
        logger.warning("⚠️ Keyboard interrupt received in main thread")
        interrupt_flag.set()
        interrupted = True
    
    # Close all database connections
    logger.info("🔒 Closing database connections...")
    thread_safe_db.close_all_connections()
    
    end_time = time.time()
    processing_time = end_time - start_time
    
    # Log final results to database
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        status = "INTERRUPTED" if interrupted else "COMPLETED"
        cursor.execute('''
            INSERT INTO processing_log (articles_processed, words_extracted, notes) 
            VALUES (?, ?, ?)
        ''', (
            total_articles_processed, 
            total_words_extracted, 
            f"Multi-threaded processing {status} - {num_threads} threads, {batch_size} batch size, {processing_time:.1f}s, {total_errors} errors"
        ))
        conn.commit()
        conn.close()
        logger.info("📝 Logged processing results to database")
    except Exception as e:
        logger.error(f"Failed to log to database: {e}")
    
    # Final summary
    status_emoji = "⚠️" if interrupted else "✅"
    logger.info(f"\n{status_emoji} === MULTI-THREADED PROCESSING {'INTERRUPTED' if interrupted else 'COMPLETE'} ===")
    logger.info(f"📊 Total articles processed: {total_articles_processed:,}/{total_articles:,}")
    logger.info(f"💬 Total words extracted: {total_words_extracted:,}")
    logger.info(f"⏱️ Processing time: {processing_time:.1f} seconds")
    logger.info(f"🚀 Articles per second: {total_articles_processed/processing_time:.1f}")
    logger.info(f"❌ Total errors: {total_errors}")
    logger.info(f"📁 Database saved to: {db_path}")
    logger.info(f"🏁 End time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    return {
        'articles_processed': total_articles_processed,
        'words_extracted': total_words_extracted,
        'processing_time': processing_time,
        'articles_per_second': total_articles_processed/processing_time if processing_time > 0 else 0,
        'database_path': db_path,
        'num_threads': num_threads,
        'total_errors': total_errors,
        'interrupted': interrupted,
        'batches_completed': completed_batches,
        'batches_total': len(batches)
    }

print("✅ Multi-threaded processing functions defined!")
print("Ready for high-performance article processing.")

✅ Multi-threaded processing functions defined!
Ready for high-performance article processing.


In [None]:
# Utility functions for interrupt handling and monitoring
def reset_logging():
    """Reset logging configuration to prevent duplicates."""
    global logger
    # Clear all handlers
    logger.handlers.clear()
    
    # Recreate formatter and handler
    formatter = logging.Formatter(
        '%(asctime)s - %(levelname)s - [Thread-%(thread)d] - %(message)s',
        datefmt='%H:%M:%S'
    )
    
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger.addHandler(console_handler)
    logger.setLevel(logging.INFO)
    logger.propagate = False
    
    logger.info("🔄 Logging configuration reset")

def reset_interrupt():
    """Reset the interrupt flag to allow new processing."""
    interrupt_flag.clear()
    logger.info("🔄 Interrupt flag reset - ready for new processing")

def force_interrupt():
    """Manually trigger an interrupt (useful for testing)."""
    interrupt_flag.set()
    logger.warning("🛑 Manual interrupt triggered")

def check_interrupt_status():
    """Check if processing is currently interrupted."""
    status = "INTERRUPTED" if interrupt_flag.is_set() else "NORMAL"
    logger.info(f"📊 Current interrupt status: {status}")
    return interrupt_flag.is_set()

def monitor_processing_progress(db_path="output/words_database.sqlite"):
    """Monitor the progress of ongoing processing by checking the database."""
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Get latest processing log entry
        cursor.execute('''
            SELECT articles_processed, words_extracted, processing_date, notes 
            FROM processing_log 
            ORDER BY processing_date DESC 
            LIMIT 1
        ''')
        result = cursor.fetchone()
        
        if result:
            articles, words, date, notes = result
            logger.info(f"📈 Latest processing status:")
            logger.info(f"   📊 Articles processed: {articles:,}")
            logger.info(f"   💬 Words extracted: {words:,}")
            logger.info(f"   ⏰ Last update: {date}")
            logger.info(f"   📝 Notes: {notes}")
        else:
            logger.info("📭 No processing log entries found")
            
        conn.close()
        
    except Exception as e:
        logger.error(f"❌ Error monitoring progress: {e}")

print("🛠️ Interrupt handling and monitoring utilities defined!")
print("Functions available:")
print("  - reset_logging(): Reset logging configuration")
print("  - reset_interrupt(): Reset interrupt flag")
print("  - force_interrupt(): Manually trigger interrupt")
print("  - check_interrupt_status(): Check current status")
print("  - monitor_processing_progress(): Check database progress")

🛠️ Interrupt handling and monitoring utilities defined!
Functions available:
  - reset_interrupt(): Reset interrupt flag
  - force_interrupt(): Manually trigger interrupt
  - check_interrupt_status(): Check current status
  - monitor_processing_progress(): Check database progress


In [None]:
# Test multi-threaded processing with enhanced logging and interrupt handling
print("🧪 Testing enhanced multi-threaded pipeline...")

# Reset any previous interrupt state and logging
reset_interrupt()
reset_logging()

# Use smaller sample for testing multi-threading
SAMPLE_SIZE_MT = 100  # Smaller for testing interrupt functionality

if 'df' not in locals() or df is None:
    print("Loading dataset...")
    df = pd.read_feather("data/NOS_NL_articles_2015_mar_2025.feather")
    print(f"Dataset loaded: {df.shape}")

# Create sample for multi-threaded testing
sample_df_mt = df.head(SAMPLE_SIZE_MT)
print(f"Created sample with {len(sample_df_mt)} articles for testing")

print("\n📊 Enhanced Features Demo:")
print("1. Detailed logging with timestamps and thread IDs")
print("2. Interrupt handling (Ctrl+C or notebook interrupt)")
print("3. Progress monitoring and error tracking")
print("4. Graceful shutdown and cleanup")

print("\n🚀 Starting enhanced multi-threaded processing...")

# Run with detailed logging
results = process_articles_multithreaded(
    df=sample_df_mt,
    nlp_models=nlp,
    db_path="output/enhanced_test.sqlite",
    batch_size=20,  # Small batches for more frequent updates
    num_threads=3,  # Fewer threads for easier monitoring
    log_level=logging.INFO  # Detailed logging
)

print(f"\n✅ Enhanced processing complete!")
print(f"📊 Results summary:")
for key, value in results.items():
    print(f"   {key}: {value}")

# Demonstrate monitoring functions
print(f"\n🔍 Testing monitoring functions:")
check_interrupt_status()
monitor_processing_progress("output/enhanced_test.sqlite")

print(f"\n💡 To interrupt processing:")
print("   - Use Jupyter notebook interrupt button")
print("   - Press Ctrl+C in terminal")
print("   - Call force_interrupt() function")
print("   - Processing will stop gracefully and save progress")

22:33:21 - INFO - [Thread-25780] - 🔄 Interrupt flag reset - ready for new processing
22:33:21 - INFO - [Thread-25780] - 🚀 Starting multi-threaded processing...
22:33:21 - INFO - [Thread-25780] - 📊 Configuration:
22:33:21 - INFO - [Thread-25780] -    Articles to process: 100
22:33:21 - INFO - [Thread-25780] -    Batch size: 20
22:33:21 - INFO - [Thread-25780] -    Number of threads: 3
22:33:21 - INFO - [Thread-25780] -    Database path: output/enhanced_test.sqlite
22:33:21 - INFO - [Thread-25780] -    Start time: 2025-08-01 22:33:21
22:33:21 - INFO - [Thread-25780] - 🗄️ Setting up database...
22:33:21 - INFO - [Thread-25780] - 🚀 Starting multi-threaded processing...
22:33:21 - INFO - [Thread-25780] - 📊 Configuration:
22:33:21 - INFO - [Thread-25780] -    Articles to process: 100
22:33:21 - INFO - [Thread-25780] -    Batch size: 20
22:33:21 - INFO - [Thread-25780] -    Number of threads: 3
22:33:21 - INFO - [Thread-25780] -    Database path: output/enhanced_test.sqlite
22:33:21 - INFO - 

🧪 Testing enhanced multi-threaded pipeline...
Created sample with 100 articles for testing

📊 Enhanced Features Demo:
1. Detailed logging with timestamps and thread IDs
2. Interrupt handling (Ctrl+C or notebook interrupt)
3. Progress monitoring and error tracking
4. Graceful shutdown and cleanup

🚀 Starting enhanced multi-threaded processing...
Database setup complete: output/enhanced_test.sqlite


22:33:22 - INFO - [Thread-25780] -    ✅ Thread 1 model loaded
22:33:22 - INFO - [Thread-25780] -    ✅ Thread 2 model loaded
22:33:22 - INFO - [Thread-25780] -    ✅ Thread 2 model loaded
22:33:23 - INFO - [Thread-25780] -    ✅ Thread 3 model loaded
22:33:23 - INFO - [Thread-25780] - 📦 Creating batches...
22:33:23 - INFO - [Thread-25780] -    Created 5 batches for processing
22:33:23 - INFO - [Thread-25780] - ⚡ Starting thread pool execution...
22:33:23 - INFO - [Thread-25780] - 🎯 Submitted 5 batches to 3 threads
22:33:23 - INFO - [Thread-24600] - 🚀 Thread 0 starting batch 0 with 20 articles
22:33:23 - INFO - [Thread-30948] - 🚀 Thread 1 starting batch 1 with 20 articles
22:33:23 - INFO - [Thread-25780] -    ✅ Thread 3 model loaded
22:33:23 - INFO - [Thread-25780] - 📦 Creating batches...
22:33:23 - INFO - [Thread-25780] -    Created 5 batches for processing
22:33:23 - INFO - [Thread-25780] - ⚡ Starting thread pool execution...
22:33:23 - INFO - [Thread-25780] - 🎯 Submitted 5 batches to 3 


✅ Enhanced processing complete!
📊 Results summary:
   articles_processed: 100
   words_extracted: 29696
   processing_time: 4.25283670425415
   articles_per_second: 23.513717303081286
   database_path: output/enhanced_test.sqlite
   num_threads: 3
   total_errors: 0
   interrupted: False
   batches_completed: 5
   batches_total: 5

🔍 Testing monitoring functions:

💡 To interrupt processing:
   - Use Jupyter notebook interrupt button
   - Press Ctrl+C in terminal
   - Call force_interrupt() function
   - Processing will stop gracefully and save progress


## Performance Optimization Guide

### Multi-threading Configuration

**Optimal Thread Count:**
- **CPU-bound tasks**: Use `CPU cores - 2` threads (leaves cores for OS)
- **Memory-bound tasks**: Use `CPU cores` or slightly more
- **For this workload**: Start with 4-6 threads, adjust based on performance

**Batch Size Recommendations:**
- **Small batches (100-200)**: Better memory usage, more frequent database commits
- **Large batches (500-1000)**: Less database overhead, higher memory usage
- **Recommended**: 500 articles per batch for balanced performance

**Performance Tips:**
1. **Monitor system resources** during processing
2. **Reduce threads** if CPU usage is maxed out
3. **Increase batch size** if I/O is the bottleneck
4. **Use SSD storage** for database for faster writes
5. **Close other applications** to free up memory

**Expected Performance:**
- Single-threaded: ~50-100 articles/second
- Multi-threaded (4 cores): ~200-400 articles/second
- Multi-threaded (8 cores): ~400-600 articles/second

In [12]:
# System Analysis for Optimal Thread Configuration
import psutil
import multiprocessing

print("🖥️  SYSTEM ANALYSIS FOR MULTI-THREADING")
print("="*50)

# CPU Information
cpu_count = multiprocessing.cpu_count()
cpu_freq = psutil.cpu_freq()
print(f"CPU Cores: {cpu_count}")
if cpu_freq:
    print(f"CPU Frequency: {cpu_freq.current:.0f} MHz (max: {cpu_freq.max:.0f} MHz)")

# Memory Information
memory = psutil.virtual_memory()
print(f"Total RAM: {memory.total / (1024**3):.1f} GB")
print(f"Available RAM: {memory.available / (1024**3):.1f} GB")
print(f"Memory Usage: {memory.percent}%")

# Disk Information (for database storage)
try:
    disk = psutil.disk_usage('output')
    print(f"Output Drive Free Space: {disk.free / (1024**3):.1f} GB")
except:
    print("Could not check output drive space")

print("\n🎯 RECOMMENDED CONFIGURATION:")
print("="*40)

# Calculate recommendations
recommended_threads = max(2, min(8, cpu_count - 1))
recommended_batch_size = 500

if memory.available / (1024**3) < 4:  # Less than 4GB available
    recommended_batch_size = 250
    recommended_threads = max(2, recommended_threads - 1)
    print("⚠️  Limited memory detected - reducing batch size and threads")

print(f"Recommended Threads: {recommended_threads}")
print(f"Recommended Batch Size: {recommended_batch_size}")

# Memory estimate
estimated_memory_per_thread = 0.5  # GB per thread (rough estimate)
total_estimated_memory = recommended_threads * estimated_memory_per_thread

print(f"Estimated Memory Usage: {total_estimated_memory:.1f} GB")

if total_estimated_memory > memory.available / (1024**3) * 0.8:
    print("⚠️  WARNING: Estimated memory usage is high. Consider reducing threads.")
    recommended_threads = max(2, int(memory.available / (1024**3) * 0.8 / estimated_memory_per_thread))
    print(f"Adjusted recommendation: {recommended_threads} threads")

print(f"\n💡 Usage Example:")
print("```python")
print("results = process_articles_multithreaded(")
print("    df=df,")
print("    nlp_models=nlp,")
print("    db_path='output/dutch_words_full.sqlite',")
print(f"    batch_size={recommended_batch_size},")
print(f"    num_threads={recommended_threads}")
print(")")
print("```")

print(f"\n⏱️  Estimated Processing Time:")
articles_per_second = recommended_threads * 50  # Rough estimate
total_articles = 295000  # Approximate article count
estimated_time = total_articles / articles_per_second
print(f"~{estimated_time/60:.0f} minutes for {total_articles:,} articles")
print(f"({articles_per_second} articles/second estimated)")

🖥️  SYSTEM ANALYSIS FOR MULTI-THREADING
CPU Cores: 24
CPU Frequency: 3701 MHz (max: 3701 MHz)
Total RAM: 31.9 GB
Available RAM: 13.1 GB
Memory Usage: 59.1%
Output Drive Free Space: 288.3 GB

🎯 RECOMMENDED CONFIGURATION:
Recommended Threads: 8
Recommended Batch Size: 500
Estimated Memory Usage: 4.0 GB

💡 Usage Example:
```python
results = process_articles_multithreaded(
    df=df,
    nlp_models=nlp,
    db_path='output/dutch_words_full.sqlite',
    batch_size=500,
    num_threads=8
)
```

⏱️  Estimated Processing Time:
~12 minutes for 295,000 articles
(400 articles/second estimated)


## 🛑 Interrupt Handling Guide

### How to Safely Stop Processing

**Jupyter Notebook:**
1. Click the **"Interrupt"** button in the toolbar
2. Processing will stop gracefully after current batches complete
3. Partial results are automatically saved to the database

**Terminal/Command Line:**
1. Press **Ctrl+C** (Windows/Linux) or **Cmd+C** (Mac)
2. Processing will stop gracefully
3. All completed work is preserved

**Programmatically:**
```python
# Force interrupt from code
force_interrupt()

# Check if processing was interrupted
if check_interrupt_status():
    print("Processing is currently interrupted")

# Reset to allow new processing
reset_interrupt()
```

### 📊 Monitoring Progress

**Real-time Monitoring:**
- Detailed logs show progress every 5 completed batches
- Thread-level logging shows individual thread progress
- Error tracking and timeout handling

**Database Monitoring:**
```python
# Check latest progress
monitor_processing_progress("output/dutch_words_full.sqlite")

# Analyze partial results
analyze_word_database("output/dutch_words_full.sqlite")
```

### 🔄 Resuming Interrupted Processing

**Safe Resume Process:**
1. Check interrupt status: `check_interrupt_status()`
2. Reset interrupt flag: `reset_interrupt()`
3. Re-run the processing cell
4. Processing will continue where it left off (database handles duplicates)

### ⚡ Performance vs Safety Balance

**Trade-offs:**
- **More frequent saves**: Slower processing, safer against data loss
- **Larger batches**: Faster processing, more data loss risk if interrupted
- **More threads**: Higher resource usage, harder to interrupt cleanly

**Recommendations:**
- Use **500 article batches** for optimal balance
- Set **num_threads = CPU_cores - 2** to leave resources for interrupts
- Monitor system resources during processing
- Always test interrupt functionality with small samples first

## Step 6: Analysis and Export Functions

Define the analysis and export functions that will be used to analyze the word database and create various exports for different use cases.

In [13]:
def analyze_word_database(db_path="output/words_database.sqlite"):
    """
    Analyze the word database and generate statistics.
    
    Args:
        db_path (str): Path to the SQLite database
    """
    try:
        conn = sqlite3.connect(db_path)
        
        print(f"=== WORD DATABASE ANALYSIS ===")
        print(f"Database: {db_path}")
        
        # Basic statistics
        cursor = conn.cursor()
        
        cursor.execute('SELECT COUNT(*) FROM words')
        total_unique_words = cursor.fetchone()[0]
        
        cursor.execute('SELECT SUM(total_frequency) FROM words')
        total_word_instances = cursor.fetchone()[0]
        
        cursor.execute('SELECT COUNT(DISTINCT pos_category) FROM words')
        pos_categories = cursor.fetchone()[0]
        
        print(f"\nBasic Statistics:")
        print(f"  Unique words: {total_unique_words:,}")
        print(f"  Total word instances: {total_word_instances:,}")
        print(f"  POS categories: {pos_categories}")
        
        # Top words by frequency
        cursor.execute('''
            SELECT word, lemma, pos_category, total_frequency 
            FROM words 
            ORDER BY total_frequency DESC 
            LIMIT 20
        ''')
        top_words = cursor.fetchall()
        
        print(f"\nTop 20 Most Frequent Words:")
        for i, (word, lemma, pos, freq) in enumerate(top_words, 1):
            print(f"  {i:2d}. {word} ({lemma}) [{pos}] - {freq:,} times")
        
        # Words by POS category
        cursor.execute('''
            SELECT pos_category, COUNT(*) as count, AVG(total_frequency) as avg_freq
            FROM words 
            GROUP BY pos_category 
            ORDER BY count DESC
        ''')
        pos_stats = cursor.fetchall()
        
        print(f"\nWords by POS Category:")
        for pos, count, avg_freq in pos_stats:
            print(f"  {pos}: {count:,} words (avg freq: {avg_freq:.1f})")
        
        # Yearly trends
        cursor.execute('''
            SELECT year, COUNT(*) as word_count, SUM(frequency) as total_freq
            FROM word_frequencies 
            GROUP BY year 
            ORDER BY year
        ''')
        yearly_trends = cursor.fetchall()
        
        print(f"\nYearly Word Trends:")
        for year, word_count, total_freq in yearly_trends:
            print(f"  {year}: {word_count:,} unique words, {total_freq:,} total instances")
        
        conn.close()
        
    except sqlite3.Error as e:
        print(f"Database error: {e}")
    except FileNotFoundError:
        print(f"Database file not found: {db_path}")

def export_word_lists(db_path="output/words_database.sqlite", output_dir="output/exports"):
    """
    Export word lists in various formats for different use cases.
    
    Args:
        db_path (str): Path to the SQLite database
        output_dir (str): Directory to save exports
    """
    import os
    
    try:
        os.makedirs(output_dir, exist_ok=True)
        conn = sqlite3.connect(db_path)
        
        print(f"=== EXPORTING WORD LISTS ===")
        print(f"Output directory: {output_dir}")
        
        # 1. All words list (for general use)
        print("\n1. Exporting all words list...")
        cursor = conn.cursor()
        cursor.execute('SELECT DISTINCT word FROM words ORDER BY word')
        all_words = [row[0] for row in cursor.fetchall()]
        
        with open(f"{output_dir}/all_words.txt", 'w', encoding='utf-8') as f:
            for word in all_words:
                f.write(word + '\n')
        print(f"   Exported {len(all_words):,} words to all_words.txt")
        
        # 2. Common words (frequency >= 10)
        print("\n2. Exporting common words (frequency >= 10)...")
        cursor.execute('SELECT word, total_frequency FROM words WHERE total_frequency >= 10 ORDER BY total_frequency DESC')
        common_words = cursor.fetchall()
        
        with open(f"{output_dir}/common_words.txt", 'w', encoding='utf-8') as f:
            for word, freq in common_words:
                f.write(f"{word}\t{freq}\n")
        print(f"   Exported {len(common_words):,} words to common_words.txt")
        
        # 3. Words by POS category
        print("\n3. Exporting words by POS category...")
        pos_categories = ['noun', 'verb', 'adjective', 'adverb']
        
        for pos in pos_categories:
            cursor.execute('''
                SELECT word, total_frequency 
                FROM words 
                WHERE pos_category = ? 
                ORDER BY total_frequency DESC
            ''', (pos,))
            pos_words = cursor.fetchall()
            
            with open(f"{output_dir}/{pos}_words.txt", 'w', encoding='utf-8') as f:
                for word, freq in pos_words:
                    f.write(f"{word}\t{freq}\n")
            print(f"   Exported {len(pos_words):,} {pos} words to {pos}_words.txt")
        
        # 4. CSV export with full data
        print("\n4. Exporting full data to CSV...")
        cursor.execute('''
            SELECT word, lemma, pos_tag, pos_category, total_frequency, first_seen, last_seen
            FROM words 
            ORDER BY total_frequency DESC
        ''')
        
        import csv
        with open(f"{output_dir}/words_full_data.csv", 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['word', 'lemma', 'pos_tag', 'pos_category', 'total_frequency', 'first_seen', 'last_seen'])
            writer.writerows(cursor.fetchall())
        print(f"   Exported full data to words_full_data.csv")
        
        # 5. Game-friendly word list (4-8 letters, common words)
        print("\n5. Exporting game-friendly word list...")
        cursor.execute('''
            SELECT word, total_frequency 
            FROM words 
            WHERE LENGTH(word) BETWEEN 4 AND 8 
            AND total_frequency >= 5
            AND pos_category IN ('noun', 'verb', 'adjective')
            ORDER BY total_frequency DESC
        ''')
        game_words = cursor.fetchall()
        
        with open(f"{output_dir}/game_words.txt", 'w', encoding='utf-8') as f:
            for word, freq in game_words:
                f.write(word + '\n')
        print(f"   Exported {len(game_words):,} words to game_words.txt")
        
        conn.close()
        print(f"\n✅ All exports completed in: {output_dir}")
        
    except Exception as e:
        print(f"Export error: {e}")

print("Analysis and export functions defined and ready for use.")

Analysis and export functions defined and ready for use.


## Step 7: Test Processing with Small Sample

Test the processing pipeline with a small sample of articles to verify everything works correctly before running on the full dataset.

In [14]:
# Test the processing pipeline with a small sample first
SAMPLE_SIZE = 100  # Number of articles to test with

print("🧪 Testing word extraction pipeline with small sample...")
print(f"Sample size: {SAMPLE_SIZE} articles")

# Load the full dataset if not already loaded
if 'df' not in locals() or df is None:
    print("Loading dataset...")
    df = pd.read_feather("data/NOS_NL_articles_2015_mar_2025.feather")
    print(f"Dataset loaded: {df.shape}")

# Create sample dataset
sample_df = df.head(SAMPLE_SIZE)
print(f"Created sample with {len(sample_df)} articles")

# Run the processing pipeline on sample
print("\nStarting sample processing...")
test_results = process_articles_pipeline(
    df=sample_df,
    nlp_model=nlp,
    db_path="output/test_dutch_words.sqlite",
    batch_size=25  # Small batches for testing
)

print("✅ Sample processing complete!")
print(f"Test results: {test_results}")

# Quick analysis of test results
if os.path.exists("output/test_dutch_words.sqlite"):
    print("\n📊 Quick analysis of sample results:")
    analyze_word_database("output/test_dutch_words.sqlite")
    
    # Export sample results
    print("\n📁 Creating sample exports...")
    export_word_lists("output/test_dutch_words.sqlite", "output/test_exports")
    
    print(f"\n✅ Sample testing completed successfully!")
    print(f"Ready to process full dataset in the next step.")
else:
    print("❌ Test database was not created properly.")

🧪 Testing word extraction pipeline with small sample...
Sample size: 100 articles
Created sample with 100 articles

Starting sample processing...
Database setup complete: output/test_dutch_words.sqlite
Starting processing of 100 articles...
Batch size: 25


Processing batches:  25%|██▌       | 1/4 [00:01<00:04,  1.56s/it]

Processed 25/100 articles, extracted 6,787 words


Processing batches: 100%|██████████| 4/4 [00:06<00:00,  1.65s/it]


=== PROCESSING COMPLETE ===
Total articles processed: 100
Total words extracted: 29,696
Database saved to: output/test_dutch_words.sqlite
Unique words in database: 6,195
POS categories found: 13
Yearly distribution:
  2015: 6,195 word instances
✅ Sample processing complete!
Test results: {'articles_processed': 100, 'words_extracted': 29696, 'unique_words': 6195, 'database_path': 'output/test_dutch_words.sqlite'}

📊 Quick analysis of sample results:
=== WORD DATABASE ANALYSIS ===
Database: output/test_dutch_words.sqlite

Basic Statistics:
  Unique words: 6,195
  Total word instances: 59,392
  POS categories: 13

Top 20 Most Frequent Words:
   1. de (de) [determiner] - 3,724 times
   2. in (in) [preposition] - 1,764 times
   3. van (van) [preposition] - 1,704 times
   4. een (een) [determiner] - 1,560 times
   5. het (het) [determiner] - 1,384 times
   6. en (en) [conjunction] - 1,032 times
   7. is (zijn) [auxiliary] - 860 times
   8. op (op) [preposition] - 784 times
   9. met (met) [




## Step 8: Full Dataset Processing (Execute with Caution)

**WARNING**: This step will process all 295k+ articles and may take several hours. Only run when ready!

Run this step only after successfully testing with the sample in Step 7.

In [None]:
# Process the full dataset with enhanced multi-threading (logging + interrupt handling)
print("🚀 Starting ENHANCED MULTI-THREADED processing of ALL articles...")
print("⚡ Features: Detailed logging, interrupt handling, progress monitoring")
print("🛑 You can interrupt processing safely with Ctrl+C or notebook interrupt")

# Reset interrupt flag
reset_interrupt()

# Load the full dataset if not already loaded
if 'df' not in locals() or df is None:
    print("Loading dataset...")
    df = pd.read_feather("data/NOS_NL_articles_2015_mar_2025.feather")
    print(f"Dataset loaded: {df.shape}")

print(f"Processing {len(df):,} articles with enhanced multi-threading...")

# Get system recommendations
import psutil
import multiprocessing
cpu_count = multiprocessing.cpu_count()
memory = psutil.virtual_memory()
recommended_threads = max(2, min(8, cpu_count - 1))

if memory.available / (1024**3) < 4:  # Less than 4GB available
    recommended_threads = max(2, recommended_threads - 1)

print(f"💡 System recommendations:")
print(f"   CPU cores: {cpu_count}")
print(f"   Available memory: {memory.available / (1024**3):.1f} GB")
print(f"   Recommended threads: {recommended_threads}")

# Run the ENHANCED MULTI-THREADED processing pipeline
try:
    results = process_articles_multithreaded(
        df=df,
        nlp_models=nlp,  # Will create copies for each thread
        db_path="output/dutch_words_full.sqlite",
        batch_size=500,  # Optimized batch size
        num_threads=recommended_threads,  # Use system recommendations
        log_level=logging.INFO  # Detailed logging
    )
    
    status = "INTERRUPTED" if results.get('interrupted', False) else "COMPLETED"
    print(f"\n{'⚠️' if results.get('interrupted') else '✅'} FULL PROCESSING {status}!")
    
    print(f"\n📊 Final Results:")
    print(f"   Articles processed: {results['articles_processed']:,}")
    print(f"   Words extracted: {results['words_extracted']:,}")
    print(f"   Processing time: {results['processing_time']:.1f} seconds")
    print(f"   Articles per second: {results['articles_per_second']:.1f}")
    print(f"   Errors encountered: {results['total_errors']}")
    print(f"   Batches completed: {results['batches_completed']}/{results['batches_total']}")
    
    if results.get('interrupted'):
        print(f"\n⚠️ Processing was interrupted but data has been saved.")
        print(f"📄 You can resume processing by running this cell again.")
        print(f"📊 Progress monitoring available with: monitor_processing_progress()")
    
except KeyboardInterrupt:
    print(f"\n🛑 Processing interrupted by user")
    print(f"📁 Partial results saved to database")
except Exception as e:
    print(f"\n❌ Error during processing: {e}")
    logger.error(f"Processing failed: {e}")

# Always run analysis if database exists
if os.path.exists("output/dutch_words_full.sqlite"):
    print(f"\n📊 Database analysis:")
    try:
        analyze_word_database("output/dutch_words_full.sqlite")
        
        # Export results if processing completed successfully
        if 'results' in locals() and not results.get('interrupted', False):
            print(f"\n📁 Creating production exports...")
            export_word_lists("output/dutch_words_full.sqlite", "output/exports")
            print(f"\n🎉 COMPLETE! Full Dutch word database created successfully!")
        else:
            print(f"\n💾 Partial results available in database")
            
    except Exception as e:
        print(f"❌ Error during analysis: {e}")
else:
    print(f"❌ Database file not found")

22:33:34 - INFO - [Thread-25780] - 🔄 Interrupt flag reset - ready for new processing
22:33:34 - INFO - [Thread-25780] - 🚀 Starting multi-threaded processing...
22:33:34 - INFO - [Thread-25780] - 📊 Configuration:
22:33:34 - INFO - [Thread-25780] -    Articles to process: 295,259
22:33:34 - INFO - [Thread-25780] -    Batch size: 500
22:33:34 - INFO - [Thread-25780] - 🚀 Starting multi-threaded processing...
22:33:34 - INFO - [Thread-25780] - 📊 Configuration:
22:33:34 - INFO - [Thread-25780] -    Articles to process: 295,259
22:33:34 - INFO - [Thread-25780] -    Batch size: 500
22:33:34 - INFO - [Thread-25780] -    Number of threads: 8
22:33:34 - INFO - [Thread-25780] -    Database path: output/dutch_words_full.sqlite
22:33:34 - INFO - [Thread-25780] -    Start time: 2025-08-01 22:33:34
22:33:34 - INFO - [Thread-25780] - 🗄️ Setting up database...
22:33:34 - INFO - [Thread-25780] -    Number of threads: 8
22:33:34 - INFO - [Thread-25780] -    Database path: output/dutch_words_full.sqlite
22

🚀 Starting ENHANCED MULTI-THREADED processing of ALL articles...
⚡ Features: Detailed logging, interrupt handling, progress monitoring
🛑 You can interrupt processing safely with Ctrl+C or notebook interrupt
Processing 295,259 articles with enhanced multi-threading...
💡 System recommendations:
   CPU cores: 24
   Available memory: 13.1 GB
   Recommended threads: 8
Database setup complete: output/dutch_words_full.sqlite


22:33:35 - INFO - [Thread-25780] -    ✅ Thread 1 model loaded
22:33:36 - INFO - [Thread-25780] -    ✅ Thread 2 model loaded
22:33:36 - INFO - [Thread-25780] -    ✅ Thread 2 model loaded
22:33:36 - INFO - [Thread-25780] -    ✅ Thread 3 model loaded
22:33:36 - INFO - [Thread-25780] -    ✅ Thread 3 model loaded
22:33:37 - INFO - [Thread-25780] -    ✅ Thread 4 model loaded
22:33:37 - INFO - [Thread-25780] -    ✅ Thread 4 model loaded
22:33:38 - INFO - [Thread-25780] -    ✅ Thread 5 model loaded
22:33:38 - INFO - [Thread-25780] -    ✅ Thread 5 model loaded
22:33:39 - INFO - [Thread-25780] -    ✅ Thread 6 model loaded
22:33:39 - INFO - [Thread-25780] -    ✅ Thread 6 model loaded
22:33:40 - INFO - [Thread-25780] -    ✅ Thread 7 model loaded
22:33:40 - INFO - [Thread-25780] -    ✅ Thread 7 model loaded
22:33:40 - INFO - [Thread-25780] -    ✅ Thread 8 model loaded
22:33:40 - INFO - [Thread-25780] - 📦 Creating batches...
22:33:40 - INFO - [Thread-25780] -    ✅ Thread 8 model loaded
22:33:40 - IN

## Step 8: Analysis and Export

Analyze the extracted words and create various exports for different use cases.