# Cosine-Based Embedding Matcher for NodeViewForComparison

This notebook implements a cosine similarity-based matcher for `NodeViewForComparison` objects.
The implementation:
1. Applies a generic encoder to create embeddings for all nodes
2. Compares nodes using cosine similarity
3. Groups similar nodes based on a threshold
4. Provides visualization of similarity matrices and groups


In [2]:
# Import required packages (managed by uv.lock)
import json
import numpy as np
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import sys

# ML and visualization imports
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns

print("✓ All packages imported successfully")

  from .autonotebook import tqdm as notebook_tqdm


✓ All packages imported successfully


In [None]:
# Add the project path to import local modules
project_root = Path().resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Import local types
from ardhito.llm_assisted_graph_merging.src.merge_types import NodeViewForComparison  # noqa: E402

# Set style
plt.style.use("default")
sns.set_palette("husl")

print("✓ Local modules imported successfully")

✓ Local modules imported successfully


## 1. Encoder Implementation

We'll create a generic encoder that converts `NodeViewForComparison` objects into embeddings.


In [17]:
class NodeEmbedding:
    def __init__(self, embedding: np.ndarray, node_key: str):
        self.embedding = embedding
        self.node_key = node_key


class NodeEncoder:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.model_name = model_name

    def extract_embedding_from_payload(self, path: str) -> NodeEmbedding:
        node_pair_dict = json.load(open(path))
        node_embeddings = {}
        for key, node in node_pair_dict.items():
            node_view = NodeViewForComparison(**node)
            embedding = self.model.encode(node_view.text)
            node_embeddings[key] = NodeEmbedding(embedding, key)
        return node_embeddings

In [19]:
node_encoder = NodeEncoder()
result = node_encoder.extract_embedding_from_payload(
    "ardhito/llm_assisted_graph_merging/examples/outputs/walkthrough/index.json"
)

## 2. Cosine Similarity Matcher

Implementation of the cosine similarity matcher with threshold-based matching.


In [23]:
class CosineSimilarityMatcher:
    """Cosine similarity matcher for node embeddings"""

    def __init__(self, threshold: float = 0.8):
        """Initialize with similarity threshold"""
        self.threshold = threshold
        self.embeddings: Dict[str, NodeEmbedding] = {}
        self.similarity_matrix: Optional[np.ndarray] = None
        self.keys: List[str] = []

    def add_embeddings(self, embeddings: Dict[str, NodeEmbedding]):
        """Add embeddings to the matcher"""
        self.embeddings.update(embeddings)
        self.keys = list(self.embeddings.keys())
        self._compute_similarity_matrix()

    def _compute_similarity_matrix(self):
        """Compute the full similarity matrix"""
        if not self.embeddings:
            return

        # Extract embeddings in order
        embedding_matrix = np.array(
            [self.embeddings[key].embedding for key in self.keys]
        )

        # Compute cosine similarity
        self.similarity_matrix = cosine_similarity(embedding_matrix)

    def get_similarity(self, key_a: str, key_b: str) -> float:
        """Get similarity between two nodes"""
        if key_a not in self.embeddings or key_b not in self.embeddings:
            raise ValueError(f"One or both keys not found: {key_a}, {key_b}")

        idx_a = self.keys.index(key_a)
        idx_b = self.keys.index(key_b)

        return self.similarity_matrix[idx_a, idx_b]

    def is_match(self, key_a: str, key_b: str) -> bool:
        """Check if two nodes match based on threshold"""
        return self.get_similarity(key_a, key_b) >= self.threshold

    def find_matches(self, key: str) -> List[Tuple[str, float]]:
        """Find all matches for a given node"""
        if key not in self.embeddings:
            raise ValueError(f"Key not found: {key}")

        idx = self.keys.index(key)
        similarities = self.similarity_matrix[idx]

        matches = []
        for i, sim in enumerate(similarities):
            if i != idx and sim >= self.threshold:  # Exclude self
                matches.append((self.keys[i], sim))

        # Sort by similarity descending
        matches.sort(key=lambda x: x[1], reverse=True)
        return matches

    def get_top_similarities(self, key: str, top_k: int = 5) -> List[Tuple[str, float]]:
        """Get top k most similar nodes (excluding self)"""
        if key not in self.embeddings:
            raise ValueError(f"Key not found: {key}")

        idx = self.keys.index(key)
        similarities = self.similarity_matrix[idx]

        # Get indices of top similarities (excluding self)
        similarities_copy = similarities.copy()
        similarities_copy[idx] = -1  # Exclude self

        top_indices = np.argsort(similarities_copy)[-top_k:][::-1]

        return [(self.keys[i], similarities[i]) for i in top_indices]


print("✓ CosineSimilarityMatcher implementation complete")

✓ CosineSimilarityMatcher implementation complete


In [24]:
result

{'node_a': <__main__.NodeEmbedding at 0x15ab00e90>,
 'node_b': <__main__.NodeEmbedding at 0x15ac67010>}

In [26]:
cosine_similarity_matcher = CosineSimilarityMatcher()

cosine_similarity_matcher.add_embeddings(result)

In [27]:
cosine_similarity_matcher.get_top_similarities("node_a", 5)

[('node_b', np.float32(0.7736163)), ('node_a', np.float32(0.99999994))]

## 3. Group Building Algorithm

Iterative group building algorithm that creates groups of similar nodes.
