# Triangular adaptation of UMAP

This is a project to adapt the UMAP algorithm to incoroporate 2-simplices.


##  Installing necessary packages

Initially I had some dependency issues, and these package installations resolves them

In [None]:
#need umap package, but there are weird dependency issues and this is the fix for collab
!pip uninstall -y torch torchvision sympy
!pip uninstall -y scipy
!pip install scipy
!pip install ipywidgets plotly scikit-learn
!pip install scanpy python-igraph louvain umap-learn matplotlib pandas
!pip install ripser persim
!pip install flagser persim

# Neccesary Imports

In [None]:
%matplotlib inline
import numpy as np
import umap
import numba
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import plotly.graph_objects as go
from matplotlib.patches import Polygon
from sklearn.datasets import make_blobs
import pandas as pd
from sklearn.datasets import make_swiss_roll, make_s_curve, make_circles
from ipywidgets import (
    FloatSlider, IntSlider, Checkbox, Dropdown,
    VBox, HBox, Label, Button, Layout, interactive_output, HTML
)
import ipywidgets as widgets
from IPython.display import display, clear_output, Image
from scipy.spatial.distance import pdist, squareform
from scipy.stats import spearmanr
import os
import datetime
import scipy.sparse as sp
from sklearn.manifold import trustworthiness
from ripser import ripser
from persim import plot_diagrams, wasserstein, bottleneck
from mpl_toolkits.mplot3d import Axes3D
from umap.umap_ import simplicial_set_embedding
from umap.umap_ import compute_membership_strengths
from umap.umap_ import smooth_knn_dist
from scipy.spatial import distance_matrix
from pynndescent import NNDescent
from umap.utils import (
    submatrix,
    ts,
    csr_unique,
    fast_knn_indices
)
import flagser
from sklearn.metrics.pairwise import pairwise_distances
from google.colab import output
from tensorflow.keras.datasets import mnist
from collections import defaultdict
output.enable_custom_widget_manager()

## Nearest neighbors
UMAP uses a k-nn graph to find pairwise relations

In [None]:
#nearest neighbor function from umap
def nearest_neighbors(
    X,
    n_neighbors,
    metric,
    metric_kwds,
    angular,
    random_state,
    low_memory=True,
    use_pynndescent=True,
    n_jobs=-1,
    verbose=False,
):
    """Compute the ``n_neighbors`` nearest points for each data point in ``X``
    under ``metric``. This may be exact, but more likely is approximated via
    nearest neighbor descent.

    Parameters
    ----------
    X: array of shape (n_samples, n_features)
        The input data to compute the k-neighbor graph of.

    n_neighbors: int
        The number of nearest neighbors to compute for each sample in ``X``.

    metric: string or callable
        The metric to use for the computation.

    metric_kwds: dict
        Any arguments to pass to the metric computation function.

    angular: bool
        Whether to use angular rp trees in NN approximation.

    random_state: np.random state
        The random state to use for approximate NN computations.

    low_memory: bool (optional, default True)
        Whether to pursue lower memory NNdescent.

    verbose: bool (optional, default False)
        Whether to print status data during the computation.

    Returns
    -------
    knn_indices: array of shape (n_samples, n_neighbors)
        The indices on the ``n_neighbors`` closest points in the dataset.

    knn_dists: array of shape (n_samples, n_neighbors)
        The distances to the ``n_neighbors`` closest points in the dataset.

    rp_forest: list of trees
        The random projection forest used for searching (if used, None otherwise).
    """
    if verbose:
        print(ts(), "Finding Nearest Neighbors")

    if metric == "precomputed":
        # Note that this does not support sparse distance matrices yet ...
        # Compute indices of n nearest neighbors
        knn_indices = fast_knn_indices(X, n_neighbors)
        # knn_indices = np.argsort(X)[:, :n_neighbors]
        # Compute the nearest neighbor distances
        #   (equivalent to np.sort(X)[:,:n_neighbors])
        knn_dists = X[np.arange(X.shape[0])[:, None], knn_indices].copy()
        # Prune any nearest neighbours that are infinite distance apart.
        disconnected_index = knn_dists == np.inf
        knn_indices[disconnected_index] = -1

        knn_search_index = None
    else:
        # TODO: Hacked values for now
        n_trees = min(64, 5 + int(round((X.shape[0]) ** 0.5 / 20.0)))
        n_iters = max(5, int(round(np.log2(X.shape[0]))))

        knn_search_index = NNDescent(
            X,
            n_neighbors=n_neighbors,
            metric=metric,
            metric_kwds=metric_kwds,
            random_state=random_state,
            n_trees=n_trees,
            n_iters=n_iters,
            max_candidates=60,
            low_memory=low_memory,
            n_jobs=n_jobs,
            verbose=verbose,
            compressed=False,
        )
        knn_indices, knn_dists = knn_search_index.neighbor_graph

    if verbose:
        print(ts(), "Finished Nearest Neighbor Search")
    return knn_indices, knn_dists, knn_search_index

# Test data sets

Here we define some geometrically simple test data, to see how our algorithm works.  We begin by creating a 2D grids of points.

Put all sample data sets up here.  Give them nice names, like X_grid, X_circle, X_torus

We begin by creating 1d coordinate arrays.

In [None]:
x = np.linspace(0, 1, 5)  # 5 evenly spaced points from 0 to 1
y = np.linspace(0, 1, 5)

Combine these two 1-d arrays into a 2-d grid.

In [None]:
X, Y = np.meshgrid(x, y)

We now have an evenly spaced grid.

In [None]:
points = np.column_stack([X.ravel(), Y.ravel()])
#rename points to match UMAP notation of X
X_grid = points
print(X_grid.shape)   # (25, 2)
print(X_grid[:10])     # first few (x, y) points

Need to update the index information for the grid dataset.

In [None]:
knn_indices_grid, knn_dists_grid, knn_search_index_grid = nearest_neighbors(X_grid,10,metric='euclidean',metric_kwds={},angular=False,random_state=42)
print (knn_indices_grid[:3], knn_dists_grid[:3])

# Noisy Circle

Now we check a new test dataset, that of a noisy circle.

In [None]:
def generate_noisy_circle(n_points=100, radius=1.0, noise_std=0.05, random_seed=42):
    """
    Generate points roughly on a circle with Gaussian noise.

    Parameters
    ----------
    n_points : int
        Number of points to generate.
    radius : float
        Radius of the circle.
    noise_std : float
        Standard deviation of Gaussian noise added to each coordinate.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    X : (n_points, 2) array
        Coordinates of noisy points on the circle.
    """
    np.random.seed(random_seed)

    # Generate angles evenly spaced around the circle
    angles = np.linspace(0, 2 * np.pi, n_points, endpoint=False)

    # Circle coordinates
    x = radius * np.cos(angles)
    y = radius * np.sin(angles)

    # Add Gaussian noise
    x += np.random.normal(scale=noise_std, size=n_points)
    y += np.random.normal(scale=noise_std, size=n_points)

    X = np.column_stack([x, y])
    return X


We now generate the circle.

In [None]:
X_circle = generate_noisy_circle(n_points=100, radius=1.0, noise_std=0.05, random_seed=42)

This is what the points look like for the 2d noisy circle


In [None]:
print (X_circle[:10])

Indices must be updated for each dataset.

In [None]:
knn_indices_circle, knn_dists_circle, knn_search_index_circle = nearest_neighbors(X_circle,10,metric='euclidean',metric_kwds={},angular=False,random_state=42)


# Noisy Torus
We are now going to inspect a third test data set, that of a noisy torus

In [None]:
def generate_noisy_torus(n_points=500, R=2.0, r=0.5, noise_std=0.05, random_seed=42):
    """
    Generate points roughly on a torus with Gaussian noise.

    Parameters
    ----------
    n_points : int
        Total number of points to generate.
    R : float
        Major radius (distance from center of tube to center of torus).
    r : float
        Minor radius (radius of the tube).
    noise_std : float
        Standard deviation of Gaussian noise added to each coordinate.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    X : (n_points, 3) array
        Coordinates of noisy points on the torus.
    """
    np.random.seed(random_seed)

    # Random angles for torus param
    theta = np.random.uniform(0, 2*np.pi, n_points)  # around major circle
    phi = np.random.uniform(0, 2*np.pi, n_points)    # around minor circle

    # Parametric equations
    x = (R + r * np.cos(phi)) * np.cos(theta)
    y = (R + r * np.cos(phi)) * np.sin(theta)
    z = r * np.sin(phi)

    # Add Gaussian noise
    x += np.random.normal(scale=noise_std, size=n_points)
    y += np.random.normal(scale=noise_std, size=n_points)
    z += np.random.normal(scale=noise_std, size=n_points)

    X = np.column_stack([x, y, z])
    return X

Next we generate the torus itself.

In [None]:
X_torus = generate_noisy_torus(n_points=500, R=2.0, r=0.5, noise_std=0.05)

Indices must be updated.

In [None]:
knn_indices_torus, knn_dists_torus, knn_search_index_torus = nearest_neighbors(X_torus,10,metric='euclidean',metric_kwds={},angular=False,random_state=42)

Below we can inspect the coordinates of points of the torus.

In [None]:
print(X_torus[:10])

## Finding triangles

> This section outlines the triangle finding procedure

We will use the knn data to find which nodes lie within each other's neighborhoods. For example, if a is b's neighbor, but b is not a's neighbor, then there will be no triangle found. However if three nodes appear in eachothers nearest neighbors, then we have a candidate triangle.

Regardless, we precompute the distance matrix for each node, and with that we find the standard deviation of each distance matrix. We use this statistical info to find edges that are unusually close together. We also normalize the weight of each edge using the standard deviation. Ultimately we get three new edges if we find a triangle, and the edge ab will eventually connect the anchor node in the center of the traingle to node c.


Edge weights in UMAP are always between 0 and 1 so this function makes sure our weights are approptiate.

In [None]:
def weight_fn(d, scale=5.0):
    return 1 / (1 + np.exp(scale * d))

In [None]:
def find_triangles_from_local(X, knn_indices, min_edge = 0.5, tension = 3):
        """
        Given data X and a knn_indices array, find triangles (a,b,c).

        Parameters
        ----------
        X : (n_samples, n_features) array
            Original data.
        knn_indices : (n_samples, k) array
            k-nearest neighbors for each sample.
        Returns
        -------
        triangles : list of tuples
            Each tuple is (a, b, c, w_bc, w_ac, w_ab)
            where w_* are the weighted edge strengths obtained from z-score distances.
        """
        n_samples, k = knn_indices.shape
        triangles = []
        seen = set()  # track unique (a,b,c)
        local_means = np.zeros(n_samples)
        local_stds = np.zeros(n_samples)
        for p in range(n_samples):
          neighbors = knn_indices[p]
          neighbor_coords = X[neighbors]
          DM = distance_matrix(neighbor_coords, neighbor_coords)
          local_means[p] = np.mean(DM)
          local_stds[p] = np.std(DM)

        for a in range(n_samples):
            neighbors = knn_indices[a]
            neighbor_coords = X[neighbors]

            # Now check all neighbor pairs (b, c)
            for i in range(k):
                for j in range(i + 1, k):
                    b = neighbors[i]
                    c = neighbors[j]
                    b_neighbors = knn_indices[b]
                    c_neighbors = knn_indices[c]
                    if (a in b_neighbors) and (c in b_neighbors) and (b in c_neighbors) and (a in c_neighbors):
                    # distance between b and c in neighbor space
                      d_bc = np.linalg.norm(X[c] - X[b])
                      d_ab = np.linalg.norm(X[b] - X[a])
                      d_ac = np.linalg.norm(X[c] - X[a])

                      # Normalize by mean_val too
                      d_ab = (d_ab - local_means[c]) / local_stds[c] #z-score
                      d_ac = (d_ac - local_means[b]) / local_stds[b] #z-score
                      d_bc = (d_bc - local_means[a]) / local_stds[a] #z-score

                      # Store triangle, edges already weighted
                      if a < b < c:
                        key = tuple(sorted((a, b, c)))
                        if key not in seen:
                            seen.add(key)
                            w_bc = weight_fn(d_bc)
                            w_ac = weight_fn(d_ac)
                            w_ab = weight_fn(d_ab)
                            #remove low quality triangles
                            if min(w_ab, w_ac, w_bc) >= min_edge:
                              triangles.append((a, b, c, tension*w_bc, tension*w_ac, tension*w_ab))
        return triangles

After running the find_triangles_from_local we have a list of triangles found from the data. Here we inspect them to see what kind of data we have.

In [None]:
triangles = find_triangles_from_local(X_grid,knn_indices_grid)
i = 0
for tri in triangles:
  if i == 3:
    continue
  a, b, c, w_bc, w_ac, w_ab = tri
  print(f"Triangle: ({a}, {b}, {c}) | w_bc={w_bc:.3f}, w_ac={w_ac:.3f}, w_ab={w_ab:.3f}")
  i += 1

In [None]:
i = 0
for tri in triangles:
  if i == 3:
    continue
  a, b, c, w_bc, w_ac, w_ab = tri
  print ({a},{b},{c})
  i += 1

Here we find the triangles in the circle data.

In [None]:
circle_tri = find_triangles_from_local(X_circle,knn_indices_circle)
i = 0
for tri in circle_tri:
    if i == 3:
      continue
    a, b, c, w_bc, w_ac, w_ab = tri
    print(f"Triangle: ({a}, {b}, {c}) | w_bc={w_bc:.3f}, w_ac={w_ac:.3f}, w_ab={w_ab:.3f}")
    print(f"Coords: a = ({X_circle[a][0]:.3f}, {X_circle[a][1]:.3f},), b = ({X_circle[b][0]:.3f}, {X_circle[b][1]:.3f}), c = ({X_circle[c][0]:.3f}, {X_circle[c][1]:.3f})")
    i += 1

Here we find triangles in the torus data.

In [None]:
tor_tri = find_triangles_from_local(X_torus,knn_indices_torus)
i = 0
for tri in tor_tri:
    if i == 3:
      continue
    a, b, c, w_bc, w_ac, w_ab = tri
    print(f"Triangle: ({a}, {b}, {c}) | w_bc={w_bc:.3f}, w_ac={w_ac:.3f}, w_ab={w_ab:.3f}")
    print(f"Coords: a = ({X_torus[a][0]:.3f}, {X_torus[a][1]:.3f}, {X_torus[a][2]:.3f}), "
      f"b = ({X_torus[b][0]:.3f}, {X_torus[b][1]:.3f}, {X_torus[b][2]:.3f}), "
      f"c = ({X_torus[c][0]:.3f}, {X_torus[c][1]:.3f}, {X_torus[c][2]:.3f})")
    i += 1

## Pictures of triangles.

Draw the mesh of triangles, filled at 0.1 or 0.2 opacity, on the raw data in Euclidean space.  

In [None]:
fig, ax = plt.subplots(figsize=(6,6))
ax.scatter(X_grid[:,0], X_grid[:,1], s=10, color='black', alpha=0.9)

for (a,b,c,_,_,_) in triangles:
    pts = X_grid[[a,b,c]]
    ax.fill(pts[:,0], pts[:,1], color='tab:blue', alpha=0.1) #, edgecolor='none')

ax.set_aspect('equal')
plt.show()

# Why is there asymmetry?
Asymmetry arises from the nearest neighbors algorithm from UMAP. We can see the triangles evolve as we change the number of neighbors. When we have a regular grid there may be many points that are an equal distance away, therefore the nearest neighbors algorithm has to choose what point to add to the list when there is only one option left but many candidate points.

In [None]:
fig, ax = plt.subplots(figsize=(6,6))
ax.scatter(X_circle[:,0], X_circle[:,1], s=10, color='black', alpha=0.6)

for (a,b,c,_,_,_) in circle_tri:
    pts = X_circle[[a,b,c]]
    ax.fill(pts[:,0], pts[:,1], color='tab:blue', alpha=0.2, edgecolor='none')

ax.set_aspect('equal')
ax.set_title('Noisy Circle with Triangles')
ax.set_xlabel('X')
ax.set_ylabel('Y')
plt.show()

In [None]:
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection='3d')

# Scatter plot of the torus data
ax.scatter(X_torus[:, 0], X_torus[:, 1], X_torus[:, 2], s=10, color='black', alpha=0.6)

# Draw the triangles
for (a, b, c, _, _, _) in tor_tri:
    pts = X_torus[[a, b, c]]
    # Create a polygon for the triangle
    triangle_polygon = Poly3DCollection([pts], alpha=0.2, facecolor='tab:blue', edgecolor='none')
    ax.add_collection3d(triangle_polygon)

ax.set_title('Torus with Triangles')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.show()

# Regular datasets
Noise makes datasets mare natural, but let's see what happens with completely regular dataset.

In [None]:
def generate_circle_points(n_points=100, radius=1.0, random_seed=42):
    """
    Generate evenly spaced points on a circle.

    Parameters
    ----------
    n_points : int
        Number of points to generate.
    radius : float
        Radius of the circle.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    X : (n_points, 2) array
        Coordinates of evenly spaced points on the circle.
    """
    np.random.seed(random_seed)

    # Generate evenly spaced angles around the circle
    angles = np.linspace(0, 2 * np.pi, n_points, endpoint=False)

    # Compute coordinates on the circle
    x = radius * np.cos(angles)
    y = radius * np.sin(angles)

    X = np.column_stack([x, y])
    return X

In [None]:
reg_circle = generate_circle_points(n_points=100, radius=1.0, random_seed=42)
reg_circle_knn_indices, reg_circle_knn_dists, reg_circle_knn_search_index = nearest_neighbors(reg_circle,9,metric='euclidean',metric_kwds={},angular=False,random_state=42)
reg_circle_tri = find_triangles_from_local(reg_circle,reg_circle_knn_indices)

In [None]:
fig, ax = plt.subplots(figsize=(6,6))
ax.scatter(reg_circle[:,0], reg_circle[:,1], s=10, color='black', alpha=0.6)

for (a, b, c, *_) in circle_tri:
    pts = reg_circle[[a, b, c]]
    ax.fill(pts[:,0], pts[:,1], color='tab:blue', alpha=0.2, edgecolor='none')

ax.set_aspect('equal')
ax.set_title('Regular Circle with Triangles')
ax.set_xlabel('X')
ax.set_ylabel('Y')
plt.show()


Use Torus from eariler.

In [None]:
def generate_torus_points(n_major=30, n_minor=30, R=2.0, r=0.7, random_seed=42):
    np.random.seed(random_seed)
    theta = np.linspace(0, 2 * np.pi, n_major, endpoint=False)
    phi = np.linspace(0, 2 * np.pi, n_minor, endpoint=False)
    theta, phi = np.meshgrid(theta, phi)
    x = (R + r * np.cos(phi)) * np.cos(theta)
    y = (R + r * np.cos(phi)) * np.sin(theta)
    z = r * np.sin(phi)
    X = np.column_stack([x.ravel(), y.ravel(), z.ravel()])
    return X

In [None]:
# Generate torus points
reg_torus = generate_torus_points(n_major=30, n_minor=30, R=2.0, r=0.7)

# Compute neighbors & triangles
reg_torus_knn_indices, reg_torus_knn_dists, reg_torus_knn_search_index = nearest_neighbors(
    reg_torus, 9, metric='euclidean', metric_kwds={}, angular=False, random_state=42
)
reg_torus_tri = find_triangles_from_local(reg_torus, reg_torus_knn_indices)

Here we verify we found some triangles.

In [None]:
print(len(reg_torus_tri))

Now we can see them!

In [None]:
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection='3d')

# Scatter plot of the torus points
ax.scatter(reg_torus[:,0], reg_torus[:,1], reg_torus[:,2], s=10, color='black', alpha=0.6)

# Draw the triangles using Poly3DCollection, light blue and semi-transparent
for (a, b, c, *_) in reg_torus_tri:
    pts = reg_torus[[a, b, c]]
    triangle_polygon = Poly3DCollection(
        [pts],
        alpha=0.2,
        facecolor='tab:blue',
        edgecolor='none'
    )
    ax.add_collection3d(triangle_polygon)

ax.set_title('Regular Torus with Triangles')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_box_aspect([1,1,1])  # Equal aspect ratio
ax.view_init(elev=25, azim=35)  # Nice viewing angle
plt.show()

# Start of UMAP pipeline with triangles

We will now compute the UMAP graph incorporating the triangles found. First we have the function that creates the anchor edges from the center node to the original triangle nodes.

In [None]:
def build_anchor_edges(triangles, n_samples):
        """Build edges connecting anchors to their triangle vertices."""
        edges = []
        for t_id, (a, b, c, w_bc, w_ac, w_ab) in enumerate(triangles):
            anchor_idx = n_samples + t_id

            # anchor to a (weight comes from edge bc)
            edges.append((anchor_idx, a, w_bc))
            edges.append((a, anchor_idx, w_bc))

            # anchor to b (weight comes from edge ac)
            edges.append((anchor_idx, b, w_ac))
            edges.append((b, anchor_idx, w_ac))

            # anchor to c (weight comes from edge ab)
            edges.append((anchor_idx, c, w_ab))
            edges.append((c, anchor_idx, w_ab))

        return edges

This function creates the anchors that go in the center of each triangle.

In [None]:
def make_anchors(triangles, X):
        """Create anchor points at triangle centroids."""
        if not triangles:
            return np.empty((0, X.shape[1]))

        anchor_coords = []
        for (a, b, c, w_ab, w_ac, w_bc) in triangles:
            total_w = w_ab + w_ac + w_bc + 1e-9
            centroid = (w_bc * X[a] + w_ac * X[b] + w_ab * X[c]) / total_w
            anchor_coords.append(centroid)
        return np.array(anchor_coords)

This function creats a new graph that incorporates our triangles into the UMAP graph.

In [None]:
def _build_augmented_graph(X, knn_indices, knn_dists, sigmas, rhos, return_dists=None):
        """Build the fuzzy simplicial set including anchor points."""
        # Compute standard membership strengths
        rows, cols, vals, dists = compute_membership_strengths(
            knn_indices, knn_dists, sigmas, rhos, return_dists
        )

        # Find triangles and create anchors
        triangles = find_triangles_from_local(X, knn_indices)

        n_samples = X.shape[0]

        if len(triangles) > 0:
            # Create anchor points
            anchors = make_anchors(triangles, X)
            n_anchors_ = anchors.shape[0]

            # Augment data with anchors
            X_aug = np.vstack([X, anchors])

            # Build anchor edges
            anchor_edges = build_anchor_edges(triangles, n_samples)

            # Add anchor edges to graph
            if anchor_edges:
                anchor_rows, anchor_cols, anchor_vals = zip(*anchor_edges)
                rows = np.concatenate([rows, np.array(anchor_rows, dtype=np.int32)])
                cols = np.concatenate([cols, np.array(anchor_cols, dtype=np.int32)])
                vals = np.concatenate([vals, np.array(anchor_vals, dtype=np.float32)])

            # Update graph size to include anchors
            graph_size = n_samples + n_anchors_
        else:
            X_aug = X
            graph_size = n_samples
            n_anchors_ = 0

        # Create sparse matrix with correct size
        graph = sp.coo_matrix(
            (vals, (rows, cols)),
            shape=(graph_size, graph_size)
        )
        graph.eliminate_zeros()

        return graph, X_aug

We will now build a 3-d grid and construct a UMAP embedding.

In [None]:
# 1D coordinate arrays
x = np.linspace(0, 1, 5)  # 5 evenly spaced points from 0 to 1
y = np.linspace(0, 1, 5)
z = np.linspace(0, 1, 5)

In [None]:
# Create a 3D grid
X, Y, Z = np.meshgrid(x, y, z)
points = np.column_stack([X.ravel(), Y.ravel(), Z.ravel()])
X = points

In [None]:
#find triangles, if len(triangles) > 0, then some were found
knn_indices, knn_dists, knn_search_index = nearest_neighbors(X,10,metric='euclidean',metric_kwds={},angular=False,random_state=42)
triangles = find_triangles_from_local(X,knn_indices)
print(len(triangles))

We need local paramters, sigma and rho. Sigma is a scaling parameter for local geomtery, and rho is the distance to the nearest neighbor.

In [None]:
sigmas, rhos = smooth_knn_dist(knn_dists, float(10))

Building the augmented graph requires our data and local parameters.

In [None]:
# Build augmented graph
graph, X_aug = _build_augmented_graph(
      X, knn_indices, knn_dists, sigmas, rhos
        )

We store the UMAP graph as a sparse matrix.

In [None]:
graph

In [None]:
X_aug

The following operatins are preformed in the creation of the fuzzy simplicial set.

In [None]:
# Apply set operations
transpose = graph.transpose()
prod_matrix = graph.multiply(transpose)

# Define set_op_mix_ratio, 1 is default for umap
set_op_mix_ratio = 1.0

graph = (set_op_mix_ratio * (graph + transpose - prod_matrix)
            + (1.0 - set_op_mix_ratio) * prod_matrix
        )
graph.eliminate_zeros()

We now have a UMAP graph.

In [None]:
graph

a and b are UMAP parameters, here we assign both to their defualt values

In [None]:
spread, min_dist = 1, 0.1
a, b = umap.umap_.find_ab_params(spread, min_dist)


We are now ready to compute the embedding!

In [None]:
rs = np.random.RandomState(42)

embedding = simplicial_set_embedding(
    X_aug,
    graph,
    n_components=2,
    initial_alpha=1.0,        # replaces learning_rate
    a=a,
    b=b,
    gamma=1,                  # replaces repulsion_strength
    negative_sample_rate=5,
    n_epochs=200,
    init='spectral',
    random_state=rs,
    metric='euclidean',
    metric_kwds={},
    densmap=False,
    densmap_kwds={},           # reintroduced for backward compatibility
    output_dens=False,
    verbose=False,
)[0]

This is the geometric embedding into 2-space.

In [None]:
embedding

# The data is now plottable!

In [None]:
plt.figure(figsize=(8, 6))
plt.scatter(embedding[:, 0], embedding[:, 1], s=10, c='black', alpha=0.7)
plt.title("2D UMAP Embedding")
plt.xlabel("UMAP Dimension 1")
plt.ylabel("UMAP Dimension 2")
plt.show()

# AnchorUMAP

Below we have the full anchorUMAP class so that we can easily compare anchor umap versus regular umap in some test cases. Could also uplopad it and import it but that seems harder to run on multiple machines.

In [None]:
class AnchorUMAP(umap.UMAP):
    def __init__(self, include_anchors_in_embedding=False, *args, **kwargs):
        """
        Extended UMAP that includes triangle anchors (2-simplices).

        Parameters
        ----------
        include_anchors_in_embedding : bool, default=True
            Whether to include anchor points in the final embedding visualization.
        """
        super().__init__(*args, **kwargs)
        self.include_anchors_in_embedding = include_anchors_in_embedding
        self.n_anchors_ = 0
        self.triangles_ = []
        self.metric_kwds = kwargs.get("metric_kwds", {}) or {}

    @staticmethod
    def find_triangles_from_local(X, knn_indices, min_edge=0.5, tension=2):
      """
      Find triangles and aggregate weights from all three basepoints.
      """

      n_samples, k = knn_indices.shape
      triangle_data = defaultdict(list)   # key → list of (w_ab, w_ac, w_bc)

      # Precompute stats
      local_means = np.zeros(n_samples)
      local_stds = np.zeros(n_samples)
      local_area_mean = np.zeros(n_samples)

      for p in range(n_samples):
          neighbors = knn_indices[p]
          DM = distance_matrix(X[neighbors], X[neighbors])

          local_means[p] = np.mean(DM)
          local_stds[p] = np.std(DM)

          # baseline triangle area in neighborhood
          areas = []
          for i in range(k):
              for j in range(i+1, k):
                  a_idx = neighbors[i]
                  b_idx = neighbors[j]
                  areas.append(AnchorUMAP.triangle_area(X, p, a_idx, b_idx))

          local_area_mean[p] = np.mean(areas) if areas else 1.0


      # Search for triangles
      for a in range(n_samples):
          neighbors = knn_indices[a]

          for i in range(k):
              for j in range(i+1, k):

                  b = neighbors[i]
                  c = neighbors[j]

                  # mutual neighbor condition
                  if (a in knn_indices[b] and c in knn_indices[b] and
                      b in knn_indices[c] and a in knn_indices[c]):

                      # true distances
                      dab = np.linalg.norm(X[b] - X[a])
                      dac = np.linalg.norm(X[c] - X[a])
                      dbc = np.linalg.norm(X[c] - X[b])

                      # z-score these
                      mu = (local_means[a] + local_means[b] + local_means[c]) / 3
                      sigma = (local_stds[a] + local_stds[b] + local_stds[c]) / 3 + 1e-8

                      dab = (dab - mu) / sigma
                      dac = (dac - mu) / sigma
                      dbc = (dbc - mu) / sigma

                      # triangle area
                      area = AnchorUMAP.triangle_area(X, a, b, c)
                      S = area / (local_area_mean[a] + 1e-8)
                      if not (0.05 <= S <= 5.0):
                        continue

                      # compute weights
                      w_ab = AnchorUMAP.weight_fn(dab)
                      w_ac = AnchorUMAP.weight_fn(dac)
                      w_bc = AnchorUMAP.weight_fn(dbc)

                      # canonical ordering of triangle
                      key = tuple(sorted((a, b, c)))

                      # store contribution (DO NOT FILTER YET)
                      triangle_data[key].append((w_ab, w_ac, w_bc))


      # Aggregate weights per triangle
      triangles = []

      for (a, b, c), weights in triangle_data.items():
          w_ab = np.mean([w[0] for w in weights])
          w_ac = np.mean([w[1] for w in weights])
          w_bc = np.mean([w[2] for w in weights])


          # simple quality threshold
          if np.mean([w_ab, w_ac, w_bc]) > min_edge:
              triangles.append((a, b, c,
                                tension * w_bc,
                                tension * w_ac,
                                tension * w_ab))

      return triangles

    @staticmethod
    def weight_fn(d, scale=5.0):
      """Simple logistic weight on z-scored distances."""
      return 1 / (1 + np.exp(scale * d))

    @staticmethod
    def triangle_area(X, a, b, c):
        """
        Computes area of triangle (a,b,c) for points in R^n.
        Works for any dimension.
        """
        u = X[b] - X[a]
        v = X[c] - X[a]
        uu = np.dot(u, u)
        vv = np.dot(v, v)
        uv = np.dot(u, v)
        area_sq = uu * vv - uv * uv
        if area_sq <= 0:
            return 0.0
        return 0.5 * np.sqrt(area_sq)

    @staticmethod
    def normalize_area(area, local_mean, eps=1e-8):
        return area / (local_mean + eps)



    @staticmethod
    def make_anchors(triangles, X):
        """Create anchor points at triangle centroids."""
        if not triangles:
            return np.empty((0, X.shape[1]))

        anchor_coords = []
        for (a, b, c, w_ab, w_ac, w_bc) in triangles:
            #total_w = w_ab + w_ac + w_bc + 1e-9
            centroid = (X[a] + X[b] + X[c]) / 3
            anchor_coords.append(centroid)
        return np.array(anchor_coords)

    @staticmethod
    def build_anchor_edges(triangles, n_samples):
        """Build edges connecting anchors to their triangle vertices."""
        edges = []
        for t_id, (a, b, c, w_bc, w_ac, w_ab) in enumerate(triangles):
            anchor_idx = n_samples + t_id

            # anchor to a (weight comes from edge bc)
            edges.append((anchor_idx, a, w_bc))
            edges.append((a, anchor_idx, w_bc))

            # anchor to b (weight comes from edge ac)
            edges.append((anchor_idx, b, w_ac))
            edges.append((b, anchor_idx, w_ac))

            # anchor to c (weight comes from edge ab)
            edges.append((c, anchor_idx, w_ab))
            edges.append((anchor_idx, c, w_ab))

        return edges

    def _build_augmented_graph(self, X, knn_indices, knn_dists, sigmas, rhos, return_dists=None):
        """Build the fuzzy simplicial set including anchor points."""
        # Compute standard membership strengths
        rows, cols, vals, dists = compute_membership_strengths(
            knn_indices, knn_dists, sigmas, rhos, return_dists
        )

        # Find triangles and create anchors
        triangles = self.find_triangles_from_local(X, knn_indices)
        self.triangles_ = triangles

        n_samples = X.shape[0]

        if len(triangles) > 0:
            # Create anchor points
            anchors = self.make_anchors(triangles, X)
            self.n_anchors_ = anchors.shape[0]

            # Augment data with anchors
            X_aug = np.vstack([X, anchors])

            # Build anchor edges
            anchor_edges = self.build_anchor_edges(triangles, n_samples)

            # Add anchor edges to graph
            if anchor_edges:
                anchor_rows, anchor_cols, anchor_vals = zip(*anchor_edges)
                rows = np.concatenate([rows, np.array(anchor_rows, dtype=np.int32)])
                cols = np.concatenate([cols, np.array(anchor_cols, dtype=np.int32)])
                vals = np.concatenate([vals, np.array(anchor_vals, dtype=np.float32)])

            # Update graph size to include anchors
            graph_size = n_samples + self.n_anchors_
        else:
            X_aug = X
            graph_size = n_samples
            self.n_anchors_ = 0

        # Create sparse matrix with correct size
        graph = sp.coo_matrix(
            (vals, (rows, cols)),
            shape=(graph_size, graph_size)
        )
        graph.eliminate_zeros()

        return graph, X_aug

    def fit(self, X, y=None):
        """Fit the AnchorUMAP model."""
        X = X.astype(np.float32)

        # Validate random state
        if isinstance(self.random_state, (int, np.integer)):
            self.random_state = np.random.RandomState(self.random_state)

        # Get k-NN graph
        knn_indices, knn_dists, _ = nearest_neighbors(
            X,
            self.n_neighbors,
            self.metric,
            getattr(self, 'metric_kwds', {}),
            getattr(self, 'angular_rp_forest', False),
            self.random_state,
            verbose=self.verbose,
        )

        # Compute membership strengths parameters
        sigmas, rhos = smooth_knn_dist(
            knn_dists.astype(np.float32),
            float(self.n_neighbors),
            local_connectivity=float(self.local_connectivity),
        )

        # Build augmented graph
        graph, X_aug = self._build_augmented_graph(
            X, knn_indices, knn_dists, sigmas, rhos
        )

        # Apply set operations
        transpose = graph.transpose()
        prod_matrix = graph.multiply(transpose)
        graph = (
            self.set_op_mix_ratio * (graph + transpose - prod_matrix)
            + (1.0 - self.set_op_mix_ratio) * prod_matrix
        )
        graph.eliminate_zeros()

        # Compute embedding
        self._raw_data = X_aug  # Store augmented data

        # Get UMAP parameters
        a, b = umap.umap_.find_ab_params(self.spread, self.min_dist)

        # Compute embedding for augmented data
        embedding = simplicial_set_embedding(
            X_aug,
            graph,
            self.n_components,
            self.learning_rate,
            a, b,
            self.repulsion_strength,
            self.negative_sample_rate,
            self.n_epochs if self.n_epochs is not None else 200,
            init=self.init,
            random_state=self.random_state,
            metric=self.metric,
            metric_kwds=getattr(self, 'metric_kwds', {}),
            densmap=False,
            densmap_kwds={},
            output_dens=False,
            verbose=self.verbose,
        )[0]

        # Store embeddings
        if self.include_anchors_in_embedding or self.n_anchors_ == 0:
            self.embedding_ = embedding
        else:
            # Only return original data points in embedding
            self.embedding_ = embedding[:X.shape[0]]

        # Store anchor embeddings separately
        if self.n_anchors_ > 0:
            self.anchor_embedding_ = embedding[X.shape[0]:]
        else:
            self.anchor_embedding_ = np.empty((0, self.n_components))

        return self

    def fit_transform(self, X, y=None):
        """Fit the model and return the embedding."""
        return self.fit(X, y).embedding_

    def get_anchor_embeddings(self):
        """Get the embedding coordinates of anchor points."""
        if not hasattr(self, 'anchor_embedding_'):
            raise ValueError("Model must be fitted first")
        return self.anchor_embedding_

    def get_triangles(self):
        """Get the list of triangles used to create anchors."""
        if not hasattr(self, 'triangles_'):
            raise ValueError("Model must be fitted first")
        return self.triangles_

# Comparison to regular UMAP
We create a function that allows us to compare AnchorUMAP to regular UMAP.

In [None]:
def explore_umap_anchor_snapshot(dset, n_neighbors, show_anchors, show_triangles, noise):
    # Generate dataset
    X, color, is3d = generate_dataset(dset, noise=noise)

    # Fit Regular UMAP
    umap_model = umap.UMAP(n_neighbors=n_neighbors, random_state=42)
    umap_emb = umap_model.fit_transform(X)

    # Fit AnchorUMAP (no tau)
    anchor_model = AnchorUMAP(
        n_neighbors=n_neighbors,
        random_state=42,
        include_anchors_in_embedding=True,
        verbose=False
    )
    anchor_emb = anchor_model.fit_transform(X)
    anchors = anchor_model.get_anchor_embeddings()
    triangles = anchor_model.get_triangles()

    # Plot
    plt.close('all')
    fig = plt.figure(figsize=(18, 6))

    # -------------------------------
    # Original Data
    # -------------------------------
    ax1 = fig.add_subplot(131, projection='3d' if is3d else None)
    if is3d:
        ax1.scatter(X[:,0], X[:,1], X[:,2], c=color, cmap='cividis', s=10)
    else:
        ax1.scatter(X[:,0], X[:,1], c=color, cmap='cividis', s=10)
    ax1.set_title(f"Original Data: {dset}", fontsize=12)
    ax1.axis('off')

    # -------------------------------
    # Regular UMAP
    # -------------------------------
    ax2 = fig.add_subplot(132)
    ax2.scatter(umap_emb[:,0], umap_emb[:,1], c=color, cmap='cividis', s=10)
    ax2.set_title("Regular UMAP", fontsize=12)
    ax2.axis('equal')
    ax2.axis('off')

    # -------------------------------
    # AnchorUMAP
    # -------------------------------
    ax3 = fig.add_subplot(133)
    n_samples = X.shape[0]

    ax3.scatter(
        anchor_emb[:n_samples, 0],
        anchor_emb[:n_samples, 1],
        c=color,
        cmap='cividis',
        s=10,
        label='Data Points'
    )

    # Plot triangles
    if show_triangles:
        for (a, b, c, *_) in triangles:
            if a < n_samples and b < n_samples and c < n_samples:
                tri_pts = anchor_emb[[a, b, c], :]
                poly = Polygon(
                    tri_pts,
                    closed=True,
                    facecolor='red',
                    alpha=0.05,
                    edgecolor='none'
                )
                ax3.add_patch(poly)

    # Plot anchors
    if show_anchors and anchors.shape[0] > 0:
        ax3.scatter(
            anchors[:,0], anchors[:,1],
            c='red', s=40, alpha=0.1,
            edgecolors='k', linewidths=0.2,
            zorder=3,
            label='Anchors'
        )

    # Title (no tau)
    ax3.set_title(
        f"AnchorUMAP (n_neighbors={n_neighbors}, noise={noise:.2f})",
        fontsize=12
    )

    # Triangle count
    ax3.text(
        0.02, 0.98,
        f"Triangles: {len(triangles)}",
        transform=ax3.transAxes,
        fontsize=11,
        color='black',
        va='top',
        ha='left'
    )

    ax3.axis('equal')
    ax3.axis('off')
    ax3.legend(loc='lower right')

    plt.tight_layout()
    plt.show()

    return fig, triangles

# Datasets


Below we have a function that generates a variety of dataset we can use to analyze the way both UMAP and AnchorUMAP react.



In [None]:
def generate_dataset(name, noise=0.0):
    if name == "Circle":
        n_samples = 800
        theta = np.linspace(0, 2 * np.pi, n_samples)
        X = np.c_[np.cos(theta), np.sin(theta)] + noise * np.random.randn(n_samples, 2)
        color = theta
        return X, color, False

    elif name == "Sphere":
        n_samples = 400
        # Sample uniformly on sphere surface
        phi = np.arccos(1 - 2*np.random.rand(n_samples))  # polar angle
        theta = 2 * np.pi * np.random.rand(n_samples)     # azimuthal angle
        x = np.sin(phi) * np.cos(theta)
        y = np.sin(phi) * np.sin(theta)
        z = np.cos(phi)
        X = np.vstack([x, y, z]).T + noise*np.random.randn(n_samples, 3)
        color = phi
        is3d = True
        return X, color, is3d

    elif name == "2D Manifold":
        n_samples = 800
        side = int(np.sqrt(n_samples))
        u = np.linspace(-1,1,side)
        v = np.linspace(-1,1,side)
        uu, vv = np.meshgrid(u,v)
        x = uu.ravel()
        y = vv.ravel()
        z = (0.3*np.sin(np.pi*uu)*np.cos(np.pi*vv)).ravel()  # flatten
        X = np.vstack([x,y,z]).T + noise*np.random.randn(uu.size, 3)
        color = x
        is3d = True
        return X, color, is3d

    elif name == "Guassian":
        X, y = make_blobs(
            n_samples=5000,
            n_features=2,
            centers=5,
            cluster_std=2.0,
            random_state=42
        )
        # Add Gaussian noise
        X += np.random.normal(0, noise, X.shape)
        return X, y, False  # (data, color labels, is3d flag)

    elif name == "Interlocking Circles":
        n_samples = 800
        r = 1.0
        offset = 1.2
        theta = np.linspace(0, 2 * np.pi, n_samples // 2)
        circle1 = np.c_[r * np.cos(theta), r * np.sin(theta)]
        circle2 = np.c_[r * np.cos(theta) + offset, r * np.sin(theta)]
        X = np.vstack([circle1, circle2]) + noise * np.random.randn(n_samples, 2)
        color = np.concatenate([np.zeros(n_samples // 2), np.ones(n_samples // 2)])
        return X, color, False

    elif name == "S-Curve":
        X, color = make_s_curve(800, noise=noise)
        return X, color, True

    elif name == "Swiss Roll":
        X, color = make_swiss_roll(800, noise=noise)
        return X, color, True

    elif name == "Torus":
        n = 30
        theta = np.linspace(0, 2 * np.pi, n)
        phi = np.linspace(0, 2 * np.pi, n)
        theta, phi = np.meshgrid(theta, phi)
        R, r = 1.0, 0.4
        X = np.stack(
            [(R + r * np.cos(phi)) * np.cos(theta),
             (R + r * np.cos(phi)) * np.sin(theta),
             r * np.sin(phi)], axis=-1
        ).reshape(-1, 3)
        X += noise * np.random.randn(*X.shape)
        color = theta.flatten()
        return X, color, True

    elif name == "mnist":
        (x, y), _ = mnist.load_data()
        X = x.reshape(len(x), -1) / 255.0
        return X[:5000], y[:5000], False

    elif name.lower() == "pbmc3k":
        print("Loading PBMC3k and applying PCA preprocessing...")

        adata = sc.datasets.pbmc3k()

        sc.pp.filter_genes(adata, min_cells=10)
        sc.pp.normalize_total(adata, target_sum=1e4)
        sc.pp.log1p(adata)
        sc.pp.highly_variable_genes(adata, n_top_genes=2000)
        adata = adata[:, adata.var["highly_variable"]]

        sc.pp.scale(adata, max_value=10)
        sc.tl.pca(adata, svd_solver="arpack", n_comps=50)

        X_pca = adata.obsm["X_pca"].astype(np.float32)

        # optional labeling
        sc.pp.neighbors(adata, n_neighbors=15, n_pcs=40)
        sc.tl.louvain(adata, resolution=0.8)
        labels = adata.obs["louvain"].astype(int)

        return X_pca, labels, False

    else:
        raise ValueError(f"Unknown dataset: {name}")

# Widget Section

In [None]:
def reset_widgets(widgets_dict):
    for w, val in widgets_dict.items():
        w.value = val

In [None]:
def update_plot(dset, n_neighbors, show_anchors, show_triangles, noise):
    fig, triangles = explore_umap_anchor_snapshot(
        dset, n_neighbors, show_anchors, show_triangles, noise
    )
    fig_container['fig'] = fig

    # Update the triangle count widget
    triangle_count_label.value = f"<b>Triangles found:</b> {len(triangles)}"

In [None]:
def on_reset_clicked(b):
    reset_widgets({
        dset_widget: "Interlocking Circles",
        n_neighbors_widget: 10,
        noise_widget: 0.0,
        show_anchors_widget: True,
        show_triangles_widget: True
    })

In [None]:
# Layout and styling
style = {'description_width': '140px'}
layout = Layout(width='400px')

# Widgets
dset_widget = Dropdown(
    options=["Circle", "Sphere", "Gaussian", "2D Manifold", "Interlocking Circles",
             "S-Curve", "Swiss Roll", "Torus"],
    value="Interlocking Circles",
    description="Dataset:",
    style=style,
    layout=layout
)

n_neighbors_widget = IntSlider(
    min=5, max=30, step=1, value=10,
    description='n_neighbors:',
    continuous_update=False,
    style=style,
    layout=layout
)

noise_widget = FloatSlider(
    min=0.0, max=0.2, step=0.01, value=0.0,
    description='Noise:',
    continuous_update=False,
    style=style,
    layout=layout
)

show_anchors_widget = Checkbox(
    value=False,
    description='Show Anchors',
    indent=False,
    style={'description_width': 'initial'}
)

show_triangles_widget = Checkbox(
    value=True,
    description='Show Triangles',
    indent=False,
    style={'description_width': 'initial'}
)

triangle_count_label = HTML("<b>Triangles found:</b> 0")

# Buttons
reset_button = Button(
    description='Reset Defaults',
    button_style='info',
    layout=Layout(width='200px')
)

# Main control panel
controls = VBox([
    HTML("<b>Model & Data Controls</b>"),
    dset_widget,
    n_neighbors_widget,
    noise_widget,
    HTML("<b>Visualization Options</b>"),
    show_anchors_widget,
    show_triangles_widget,
    triangle_count_label,
    HBox(
        [reset_button],
        layout=Layout(padding='5px 0 0 0', gap='10px')
    )
], layout=Layout(padding='5px 10px 0px 10px'))

# Figure container
fig_container = {'fig': None}

# Interactive output
out = interactive_output(update_plot, {
    'dset': dset_widget,
    'n_neighbors': n_neighbors_widget,
    'noise': noise_widget,
    'show_anchors': show_anchors_widget,
    'show_triangles': show_triangles_widget
})

# Reset button callback
reset_button.on_click(on_reset_clicked)

# Display UI
display(
    HBox(
        [controls, out],
        layout=Layout(align_items='flex-start', gap='20px')
    )
)

# Here we begin Quantitative Analysis of datasets that appear in the thesis.

I have chosen 3 datasets to focus on.


1.   Circle
2.   2d-manifold
3.   Pbmc3k

The 2-d manifold should showcase some effictivenss of the algorithm. The circle should be relatively unnaffected. And we apply anchorUMAP to a realworld dataset to compare with UMAP. When we compare anchorUMAP to UMAP we do not include the anchor points in the final embedding, because we want to compare if we have more accurately captured the geometry or topology of the original dataset.


The Wasserstein and Bottleneck distances both quantify how different two persistence diagrams are, so we want smaller numbers.

In [None]:
def quantitative_analysis(dataset_name, n_neighbors=10, noise=0.0,
                          subsample=400, maxdim=1, plot=True):
    """
    Quantitatively compare topology preservation of UMAP vs AnchorUMAP.

    Parameters
    ----------
    dataset_name : str
        One of the names handled by generate_dataset().
    n_neighbors : int, default=10
        Number of neighbors for both embeddings.
    noise : float, default=0.0
        Noise level in dataset generation.
    subsample : int, default=400
        Subsample size for persistence computations.
    maxdim : int, default=1
        Maximum homology dimension (H₁ = loops; H₂ = voids, etc.)
    plot : bool, default=True
        Whether to show persistence diagrams.

    Returns
    -------
    results : dict
        {
            "dataset": str,
            "wasserstein_umap": float,
            "bottleneck_umap": float,
            "wasserstein_anchor": float,
            "bottleneck_anchor": float
        }
    """
    # Generate data
    X, color, is3d = generate_dataset(dataset_name, noise=noise)
    print(f"Generated dataset: {dataset_name}, shape={X.shape}, is3d={is3d}")

    # Optional subsampling for speed
    if len(X) > subsample:
        idx = np.random.choice(len(X), subsample, replace=False)
        X = X[idx]
        color = np.array(color)[idx]

    # UMAP embedding
    umap_model = umap.UMAP(n_neighbors=n_neighbors, random_state=42)
    Y_umap = umap_model.fit_transform(X)

    # AnchorUMAP embedding
    anchor_model = AnchorUMAP(
        n_neighbors=n_neighbors,
        random_state=42,
        include_anchors_in_embedding=False,
        verbose=False
    )
    Y_anchor = anchor_model.fit_transform(X)

    # Compute persistence diagrams
    diag_X = ripser(X, maxdim=maxdim)['dgms']
    diag_umap = ripser(Y_umap, maxdim=maxdim)['dgms']
    diag_anchor = ripser(Y_anchor, maxdim=maxdim)['dgms']

    # Compute distances (use H₁ loop features)
    w_umap = wasserstein(diag_X[1], diag_umap[1])
    b_umap = bottleneck(diag_X[1], diag_umap[1])
    w_anchor = wasserstein(diag_X[1], diag_anchor[1])
    b_anchor = bottleneck(diag_X[1], diag_anchor[1])

    print(f"=== Quantitative Topological Fidelity: {dataset_name} ===")
    print(f"UMAP:        Wasserstein={w_umap:.4f}, Bottleneck={b_umap:.4f}")
    print(f"AnchorUMAP:  Wasserstein={w_anchor:.4f}, Bottleneck={b_anchor:.4f}")
    print(f"ΔW = {w_umap - w_anchor:+.4f}, ΔB = {b_umap - b_anchor:+.4f}")

    # Plot persistence diagrams
    if plot:
        fig, axes = plt.subplots(1, 3, figsize=(12, 4))
        plot_diagrams(diag_X, title=f"{dataset_name} – Original", ax=axes[0])
        plot_diagrams(diag_umap, title=f"UMAP\n(W={w_umap:.3f}, B={b_umap:.3f})", ax=axes[1])
        plot_diagrams(diag_anchor, title=f"AnchorUMAP\n(W={w_anchor:.3f}, B={b_anchor:.3f})", ax=axes[2])
        plt.tight_layout()
        plt.show()

    # Return results
    return {
        "dataset": dataset_name,
        "wasserstein_umap": w_umap,
        "bottleneck_umap": b_umap,
        "wasserstein_anchor": w_anchor,
        "bottleneck_anchor": b_anchor
    }

Two main topological measuers we use are bottleneck distance and Wasserstein distance. These distances are measures of persistance diagrams. In this case we use Ripser, which creates rips complexes and lets the scale of r approach infinity. Techincally Ripser never computes the whole complex due to efficiency concers, but performs a sufficient caculation to compute presistant cohomology. These measures are connected as if Wasserstein is L-p, then bottleneck is L-inf.

Trustworthiness measuers local geometric preservation by observing knn

In [None]:
#want close to 1
def trustworthiness_analysis(dataset_name, n_neighbors=10, noise=0.0,
                             k_local=10, subsample=2000, plot=True):
    """
    Quantitatively compare local neighborhood preservation (trustworthiness)
    between UMAP and AnchorUMAP embeddings.

    Parameters
    ----------
    dataset_name : str
        Name of dataset handled by generate_dataset().
    n_neighbors : int, default=10
        Number of neighbors used in both embeddings.
    noise : float, default=0.0
        Noise level in dataset generation.
    k_local : int, default=10
        Neighborhood size for trustworthiness computation.
    subsample : int, default=2000
        Subsample size for large datasets.
    plot : bool, default=True
        If True, plot the embeddings and report trustworthiness values.

    Returns
    -------
    results : dict
        Dictionary with trustworthiness scores for UMAP and AnchorUMAP.
    """

    # Generate dataset
    X, color, is3d = generate_dataset(dataset_name, noise=noise)
    if len(X) > subsample:
        idx = np.random.choice(len(X), subsample, replace=False)
        X = X[idx]
        color = np.array(color)[idx]

    print(f"Dataset: {dataset_name} | Shape={X.shape}")

    # Fit UMAP
    umap_model = umap.UMAP(n_neighbors=n_neighbors, random_state=42)
    Y_umap = umap_model.fit_transform(X)

    # Fit AnchorUMAP
    anchor_model = AnchorUMAP(
        n_neighbors=n_neighbors,
        random_state=42,
        include_anchors_in_embedding=False,
        verbose=False
    )
    Y_anchor = anchor_model.fit_transform(X)

    # Compute trustworthiness
    trust_umap = trustworthiness(X, Y_umap, n_neighbors=k_local)
    # Slice Y_anchor to only include the original data points, not the anchors
    trust_anchor = trustworthiness(X, Y_anchor[:X.shape[0]], n_neighbors=k_local)

    print(f"=== Local Neighborhood Preservation (k={k_local}) ===")
    print(f"UMAP:        Trustworthiness = {trust_umap:.4f}")
    print(f"AnchorUMAP:  Trustworthiness = {trust_anchor:.4f}")
    print(f"Δ = {trust_anchor - trust_umap:+.4f}")

    # Plot embeddings
    if plot:
        plt.figure(figsize=(10, 4))
        plt.subplot(1, 2, 1)
        plt.scatter(Y_umap[:, 0], Y_umap[:, 1], c=color, cmap='cividis', s=10)
        plt.title(f"UMAP\nTrust = {trust_umap:.3f}")
        plt.axis('off')

        plt.subplot(1, 2, 2)
        # Plot only the original data points from Y_anchor
        plt.scatter(Y_anchor[:X.shape[0], 0], Y_anchor[:X.shape[0], 1], c=color, cmap='cividis', s=10)
        plt.title(f"AnchorUMAP\nTrust = {trust_anchor:.3f}")
        plt.axis('off')

        plt.tight_layout()
        plt.show()

    # Return results
    return {
        "dataset": dataset_name,
        "trust_umap": trust_umap,
        "trust_anchor": trust_anchor,
        "delta_trust": trust_anchor - trust_umap
    }

Spearman's rho measures global structural presrvstion using pairwise distances.

In [None]:
#want close to 1
def spearman_rho_analysis(dataset_name, n_neighbors=10, noise=0.0,
                          subsample=4000, plot=True):
    """
    Quantitatively compare global distance preservation (Spearman's rho)
    between UMAP and AnchorUMAP embeddings.

    Parameters
    ----------
    dataset_name : str
        Name of dataset handled by generate_dataset().
    n_neighbors : int, default=10
        Number of neighbors used in both embeddings.
    noise : float, default=0.0
        Noise level in dataset generation.
    subsample : int, default=2000
        Subsample size for large datasets.
    plot : bool, default=True
        If True, plot embeddings and print rho values.

    Returns
    -------
    results : dict
        Dictionary with Spearman rho correlations for UMAP and AnchorUMAP.
    """

    # Generate dataset
    X, color, is3d = generate_dataset(dataset_name, noise=noise)
    if len(X) > subsample:
        idx = np.random.choice(len(X), subsample, replace=False)
        X = X[idx]
        color = np.array(color)[idx]

    print(f"Dataset: {dataset_name} | Shape={X.shape}")

    # Fit UMAP
    umap_model = umap.UMAP(n_neighbors=n_neighbors, random_state=42)
    Y_umap = umap_model.fit_transform(X)

    # Fit AnchorUMAP
    anchor_model = AnchorUMAP(
        n_neighbors=n_neighbors,
        random_state=42,
        include_anchors_in_embedding=False,
        verbose=False
    )
    Y_anchor = anchor_model.fit_transform(X)

    # Compute pairwise distance matrices
    D_X = squareform(pdist(X))
    D_umap = squareform(pdist(Y_umap))
    # Correct: Use only the original data points from Y_anchor for distance computation
    D_anchor = squareform(pdist(Y_anchor[:X.shape[0]]))

    # Flatten and compute Spearman correlation
    rho_umap, _ = spearmanr(D_X.ravel(), D_umap.ravel())
    rho_anchor, _ = spearmanr(D_X.ravel(), D_anchor.ravel())

    print("=== Global Structure Preservation (Spearman's ρ) ===")
    print(f"UMAP:        ρ = {rho_umap:.4f}")
    print(f"AnchorUMAP:  ρ = {rho_anchor:.4f}")
    print(f"Δρ = {rho_anchor - rho_umap:+.4f}")

    # Visualization
    if plot:
        plt.figure(figsize=(10, 4))
        plt.subplot(1, 2, 1)
        plt.scatter(Y_umap[:, 0], Y_umap[:, 1], c=color, cmap='cividis', s=10)
        plt.title(f"UMAP\nρ = {rho_umap:.3f}")
        plt.axis('off')

        plt.subplot(1, 2, 2)
        # Correct: Plot only the original data points from Y_anchor
        plt.scatter(Y_anchor[:X.shape[0], 0], Y_anchor[:X.shape[0], 1], c=color, cmap='cividis', s=10)
        plt.title(f"AnchorUMAP\nρ = {rho_anchor:.3f}")
        plt.axis('off')

        plt.tight_layout()
        plt.show()

    return {
        "dataset": dataset_name,
        "spearman_umap": rho_umap,
        "spearman_anchor": rho_anchor,
        "delta_rho": rho_anchor - rho_umap
    }

# Quant analysis of the Circle




In [None]:
quantitative_analysis("Circle", n_neighbors=10)

In [None]:
trustworthiness_analysis("Circle", n_neighbors=10)

In [None]:
spearman_rho_analysis("Circle", n_neighbors=10)

# Quant Annalysis of Swiss Roll

In [None]:
quantitative_analysis("Swiss Roll", n_neighbors=10)

In [None]:
trustworthiness_analysis("Swiss Roll", n_neighbors=10)

In [None]:
spearman_rho_analysis("Swiss Roll", n_neighbors=10)

# Quant Analysis of 2d-Manifold

In [None]:
quantitative_analysis("2D Manifold", n_neighbors=10)

In [None]:
spearman_rho_analysis("2D Manifold", n_neighbors=10)

In [None]:
trustworthiness_analysis("2D Manifold", n_neighbors=10)

 # Quant analysis of Mnist


In [None]:
quantitative_analysis("mnist", n_neighbors=10)

In [None]:
trustworthiness_analysis("mnist", n_neighbors=10)