# üî¨ Embedding Clustering Toolkit

**A comprehensive toolkit for clustering high-dimensional text embeddings using DBSCAN and HDBSCAN algorithms.**

This notebook provides an interactive, configurable workflow for:
- Loading and preprocessing embedding vectors from CSV files
- Finding optimal clustering parameters automatically
- Performing DBSCAN clustering with cosine similarity
- Performing HDBSCAN clustering with PCA dimensionality reduction
- Visualizing and exporting results

---

## Table of Contents
1. [Setup & Configuration](#1-setup--configuration)
2. [Data Loading](#2-data-loading)
3. [Parameter Search (Sweet Spot Finder)](#3-parameter-search-sweet-spot-finder)
4. [DBSCAN Clustering](#4-dbscan-clustering)
5. [HDBSCAN with PCA](#5-hdbscan-with-pca)
6. [Results Export](#6-results-export)
7. [Visualization](#7-visualization)

---
## 1. Setup & Configuration

Install required packages and configure your analysis parameters.

In [None]:
# Install dependencies (uncomment if needed)
# !pip install pandas numpy scikit-learn hdbscan openpyxl matplotlib seaborn tqdm

In [None]:
import pandas as pd
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Optional, List, Dict, Any
from dataclasses import dataclass
from tqdm.notebook import tqdm
import warnings

try:
    import hdbscan
    HDBSCAN_AVAILABLE = True
except ImportError:
    HDBSCAN_AVAILABLE = False
    print("‚ö†Ô∏è hdbscan not installed. Install with: pip install hdbscan")

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-whitegrid')

print("‚úÖ All imports successful!")
print(f"   - HDBSCAN available: {HDBSCAN_AVAILABLE}")

In [None]:
# =============================================================================
# üìã CONFIGURATION - Modify these parameters for your use case
# =============================================================================

@dataclass
class ClusteringConfig:
    """Configuration for embedding clustering analysis."""
    
    # File paths
    input_csv_path: str = "sample.csv"  # Path to your CSV file with embeddings
    output_xlsx_path: str = "clustering_results.xlsx"  # Output file path
    
    # Vector configuration
    vector_dimension: int = 3072  # Expected dimension (3072 for text-embedding-3-large)
    vector_columns: List[str] = None  # Column names containing vector parts (auto-detect if None)
    name_column: str = "Name"  # Column containing entity names/labels
    
    # DBSCAN parameters
    similarity_threshold: float = 0.78  # Cosine similarity threshold (higher = tighter clusters)
    min_samples: int = 2  # Minimum samples to form a cluster
    
    # HDBSCAN + PCA parameters  
    n_pca_components: int = 30  # PCA dimensions for HDBSCAN
    hdbscan_metric: str = "euclidean"  # Distance metric for HDBSCAN
    hdbscan_min_cluster_size: int = 2  # Minimum cluster size for HDBSCAN
    hdbscan_min_samples: int = 1  # Min samples for HDBSCAN
    
    # Parameter search ranges
    threshold_range: Tuple[float, float, float] = (0.995, 0.800, -0.005)  # (start, stop, step)
    min_samples_range: List[int] = None  # List of min_samples to test
    
    def __post_init__(self):
        if self.vector_columns is None:
            self.vector_columns = ["1", "2", "3", "4", "5", "6"]
        if self.min_samples_range is None:
            self.min_samples_range = [2, 3, 5]

# Initialize config - MODIFY THIS FOR YOUR DATA
config = ClusteringConfig(
    input_csv_path="sample.csv",
    output_xlsx_path="clustering_results.xlsx",
    vector_dimension=3072,
    similarity_threshold=0.78,
    min_samples=2,
)

print("üìã Configuration loaded:")
print(f"   - Input: {config.input_csv_path}")
print(f"   - Vector dimension: {config.vector_dimension}")
print(f"   - Similarity threshold: {config.similarity_threshold}")

---
## 2. Data Loading

Load your CSV file containing embedding vectors. This handles:
- Multiple column formats (split vectors or single column)
- Automatic validation of vector dimensions
- Filtering invalid/incomplete vectors

In [None]:
class EmbeddingDataLoader:
    """Load and preprocess embedding vectors from CSV files."""
    
    def __init__(self, config: ClusteringConfig):
        self.config = config
        self.df: Optional[pd.DataFrame] = None
        self.valid_vectors_df: Optional[pd.DataFrame] = None
        self.vector_matrix: Optional[np.ndarray] = None
    
    def load(self, file_path: Optional[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Load data from CSV and prepare for clustering.
        
        Returns:
            Tuple of (original_df, valid_vectors_df)
        """
        file_path = file_path or self.config.input_csv_path
        
        print(f"üìÇ Loading data from: {file_path}")
        self.df = pd.read_csv(file_path)
        print(f"   - Total rows: {len(self.df):,}")
        print(f"   - Columns: {list(self.df.columns)}")
        
        # Parse vectors from columns
        self._parse_vectors()
        
        # Filter valid vectors
        self._filter_valid_vectors()
        
        return self.df, self.valid_vectors_df
    
    def _parse_vectors(self) -> None:
        """Parse vector columns into numpy arrays."""
        
        # Check if vectors are split across multiple columns
        if all(col in self.df.columns for col in self.config.vector_columns):
            print(f"   - Parsing vectors from columns: {self.config.vector_columns}")
            self.df['vector_array'] = self.df.apply(
                lambda row: self._concat_vector_parts(row, self.config.vector_columns),
                axis=1
            )
        # Check for single 'embedding' or 'vector' column
        elif 'embedding' in self.df.columns:
            print("   - Parsing vectors from 'embedding' column")
            self.df['vector_array'] = self.df['embedding'].apply(self._parse_single_column)
        elif 'vector' in self.df.columns:
            print("   - Parsing vectors from 'vector' column")
            self.df['vector_array'] = self.df['vector'].apply(self._parse_single_column)
        else:
            raise ValueError(
                f"Could not find vector columns. Expected one of:\n"
                f"  - Split columns: {self.config.vector_columns}\n"
                f"  - Single column: 'embedding' or 'vector'"
            )
        
        # Initialize cluster column
        self.df['cluster'] = -1
    
    def _concat_vector_parts(self, row: pd.Series, columns: List[str]) -> np.ndarray:
        """Concatenate vector parts from multiple columns."""
        parts = []
        for col in columns:
            val = str(row[col])
            if val and val != 'nan':
                parts.extend([
                    float(x.strip()) 
                    for x in val.split(',') 
                    if x.strip()
                ])
        return np.array(parts)
    
    def _parse_single_column(self, value: str) -> np.ndarray:
        """Parse vector from a single column (comma or JSON format)."""
        if pd.isna(value):
            return np.array([])
        
        value = str(value).strip()
        
        # Handle JSON array format
        if value.startswith('['):
            import json
            return np.array(json.loads(value))
        
        # Handle comma-separated format
        return np.array([float(x.strip()) for x in value.split(',') if x.strip()])
    
    def _filter_valid_vectors(self) -> None:
        """Filter rows with valid vector dimensions."""
        vector_lengths = self.df['vector_array'].apply(len)
        
        # Show dimension distribution
        print(f"\nüìä Vector dimension distribution:")
        for dim, count in vector_lengths.value_counts().head(5).items():
            status = "‚úÖ" if dim == self.config.vector_dimension else "‚ùå"
            print(f"   {status} Dimension {dim}: {count:,} vectors")
        
        # Filter valid vectors
        mask = vector_lengths == self.config.vector_dimension
        self.valid_vectors_df = self.df[mask].copy()
        
        invalid_count = (~mask).sum()
        if invalid_count > 0:
            print(f"\n‚ö†Ô∏è Filtered out {invalid_count:,} rows with invalid dimensions")
        
        print(f"‚úÖ Valid vectors: {len(self.valid_vectors_df):,}")
        
        # Create vector matrix for efficient computation
        self.vector_matrix = np.stack(self.valid_vectors_df['vector_array'].values)
    
    def get_vector_matrix(self) -> np.ndarray:
        """Get the stacked vector matrix for clustering."""
        if self.vector_matrix is None:
            raise ValueError("Data not loaded. Call load() first.")
        return self.vector_matrix

In [None]:
# Load the data
loader = EmbeddingDataLoader(config)
df, valid_df = loader.load()

print(f"\nüìã Data preview:")
display(valid_df[[config.name_column]].head(10) if config.name_column in valid_df.columns else valid_df.head(10))

---
## 3. Parameter Search (Sweet Spot Finder)

Find optimal DBSCAN parameters by testing different similarity thresholds and min_samples values.

In [None]:
class ParameterSearcher:
    """Find optimal clustering parameters through grid search."""
    
    def __init__(self, vector_matrix: np.ndarray):
        self.vector_matrix = vector_matrix
        self.results: List[Dict[str, Any]] = []
    
    def search(
        self,
        threshold_range: Tuple[float, float, float] = (0.995, 0.800, -0.005),
        min_samples_list: List[int] = [2, 3, 5],
        show_progress: bool = True
    ) -> pd.DataFrame:
        """
        Search for optimal DBSCAN parameters.
        
        Args:
            threshold_range: (start, stop, step) for similarity thresholds
            min_samples_list: List of min_samples values to test
            show_progress: Show progress bar
            
        Returns:
            DataFrame with results for each parameter combination
        """
        self.results = []
        thresholds = np.arange(*threshold_range)
        
        total = len(thresholds) * len(min_samples_list)
        iterator = tqdm(total=total, desc="üîç Parameter search") if show_progress else None
        
        for min_samples in min_samples_list:
            for threshold in thresholds:
                result = self._evaluate_params(threshold, min_samples)
                self.results.append(result)
                
                if iterator:
                    iterator.update(1)
        
        if iterator:
            iterator.close()
        
        return self._create_results_df()
    
    def _evaluate_params(
        self, 
        similarity_threshold: float, 
        min_samples: int
    ) -> Dict[str, Any]:
        """Evaluate a single parameter combination."""
        eps_value = 1 - similarity_threshold
        
        db = DBSCAN(eps=eps_value, min_samples=min_samples, metric='cosine')
        labels = db.fit_predict(self.vector_matrix)
        
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        n_clustered = len(labels) - n_noise
        
        # Calculate silhouette score if possible
        silhouette = np.nan
        if 1 < n_clusters < len(labels) and n_clustered > n_clusters:
            try:
                silhouette = silhouette_score(self.vector_matrix, labels, metric='cosine')
            except:
                pass
        
        return {
            'similarity_threshold': similarity_threshold,
            'min_samples': min_samples,
            'n_clusters': n_clusters,
            'n_noise': n_noise,
            'noise_ratio': n_noise / len(labels),
            'clustered_ratio': n_clustered / len(labels),
            'avg_cluster_size': n_clustered / n_clusters if n_clusters > 0 else 0,
            'silhouette_score': silhouette
        }
    
    def _create_results_df(self) -> pd.DataFrame:
        """Create results DataFrame sorted by quality."""
        results_df = pd.DataFrame(self.results)
        
        # Sort by silhouette score (higher is better), then by cluster count
        results_df = results_df.sort_values(
            by=['silhouette_score', 'n_clusters'],
            ascending=[False, False]
        )
        
        return results_df
    
    def plot_results(self, results_df: pd.DataFrame) -> None:
        """Visualize parameter search results."""
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        for idx, min_samples in enumerate(results_df['min_samples'].unique()):
            subset = results_df[results_df['min_samples'] == min_samples]
            
            # Plot 1: Clusters vs Threshold
            axes[0, 0].plot(
                subset['similarity_threshold'], 
                subset['n_clusters'],
                marker='o',
                label=f'min_samples={min_samples}'
            )
            
            # Plot 2: Noise Ratio vs Threshold
            axes[0, 1].plot(
                subset['similarity_threshold'],
                subset['noise_ratio'],
                marker='o',
                label=f'min_samples={min_samples}'
            )
            
            # Plot 3: Silhouette Score vs Threshold
            axes[1, 0].plot(
                subset['similarity_threshold'],
                subset['silhouette_score'],
                marker='o',
                label=f'min_samples={min_samples}'
            )
            
            # Plot 4: Avg Cluster Size vs Threshold
            axes[1, 1].plot(
                subset['similarity_threshold'],
                subset['avg_cluster_size'],
                marker='o',
                label=f'min_samples={min_samples}'
            )
        
        axes[0, 0].set(xlabel='Similarity Threshold', ylabel='Number of Clusters', title='Clusters vs Threshold')
        axes[0, 1].set(xlabel='Similarity Threshold', ylabel='Noise Ratio', title='Noise vs Threshold')
        axes[1, 0].set(xlabel='Similarity Threshold', ylabel='Silhouette Score', title='Quality vs Threshold')
        axes[1, 1].set(xlabel='Similarity Threshold', ylabel='Avg Cluster Size', title='Cluster Size vs Threshold')
        
        for ax in axes.flat:
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.suptitle('DBSCAN Parameter Search Results', y=1.02, fontsize=14, fontweight='bold')
        plt.show()
    
    def get_best_params(self, results_df: pd.DataFrame) -> Dict[str, Any]:
        """Get the best parameter combination."""
        # Filter for valid silhouette scores
        valid_results = results_df[results_df['silhouette_score'].notna()]
        
        if len(valid_results) == 0:
            print("‚ö†Ô∏è No valid silhouette scores. Using highest cluster count.")
            best = results_df.iloc[results_df['n_clusters'].idxmax()]
        else:
            best = valid_results.iloc[0]  # Already sorted by silhouette
        
        return best.to_dict()

In [None]:
# Run parameter search
searcher = ParameterSearcher(loader.get_vector_matrix())

# Customize search range if needed
search_results = searcher.search(
    threshold_range=(0.95, 0.70, -0.01),  # Coarser search for speed
    min_samples_list=[2, 3, 5],
    show_progress=True
)

print("\nüìä Top 10 Parameter Combinations:")
display(search_results.head(10))

In [None]:
# Visualize parameter search results
searcher.plot_results(search_results)

# Get best parameters
best_params = searcher.get_best_params(search_results)
print("\nüéØ Best Parameters Found:")
for key, value in best_params.items():
    if isinstance(value, float):
        print(f"   - {key}: {value:.4f}")
    else:
        print(f"   - {key}: {value}")

---
## 4. DBSCAN Clustering

Perform DBSCAN clustering using cosine similarity.

In [None]:
class DBSCANClusterer:
    """DBSCAN clustering for high-dimensional embeddings using cosine similarity."""
    
    def __init__(self, vector_matrix: np.ndarray):
        self.vector_matrix = vector_matrix
        self.labels: Optional[np.ndarray] = None
        self.stats: Dict[str, Any] = {}
    
    def fit(
        self,
        similarity_threshold: float = 0.78,
        min_samples: int = 2
    ) -> np.ndarray:
        """
        Perform DBSCAN clustering.
        
        Args:
            similarity_threshold: Cosine similarity threshold (0-1)
            min_samples: Minimum samples for core points
            
        Returns:
            Array of cluster labels (-1 = noise)
        """
        print(f"\nüîß Running DBSCAN clustering...")
        print(f"   - Similarity threshold: {similarity_threshold}")
        print(f"   - Min samples: {min_samples}")
        
        eps_value = 1 - similarity_threshold
        
        db = DBSCAN(eps=eps_value, min_samples=min_samples, metric='cosine')
        self.labels = db.fit_predict(self.vector_matrix)
        
        self._calculate_stats()
        self._print_summary()
        
        return self.labels
    
    def _calculate_stats(self) -> None:
        """Calculate clustering statistics."""
        labels = self.labels
        
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        n_clustered = len(labels) - n_noise
        
        # Cluster size distribution
        cluster_sizes = pd.Series(labels).value_counts().sort_index()
        if -1 in cluster_sizes.index:
            cluster_sizes = cluster_sizes.drop(-1)
        
        # Silhouette score
        silhouette = np.nan
        if 1 < n_clusters < len(labels):
            try:
                silhouette = silhouette_score(self.vector_matrix, labels, metric='cosine')
            except:
                pass
        
        self.stats = {
            'n_clusters': n_clusters,
            'n_noise': n_noise,
            'n_clustered': n_clustered,
            'noise_ratio': n_noise / len(labels),
            'clustered_ratio': n_clustered / len(labels),
            'avg_cluster_size': n_clustered / n_clusters if n_clusters > 0 else 0,
            'cluster_sizes': cluster_sizes.to_dict(),
            'silhouette_score': silhouette
        }
    
    def _print_summary(self) -> None:
        """Print clustering summary."""
        s = self.stats
        
        print(f"\nüìä DBSCAN Results:")
        print(f"   ‚îú‚îÄ Clusters: {s['n_clusters']}")
        print(f"   ‚îú‚îÄ Noise points: {s['n_noise']} ({s['noise_ratio']:.1%})")
        print(f"   ‚îú‚îÄ Clustered points: {s['n_clustered']} ({s['clustered_ratio']:.1%})")
        print(f"   ‚îú‚îÄ Avg cluster size: {s['avg_cluster_size']:.1f}")
        if not np.isnan(s['silhouette_score']):
            print(f"   ‚îî‚îÄ Silhouette score: {s['silhouette_score']:.3f}")
        else:
            print(f"   ‚îî‚îÄ Silhouette score: N/A")
    
    def get_cluster_summary(self) -> pd.DataFrame:
        """Get summary DataFrame of cluster sizes."""
        if self.labels is None:
            raise ValueError("No clustering performed yet. Call fit() first.")
        
        sizes = pd.Series(self.labels).value_counts().sort_values(ascending=False)
        summary = pd.DataFrame({
            'cluster_id': sizes.index,
            'size': sizes.values,
            'percentage': (sizes.values / len(self.labels) * 100).round(2)
        })
        summary['type'] = summary['cluster_id'].apply(lambda x: 'Noise' if x == -1 else 'Cluster')
        
        return summary

In [None]:
# Run DBSCAN with optimal or custom parameters
dbscan_clusterer = DBSCANClusterer(loader.get_vector_matrix())

# Option 1: Use best parameters from search
# dbscan_labels = dbscan_clusterer.fit(
#     similarity_threshold=best_params['similarity_threshold'],
#     min_samples=int(best_params['min_samples'])
# )

# Option 2: Use custom parameters
dbscan_labels = dbscan_clusterer.fit(
    similarity_threshold=config.similarity_threshold,
    min_samples=config.min_samples
)

# Show cluster summary
print("\nüìã Cluster Size Distribution:")
display(dbscan_clusterer.get_cluster_summary().head(20))

---
## 5. HDBSCAN with PCA

Use PCA for dimensionality reduction followed by HDBSCAN clustering. This approach:
- Reduces computation time significantly
- Can improve clustering quality for very high-dimensional data
- Enables visualization in 2D/3D

In [None]:
class HDBSCANClusterer:
    """HDBSCAN clustering with optional PCA dimensionality reduction."""
    
    def __init__(self, vector_matrix: np.ndarray):
        self.vector_matrix = vector_matrix
        self.reduced_data: Optional[np.ndarray] = None
        self.labels: Optional[np.ndarray] = None
        self.stats: Dict[str, Any] = {}
        self.pca: Optional[PCA] = None
        self.clusterer: Optional[Any] = None
    
    def apply_pca(self, n_components: int = 30) -> np.ndarray:
        """
        Apply PCA dimensionality reduction.
        
        Args:
            n_components: Number of principal components
            
        Returns:
            Reduced data matrix
        """
        print(f"\nüîß Applying PCA: {self.vector_matrix.shape[1]} ‚Üí {n_components} dimensions")
        
        self.pca = PCA(n_components=n_components)
        self.reduced_data = self.pca.fit_transform(self.vector_matrix)
        
        explained_var = self.pca.explained_variance_ratio_.sum()
        print(f"   - Explained variance: {explained_var:.2%}")
        
        return self.reduced_data
    
    def fit(
        self,
        n_pca_components: int = 30,
        min_cluster_size: int = 2,
        min_samples: int = 1,
        metric: str = 'euclidean',
        cluster_selection_epsilon: float = 0.0
    ) -> np.ndarray:
        """
        Perform HDBSCAN clustering with PCA.
        
        Returns:
            Array of cluster labels (-1 = noise)
        """
        if not HDBSCAN_AVAILABLE:
            raise ImportError("hdbscan package not installed. Run: pip install hdbscan")
        
        # Apply PCA
        self.apply_pca(n_pca_components)
        
        # Run HDBSCAN
        print(f"\nüîß Running HDBSCAN clustering...")
        print(f"   - Min cluster size: {min_cluster_size}")
        print(f"   - Min samples: {min_samples}")
        print(f"   - Metric: {metric}")
        
        self.clusterer = hdbscan.HDBSCAN(
            min_cluster_size=min_cluster_size,
            min_samples=min_samples,
            metric=metric,
            cluster_selection_epsilon=cluster_selection_epsilon
        )
        self.labels = self.clusterer.fit_predict(self.reduced_data)
        
        self._calculate_stats()
        self._print_summary()
        
        return self.labels
    
    def _calculate_stats(self) -> None:
        """Calculate clustering statistics."""
        labels = self.labels
        
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        n_clustered = len(labels) - n_noise
        
        # Silhouette score on reduced data
        silhouette = np.nan
        if 1 < n_clusters < len(labels):
            try:
                silhouette = silhouette_score(self.reduced_data, labels)
            except:
                pass
        
        self.stats = {
            'n_clusters': n_clusters,
            'n_noise': n_noise,
            'n_clustered': n_clustered,
            'noise_ratio': n_noise / len(labels),
            'avg_cluster_size': n_clustered / n_clusters if n_clusters > 0 else 0,
            'silhouette_score': silhouette,
            'pca_explained_variance': self.pca.explained_variance_ratio_.sum() if self.pca else None
        }
    
    def _print_summary(self) -> None:
        """Print clustering summary."""
        s = self.stats
        
        print(f"\nüìä HDBSCAN Results:")
        print(f"   ‚îú‚îÄ Clusters: {s['n_clusters']}")
        print(f"   ‚îú‚îÄ Noise points: {s['n_noise']} ({s['noise_ratio']:.1%})")
        print(f"   ‚îú‚îÄ Avg cluster size: {s['avg_cluster_size']:.1f}")
        if not np.isnan(s['silhouette_score']):
            print(f"   ‚îî‚îÄ Silhouette score: {s['silhouette_score']:.3f}")
        else:
            print(f"   ‚îî‚îÄ Silhouette score: N/A")

In [None]:
# Run HDBSCAN with PCA
if HDBSCAN_AVAILABLE:
    hdbscan_clusterer = HDBSCANClusterer(loader.get_vector_matrix())
    
    hdbscan_labels = hdbscan_clusterer.fit(
        n_pca_components=config.n_pca_components,
        min_cluster_size=config.hdbscan_min_cluster_size,
        min_samples=config.hdbscan_min_samples,
        metric=config.hdbscan_metric
    )
else:
    print("‚ö†Ô∏è HDBSCAN not available. Skipping this section.")
    hdbscan_labels = None

---
## 6. Results Export

Export clustering results to Excel files for further analysis.

In [None]:
class ResultsExporter:
    """Export clustering results to various formats."""
    
    def __init__(self, df: pd.DataFrame, config: ClusteringConfig):
        self.df = df.copy()
        self.config = config
    
    def export(
        self,
        labels: np.ndarray,
        output_path: Optional[str] = None,
        method_name: str = "clustering"
    ) -> str:
        """
        Export clustering results to Excel.
        
        Args:
            labels: Cluster labels array
            output_path: Output file path (auto-generated if None)
            method_name: Name of clustering method for filename
            
        Returns:
            Path to exported file
        """
        if output_path is None:
            output_path = f"{method_name}_results.xlsx"
        
        # Assign labels to valid vectors
        valid_mask = self.df['vector_array'].apply(len) == self.config.vector_dimension
        self.df.loc[valid_mask, 'cluster'] = labels
        
        # Sort by cluster
        export_df = self.df.sort_values(by='cluster')
        
        # Select columns for export
        export_columns = [self.config.name_column, 'cluster'] if self.config.name_column in export_df.columns else ['cluster']
        export_df = export_df[export_columns]
        
        # Export to Excel
        export_df.to_excel(output_path, index=False)
        
        print(f"\nüíæ Results exported to: {output_path}")
        print(f"   - Total rows: {len(export_df):,}")
        
        return output_path
    
    def export_both(
        self,
        dbscan_labels: np.ndarray,
        hdbscan_labels: Optional[np.ndarray] = None
    ) -> None:
        """Export results from both clustering methods."""
        
        # DBSCAN results
        self.export(dbscan_labels, "dbscan_results.xlsx", "dbscan")
        
        # HDBSCAN results (if available)
        if hdbscan_labels is not None:
            self.export(hdbscan_labels, "hdbscan_results.xlsx", "hdbscan")

In [None]:
# Export results
exporter = ResultsExporter(df, config)

# Export DBSCAN results
exporter.export(dbscan_labels, config.output_xlsx_path, "dbscan")

# Export HDBSCAN results if available
if hdbscan_labels is not None:
    exporter.export(hdbscan_labels, "hdbscan_results.xlsx", "hdbscan")

---
## 7. Visualization

Visualize clustering results using PCA-reduced dimensions.

In [None]:
class ClusterVisualizer:
    """Visualize clustering results."""
    
    def __init__(self, vector_matrix: np.ndarray):
        self.vector_matrix = vector_matrix
        self.pca_2d: Optional[np.ndarray] = None
    
    def _get_2d_projection(self) -> np.ndarray:
        """Get 2D PCA projection for visualization."""
        if self.pca_2d is None:
            pca = PCA(n_components=2)
            self.pca_2d = pca.fit_transform(self.vector_matrix)
        return self.pca_2d
    
    def plot_clusters(
        self,
        labels: np.ndarray,
        title: str = "Cluster Visualization",
        figsize: Tuple[int, int] = (12, 8)
    ) -> None:
        """Plot clusters in 2D PCA space."""
        projection = self._get_2d_projection()
        
        fig, ax = plt.subplots(figsize=figsize)
        
        # Plot noise points first (in gray)
        noise_mask = labels == -1
        if noise_mask.any():
            ax.scatter(
                projection[noise_mask, 0],
                projection[noise_mask, 1],
                c='lightgray',
                alpha=0.5,
                s=20,
                label='Noise'
            )
        
        # Plot clustered points
        clustered_mask = ~noise_mask
        if clustered_mask.any():
            scatter = ax.scatter(
                projection[clustered_mask, 0],
                projection[clustered_mask, 1],
                c=labels[clustered_mask],
                cmap='tab20',
                alpha=0.7,
                s=30
            )
            plt.colorbar(scatter, ax=ax, label='Cluster ID')
        
        ax.set_xlabel('PCA Component 1')
        ax.set_ylabel('PCA Component 2')
        ax.set_title(title, fontsize=14, fontweight='bold')
        ax.legend()
        
        plt.tight_layout()
        plt.show()
    
    def plot_cluster_sizes(
        self,
        labels: np.ndarray,
        title: str = "Cluster Size Distribution",
        top_n: int = 20
    ) -> None:
        """Plot cluster size distribution."""
        sizes = pd.Series(labels).value_counts().sort_values(ascending=True)
        
        # Separate noise from clusters
        noise_size = sizes.get(-1, 0)
        cluster_sizes = sizes.drop(-1, errors='ignore').tail(top_n)
        
        fig, ax = plt.subplots(figsize=(10, 6))
        
        colors = ['coral' if x == -1 else 'steelblue' for x in cluster_sizes.index]
        
        cluster_sizes.plot(kind='barh', ax=ax, color='steelblue')
        
        ax.set_xlabel('Number of Points')
        ax.set_ylabel('Cluster ID')
        ax.set_title(title, fontsize=14, fontweight='bold')
        
        # Add noise annotation
        if noise_size > 0:
            ax.annotate(
                f'Noise points: {noise_size}',
                xy=(0.95, 0.05),
                xycoords='axes fraction',
                ha='right',
                fontsize=10,
                color='coral'
            )
        
        plt.tight_layout()
        plt.show()
    
    def compare_methods(
        self,
        dbscan_labels: np.ndarray,
        hdbscan_labels: Optional[np.ndarray] = None
    ) -> None:
        """Compare clustering results from different methods."""
        projection = self._get_2d_projection()
        
        n_methods = 2 if hdbscan_labels is not None else 1
        fig, axes = plt.subplots(1, n_methods, figsize=(6 * n_methods, 5))
        
        if n_methods == 1:
            axes = [axes]
        
        # DBSCAN plot
        ax = axes[0]
        scatter = ax.scatter(
            projection[:, 0],
            projection[:, 1],
            c=dbscan_labels,
            cmap='tab20',
            alpha=0.7,
            s=20
        )
        ax.set_title(f'DBSCAN\n({len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0)} clusters)')
        ax.set_xlabel('PCA 1')
        ax.set_ylabel('PCA 2')
        
        # HDBSCAN plot
        if hdbscan_labels is not None:
            ax = axes[1]
            scatter = ax.scatter(
                projection[:, 0],
                projection[:, 1],
                c=hdbscan_labels,
                cmap='tab20',
                alpha=0.7,
                s=20
            )
            ax.set_title(f'HDBSCAN + PCA\n({len(set(hdbscan_labels)) - (1 if -1 in hdbscan_labels else 0)} clusters)')
            ax.set_xlabel('PCA 1')
            ax.set_ylabel('PCA 2')
        
        plt.suptitle('Clustering Method Comparison', fontsize=14, fontweight='bold', y=1.02)
        plt.tight_layout()
        plt.show()

In [None]:
# Visualize results
visualizer = ClusterVisualizer(loader.get_vector_matrix())

# Plot DBSCAN clusters
visualizer.plot_clusters(dbscan_labels, title="DBSCAN Clustering Results")

# Plot cluster size distribution
visualizer.plot_cluster_sizes(dbscan_labels, title="DBSCAN Cluster Size Distribution")

In [None]:
# Compare methods if HDBSCAN was run
if hdbscan_labels is not None:
    visualizer.compare_methods(dbscan_labels, hdbscan_labels)
    visualizer.plot_cluster_sizes(hdbscan_labels, title="HDBSCAN Cluster Size Distribution")

---
## üéØ Quick Reference

### Choosing Parameters

| Parameter | Higher Value | Lower Value |
|-----------|-------------|-------------|
| **similarity_threshold** | Tighter clusters, more noise | Looser clusters, less noise |
| **min_samples** | More robust clusters, more noise | More clusters, potentially less meaningful |
| **n_pca_components** | More information preserved | Faster computation, more noise reduction |

### When to Use Each Method

| Method | Best For |
|--------|----------|
| **DBSCAN** | High-dimensional embeddings with clear similarity structure |
| **HDBSCAN + PCA** | Very large datasets, noisy data, varying cluster densities |

### Interpreting Results

- **Silhouette Score**: Higher is better (range: -1 to 1)
  - \> 0.5: Strong structure
  - 0.25-0.5: Reasonable structure
  - < 0.25: Weak or no structure
  
- **Noise Ratio**: 
  - < 10%: Very clean clustering
  - 10-30%: Normal for real-world data
  - \> 30%: Consider adjusting parameters

---
## üìù Notes

Use this cell to record your analysis notes and findings.

In [None]:
# Your notes here
# - 
# - 