
# Blog Content Analysis Notebook
 

### Import reqs and config

In [None]:
import os
import json
import pickle
import hashlib
import boto3
import frontmatter
import pandas as pd
import numpy as np
import hdbscan
import umap
import plotly.express as px
import plotly.graph_objects as go
from pathlib import Path
from typing import List, Dict, Any, Optional
import re
from datetime import datetime
import dotenv

dotenv.load_dotenv()

# Configuration
AWS_REGION = os.getenv("AWS_REGION")
AWS_PROFILE = os.getenv("AWS_PROFILE")
BLOG_CONTENT_DIR = Path("../src/content/blog")
DRAFTS_CONTENT_DIR = Path("../src/content/drafts")
EMBEDDINGS_CACHE_DIR = Path("./cache")
EMBEDDINGS_CACHE_DIR.mkdir(exist_ok=True)

print("✓ Libraries imported and directories configured")



## AWS setup

In [None]:
# AWS Setup
def initialize_aws_session(profile_name: str = "default", region_name: str = "eu-central-1"):
    """Initialize AWS session and verify credentials."""
    try:
        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        
        # Test credentials
        sts_client = session.client('sts')
        identity = sts_client.get_caller_identity()
        
        print(f"✓ AWS credentials verified for profile '{profile_name}'")
        print(f"  Account: {identity.get('Account')}")
        print(f"  User/Role: {identity.get('Arn', '').split('/')[-1]}")
        print(f"  Region: {region_name}")
        
        return session
        
    except Exception as e:
        print(f"❌ Error with AWS credentials for profile '{profile_name}': {e}")
        print(f"💡 Try running: aws configure --profile {profile_name}")
        raise

# Initialize AWS clients
session = initialize_aws_session(AWS_PROFILE, AWS_REGION)
bedrock_client = session.client('bedrock-runtime')


In [None]:

# Model names for AWS
MODEL_SONNET_35="eu.anthropic.claude-3-5-sonnet-20240620-v1:0"
MODEL_TITAN_V2="amazon.titan-embed-text-v2:0"

## Load articles

In [None]:
# Data Loading Functions
def preprocess_text(text: str) -> str:
    """Clean and prepare text for analysis."""
    # Remove markdown syntax
    text = re.sub(r'#{1,6}\s+', '', text)  # Headers
    text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)  # Bold
    text = re.sub(r'\*(.*?)\*', r'\1', text)  # Italic
    text = re.sub(r'`(.*?)`', r'\1', text)  # Code
    text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)  # Links
    text = re.sub(r'!\[([^\]]*)\]\([^\)]+\)', r'\1', text)  # Images
    
    # Normalize whitespace
    text = ' '.join(text.split())
    
    return text

def load_articles_from_local() -> List[Dict[str, Any]]:
    """Load all markdown articles from local directories."""
    print("Loading articles from local directories...")
    
    articles = []
    
    # Load from blog and drafts directories
    for content_dir, article_type in [
        (BLOG_CONTENT_DIR, "blog"), 
        # (DRAFTS_CONTENT_DIR, "draft")
        ]:
        if not content_dir.exists():
            print(f"Directory {content_dir} does not exist, skipping...")
            continue
            
        md_files = list(content_dir.glob("*.md")) + list(content_dir.glob("*.mdx"))
        
        for file_path in md_files:
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # Parse frontmatter
                post = frontmatter.loads(content)
                
                # Create full text for embedding (title + content)
                title = post.metadata.get("title", file_path.stem)
                full_text = f"{title}\n\n{post.content.strip()}"
                
                article = {
                    "file_path": str(file_path),
                    "filename": file_path.name,
                    "title": title,
                    "date": post.metadata.get("pubDate", ""),
                    "tags": post.metadata.get("tags", []),
                    "type": article_type,
                    "metadata": post.metadata,
                    "content": post.content.strip(),
                    "full_text": full_text,
                    "processed_content": preprocess_text(full_text), # preprocess text or not
                    "llm_tags": post.metadata.get("LLM Tags", []),
                    "llm_categories": post.metadata.get("LLM Categories", []),
                    "llm_summary": post.metadata.get("Summary", "")
                }
                articles.append(article)
                
            except Exception as e:
                print(f"Error processing {file_path}: {e}")
                continue
    
    print(f"✓ Loaded {len(articles)} articles")
    return articles

# Load articles
articles = load_articles_from_local()

# Display basic info
df = pd.DataFrame(articles)
print(f"\nArticle types: {df['type'].value_counts().to_dict()}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
print(f"Average content length: {df['content'].str.len().mean():.0f} characters")


## Generate Embeddings for the whole article

In [None]:
# Embedding Generation with Caching
def get_content_hash(text: str) -> str:
    """Generate hash for content to detect changes."""
    return hashlib.md5(text.encode()).hexdigest()

def generate_embedding(text: str) -> List[float]:
    """Generate embedding for a single text using AWS Bedrock."""
    body = json.dumps({"inputText": text})
    
    response = bedrock_client.invoke_model(
        modelId="amazon.titan-embed-text-v2:0",
        body=body,
        contentType="application/json",
        accept="application/json"
    )
    
    response_body = json.loads(response['body'].read())
    return response_body['embedding']

def generate_embeddings_with_cache(articles: List[Dict[str, Any]]) -> np.ndarray:
    """Generate embeddings with caching to avoid re-computation."""
    print("Generating embeddings with caching...")
    
    embeddings = []
    cache_file = EMBEDDINGS_CACHE_DIR / "embeddings_cache.pkl"
    
    # Load existing cache
    cache = {}
    if cache_file.exists():
        with open(cache_file, 'rb') as f:
            cache = pickle.load(f)
        print(f"✓ Loaded cache with {len(cache)} embeddings")
    
    # Generate embeddings
    for i, article in enumerate(articles):
        content_hash = get_content_hash(article["processed_content"])
        
        if content_hash in cache:
            embedding = cache[content_hash]
            print(f"  {i+1}/{len(articles)}: Using cached embedding for '{article['title'][:50]}...'")
        else:
            print(f"  {i+1}/{len(articles)}: Generating embedding for '{article['title'][:50]}...'")
            embedding = generate_embedding(article["processed_content"])
            cache[content_hash] = embedding
        
        embeddings.append(embedding)
    
    # Save updated cache
    with open(cache_file, 'wb') as f:
        pickle.dump(cache, f)
    print(f"✓ Saved cache with {len(cache)} embeddings")
    
    return np.array(embeddings)

# Generate embeddings
embeddings = generate_embeddings_with_cache(articles)
print(f"✓ Generated embeddings shape: {embeddings.shape}")


## Clustering articles

In [None]:
# %% Clustering with HDBSCAN
def cluster_articles(embeddings: np.ndarray, min_cluster_size: int = 2, min_samples: int = 1) -> np.ndarray:
    """Cluster articles using HDBSCAN."""
    print(f"Clustering {len(embeddings)} articles...")
    
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        min_samples=min_samples,
        metric='euclidean',
        cluster_selection_epsilon=0.01
    )
    
    labels = clusterer.fit_predict(embeddings)
    
    # Print cluster statistics
    unique_labels = np.unique(labels)
    n_clusters = len(unique_labels) - (1 if -1 in labels else 0)
    n_noise = np.sum(labels == -1)
    
    print(f"✓ Found {n_clusters} clusters")
    print(f"  Noise points (unclustered): {n_noise}")
    
    for label in unique_labels:
        if label != -1:
            count = np.sum(labels == label)
            print(f"  Cluster {label}: {count} articles")
    
    return labels

# Perform clustering
cluster_labels = cluster_articles(embeddings, min_cluster_size=2)

# Add cluster labels to articles
for i, article in enumerate(articles):
    article['cluster'] = int(cluster_labels[i])


## Visualise embeddings with UMAP

In [None]:
# %% Visualization with UMAP
def create_umap_visualization(embeddings: np.ndarray, articles: List[Dict[str, Any]], 
                            cluster_labels: np.ndarray) -> go.Figure:
    """Create UMAP visualization of article clusters."""
    print("Creating UMAP visualization...")
    
    # Reduce dimensionality with UMAP
    reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=15, min_dist=0.1)
    embedding_2d = reducer.fit_transform(embeddings)
    
    # Create DataFrame for plotting
    plot_df = pd.DataFrame({
        'x': embedding_2d[:, 0],
        'y': embedding_2d[:, 1],
        'cluster': cluster_labels,
        'title': [article['title'] for article in articles],
        'type': [article['type'] for article in articles],
        'filename': [article['filename'] for article in articles],
        'date': [article['date'] for article in articles]
    })
    
    # Create color map for clusters
    unique_clusters = sorted(plot_df['cluster'].unique())
    colors = px.colors.qualitative.Set3
    color_map = {cluster: colors[i % len(colors)] for i, cluster in enumerate(unique_clusters)}
    
    # Create scatter plot
    fig = go.Figure()
    
    for cluster in unique_clusters:
        cluster_data = plot_df[plot_df['cluster'] == cluster]
        cluster_name = f"Cluster {cluster}" if cluster != -1 else "Unclustered"
        
        fig.add_trace(go.Scatter(
            x=cluster_data['x'],
            y=cluster_data['y'],
            mode='markers',
            name=cluster_name,
            marker=dict(
                color=color_map[cluster],
                size=8,
                opacity=0.7
            ),
            text=cluster_data['title'],
            hovertemplate='<b>%{text}</b><br>' +
                         'Type: %{customdata[0]}<br>' +
                         'Date: %{customdata[1]}<br>' +
                         'File: %{customdata[2]}<br>' +
                         '<extra></extra>',
            customdata=cluster_data[['type', 'date', 'filename']].values
        ))
    
    fig.update_layout(
        title="Blog Articles Clustered by Content Similarity (UMAP)",
        xaxis_title="UMAP Dimension 1",
        yaxis_title="UMAP Dimension 2",
        width=1000,
        height=700,
        showlegend=True
    )
    
    return fig

# Create and display visualization
fig = create_umap_visualization(embeddings, articles, cluster_labels)
fig.show()

In [None]:
import plotly.express as px


fig.write_html("./articles-umap.html")

## Extracting tags and categories using LLM

In [None]:
# %% Individual Article Analysis with LLM
def analyze_article_with_llm(article: Dict[str, Any]) -> Dict[str, Any]:
    """Generate topics, tags, and main points for a single article using LLM."""
    
    # Prepare content for analysis
    content_for_analysis = article['content']  # [:4000] Limit to ~4k chars 
    title = article['title']
    
    prompt = f"""    
    Analyze the following article and respond in the exact JSON format below.  
    Your task is to:  
    1. Suggest **tags**: short keywords or phrases (3–7 items) that help with search and discovery.  
    2. Suggest **categories**: 1–3 broader groupings under which this article should appear on a blog (e.g., topic area, type of article).  
    3. Provide a **1-sentence summary** that clearly states what the article is about (concise and neutral).  

    Do not include explanations or extra text—only return valid JSON.  

    Article Title: {title}  

    Article Content:  
    {content_for_analysis}  

    Respond in this exact JSON format:  
    {{
        "tags": ["tag1", "tag2", "tag3"],
        "categories": ["category1", "category2"],
        "summary": "Brief 1-sentence summary of the article"
    }}
    """

    try:
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 300,
            "temperature": 0.0,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        })
        
        response = bedrock_client.invoke_model(
            modelId=MODEL_SONNET_35,
            body=body,
            contentType="application/json",
            accept="application/json"
        )
        
        response_body = json.loads(response['body'].read())
        llm_response = response_body['content'][0]['text'].strip()
        
        # Parse JSON response
        try:
            analysis = json.loads(llm_response)
            return {
                "llm_tags": analysis.get("tags", []),
                "llm_categories": analysis.get("categories", []),
                "llm_summary": analysis.get("summary", ""),
                "llm_analysis_success": True
            }
        except json.JSONDecodeError:
            # Fallback if JSON parsing fails
            return {
                "llm_tags": [],
                "llm_categories": [],
                "llm_summary": llm_response[:200] + "..." if len(llm_response) > 200 else llm_response,
                "llm_analysis_success": False
            }
            
    except Exception as e:
        print(f"❌ Error analyzing article '{title}': {e}")
        return {
            "llm_tags": [],
            "llm_categories": [],
            "llm_summary": "",
            "llm_analysis_success": False
        }

def analyze_all_articles_with_llm(articles: List[Dict[str, Any]], 
                                 max_articles: Optional[int] = None) -> List[Dict[str, Any]]:
    """Analyze all articles with LLM (with optional limit for testing)."""
    print("Analyzing articles with LLM for topics, tags, and main points...")
    
    articles_to_analyze = articles[:max_articles] if max_articles else articles
    
    for i, article in enumerate(articles_to_analyze):
        print(f"  {i+1}/{len(articles_to_analyze)}: Analyzing '{article['title'][:50]}...'")
        
        analysis = analyze_article_with_llm(article)
        print("analysis", analysis)
        
        # Add analysis results to article
        article.update(analysis)
        
        if analysis['llm_analysis_success']:
            print(f"    ✓ Generated {len(analysis['llm_tags'])} tags, {len(analysis['llm_categories'])} categories")
        else:
            print(f"    ⚠️  Analysis partially failed")
    
    return articles_to_analyze

# Analyze articles (start with first 5 for testing, remove limit to analyze all)
analyzed_articles = analyze_all_articles_with_llm(articles, max_articles=5)

# Display sample results
print("\n" + "="*60)
print("SAMPLE ARTICLE ANALYSIS RESULTS")
print("="*60)

for article in analyzed_articles[:5]:  # Show first N articles
    print(f"\n📄 {article['title']}")
    print(f"LLM Tags: {', '.join(article['llm_tags'])}")
    print(f"LLM Categories: {', '.join(article['llm_categories'])}")
    print(f"Summary: {article['llm_summary']}")

save the tags, categories and summary for later

In [None]:
# Save LLM results back into frontmatter of each file
import shutil
from pathlib import Path
import frontmatter
from typing import Dict, Any, List

def save_llm_to_frontmatter(article: Dict[str, Any],
                            keys_map: Dict[str, str] = None,
                            backup: bool = True) -> bool:
    """
    Write LLM fields from article dict into the file frontmatter.
    keys_map maps article keys -> frontmatter keys (defaults below).
    Returns True on success.
    """
    if keys_map is None:
        keys_map = {
            "llm_tags": "LLM Tags",
            "llm_categories": "LLM Categories",
            "llm_summary": "Summary",
            "llm_main_points": "LLM Main Points"
        }

    file_path = article.get("file_path") or article.get("filename")
    if not file_path:
        print("No file path in article, skipping")
        return False

    fp = Path(file_path)
    if not fp.exists():
        print(f"File not found, skipping: {fp}")
        return False

    # Load existing frontmatter
    post = frontmatter.load(fp)

    # Update metadata with lists/strings from article (preserve types)
    for a_key, fm_key in keys_map.items():
        if a_key in article and article[a_key] is not None:
            post.metadata[fm_key] = article[a_key]

    # Optional backup (safe copy)
    if backup:
        bak = fp.with_suffix(fp.suffix + ".bak")
        try:
            shutil.copy(fp, bak)
        except Exception:
            pass

    # Write atomically to temp then replace
    tmp = fp.with_suffix(fp.suffix + ".tmp")
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(frontmatter.dumps(post))
    tmp.replace(fp)

    return True

def save_all_llm_to_files(articles: List[Dict[str, Any]], backup: bool = True):
    for art in articles:
        ok = save_llm_to_frontmatter(art, backup=backup)
        if ok:
            print(f"Saved LLM fields to {art.get('file_path')}")
# ...existing code...

### Generate embeddings for extracted terms

In [None]:
# Step 1: Extract all tags and categories
def extract_all_tags_and_categories(articles: List[Dict[str, Any]]) -> Dict[str, List[str]]:
    """Extract all unique tags and categories from analyzed articles."""
    all_tags = set()
    all_categories = set()
    
    for article in articles:
        if 'llm_tags' in article:
            all_tags.update(article['llm_tags'])
        if 'llm_categories' in article:
            all_categories.update(article['llm_categories'])
    
    return {
        'tags': list(all_tags),
        'categories': list(all_categories)
    }

terms_data = extract_all_tags_and_categories(articles)

print(f"Extracted {len(terms_data['tags'])} unique tags")
print(f"Extracted {len(terms_data['categories'])} unique categories")



### Actually generate the embeddings


In [None]:

# Step 2: Generate embeddings for terms

def generate_embeddings_for_terms(terms: List[str], bedrock_client) -> np.ndarray:
    """Generate embeddings for tags/categories."""
    print(f"Generating embeddings for {len(terms)} terms...")
    
    embeddings = []
    for term in terms:
        try:
            body = json.dumps({
                "inputText": term
            })
            
            response = bedrock_client.invoke_model(
                modelId="amazon.titan-embed-text-v2:0",
                body=body,
                contentType="application/json",
                accept="application/json"
            )
            
            response_body = json.loads(response['body'].read())
            embedding = response_body['embedding']
            embeddings.append(embedding)
            
        except Exception as e:
            print(f"Error generating embedding for '{term}': {e}")
            # Use zero vector as fallback
            embeddings.append([0.0] * 1536)
    
    return np.array(embeddings)

all_terms = terms_data['tags'] + terms_data['categories']
term_embeddings = generate_embeddings_for_terms(all_terms, bedrock_client)


### Cluster terms

In [None]:

# Step 3: Cluster tags and categories (separately)

def cluster_terms(terms: List[str], embeddings: np.ndarray, min_cluster_size: int = 2) -> Dict[int, List[str]]:
    """Cluster terms by semantic similarity."""
    print(f"Clustering {len(terms)} terms...")
    
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        min_samples=1,
        metric='euclidean',
        cluster_selection_epsilon=0.1
    )
    
    labels = clusterer.fit_predict(embeddings)
    
    # Group terms by cluster
    clusters = {}
    for i, label in enumerate(labels):
        if label not in clusters:
            clusters[label] = []
        clusters[label].append(terms[i])
    
    # Print cluster info
    n_clusters = len([k for k in clusters.keys() if k != -1])
    n_noise = len(clusters.get(-1, []))
    
    print(f"✓ Found {n_clusters} term clusters")
    if n_noise > 0:
        print(f"  Unclustered terms: {n_noise}")
    
    for cluster_id, cluster_terms in clusters.items():
        if cluster_id != -1:
            print(f"  Cluster {cluster_id}: {cluster_terms}")
    
    return clusters



tag_embeddings = term_embeddings[:len(terms_data['tags'])]
category_embeddings = term_embeddings[len(terms_data['tags']):]

tag_clusters = cluster_terms(terms_data['tags'], tag_embeddings, min_cluster_size=2)
category_clusters = cluster_terms(terms_data['categories'], category_embeddings, min_cluster_size=2)



# Generate blog taxonomy
# taxonomy_results = create_blog_taxonomy(analyzed_articles, bedrock_client)

### Visualise

In [None]:
def create_umap_visualization(embeddings: np.ndarray, articles: List[Dict[str, Any]], 
                            cluster_labels: np.ndarray, 
                            tag_embeddings: np.ndarray = None,
                            tag_clusters: Dict[int, List[str]] = None,
                            all_tags: List[str] = None) -> go.Figure:
    """Create UMAP visualization of article clusters and optionally tag clusters."""
    print("Creating UMAP visualization...")
    
    # Combine embeddings if tags are provided
    if tag_embeddings is not None and all_tags is not None:
        print(f"Including {len(all_tags)} tags in visualization...")
        combined_embeddings = np.vstack([embeddings, tag_embeddings])
        
        # Create labels for combined data
        item_types = ['Article'] * len(articles) + ['Tag'] * len(all_tags)
        item_names = [article['title'] for article in articles] + [f"Tag: {tag}" for tag in all_tags]
        
        # Create cluster labels for combined data
        if tag_clusters is not None:
            # Map tags to their cluster labels
            tag_cluster_labels = []
            for tag in all_tags:
                # Find which cluster this tag belongs to
                tag_cluster = -1  # Default to noise
                for cluster_id, cluster_tags in tag_clusters.items():
                    if tag in cluster_tags:
                        tag_cluster = cluster_id + 100  # Offset tag clusters to avoid confusion with article clusters
                        break
                tag_cluster_labels.append(tag_cluster)
            
            combined_cluster_labels = np.concatenate([cluster_labels, tag_cluster_labels])
        else:
            # All tags get a special cluster label
            combined_cluster_labels = np.concatenate([cluster_labels, [-10] * len(all_tags)])
    else:
        combined_embeddings = embeddings
        item_types = ['Article'] * len(articles)
        item_names = [article['title'] for article in articles]
        combined_cluster_labels = cluster_labels
    
    # Reduce dimensionality with UMAP
    reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=15, min_dist=0.1, metric='cosine')
    embedding_2d = reducer.fit_transform(combined_embeddings)
    
    # Create DataFrame for plotting
    plot_data = {
        'x': embedding_2d[:, 0],
        'y': embedding_2d[:, 1],
        'cluster': combined_cluster_labels,
        'type': item_types,
        'name': item_names
    }
    
    # Add article-specific data
    if tag_embeddings is not None:
        plot_data['title'] = [article['title'] for article in articles] + [''] * len(all_tags)
        plot_data['article_type'] = [article['type'] for article in articles] + [''] * len(all_tags)
        plot_data['filename'] = [article['filename'] for article in articles] + [''] * len(all_tags)
        plot_data['date'] = [article['date'] for article in articles] + [''] * len(all_tags)
    else:
        plot_data['title'] = [article['title'] for article in articles]
        plot_data['article_type'] = [article['type'] for article in articles]
        plot_data['filename'] = [article['filename'] for article in articles]
        plot_data['date'] = [article['date'] for article in articles]
    
    plot_df = pd.DataFrame(plot_data)
    
    # Create color map for clusters
    unique_clusters = sorted(plot_df['cluster'].unique())
    colors = px.colors.qualitative.Set3
    color_map = {cluster: colors[i % len(colors)] for i, cluster in enumerate(unique_clusters)}
    
    # Create scatter plot
    fig = go.Figure()
    
    # Plot articles
    article_data = plot_df[plot_df['type'] == 'Article']
    for cluster in sorted(article_data['cluster'].unique()):
        cluster_data = article_data[article_data['cluster'] == cluster]
        cluster_name = f"Article Cluster {cluster}" if cluster != -1 else "Unclustered Articles"
        
        fig.add_trace(go.Scatter(
            x=cluster_data['x'],
            y=cluster_data['y'],
            mode='markers+text',
            name=cluster_name,
            marker=dict(
                color=color_map[cluster],
                size=8,
                opacity=0.7,
                symbol='circle'
            ),
            text=cluster_data['title'],
            hovertemplate='<b>%{text}</b><br>' +
                         'Type: Article<br>' +
                         'Article Type: %{customdata[0]}<br>' +
                         'Date: %{customdata[1]}<br>' +
                         'File: %{customdata[2]}<br>' +
                         '<extra></extra>',
            customdata=cluster_data[['article_type', 'date', 'filename']].values,
            legendgroup='articles'
        ))
    
    # Plot tags if provided
    if tag_embeddings is not None:
        tag_data = plot_df[plot_df['type'] == 'Tag']
        
        if tag_clusters is not None:
            # Plot tags by cluster
            for cluster in sorted(tag_data['cluster'].unique()):
                cluster_data = tag_data[tag_data['cluster'] == cluster]
                
                if cluster == -10:
                    cluster_name = "Unclustered Tags"
                    marker_color = 'orange'
                elif cluster >= 100:
                    actual_cluster = cluster - 100
                    cluster_name = f"Tag Cluster {actual_cluster}"
                    marker_color = color_map.get(cluster, 'orange')
                else:
                    cluster_name = f"Tag Cluster {cluster}"
                    marker_color = color_map.get(cluster, 'orange')
                
                fig.add_trace(go.Scatter(
                    x=cluster_data['x'],
                    y=cluster_data['y'],
                    mode='markers',
                    name=cluster_name,
                    marker=dict(
                        color=marker_color,
                        size=12,
                        opacity=0.8,
                        symbol='diamond',
                        line=dict(width=2, color='white')
                    ),
                    text=cluster_data['name'],
                    hovertemplate='<b>%{text}</b><br>' +
                                 'Type: Tag<br>' +
                                 'Cluster: ' + cluster_name + '<br>' +
                                 '<extra></extra>',
                    legendgroup='tags'
                ))
        else:
            # Plot all tags with same style
            fig.add_trace(go.Scatter(
                x=tag_data['x'],
                y=tag_data['y'],
                mode='markers+text',
                name="Tags",
                marker=dict(
                    color='orange',
                    size=12,
                    opacity=0.8,
                    symbol='diamond',
                    line=dict(width=2, color='white')
                ),
                text=tag_data['name'],
                hovertemplate='<b>%{text}</b><br>' +
                             'Type: Tag<br>' +
                             '<extra></extra>',
                legendgroup='tags'
            ))
    
    title = "Blog Articles"
    if tag_embeddings is not None:
        title += " and Tags"
    title += " Clustered by Content Similarity (UMAP)"
    
    fig.update_layout(
        title=title,
        xaxis_title="UMAP Dimension 1",
        yaxis_title="UMAP Dimension 2",
        width=1200,
        height=800,
        showlegend=True,
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=1.01,
            bgcolor="rgba(255,255,255,0.8)",
            bordercolor="rgba(0,0,0,0.2)",
            borderwidth=1
        ),
        margin=dict(r=150)  # Make room for legend
    )
    
    return fig

# Create and display visualization with tags
fig = create_umap_visualization(
    embeddings, 
    articles, 
    cluster_labels,
    tag_embeddings=tag_embeddings,
    tag_clusters=tag_clusters,
    all_tags=terms_data['tags']
)
fig.show()



In [None]:
import plotly.express as px


fig.write_html("./tags-clustered.html")

In [None]:
def create_umap_visualization(embeddings: np.ndarray, articles: List[Dict[str, Any]], 
                            cluster_labels: np.ndarray, 
                            tag_embeddings: np.ndarray = None,
                            tag_clusters: Dict[int, List[str]] = None,
                            all_tags: List[str] = None,
                            category_embeddings: np.ndarray = None,
                            category_clusters: Dict[int, List[str]] = None,
                            all_categories: List[str] = None) -> go.Figure:
    """Create UMAP visualization of article clusters and optionally tag and category clusters."""
    print("Creating UMAP visualization...")
    
    # Start with articles
    combined_embeddings = [embeddings]
    item_types = ['Article'] * len(articles)
    item_names = [article['title'] for article in articles]
    combined_cluster_labels = cluster_labels.tolist()
    
    # Add tag embeddings if provided
    if tag_embeddings is not None and all_tags is not None:
        print(f"Including {len(all_tags)} tags in visualization...")
        combined_embeddings.append(tag_embeddings)
        item_types.extend(['Tag'] * len(all_tags))
        item_names.extend([f"Tag: {tag}" for tag in all_tags])
        
        # Create cluster labels for tags
        if tag_clusters is not None:
            tag_cluster_labels = []
            for tag in all_tags:
                tag_cluster = -1  # Default to noise
                for cluster_id, cluster_tags in tag_clusters.items():
                    if tag in cluster_tags:
                        tag_cluster = cluster_id + 100  # Offset tag clusters
                        break
                tag_cluster_labels.append(tag_cluster)
            combined_cluster_labels.extend(tag_cluster_labels)
        else:
            combined_cluster_labels.extend([-10] * len(all_tags))
    
    # Add category embeddings if provided
    if category_embeddings is not None and all_categories is not None:
        print(f"Including {len(all_categories)} categories in visualization...")
        combined_embeddings.append(category_embeddings)
        item_types.extend(['Category'] * len(all_categories))
        item_names.extend([f"Category: {cat}" for cat in all_categories])
        
        # Create cluster labels for categories
        if category_clusters is not None:
            category_cluster_labels = []
            for category in all_categories:
                category_cluster = -1  # Default to noise
                for cluster_id, cluster_categories in category_clusters.items():
                    if category in cluster_categories:
                        category_cluster = cluster_id + 200  # Offset category clusters
                        break
                category_cluster_labels.append(category_cluster)
            combined_cluster_labels.extend(category_cluster_labels)
        else:
            combined_cluster_labels.extend([-20] * len(all_categories))
    
    # Combine all embeddings
    all_embeddings = np.vstack(combined_embeddings)
    combined_cluster_labels = np.array(combined_cluster_labels)
    
    print(f"Combined embeddings shape: {all_embeddings.shape}")
    print(f"Articles: {len(articles)}, Tags: {len(all_tags) if all_tags else 0}, Categories: {len(all_categories) if all_categories else 0}")
    
    # Reduce dimensionality with UMAP
    reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=15, min_dist=0.1, metric='cosine')
    embedding_2d = reducer.fit_transform(all_embeddings)
    
    # Create DataFrame for plotting
    plot_data = {
        'x': embedding_2d[:, 0],
        'y': embedding_2d[:, 1],
        'cluster': combined_cluster_labels,
        'type': item_types,
        'name': item_names
    }
    
    # Add article-specific data (pad with empty strings for tags/categories)
    n_tags = len(all_tags) if all_tags else 0
    n_categories = len(all_categories) if all_categories else 0
    n_total = len(articles) + n_tags + n_categories
    
    plot_data['title'] = [article['title'] for article in articles] + [''] * (n_tags + n_categories)
    plot_data['article_type'] = [article['type'] for article in articles] + [''] * (n_tags + n_categories)
    plot_data['filename'] = [article['filename'] for article in articles] + [''] * (n_tags + n_categories)
    plot_data['date'] = [article['date'] for article in articles] + [''] * (n_tags + n_categories)
    
    plot_df = pd.DataFrame(plot_data)
    
    # Create color map for clusters
    unique_clusters = sorted(plot_df['cluster'].unique())
    colors = px.colors.qualitative.Set3
    color_map = {cluster: colors[i % len(colors)] for i, cluster in enumerate(unique_clusters)}
    
    # Create scatter plot
    fig = go.Figure()
    
    # Plot articles
    article_data = plot_df[plot_df['type'] == 'Article']
    for cluster in sorted(article_data['cluster'].unique()):
        cluster_data = article_data[article_data['cluster'] == cluster]
        cluster_name = f"Article Cluster {cluster}" if cluster != -1 else "Unclustered Articles"
        
        fig.add_trace(go.Scatter(
            x=cluster_data['x'],
            y=cluster_data['y'],
            mode='markers',
            name=cluster_name,
            marker=dict(
                color=color_map[cluster],
                size=8,
                opacity=0.7,
                symbol='circle'
            ),
            text=cluster_data['title'],
            hovertemplate='<b>%{text}</b><br>' +
                         'Type: Article<br>' +
                         'Article Type: %{customdata[0]}<br>' +
                         'Date: %{customdata[1]}<br>' +
                         'File: %{customdata[2]}<br>' +
                         '<extra></extra>',
            customdata=cluster_data[['article_type', 'date', 'filename']].values,
            legendgroup='articles'
        ))
    
    # Plot tags if provided
    if tag_embeddings is not None:
        tag_data = plot_df[plot_df['type'] == 'Tag']
        
        if tag_clusters is not None:
            # Plot tags by cluster
            for cluster in sorted(tag_data['cluster'].unique()):
                cluster_data = tag_data[tag_data['cluster'] == cluster]
                
                if cluster == -10:
                    cluster_name = "Unclustered Tags"
                    marker_color = 'orange'
                elif cluster >= 100:
                    actual_cluster = cluster - 100
                    cluster_name = f"Tag Cluster {actual_cluster}"
                    marker_color = color_map.get(cluster, 'orange')
                else:
                    cluster_name = f"Tag Cluster {cluster}"
                    marker_color = color_map.get(cluster, 'orange')
                
                fig.add_trace(go.Scatter(
                    x=cluster_data['x'],
                    y=cluster_data['y'],
                    mode='markers',
                    name=cluster_name,
                    marker=dict(
                        color=marker_color,
                        size=12,
                        opacity=0.8,
                        symbol='diamond',
                        line=dict(width=2, color='white')
                    ),
                    text=cluster_data['name'],
                    hovertemplate='<b>%{text}</b><br>' +
                                 'Type: Tag<br>' +
                                 'Cluster: ' + cluster_name + '<br>' +
                                 '<extra></extra>',
                    legendgroup='tags'
                ))
        else:
            # Plot all tags with same style
            fig.add_trace(go.Scatter(
                x=tag_data['x'],
                y=tag_data['y'],
                mode='markers',
                name="Tags",
                marker=dict(
                    color='orange',
                    size=12,
                    opacity=0.8,
                    symbol='diamond',
                    line=dict(width=2, color='white')
                ),
                text=tag_data['name'],
                hovertemplate='<b>%{text}</b><br>' +
                             'Type: Tag<br>' +
                             '<extra></extra>',
                legendgroup='tags'
            ))
    
    # Plot categories if provided
    if category_embeddings is not None:
        category_data = plot_df[plot_df['type'] == 'Category']
        
        if category_clusters is not None:
            # Plot categories by cluster
            for cluster in sorted(category_data['cluster'].unique()):
                cluster_data = category_data[category_data['cluster'] == cluster]
                
                if cluster == -20:
                    cluster_name = "Unclustered Categories"
                    marker_color = 'red'
                elif cluster >= 200:
                    actual_cluster = cluster - 200
                    cluster_name = f"Category Cluster {actual_cluster}"
                    marker_color = color_map.get(cluster, 'red')
                else:
                    cluster_name = f"Category Cluster {cluster}"
                    marker_color = color_map.get(cluster, 'red')
                
                fig.add_trace(go.Scatter(
                    x=cluster_data['x'],
                    y=cluster_data['y'],
                    mode='markers',
                    name=cluster_name,
                    marker=dict(
                        color=marker_color,
                        size=15,
                        opacity=0.9,
                        symbol='star',
                        line=dict(width=2, color='white')
                    ),
                    text=cluster_data['name'],
                    hovertemplate='<b>%{text}</b><br>' +
                                 'Type: Category<br>' +
                                 'Cluster: ' + cluster_name + '<br>' +
                                 '<extra></extra>',
                    legendgroup='categories'
                ))
        else:
            # Plot all categories with same style
            fig.add_trace(go.Scatter(
                x=category_data['x'],
                y=category_data['y'],
                mode='markers',
                name="Categories",
                marker=dict(
                    color='red',
                    size=15,
                    opacity=0.9,
                    symbol='star',
                    line=dict(width=2, color='white')
                ),
                text=category_data['name'],
                hovertemplate='<b>%{text}</b><br>' +
                             'Type: Category<br>' +
                             '<extra></extra>',
                legendgroup='categories'
            ))
    
    # Create dynamic title
    title_parts = ["Blog Articles"]
    if tag_embeddings is not None:
        title_parts.append("Tags")
    if category_embeddings is not None:
        title_parts.append("Categories")
    
    title = ", ".join(title_parts) + " - Semantic Space Visualization (UMAP)"
    
    fig.update_layout(
        title=title,
        xaxis_title="UMAP Dimension 1",
        yaxis_title="UMAP Dimension 2",
        width=1200,
        height=800,
        showlegend=True,
        legend=dict(
            yanchor="top",
            y=0.99,
            xanchor="left",
            x=1.01,
            bgcolor="rgba(255,255,255,0.8)",
            bordercolor="rgba(0,0,0,0.2)",
            borderwidth=1
        ),
        margin=dict(r=150)  # Make room for legend
    )
    
    return fig

# Create and display visualization with tags and categories
fig = create_umap_visualization(
    embeddings, 
    articles, 
    cluster_labels,
    tag_embeddings=tag_embeddings,
    tag_clusters=tag_clusters,
    all_tags=terms_data['tags'],
    category_embeddings=category_embeddings,
    category_clusters=category_clusters,
    all_categories=terms_data['categories']
)
fig.show()


In [None]:
fig.write_html("article_tags_categories_umap.html")

In [None]:
def generate_blog_taxonomy_with_llm(category_clusters: Dict[int, List[str]], tag_clusters: Dict[int, List[str]],
                                   bedrock_client) -> Dict[str, Any]:
    """Generate final blog taxonomy using LLM to consolidate clusters."""
    print("Generating final blog taxonomy with LLM...")
    
    
    category_cluster_summaries = []
    for cluster_id, terms in category_clusters.items():
        if cluster_id != -1:  # Skip noise
            category_cluster_summaries.append(f"Category Cluster {cluster_id}: {', '.join(terms)}")
    

    if -1 in category_clusters:
        category_cluster_summaries.append(f"Unclustered Categories: {', '.join(category_clusters[-1])}")

    tag_cluster_summaries = []
    for cluster_id, terms in tag_clusters.items():
        if cluster_id != -1:  # Skip noise
            tag_cluster_summaries.append(f"Tag Cluster {cluster_id}: {', '.join(terms)}")
    
    if -1 in tag_clusters:
        tag_cluster_summaries.append(f"Unclustered Tags: {', '.join(tag_clusters[-1])}")


    prompt = f"""
        Based on the clustered tags and categories from blog articles below, create a clear, hierarchical blog taxonomy.  
        Your task is to:  
        1. A consolidated list of 1-7 main blog sections/categories.
        2. For each section, suggest 3-8 relevant tags that are well-organised and non-duplicative.
        3. Merge similar/duplicate terms and use clear, consistent naming.
        4. Provide a short explanation of major consolidation or naming decisions. 

        CATEGORY CLUSTERS:
        {chr(10).join(category_cluster_summaries)}

        TAG CLUSTERS:
        {chr(10).join(tag_cluster_summaries)}


        Respond in **valid JSON only** and in this exact format (no extra text):  
        {{
        "main_sections": [
            {{
            "name": "Section Name",
            "description": "Brief description of this section",
            "tags": ["tag1", "tag2", "tag3"]
            }}
        ],
        "all_tags": ["consolidated_tag1", "consolidated_tag2"],
        "taxonomy_notes": "Short explanation of how duplicates were merged or terms renamed"
        }}
        """

    try:
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "temperature": 0,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        })
        
        response = bedrock_client.invoke_model(
            modelId="eu.anthropic.claude-3-5-sonnet-20240620-v1:0",
            body=body,
            contentType="application/json",
            accept="application/json"
        )
        
        response_body = json.loads(response['body'].read())
        llm_response = response_body['content'][0]['text'].strip()
        
        # Parse JSON response
        taxonomy = json.loads(llm_response)
        
        print("✓ Generated blog taxonomy")
        print(f"  Main sections: {len(taxonomy['main_sections'])}")
        print(f"  Total consolidated tags: {len(taxonomy['all_tags'])}")
        
        return taxonomy
        
    except Exception as e:
        print(f"❌ Error generating taxonomy: {e}")
        return {
            "main_sections": [],
            "all_tags": [],
            "taxonomy_notes": f"Error: {str(e)}"
        }


## Generate Taxonomy with LLM

In [None]:
taxonomy_results = generate_blog_taxonomy_with_llm(category_clusters, tag_clusters, bedrock_client)



In [None]:
print (taxonomy_results)

## Save Taxonomy results

In [None]:
def save_taxonomy_results(taxonomy_results: Dict[str, Any], output_file: str):
    """Save taxonomy results to a JSON file."""
    with open(output_file, 'w') as f:
        json.dump(taxonomy_results, f, indent=1)

save_taxonomy_results(taxonomy_results, 'taxonomy_results_1.json')


## Use taxonomy to map articles to sections

In [None]:
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict, Counter
import re


def load_taxonomy(taxonomy_file: str) -> Dict[str, Any]:
    """Load taxonomy from JSON file."""
    with open(taxonomy_file, 'r') as f:
        return json.load(f)



def get_embedding_with_cache(text: str, bedrock_client) -> List[float]:
    """Retrieve or generate an embedding"""
    print(f"Generating embedding for '{text[:50]}...'")
    embedding = []
    cache_file = EMBEDDINGS_CACHE_DIR / "embeddings_cache.pkl"

    # Load existing cache
    cache = {}
    if cache_file.exists():
        with open(cache_file, 'rb') as f:
            cache = pickle.load(f)
        print(f"✓ Loaded cache with {len(cache)} embeddings")

    content_hash = get_content_hash(text)
    if content_hash in cache:
        embedding = cache[content_hash]
        print(f"  Using cached embedding for '{text[:50]}...'")
    else:
        print(f"  Generating embedding for '{text[:50]}...'")
        embedding = generate_embedding(text)
        cache[content_hash] = embedding

    # Save updated cache
    with open(cache_file, 'wb') as f:
        pickle.dump(cache, f)
    print(f"✓ Saved cache with {len(cache)} embeddings")
    
    return np.array(embedding)



def map_articles_to_taxonomy_by_embeddings(
    articles: List[Dict[str, Any]], 
    article_embeddings: np.ndarray,
    taxonomy: Dict[str, Any],
    bedrock_client,
    threshold: float = 0.7
) -> List[Dict[str, Any]]:
    """
    Map articles to taxonomy sections using embedding similarity.
    
    Args:
        articles: List of article dictionaries
        article_embeddings: Article embeddings array
        taxonomy: Loaded taxonomy dictionary
        bedrock_client: AWS Bedrock client for generating embeddings
        threshold: Minimum cosine similarity to assign to a section
    
    Returns:
        Articles with added 'taxonomy_section' and 'embedding_similarity' fields
    """
    
    # Generate embeddings for each taxonomy section
    section_embeddings = {}
    section_texts = {}
    
    for section in taxonomy['main_sections']:
        section_name = section['name']
        # Combine section name, description, and tags into text
        section_text = f"{section_name}. {section['description']}. Tags: {', '.join(section['tags'])}"
        section_texts[section_name] = section_text
        
        # Generate embedding
        section_embedding = get_embedding_with_cache(section_text, bedrock_client)
        section_embeddings[section_name] = section_embedding
    
    # Convert to matrix for efficient computation
    section_names = list(section_embeddings.keys())
    section_embedding_matrix = np.array([section_embeddings[name] for name in section_names])
    
    # Calculate similarities
    similarities = cosine_similarity(article_embeddings, section_embedding_matrix)
    
    # Map articles
    updated_articles = []
    
    for i, article in enumerate(articles):
        article_copy = article.copy()
        article_similarities = similarities[i]
        
        # Find best matching section
        best_section_idx = np.argmax(article_similarities)
        best_similarity = article_similarities[best_section_idx]
        best_section = section_names[best_section_idx]
        
        if best_similarity >= threshold:
            article_copy['taxonomy_section'] = best_section
            article_copy['embedding_similarity'] = float(best_similarity)
            
            # Add similarity scores for all sections
            similarity_scores = {
                section_names[j]: float(article_similarities[j]) 
                for j in range(len(section_names))
            }
            article_copy['all_section_similarities'] = similarity_scores
        else:
            article_copy['taxonomy_section'] = 'Unassigned'
            article_copy['embedding_similarity'] = float(best_similarity)
            article_copy['all_section_similarities'] = {}
        
        updated_articles.append(article_copy)
    
    return updated_articles


def analyze_taxonomy_mapping_results(mapped_articles: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Analyze the results of taxonomy mapping."""
    
    results = {
        'total_articles': len(mapped_articles),
        'assigned_articles': len([a for a in mapped_articles if a['taxonomy_section'] != 'Unassigned']),
        'unassigned_articles': len([a for a in mapped_articles if a['taxonomy_section'] == 'Unassigned']),
        'section_distribution': Counter([a['taxonomy_section'] for a in mapped_articles]),
        'average_scores': {},
        'low_confidence_articles': []
    }
    
    # Calculate average scores per section
    section_scores = defaultdict(list)
    for article in mapped_articles:
        if article['taxonomy_section'] != 'Unassigned':
            score_key = 'combined_score' if 'combined_score' in article else 'tag_match_score'
            if score_key in article:
                section_scores[article['taxonomy_section']].append(article[score_key])
    
    for section, scores in section_scores.items():
        results['average_scores'][section] = np.mean(scores) if scores else 0.0
    
    # Find low confidence articles (might need manual review)
    for article in mapped_articles:
        score_key = 'combined_score' if 'combined_score' in article else 'tag_match_score'
        if score_key in article and article[score_key] < 0.6:
            results['low_confidence_articles'].append({
                'title': article['title'],
                'section': article['taxonomy_section'],
                'score': article[score_key],
                'tags': article.get('tags', [])
            })
    
    return results


def save_mapped_articles(mapped_articles: List[Dict[str, Any]], output_file: str):
    """Save mapped articles to JSON file."""
    with open(output_file, 'w') as f:
        json.dump(mapped_articles, f, indent=2, default=str)
    print(f"✓ Mapped articles saved to {output_file}")

def print_mapping_summary(results: Dict[str, Any]):
    """Print a summary of the mapping results."""
    print("\n" + "="*60)
    print("TAXONOMY MAPPING RESULTS")
    print("="*60)
    
    print(f"\n📊 OVERVIEW:")
    print(f"  Total articles: {results['total_articles']}")
    print(f"  Successfully assigned: {results['assigned_articles']} ({results['assigned_articles']/results['total_articles']*100:.1f}%)")
    print(f"  Unassigned: {results['unassigned_articles']} ({results['unassigned_articles']/results['total_articles']*100:.1f}%)")
    
    print(f"\n🏷️  SECTION DISTRIBUTION:")
    for section, count in results['section_distribution'].items():
        avg_score = results['average_scores'].get(section, 0.0)
        print(f"  {section}: {count} articles (avg score: {avg_score:.3f})")
    
    if results['low_confidence_articles']:
        print(f"\n⚠️  LOW CONFIDENCE ASSIGNMENTS ({len(results['low_confidence_articles'])} articles):")
        for article in results['low_confidence_articles'][:5]:  # Show first 5
            print(f"  - {article['title'][:50]}... → {article['section']} (score: {article['score']:.3f})")
        if len(results['low_confidence_articles']) > 5:
            print(f"  ... and {len(results['low_confidence_articles']) - 5} more")


# Load taxonomy
taxonomy = load_taxonomy("taxonomy_results_1.json")
print(f"✓ Loaded taxonomy with {len(taxonomy['main_sections'])} sections")


mapped_articles = map_articles_to_taxonomy_by_embeddings(
    articles, embeddings, taxonomy, bedrock_client)

# Analyze results
results = analyze_taxonomy_mapping_results(mapped_articles)

# Print summary
print_mapping_summary(results)

# Save results
# save_mapped_articles(mapped_articles, output_file)




In [None]:
# Map articles to taxonomy sections using their llm_tags and llm_categories, with embeddings

def map_articles_to_taxonomy_by_tags_categories(
    articles: List[Dict[str, Any]],
    taxonomy: Dict[str, Any],
    bedrock_client,
    threshold: float = 0.7
) -> List[Dict[str, Any]]:
    """
    Map articles to taxonomy sections using embedding similarity of llm_tags and llm_categories.
    Args:
        articles: List of article dicts with 'llm_tags' and 'llm_categories'
        taxonomy: Taxonomy dict loaded from JSON
        bedrock_client: AWS Bedrock client
        threshold: Minimum cosine similarity to assign to a section
    Returns:
        Articles with added 'taxonomy_section' and 'embedding_similarity' fields
    """
    section_embeddings = {}
    section_names = []
    # Prepare section text for embedding
    for section in taxonomy['main_sections']:
        section_name = section['name']
        section_text = f"{section_name}. {section['description']}. Tags: {', '.join(section['tags'])}"
        section_embedding = get_embedding_with_cache(section_text, bedrock_client)
        section_embeddings[section_name] = section_embedding
        section_names.append(section_name)
    section_embedding_matrix = np.array([section_embeddings[name] for name in section_names])

    updated_articles = []
    for article in articles:
        # Combine llm_tags and llm_categories for embedding
        tags = article.get('llm_tags', [])
        categories = article.get('llm_categories', [])
        content = article
        combined_text = f"{' '.join(categories)} {' '.join(tags)}"
        if not combined_text.strip():
            combined_text = article.get('title', '')  # fallback to title if no tags/categories

        article_embedding = get_embedding_with_cache(combined_text, bedrock_client)
        similarities = cosine_similarity([article_embedding], section_embedding_matrix)[0]

        best_section_idx = int(np.argmax(similarities))
        best_similarity = float(similarities[best_section_idx])
        best_section = section_names[best_section_idx]

        article_copy = article.copy()
        if best_similarity >= threshold:
            article_copy['taxonomy_section'] = best_section
            article_copy['embedding_similarity'] = best_similarity
            article_copy['all_section_similarities'] = {
                section_names[j]: float(similarities[j]) for j in range(len(section_names))
            }
        else:
            article_copy['taxonomy_section'] = 'Unassigned'
            article_copy['embedding_similarity'] = best_similarity
            article_copy['all_section_similarities'] = {
                section_names[j]: float(similarities[j]) for j in range(len(section_names))
            }
        updated_articles.append(article_copy)
    return updated_articles

# Example usage:
taxonomy = load_taxonomy("taxonomy_results_1.json")
mapped_articles = map_articles_to_taxonomy_by_tags_categories(
    articles, taxonomy, bedrock_client, threshold=0.7
)
print(f"✓ Mapped {len(mapped_articles)} articles to taxonomy sections using tags/categories embeddings")





In [None]:
# print results and their taxonomy sections
  # Print first 5 mapped articles for review
for article in mapped_articles[:5]:
    print(f"Title: {article['title']}")
    print(f"  Assigned Section: {article['taxonomy_section']} (similarity: {article['embedding_similarity']:.3f})")    
    print()

In [None]:
# Advanced mapping: average embeddings for tags, categories, summary, and content, then compare to taxonomy section embeddings

def get_average_embedding(terms: List[str], bedrock_client) -> np.ndarray:
    """Generate and average embeddings for a list of terms."""
    embeddings = []
    for term in terms:
        if term and isinstance(term, str) and term.strip():
            emb = get_embedding_with_cache(term.strip(), bedrock_client)
            embeddings.append(emb)
    if embeddings:
        return np.mean(embeddings, axis=0)
    else:
        # Return zero vector if no valid terms
        return np.zeros(1536)

def get_article_composite_embedding(article: Dict[str, Any], bedrock_client, weights=None) -> np.ndarray:
    """
    Create a composite embedding for an article using tags, categories, summary, and content.
    weights: dict with keys 'tags', 'categories', 'summary', 'content'
    """
    if weights is None:
        weights = {'tags': 0.2, 'categories': 0.2, 'summary': 0.3, 'content': 0.3}
    emb_tags = get_average_embedding(article.get('llm_tags', []), bedrock_client)
    emb_categories = get_average_embedding(article.get('llm_categories', []), bedrock_client)
    emb_summary = get_average_embedding([article.get('llm_summary', '')], bedrock_client)
    emb_content = get_average_embedding([article.get('content', '')], bedrock_client)
    # Weighted average
    composite = (
        weights['tags'] * emb_tags +
        weights['categories'] * emb_categories +
        weights['summary'] * emb_summary +
        weights['content'] * emb_content
    )
    return composite

def map_articles_to_taxonomy_by_composite_embeddings(
    articles: List[Dict[str, Any]],
    taxonomy: Dict[str, Any],
    bedrock_client,
    threshold: float = 0.7,
    weights=None
) -> List[Dict[str, Any]]:
    """
    Map articles to taxonomy sections using composite embeddings (tags, categories, summary, content).
    """
    # Prepare taxonomy section embeddings
    section_embeddings = {}
    section_names = []
    for section in taxonomy['main_sections']:
        section_name = section['name']
        section_text = f"{section_name}. {section['description']}. Tags: {', '.join(section['tags'])}"
        section_embedding = get_embedding_with_cache(section_text, bedrock_client)
        section_embeddings[section_name] = section_embedding
        section_names.append(section_name)
    section_embedding_matrix = np.array([section_embeddings[name] for name in section_names])

    updated_articles = []
    for article in articles:
        composite_embedding = get_article_composite_embedding(article, bedrock_client, weights)
        similarities = cosine_similarity([composite_embedding], section_embedding_matrix)[0]
        best_section_idx = int(np.argmax(similarities))
        best_similarity = float(similarities[best_section_idx])
        best_section = section_names[best_section_idx]
        article_copy = article.copy()
        article_copy['composite_embedding_similarity'] = best_similarity
        article_copy['composite_best_section'] = best_section
        article_copy['composite_all_section_similarities'] = {
            section_names[j]: float(similarities[j]) for j in range(len(section_names))
        }
        # Assign section if above threshold
        if best_similarity >= threshold:
            article_copy['taxonomy_section'] = best_section
        else:
            article_copy['taxonomy_section'] = 'Unassigned'
        updated_articles.append(article_copy)
    return updated_articles

# Example usage:
taxonomy = load_taxonomy("taxonomy_results_1.json")
mapped_articles_composite = map_articles_to_taxonomy_by_composite_embeddings(
    articles, taxonomy, bedrock_client, threshold=0.7
)
print(f"✓ Mapped {len(mapped_articles_composite)} articles using composite embeddings (tags, categories, summary, content)")

# Show similarity results for first 5 articles
for article in mapped_articles_composite[:5]:
    print(f"\nTitle: {article['title']}")
    print(f"  Assigned Section: {article['taxonomy_section']} (composite similarity: {article['composite_embedding_similarity']:.3f})")
    print("  Section similarities:")
    for sec, sim in article['composite_all_section_similarities'].items():
        print(f"    {sec}: {sim:.3f}")

In [None]:
# Compare article-to-section similarity for each embedding type: text, tags, categories

def get_embedding_for_terms(terms: List[str], bedrock_client) -> np.ndarray:
    """Average embedding for a list of terms (tags or categories)."""
    embeddings = []
    for term in terms:
        if term and isinstance(term, str) and term.strip():
            emb = get_embedding_with_cache(term.strip(), bedrock_client)
            embeddings.append(emb)
    if embeddings:
        return np.mean(embeddings, axis=0)
    else:
        return np.zeros(1536)

def get_embedding_for_text(text: str, bedrock_client) -> np.ndarray:
    """Embedding for full text."""
    if text and isinstance(text, str) and text.strip():
        return get_embedding_with_cache(text.strip(), bedrock_client)
    else:
        return np.zeros(1536)

def compare_article_to_taxonomy_sections(article, taxonomy, bedrock_client):
    """Return similarity scores for each section for text, tags, categories."""
    # Prepare section embeddings
    section_embeddings = {}
    section_names = []
    for section in taxonomy['main_sections']:
        section_name = section['name']
        section_text = f"{section_name}. {section.get('description', '')}. Tags: {', '.join(section.get('tags', []))}"
        section_embedding = get_embedding_with_cache(section_text, bedrock_client)
        section_embeddings[section_name] = section_embedding
        section_names.append(section_name)
    section_embedding_matrix = np.array([section_embeddings[name] for name in section_names])

    # Article embeddings
    text_emb = get_embedding_for_text(article.get('full_text', ''), bedrock_client)
    tags_emb = get_embedding_for_terms(article.get('llm_tags', []), bedrock_client)
    cats_emb = get_embedding_for_terms(article.get('llm_categories', []), bedrock_client)

    # Similarities
    sim_text = cosine_similarity([text_emb], section_embedding_matrix)[0]
    sim_tags = cosine_similarity([tags_emb], section_embedding_matrix)[0]
    sim_cats = cosine_similarity([cats_emb], section_embedding_matrix)[0]

    # Results
    results = {
        'title': article.get('title', ''),
        'similarities': {
            'text': dict(zip(section_names, sim_text)),
            'tags': dict(zip(section_names, sim_tags)),
            'categories': dict(zip(section_names, sim_cats)),
        }
    }
    return results

# Example: Show similarity scores for first 3 articles
taxonomy = load_taxonomy("taxonomy_results_1.json")
for article in articles[:3]:
    res = compare_article_to_taxonomy_sections(article, taxonomy, bedrock_client)
    print(f"\nTitle: {res['title']}")
    print("  Text embedding similarity:")
    for sec, sim in res['similarities']['text'].items():
        print(f"    {sec}: {sim:.3f}")
    print("  Tags embedding similarity:")
    for sec, sim in res['similarities']['tags'].items():
        print(f"    {sec}: {sim:.3f}")
    print("  Categories embedding similarity:")
    for sec, sim in res['similarities']['categories'].items():
        print(f"    {sec}: {sim:.3f}")

## export embeddings for vis

In [None]:
import json

def export_umap_to_json(embedding_2d, items, output_file="umap_data.json"):
  """
  Export UMAP 2D coordinates and metadata to a JSON file for web visualization.
  embedding_2d: np.ndarray of shape (n_items, 2)
  items: list of dicts with metadata for each item (articles, tags, sections, etc.)
  output_file: path to save JSON
  """
  export_data = []
  for i, item in enumerate(items):
    export_data.append({
      "x": float(embedding_2d[i, 0]),
      "y": float(embedding_2d[i, 1]),
      "label": item.get("title") or item.get("name"),
      "type": item.get("type"),
      "cluster": item.get("cluster"),
      "metadata": item
    })
  with open(output_file, "w") as f:
    json.dump(export_data, f, indent=2)
  print(f"✓ Exported UMAP data to {output_file}")

# Example usage:
export_umap_to_json(embedding_2d, items, "umap_data.json")

## Using taxonomy with LLM to classify 

In [None]:
# %% Using the Taxonomy - not sure about this one

def classify_new_article(article_content: str, article_title: str, 
                        taxonomy: Dict[str, Any], 
                        bedrock_client) -> Dict[str, Any]:
    """Classify a new article using the existing taxonomy."""
    print(f"Classifying new article: '{article_title}'...")
    
    # Get the consolidated taxonomy

    main_sections = taxonomy['main_sections']
    all_tags = taxonomy['all_tags']
    
    # Prepare section descriptions for the LLM
    section_descriptions = []
    for section in main_sections:
        section_desc = f"- {section['name']}: {section['description']} (Tags: {', '.join(section['tags'])})"
        section_descriptions.append(section_desc)
    
    # Create classification prompt
    prompt = f"""
        Given the following blog taxonomy and a new article, classify the article:

        BLOG TAXONOMY:
        {chr(10).join(section_descriptions)}

        AVAILABLE TAGS: {', '.join(all_tags)}

        NEW ARTICLE:
        Title: {article_title}
        Content: {article_content[:3000]}...

        Please classify this article by:
        1. Selecting the most appropriate main section(s) from the taxonomy
        2. Assigning relevant tags from the available tags list
        3. Suggesting any new tags if the existing ones don't fit well

        Respond in this exact JSON format:
        {{
            "main_sections": ["Section Name 1"],
            "suggested_new_tags": ["new_tag1", "new_tag2"],
            "confidence": "high/medium/low",
            "reasoning": "Brief explanation of classification decisions (max 2 sentences)"
        }}"""

    try:
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 400,
            "temperature": 0,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        })
        
        response = bedrock_client.invoke_model(
            modelId=MODEL_SONNET_35,
            body=body,
            contentType="application/json",
            accept="application/json"
        )
        
        response_body = json.loads(response['body'].read())
        llm_response = response_body['content'][0]['text'].strip()
        
        # Parse JSON response
        classification = json.loads(llm_response)
        
        print(f"✓ Article classified successfully")
        print(f"  Sections: {', '.join(classification['main_sections'])}")
        print(f"  Tags: {', '.join(classification['existing_tags'])}")
        if classification.get('suggested_new_tags'):
            print(f"  New tags suggested: {', '.join(classification['suggested_new_tags'])}")
        print(f"  Confidence: {classification['confidence']}")
        
        return classification
        
    except Exception as e:
        print(f"❌ Error classifying article: {e}")
        return {
            "main_sections": [],
            "suggested_new_tags": [],
            "confidence": "low",
            "reasoning": f"Error: {str(e)}"
        }

def create_frontmatter_suggestions(classification: Dict[str, Any]) -> str:
    """Generate frontmatter suggestions for a new article based on classification."""
    
    frontmatter = f"""---
        date: {datetime.now().strftime('%Y-%m-%d')}
        llm_tags: {classification.get('suggested_new_tags', [])}
        llm_categories: {classification['main_sections']}
        # Confidence: {classification['confidence']}
        # Reasoning: {classification['reasoning']}
        ---"""
    
    return frontmatter

def update_taxonomy_with_new_tags(taxonomy_results: Dict[str, Any], 
                                 new_tags: List[str]) -> Dict[str, Any]:
    """Update taxonomy with new tags discovered from new articles."""
    if not new_tags:
        return taxonomy_results
    
    print(f"Adding {len(new_tags)} new tags to taxonomy: {new_tags}")
    
    # Add to the consolidated tags list
    current_tags = set(taxonomy_results['taxonomy']['all_tags'])
    updated_tags = list(current_tags.union(set(new_tags)))
    taxonomy_results['taxonomy']['all_tags'] = sorted(updated_tags)
    
    return taxonomy_results


In [None]:
def classify_new_article_with_section_analysis(article_content: str, 
                                             article_title: str,
                                             taxonomy_results: Dict[str, Any],
                                             bedrock_client) -> Dict[str, Any]:
    """Enhanced classification that can suggest new sections."""
    
    # Get basic classification first
    classification = classify_new_article(article_content, article_title, taxonomy_results, bedrock_client)
    
    # If confidence is low, ask LLM if a new section might be needed
    if classification['confidence'] == 'low':
        current_sections = [s['name'] for s in taxonomy_results['taxonomy']['main_sections']]
        
        section_analysis_prompt = f"""This article doesn't fit well into existing blog sections: {', '.join(current_sections)}

Article: {article_title}
Content preview: {article_content[:1000]}...

Should a new blog section be created for this type of content? If so, suggest:
1. Section name
2. Section description  
3. What other content might fit in this section

Respond in JSON format:
{{
  "suggest_new_section": true/false,
  "section_name": "Proposed Section Name",
  "section_description": "What this section would cover",
  "rationale": "Why this section is needed"
}}"""

        try:
            # Call LLM for section analysis
            body = json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 300,
                "messages": [{"role": "user", "content": section_analysis_prompt}]
            })
            
            response = bedrock_client.invoke_model(
                modelId="anthropic.claude-3-haiku-20240307-v1:0",
                body=body,
                contentType="application/json",
                accept="application/json"
            )
            
            response_body = json.loads(response['body'].read())
            section_analysis = json.loads(response_body['content'][0]['text'].strip())
            
            # Add section suggestion to classification
            classification['section_suggestion'] = section_analysis
            
        except Exception as e:
            classification['section_suggestion'] = {"suggest_new_section": False, "error": str(e)}
    
    return classification

In [None]:
# Track classifications over time
recent_classifications = []

file_path = '../src/content/drafts/on-games.md'

with open(file_path, 'r', encoding='utf-8') as f:
    content = f.read()
    
    # Parse frontmatter
    post = frontmatter.loads(content)
    
    # Create full text for embedding (title + content)
    title = post.metadata.get("title", "")
    full_text = f"{title}\n\n{post.content.strip()}"
    
    article = {
        "file_path": str(file_path),
        "filename": file_path,
        "title": title,
        "date": post.metadata.get("pubDate", ""),
        "tags": post.metadata.get("tags", []),
        "metadata": post.metadata,
        "content": post.content.strip(),
        "full_text": full_text,
        "processed_content": preprocess_text(full_text)
    }

# For each new article
new_classification = classify_new_article_with_section_analysis(
    article ['processed_content'], article['title'], taxonomy_results, bedrock_client
)

recent_classifications.append(new_classification)



In [None]:

# Periodically analyze if taxonomy needs expansion
if len(recent_classifications) >= 1:  # Every 10 articles
    evolution_analysis = analyze_taxonomy_evolution(
        taxonomy_results, recent_classifications, bedrock_client
    )
    
    if evolution_analysis['needs_new_sections']:
        print("🚨 TAXONOMY UPDATE RECOMMENDED:")
        for suggestion in evolution_analysis['suggested_sections']:
            print(f"• New Section: {suggestion['name']}")
            print(f"  Description: {suggestion['description']}")
            print(f"  Rationale: {suggestion['rationale']}")
            print(f"  Tags: {', '.join(suggestion['suggested_tags'])}")

In [None]:
# %% Enhanced Results with LLM Analysis
def create_enhanced_results_df(articles: List[Dict[str, Any]], categories: Dict[int, str]) -> pd.DataFrame:
    """Create comprehensive DataFrame with all analysis results."""
    
    # Create base DataFrame
    df = pd.DataFrame(articles)
    
    # Add cluster categories
    df['cluster_category'] = df['cluster'].map(lambda x: categories.get(x, 'Unclustered'))
    
    # Add LLM analysis columns if available
    if 'llm_tags' in df.columns:
        df['llm_tags_str'] = df['llm_tags'].apply(lambda x: ', '.join(x) if x else '')
        df['llm_main_points_str'] = df['llm_main_points'].apply(
            lambda x: ' | '.join(x) if x else ''
        )
    
    return df

# Create enhanced results DataFrame
enhanced_df = create_enhanced_results_df(articles, categories)

print(f"\n💾 Enhanced results saved to 'enhanced_df' DataFrame")
print("   New columns: llm_tags, llm_main_points, llm_summary, cluster_category")

# Show comparison of original tags vs LLM tags
if 'llm_tags' in enhanced_df.columns:
    print(f"\n🔍 TAG COMPARISON (Original vs LLM):")
    for _, row in enhanced_df.head(3).iterrows():
        print(f"  {row['title'][:40]}...")
        print(f"    Original tags: {row['tags']}")
        print(f"    LLM tags: {row['llm_tags']}")


In [None]:
for article in analyzed_articles:
    print(f"\n📄 {article['title']}")
    print(f"   🏷️  LLM Tags: {', '.join(article['llm_tags'])}")
    print(f"   📋 Main Points:")
    for point in article['llm_main_points']:
        print(f"      • {point}")
    print(f"   📝 Summary: {article['llm_summary']}")