# NER + TF-IDF Topic Extraction Backfill

This notebook processes existing news articles to extract topics using Named Entity Recognition (NER) with Google Gemini and TF-IDF scoring.

## Overview
1. Load articles from database
2. Extract entities using Gemini NER
3. Calculate TF-IDF scores
4. Store topics and update trending topics
5. Generate statistics and visualizations


# NER + TF-IDF Topic Extraction Backfill

This notebook processes existing news articles to extract topics using Named Entity Recognition (NER) with Google Gemini and TF-IDF scoring.

## Overview
1. Load articles from database
2. Extract entities using Gemini NER
3. Calculate TF-IDF scores
4. Store topics and update trending topics
5. Generate statistics and visualizations


## 1. Setup and Configuration


In [None]:
import os
import json
import time
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import pandas as pd
import numpy as np
from tqdm import tqdm

# Guard heavy plotting libs to avoid NumPy ABI issues in headless runs
try:
    import matplotlib.pyplot as plt
    import seaborn as sns
except Exception as e:
    plt = None
    sns = None
    print(f"⚠️ Skipping matplotlib/seaborn due to import issue: {e}")

# Database and AI imports
import libsql_client
import google.generativeai as genai
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

print("✅ All imports successful (plotting libs optional)")


In [None]:
# Configuration
BATCH_SIZE = 15  # Process articles in batches
RATE_LIMIT_DELAY = 2  # Seconds between API calls
MAX_ARTICLES = None  # Set to None for all articles, or number to limit
RUN_PROCESSING = True  # Safety: do not run heavy processing by default

# Entity types for NER (updated)
ENTITY_TYPES = [
    'PERSON', 'ORG', 'LOCATION', 'PRODUCT', 'PROGRAMMING_LANGUAGE', 
    'SCIENTIFIC_TERM', 'FIELD_OF_STUDY', 'EVENT', 'WORK_OF_ART', 'LAW_OR_POLICY'
]

# TF-IDF configuration
TFIDF_CONFIG = {
    'max_features': 1000,
    'stop_words': 'english',
    'ngram_range': (1, 2),  # unigrams and bigrams
    'min_df': 2,  # minimum document frequency
    'sublinear_tf': True  # log scaling
}

print(f"📊 Configuration loaded:")
print(f"   Batch size: {BATCH_SIZE}")
print(f"   Rate limit: {RATE_LIMIT_DELAY}s")
print(f"   Max articles: {MAX_ARTICLES or 'All'}")
print(f"   Entity types: {ENTITY_TYPES}")
print(f"   Run processing: {RUN_PROCESSING}")


## 2. Database Connection


In [None]:
# Initialize database connection
database_url = os.getenv('TURSO_DATABASE_URL')
auth_token = os.getenv('TURSO_AUTH_TOKEN')

if not database_url or not auth_token:
    raise ValueError("Missing TURSO_DATABASE_URL or TURSO_AUTH_TOKEN in environment")

# Force HTTPS for cloud Turso (avoid wss)
http_url = database_url.replace('libsql', 'https', 1)

# Create database client
client = libsql_client.create_client(
    url=http_url,
    auth_token=auth_token
)

print("✅ Database connection established (HTTPS)")


In [None]:
# Test database connection (Fixed - using await)
try:
    result = await client.execute("SELECT COUNT(*) FROM news_articles")
    total_articles = result.rows[0][0]
    print(f"📰 Found {total_articles} articles in database")
    
    # Check existing topics
    result = await client.execute("SELECT COUNT(*) FROM article_topics")
    existing_topics = result.rows[0][0]
    print(f"🏷️  Found {existing_topics} existing topics")
    
except Exception as e:
    print(f"❌ Database test failed: {e}")
    raise


In [None]:
# Database helper functions (Fixed - using async/await)

async def get_articles_without_topics():
    """Get articles that don't have topics extracted yet"""
    query = """
        SELECT na.* 
        FROM news_articles na
        LEFT JOIN article_topics at ON na.id = at.article_id
        WHERE at.id IS NULL
    """
    
    if MAX_ARTICLES:
        query += f" LIMIT {MAX_ARTICLES}"
    
    result = await client.execute(query)
    return result.rows


async def store_article_topics(article_id: str, topics: List[Dict]):
    """Store extracted topics for an article"""
    for topic in topics:
        topic_id = f"topic_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
        
        await client.execute(
            """
            INSERT INTO article_topics (id, article_id, entity_text, entity_type, tfidf_score, ner_confidence)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            [
                topic_id,
                article_id,
                topic['text'],
                topic['type'],
                topic.get('tfidf_score', 0.5),
                topic.get('confidence', 0.8)
            ],
        )


async def update_trending_topics(topics: List[Dict]):
    """Update or create trending topics"""
    for topic in topics:
        # Check if topic exists
        existing = await client.execute(
            """
            SELECT id, occurrence_count, avg_tfidf_score, article_ids
            FROM trending_topics
            WHERE LOWER(topic_text) = LOWER(?)
            """,
            [topic['text']],
        )
        
        if existing.rows:
            # Update existing topic
            row = existing.rows[0]
            new_score = (row['avg_tfidf_score'] + topic.get('tfidf_score', 0.5)) / 2
            
            await client.execute(
                """
                UPDATE trending_topics
                SET occurrence_count = occurrence_count + 1,
                    avg_tfidf_score = ?,
                    last_seen_at = CURRENT_TIMESTAMP
                WHERE id = ?
                """,
                [new_score, row['id']],
            )
        else:
            # Create new trending topic
            topic_id = f"trending_{int(time.time() * 1000)}_{os.urandom(4).hex()}"
            
            await client.execute(
                """
                INSERT INTO trending_topics (id, topic_text, entity_type, occurrence_count, avg_tfidf_score, article_ids)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                [
                    topic_id,
                    topic['text'],
                    topic['type'],
                    1,
                    topic.get('tfidf_score', 0.5),
                    '[]',
                ],
            )

print("✅ Database helper functions ready (async/await)")


In [None]:
# NER with Gemini

def build_entity_extraction_prompt(title: str, content: str) -> str:
    """Build the entity extraction prompt"""
    return f"""You are an expert NLP system. Your task is to extract the 5-10 most important named entities and key concepts from the following news article.

Focus on identifying specific and relevant items. Use the following entity types:
- **PERSON**: People, scientists, researchers.
- **ORG**: Organizations, companies, institutions (e.g., "NASA", "Google").
- **LOCATION**: Geographical places, countries, cities.
- **PRODUCT**: Specific software, hardware, or services (e.g., "iPhone 17", "GitHub Copilot").
- **PROGRAMMING_LANGUAGE**: Programming languages (e.g., "Python", "Rust").
- **SCIENTIFIC_TERM**: Specific scientific concepts, theories, species, or astronomical bodies (e.g., "black hole", "CRISPR").
- **FIELD_OF_STUDY**: Broader domains of knowledge (e.g., "Machine Learning", "Astrophysics").
- **EVENT**: Specific named events, conferences, or historical periods (e.g., "WWDC 2025", "The Renaissance").
- **WORK_OF_ART**: Named creative works like books, films, or paintings.
- **LAW_OR_POLICY**: Named laws, regulations, or policies (e.g., "GDPR").

**Article to Analyze:**
Title: {title}
Content: {content[:1500]}  # Increased character limit slightly for better context

**Instructions:**
1.  Analyze the title and content to find the most significant topics.
2.  Do not extract generic or overly broad terms (e.g., "science", "research").
3.  Return **ONLY** a raw JSON array with the specified format. Do not add any introductory text, explanations, or markdown formatting like ```json.

**JSON Output Format:**
[
  {{"text": "entity name", "type": "TYPE_FROM_LIST_ABOVE"}},
  {{"text": "another entity", "type": "TYPE_FROM_LIST_ABOVE"}}
]"""


def extract_topics_with_gemini(title: str, content: str) -> List[Dict]:
    """Extract topics using Gemini NER with new prompt and types"""
    prompt = build_entity_extraction_prompt(title, content)

    try:
        response = model.generate_content(
            prompt,
            generation_config=genai.types.GenerationConfig(
                temperature=0.3,
                max_output_tokens=1000,
            )
        )
        
        # Extract JSON from response
        text = response.text
        json_start = text.find('[')
        json_end = text.rfind(']') + 1
        
        if json_start >= 0 and json_end > json_start:
            topics = json.loads(text[json_start:json_end])
            return [
                {
                    'text': t['text'],
                    'type': t.get('type', 'SCIENTIFIC_TERM'),
                    'confidence': 0.8,
                    'tfidf_score': 0.5  # Will be updated with TF-IDF
                }
                for t in topics
            ]
        return []
    except Exception as e:
        print(f"❌ Error extracting topics: {e}")
        return []

# Configure Gemini (guard)
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if GOOGLE_API_KEY:
    genai.configure(api_key=GOOGLE_API_KEY)
    model = genai.GenerativeModel('gemini-2.0-flash-lite')
    print("✅ Gemini configured")
else:
    model = None
    print("⚠️ GOOGLE_API_KEY not set; Gemini NER disabled")



In [None]:
# TF-IDF utilities

def calculate_tfidf_scores(articles: List[Dict]) -> Dict[str, float]:
    """Calculate TF-IDF scores for all terms across articles"""
    # Prepare documents
    documents = []
    for article in articles:
        doc = f"{article['title']} {article['content']}"
        documents.append(doc)
    
    # Create TF-IDF vectorizer
    vectorizer = TfidfVectorizer(
        max_features=TFIDF_CONFIG['max_features'],
        stop_words=TFIDF_CONFIG['stop_words'],
        ngram_range=TFIDF_CONFIG['ngram_range'],
        min_df=TFIDF_CONFIG['min_df'],
        sublinear_tf=TFIDF_CONFIG['sublinear_tf'],
    )
    
    # Fit and transform
    tfidf_matrix = vectorizer.fit_transform(documents)
    feature_names = vectorizer.get_feature_names_out()
    
    # Calculate average TF-IDF scores
    mean_scores = np.mean(tfidf_matrix.toarray(), axis=0)
    
    # Create mapping of terms to scores
    term_scores = dict(zip(feature_names, mean_scores))
    
    return term_scores


def update_topic_tfidf_scores(topics: List[Dict], tfidf_scores: Dict[str, float]):
    """Update topics with TF-IDF scores"""
    for topic in topics:
        # Find best matching term in TF-IDF scores
        topic_text = topic['text'].lower()
        best_score = 0.0
        best_term = None
        
        for term, score in tfidf_scores.items():
            if topic_text in term or term in topic_text:
                if score > best_score:
                    best_score = score
                    best_term = term
        
        if best_term:
            topic['tfidf_score'] = best_score
            topic['matched_term'] = best_term
        else:
            topic['tfidf_score'] = 0.1  # Default low score

print("✅ TF-IDF utilities ready")


In [None]:
# Main processing loop (Fixed - using async/await)
async def main_processing():
    if RUN_PROCESSING:
        # Get all articles without topics
        articles = await get_articles_without_topics()
        print(f"📊 Found {len(articles)} articles without topics")

        if len(articles) == 0:
            print("🎉 All articles already have topics!")
        else:
            # Calculate TF-IDF scores for all articles
            print("\n🔢 Calculating TF-IDF scores...")
            tfidf_scores = calculate_tfidf_scores(articles)
            print(f"✅ Calculated TF-IDF for {len(tfidf_scores)} terms")
            
            total_processed = 0
            total_errors = 0
            all_errors = []
            
            print(f"\n🚀 Starting processing of {len(articles)} articles...")
            print("=" * 60)
            
            for i in range(0, len(articles), BATCH_SIZE):
                batch = articles[i:i + BATCH_SIZE]
                batch_num = (i // BATCH_SIZE) + 1
                total_batches = (len(articles) + BATCH_SIZE - 1) // BATCH_SIZE
                
                print(f"\n📦 Processing batch {batch_num}/{total_batches} ({len(batch)} articles)")
                
                for j, article in enumerate(batch):
                    try:
                        print(f"\n[{j+1}/{len(batch)}] Processing: {article['title'][:60]}...")
                        
                        if not model:
                            raise RuntimeError("Gemini model not configured; set GOOGLE_API_KEY")
                        
                        # Extract topics
                        topics = extract_topics_with_gemini(article['title'], article['content'])
                        
                        if not topics:
                            print(f"  ⚠️  No topics extracted")
                            all_errors.append(f"{article['id']}: No topics extracted")
                            total_errors += 1
                            continue
                        
                        # Update with TF-IDF scores
                        update_topic_tfidf_scores(topics, tfidf_scores)
                        
                        # Store topics and update trending
                        await store_article_topics(article['id'], topics)
                        await update_trending_topics(topics)
                        
                        total_processed += 1
                        print(f"  ✅ Extracted {len(topics)} topics")
                        print(f"     Topics: {', '.join([t['text'] for t in topics[:3]])}...")
                        
                        # Rate limiting
                        time.sleep(RATE_LIMIT_DELAY)
                        
                    except Exception as e:
                        print(f"  ❌ Error: {e}")
                        all_errors.append(f"{article['id']}: {str(e)}")
                        total_errors += 1
                
                print(f"✅ Batch {batch_num} complete: {len(batch)} articles processed")
                
                # Delay between batches
                if i + BATCH_SIZE < len(articles):
                    print(f"⏸️  Waiting 3s before next batch...")
                    time.sleep(3)
            
            print("\n" + "=" * 60)
            print("🎉 Processing complete!")
            print(f"✅ Total processed: {total_processed}")
            print(f"❌ Total errors: {total_errors}")
            
            if all_errors:
                print(f"\nError details (first 10):")
                for error in all_errors[:10]:
                    print(f"  - {error}")
    else:
        print("⏹️ RUN_PROCESSING is False. Set to True to process articles.")

# Run the main processing (direct await in Jupyter)
await main_processing()


In [None]:
# Results and statistics (Fixed - using async/await)
async def get_final_statistics():
    # Basic coverage stats (safe to run)
    stats = await client.execute("""
        SELECT 
            COUNT(DISTINCT na.id) as total_articles,
            COUNT(DISTINCT at.article_id) as articles_with_topics,
            COUNT(DISTINCT tt.id) as total_trending_topics
        FROM news_articles na
        LEFT JOIN article_topics at ON na.id = at.article_id
        LEFT JOIN trending_topics tt ON 1=1
    """)

    row = stats.rows[0]
    coverage = (row['articles_with_topics'] / row['total_articles'] * 100) if row['total_articles'] > 0 else 0

    print("\n📊 Final Statistics:")
    print(f"Total articles: {row['total_articles']}")
    print(f"Articles with topics: {row['articles_with_topics']}")
    print(f"Articles without topics: {row['total_articles'] - row['articles_with_topics']}")
    print(f"Total trending topics: {row['total_trending_topics']}")
    print(f"Coverage: {coverage:.2f}%")

    # Top trending topics (safe)
    top_topics = await client.execute("""
        SELECT topic_text, entity_type, occurrence_count, avg_tfidf_score
        FROM trending_topics
        ORDER BY (avg_tfidf_score * occurrence_count) DESC
        LIMIT 10
    """)

    print(f"\n🔥 Top 10 Trending Topics:")
    for i, topic in enumerate(top_topics.rows, 1):
        print(f"{i:2d}. {topic['topic_text']} ({topic['entity_type']}) - "
              f"Count: {topic['occurrence_count']}, Score: {topic['avg_tfidf_score']:.3f}")

    # Optional plotting if libs available
    if plt is not None and sns is not None:
        type_dist = await client.execute("""
            SELECT entity_type, COUNT(*) as count
            FROM trending_topics
            GROUP BY entity_type
            ORDER BY count DESC
        """)
        labels = [r['entity_type'] for r in type_dist.rows]
        counts = [r['count'] for r in type_dist.rows]
        
        plt.figure(figsize=(8,4))
        sns.barplot(x=labels, y=counts)
        plt.title('Topic Distribution by Type')
        plt.xlabel('Entity Type')
        plt.ylabel('Count')
        plt.show()
    else:
        print("(Plotting libraries unavailable; skipping charts)")

# Run the statistics (direct await in Jupyter)
await get_final_statistics()
