In [0]:
!pip install umap-learn numpy hdbscan bertopic langchain databricks-sdk[openai]

In [0]:
import numpy as np

CATALOG = "kohei_arai"
SCHEMA = "demo"
WRITEBACK_TABLE = "calls_log_silver_index_writeback_table"
PLOT_TABLE = "calls_log_plot_table"
LABELS_TABLE = "calls_log_labels_table"
GOLD_TABLE = "calls_log_gold"

In [0]:
df = spark.table(f"{CATALOG}.{SCHEMA}.{WRITEBACK_TABLE}")
display(df)

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from sklearn.cluster import SpectralClustering
from sklearn.metrics import silhouette_score
import numpy as np
from pyspark.sql.types import IntegerType
import pandas as pd

def prepare_data(spark_df, vector_col="__db_content_summary_vector"):
    """
    Convert Spark DataFrame to numpy array for sklearn
    """
    # Convert to pandas first (more efficient for spectral clustering)
    pandas_df = spark_df.toPandas()
    # Convert string representation of array to numpy array if needed
    if isinstance(pandas_df[vector_col][0], str):
        vectors = np.array([eval(v) for v in pandas_df[vector_col]])
    else:
        vectors = np.array(pandas_df[vector_col].tolist())
    return vectors, pandas_df

def find_optimal_clusters(X, max_clusters=10):
    """
    Find optimal number of clusters using silhouette score
    """
    silhouette_scores = []
    
    for n_clusters in range(2, max_clusters + 1):
        print(f"Testing {n_clusters} clusters...")
        
        # Initialize and fit Spectral Clustering
        spectral = SpectralClustering(
            n_clusters=n_clusters,
            assign_labels='discretize',
            random_state=42,
            affinity='nearest_neighbors'  # Using KNN for sparse affinity matrix
        )
        cluster_labels = spectral.fit_predict(X)
        
        # Calculate silhouette score
        score = silhouette_score(X, cluster_labels)
        silhouette_scores.append(score)
        print(f"Silhouette score for {n_clusters} clusters: {score:.3f}")
    
    optimal_clusters = np.argmax(silhouette_scores) + 2
    return optimal_clusters

def perform_spectral_clustering(X, n_clusters):
    """
    Perform spectral clustering with optimal parameters
    """
    spectral = SpectralClustering(
        n_clusters=n_clusters,
        assign_labels='discretize',
        random_state=24,
        affinity='nearest_neighbors',
        n_neighbors=10  # Adjust based on your data
    )
    return spectral.fit_predict(X)

def create_spark_df_with_clusters(spark, original_df, cluster_labels):
    """
    Create a new Spark DataFrame with cluster assignments
    """
    # Convert cluster labels to Spark DataFrame
    cluster_pd = pd.DataFrame(cluster_labels, columns=['cluster_id'])
    cluster_spark = spark.createDataFrame(cluster_pd)
    
    # Add row index to both DataFrames
    original_with_index = original_df.withColumn("row_idx", F.monotonically_increasing_id())
    cluster_with_index = cluster_spark.withColumn("row_idx", F.monotonically_increasing_id())
    
    # Join the DataFrames
    return original_with_index.join(
        cluster_with_index,
        on="row_idx"
    ).drop("row_idx")

# Main clustering pipeline
def spectral_clustering_pipeline(spark_df, max_clusters=10, n_clusters=None):
    """
    Main pipeline for spectral clustering
    """
    print("Preparing data...")
    X, pandas_df = prepare_data(spark_df)
    
    if n_clusters is None:
        print("Finding optimal number of clusters...")
        # n_clusters = find_optimal_clusters(X, max_clusters)
        n_clusters = max_clusters
        print(f"Optimal number of clusters: {n_clusters}")
    
    print(f"Performing spectral clustering with {n_clusters} clusters...")
    cluster_labels = perform_spectral_clustering(X, n_clusters)
    
    # Create final DataFrame with cluster assignments
    final_df = create_spark_df_with_clusters(spark, spark_df, cluster_labels)
    
    # Show cluster distribution
    print("\nCluster distribution:")
    final_df.groupBy("cluster_id").count().orderBy("cluster_id").show()
    
    return final_df

# Run the pipeline
clustered_df = spectral_clustering_pipeline(
    df,
    max_clusters=5  # Adjust based on your needs
)

display(clustered_df)

In [0]:
import numpy as np
import pandas as pd
from sklearn.manifold import TSNE
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Convert embeddings to numpy array
vectors = np.array(clustered_df.select('__db_content_summary_vector').toPandas()['__db_content_summary_vector'].tolist())
clusters = np.array(clustered_df.select('cluster_id').toPandas()['cluster_id'].tolist())
texts = clustered_df.select('content_summary').toPandas()['content_summary'].tolist()

# Perform t-SNE
print("Performing t-SNE dimensionality reduction...")
tsne = TSNE(n_components=2, random_state=42, perplexity=5)
embeddings_2d = tsne.fit_transform(vectors)

# Create DataFrame for plotting
plot_df = pd.DataFrame({
    'x': embeddings_2d[:, 0],
    'y': embeddings_2d[:, 1],
    'cluster': clusters.astype(str),  # Convert to string for better legends
    'text': texts
})

# Map cluster IDs to colors
unique_clusters = plot_df['cluster'].unique()
color_map = {cluster: px.colors.qualitative.Set3[i % len(px.colors.qualitative.Set3)] for i, cluster in enumerate(unique_clusters)}
plot_df['color'] = plot_df['cluster'].map(color_map)

# Create subplots: main scatter plot and cluster distribution
fig = make_subplots(
    rows=1, cols=2,
    column_widths=[0.7, 0.3],
    specs=[[{"type": "scatter"}, {"type": "bar"}]],
    subplot_titles=('Cluster Visualization (t-SNE)', 'Cluster Distribution')
)

# Add scatter plot
scatter = go.Scatter(
    x=plot_df['x'],
    y=plot_df['y'],
    mode='markers',
    marker=dict(
        size=8,
        color=plot_df['color'],
        showscale=False
    ),
    text=plot_df['text'],
    hovertemplate="<b>Cluster:</b> %{marker.color}<br>" +
                  "<b>Text:</b> %{text}<br>" +
                  "<extra></extra>",
    showlegend=True,
    name='Clusters'
)
fig.add_trace(scatter, row=1, col=1)

# Add cluster distribution bar chart
cluster_counts = plot_df['cluster'].value_counts().sort_index()
bar = go.Bar(
    x=cluster_counts.index,
    y=cluster_counts.values,
    name='Cluster Size',
    marker_color=px.colors.qualitative.Set3[:len(cluster_counts)],
    hovertemplate="<b>Cluster:</b> %{x}<br>" +
                  "<b>Count:</b> %{y}<br>" +
                  "<extra></extra>"
)
fig.add_trace(bar, row=1, col=2)

# Update layout
fig.update_layout(
    title_text="Cluster Analysis Dashboard",
    title_x=0.5,
    width=1200,
    height=600,
    showlegend=True,
    template='plotly_white',
    hovermode='closest'
)

# Update axes
fig.update_xaxes(title_text="t-SNE dimension 1", row=1, col=1)
fig.update_yaxes(title_text="t-SNE dimension 2", row=1, col=1)
fig.update_xaxes(title_text="Cluster", row=1, col=2)
fig.update_yaxes(title_text="Number of items", row=1, col=2)

# Show plot
fig.show()

# Print cluster statistics
print("\nDetailed Cluster Statistics:")
cluster_stats = pd.DataFrame({
    'Cluster': cluster_counts.index,
    'Count': cluster_counts.values,
    'Percentage': (cluster_counts.values / len(plot_df) * 100).round(2)
})
print(cluster_stats.to_string(index=False))

In [0]:
display(plot_df)

In [0]:
spark.createDataFrame(plot_df).write.mode("overwrite").saveAsTable(f"{CATALOG}.{SCHEMA}.{PLOT_TABLE}")