In [2]:
import torch
import torch.nn as nn

In [3]:


class PerceptronTo1D(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(PerceptronTo1D, self).__init__()
        self.fc1 = nn.Linear(2*input_size, hidden_size)  
        self.relu = nn.ReLU()                     
        self.fc2 = nn.Linear(hidden_size, 1) 
    def forward(self, x, y):
        flow = torch.concatenate([x,y], dim=-1)
        flow = self.fc1(flow)              
        flow = self.relu(flow)           
        flow = self.fc2(flow)     
        return flow


In [4]:
import numpy as np
from geosink.sinkhorn import GeoSinkhorn
from geosink.heat_kernel import laplacian_from_data

# Generate data and build graph.
data0 = np.random.normal(0, 1, (100, 5))
data1 = np.random.normal(5, 1, (100, 5))
data = np.concatenate([data0, data1], axis=0)
lap = laplacian_from_data(data, sigma=1.0)

# instantiate the GeoSinkhorn class
geo_sinkhorn = GeoSinkhorn(tau=5.0, order=10, method="cheb", lap=lap)

# create two signals
m_0 = np.zeros(200,)
m_0[:100] = 1
m_0 = m_0 / np.sum(m_0)
m_1 = np.zeros(200,)
m_1[100:] = 1
m_1 = m_1 / np.sum(m_1)

# compute the distance between the two signals
dist_w = geo_sinkhorn(m_0, m_1, max_iter=500)
print(dist_w)

2.7631021115928576


In [5]:
import torchvision

# Download MNIST training
trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True)
testset = torchvision.datasets.MNIST(root='./data', train=False, download=True)

#  Convert training data to NumPy arrays
train_images = trainset.data.numpy()   # Shape: (60000, 28, 28)
train_labels = trainset.targets.numpy()  # Shape: (60000,)
test_images = testset.data.numpy()    # Shape: (10000, 28, 28)
test_labels = testset.targets.numpy()  # Shape: (10000,)

# flatten images into normalized vectors
X_train = train_images.reshape(train_images.shape[0], -1) # Shape: (60000, 784)
X_test = test_images.reshape(test_images.shape[0], -1)  # Shape: (10000, 784)
Y_train = train_labels
Y_test = test_labels
# shapes print
print(f"Train images shape: {X_train.shape}, Train labels shape: {Y_train.shape}")
print(f"Test images shape: {X_test.shape}, Test labels shape: {Y_test.shape}")

def get_image(flatten_img):
    return flatten_img.reshape(28, 28)

Train images shape: (60000, 784), Train labels shape: (60000,)
Test images shape: (10000, 784), Test labels shape: (10000,)


In [6]:
import networkx as nx
from networkx.algorithms.shortest_paths.weighted import _weight_function, _bellman_ford, _dijkstra
from scipy.spatial.distance import cdist
import numpy as np
from sklearn.neighbors import NearestNeighbors
import scipy

def johnson(G, weight="weight"):
    r"""Uses Johnson's Algorithm to compute shortest paths.

    Johnson's Algorithm finds a shortest path between each pair of
    nodes in a weighted graph even if negative weights are present.

    Parameters
    ----------
    G : NetworkX graph

    weight : string or function
        If this is a string, then edge weights will be accessed via the
        edge attribute with this key (that is, the weight of the edge
        joining `u` to `v` will be ``G.edges[u, v][weight]``). If no
        such edge attribute exists, the weight of the edge is assumed to
        be one.

        If this is a function, the weight of an edge is the value
        returned by the function. The function must accept exactly three
        positional arguments: the two endpoints of an edge and the
        dictionary of edge attributes for that edge. The function must
        return a number.

    Returns
    -------
    distance : dictionary
        Dictionary, keyed by source and target, of shortest paths.
    """
    dist = {v: 0 for v in G}
    pred = {v: [] for v in G}
    weight = _weight_function(G, weight)

    # Calculate distance of shortest paths
    dist_bellman = _bellman_ford(G, list(G), weight, pred=pred, dist=dist)

    # Update the weight function to take into account the Bellman--Ford
    # relaxation distances.
    def new_weight(u, v, d):
        return weight(u, v, d) + dist_bellman[u] - dist_bellman[v]

    def dist_path(v):
        paths = {v: [v]}
        return _dijkstra(G, v, new_weight, paths=paths)

    return {v: dist_path(v) for v in G}

def geo_dist(G, weight="weight", dtype= None):
    
    n = G.number_of_nodes()
    if dtype is None:
        dist_matrix = np.zeros((n, n))
    else:
        dist_matrix = np.zeros((n, n), dtype=dtype)
    
    dist = johnson(G, weight=weight)
    for vertex,dist_set in dist.items():
        for target,distance in dist_set.items():
            dist_matrix[vertex][target] = distance
    return dist_matrix
    

def get_adjancency_matrix(data : np.ndarray, k_neighbor, **kwargs):
    """
    
    :param data: (n_samples, n_features)
    :param n_neighbors: number of neighbors to connect to each element
    :param sigma: normalizing std for gaussian "reach"
    :param alpha: 
    :param kwargs: 
    :return: 
    """
    neighborhood = NearestNeighbors(n_neighbors=k_neighbor,**kwargs).fit(data)
    # Step 2: Find the K nearest neighbors (including self)
    distances, indices = neighborhood.kneighbors(data)
    
    # Step 3: Construct the row, col, and data for the COO sparse matrix
    row_indices = np.repeat(np.arange(data.shape[0]), k_neighbor)  # Repeat each point index n_neighbors times
    col_indices = indices.flatten() 
    # Step 4: Create a COO sparse adjacency matrix
    coo_adj_matrix = scipy.sparse.coo_matrix((distances.flatten(), (row_indices, col_indices)), shape=(data.shape[0], data.shape[0]))
    
    return coo_adj_matrix

def get_dense_adj(data):
    return cdist(data, data, "euclidean")
    


geodesics = torch.from_numpy(get_geo_dist(X_train.astype(np.float16)))




    



In [7]:
EPS = 1e-6
class ExpandedGeodesicDist(nn.Module):
    def __init__(self, data, manifold_speed = 10,k_neighbor = 5, nb_softmin=10, device=None, **kwargs):
        
        if device is None:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        adj_sparse = get_adjancency_matrix(data, k_neighbor, **kwargs)
    
        G:nx.Graph = nx.from_scipy_sparse_array(adj_sparse, create_using=nx.Graph)
        
        min_dist = adj_sparse[adj_sparse > EPS].min()
        self.min_dist = min_dist
        self.data = data.to(device)
        self.manifold_dist = geo_dist(G)
        
        index_max_geodesic= self.manifold_dist.argmax()
        eucl_dist_max_geo = torch.linalg.vector_norm(data[index_max_geodesic[0]] - data[index_max_geodesic[1]])
        manifold_dist_dilatation = self.manifold_dist[index_max_geodesic] / eucl_dist_max_geo
        self.manifold_speed = manifold_speed
        self.nb_softmin = nb_softmin
        self.manifold_dist_dilatation = manifold_dist_dilatation
        
    def get_knn(self, x):
        diff_x = self.data - x 
        distances = torch.norm(diff_x, dim=1) 
        closest_distances, indices_x = torch.topk(distances, self.nb_softmin, largest=False)
        return closest_distances, indices_x
    
    @staticmethod
    def cartesian_hadamard_product(x,y):
        return x[:,None] @ y[None,:] 
    def forward(self, x,y):
        
        euc_dist_x, idx = self.get_knn(x)
        euc_dist_y, idy = self.get_knn(y)
        meshed = torch.meshgrid(idx, idy, indexing="ij")
        manifold_dist_projected = self.manifold_dist(meshed)
        attention_coefs = 1-torch.nn.functional.softmax(-1*self.cartesian_hadamard_product(euc_dist_x, euc_dist_y) / self.min_dist**2)
        
        rescaled_dist_to_manifold = self.manifold_dist_dilatation * (euc_dist_x.min() + euc_dist_y.min())
        
        softmin_geodesic_dist_on_manifold = (attention_coefs * manifold_dist_projected).sum()
        
        return  rescaled_dist_to_manifold / self.manifold_speed + softmin_geodesic_dist_on_manifold
    
    

        
        
        
        
    
        

torch.float64