In [12]:
# import os
# os.environ["KEOPS_BACKEND"] = "CPU"



In [11]:
# import pykeops
# # pykeops.clean_pykeops()
# # pykeops.build_pykeops()

In [1]:
import numpy as np
import torch
from geomloss import SamplesLoss

def voxel_wasserstein_similarity(voxel1, voxel2):
    """
    Compute the normalized Wasserstein similarity (0 to 1) between two 3D voxel grids using GeomLoss.
    Converts the voxel grids into point clouds before computing the distance.
    
    Parameters:
    voxel1 (ndarray): The first 3D voxel grid.
    voxel2 (ndarray): The second 3D voxel grid.
    
    Returns:
    float: The normalized Wasserstein similarity (1 indicates perfect similarity).
    """
    # Convert voxel grids to point clouds (coordinates + intensities)
    def voxel_to_point_cloud(voxel):
        coords = np.argwhere(voxel > 0)  # Get coordinates of non-zero voxels
        intensities = voxel[coords[:, 0], coords[:, 1], coords[:, 2]]  # Get corresponding intensities
        return coords, intensities

    # Handle completely empty grids
    if np.all(voxel1 == 0) or np.all(voxel2 == 0):
        return 0.0 if not np.array_equal(voxel1, voxel2) else 1.0

    coords1, intensities1 = voxel_to_point_cloud(voxel1)
    coords2, intensities2 = voxel_to_point_cloud(voxel2)

    # Normalize intensities to form valid weights
    weights1 = intensities1 / intensities1.sum() if intensities1.sum() > 0 else np.ones(len(intensities1)) / len(intensities1)
    weights2 = intensities2 / intensities2.sum() if intensities2.sum() > 0 else np.ones(len(intensities2)) / len(intensities2)

    # Convert to PyTorch tensors
    coords1 = torch.tensor(coords1, dtype=torch.float32) if len(coords1) > 0 else torch.zeros((1, 3))
    coords2 = torch.tensor(coords2, dtype=torch.float32) if len(coords2) > 0 else torch.zeros((1, 3))
    weights1 = torch.tensor(weights1, dtype=torch.float32)
    weights2 = torch.tensor(weights2, dtype=torch.float32)

    # Define the Wasserstein loss function
    loss_fn = SamplesLoss("sinkhorn", p=2, blur=0.05)

    # Compute the Wasserstein distance
    wasserstein_distance = loss_fn(weights1, coords1, weights2, coords2).item()

    # Compute maximum possible Wasserstein distance for normalization
    max_distance = torch.linalg.norm(
        torch.tensor(voxel1.shape, dtype=torch.float32)
    ).item()

    # Handle edge cases where max_distance is zero
    if max_distance == 0:
        return 1.0 if wasserstein_distance == 0 else 0.0

    # Normalize the distance to get similarity
    normalized_similarity = 1 - (wasserstein_distance / max_distance)

    # Ensure the similarity is between 0 and 1
    normalized_similarity = max(0, min(1, normalized_similarity))

    return normalized_similarity


    The default C++ compiler could not be found on your system.
    You need to either define the CXX environment variable or a symlink to the g++ command.
    For example if g++-8 is the command you can do
      import os
      os.environ['CXX'] = 'g++-8'
    


In [2]:
import numpy as np
import torch
from geomloss import SamplesLoss

def voxel_wasserstein_similarity(voxel1, voxel2):
    """
    Compute the normalized Wasserstein similarity (0 to 1) between two 3D voxel grids using GeomLoss.
    Converts the voxel grids into point clouds before computing the distance.
    
    Parameters:
    voxel1 (ndarray): The first 3D voxel grid.
    voxel2 (ndarray): The second 3D voxel grid.
    
    Returns:
    float: The normalized Wasserstein similarity (1 indicates perfect similarity).
    """
    # Convert voxel grids to point clouds (coordinates + intensities)
    def voxel_to_point_cloud(voxel):
        coords = np.argwhere(voxel > 0)  # Get coordinates of non-zero voxels
        intensities = voxel[coords[:, 0], coords[:, 1], coords[:, 2]]  # Get corresponding intensities
        return coords, intensities

    # Handle completely empty grids
    if np.all(voxel1 == 0) or np.all(voxel2 == 0):
        return 0.0 if not np.array_equal(voxel1, voxel2) else 1.0

    coords1, intensities1 = voxel_to_point_cloud(voxel1)
    coords2, intensities2 = voxel_to_point_cloud(voxel2)

    # Handle disjoint or very sparse point clouds
    if len(coords1) == 0 or len(coords2) == 0:
        return 0.0

    # Normalize intensities to form valid weights
    weights1 = intensities1 / intensities1.sum() if intensities1.sum() > 0 else np.ones(len(intensities1)) / len(intensities1)
    weights2 = intensities2 / intensities2.sum() if intensities2.sum() > 0 else np.ones(len(intensities2)) / len(intensities2)

    # Convert to PyTorch tensors
    coords1 = torch.tensor(coords1, dtype=torch.float32) if len(coords1) > 0 else torch.zeros((1, 3))
    coords2 = torch.tensor(coords2, dtype=torch.float32) if len(coords2) > 0 else torch.zeros((1, 3))
    weights1 = torch.tensor(weights1, dtype=torch.float32)
    weights2 = torch.tensor(weights2, dtype=torch.float32)

    # Define the Wasserstein loss function
    loss_fn = SamplesLoss("sinkhorn", p=2, blur=0.05)

    try:
        # Compute the Wasserstein distance
        wasserstein_distance = loss_fn(weights1, coords1, weights2, coords2).item()
    except Exception as e:
        print(f"Error computing Wasserstein distance: {e}")
        return 0.0

    # Compute maximum possible Wasserstein distance for normalization
    max_distance = torch.linalg.norm(
        torch.tensor(voxel1.shape, dtype=torch.float32)
    ).item()

    # Handle edge cases where max_distance is zero
    if max_distance == 0:
        return 1.0 if wasserstein_distance == 0 else 0.0

    # Normalize the distance to get similarity
    normalized_similarity = 1 - (wasserstein_distance / max_distance)

    # Ensure the similarity is between 0 and 1
    normalized_similarity = max(0, min(1, normalized_similarity))

    return normalized_similarity


In [3]:
voxel_basic = np.load("voxel_base.npy")
voxel_different_hole = np.load("voxel_different_hole.npy")
voxel_different = np.load("voxel_different.npy")
voxel_loss_big = np.load("voxel_loss_big.npy")
voxel_loss_small = np.load("voxel_loss_small.npy")
voxel_with_blob = np.load("voxel_with_blob.npy")
voxel_negative_blob = np.load("voxel_negative_blob.npy")

In [4]:
voxel_negative_blob.shape

(50, 50, 50)

In [5]:
voxel_wasserstein_similarity(voxel_basic, voxel_negative_blob)  # 1.0

Error computing Wasserstein distance: name 'generic_logsumexp' is not defined


0.0

In [6]:
def compare_deferred(metrics):    #borrowed from the Dark side
    names = []
    elements = []
    symmetry_checks = {}

    def append(name, ab):
        names.append(name)
        elements.append(ab)
        for metric_name, metric_fn in metrics:      #symmetry check
            value_ab = metric_fn(ab[0], ab[1])
            value_ba = metric_fn(ab[1], ab[0])
            symmetry_checks[(name, metric_name)] = 1 if value_ab == value_ba else 0
        return [metric(*ab) for name, metric in metrics]

    def table():
        # Determine column widths for formatting
        longest_metric_name_len = max(len(name) for name, _ in metrics)
        longest_name_len = max(max(len(name) for name in names), 6)

        print(" " * longest_metric_name_len, "|", end=" ")
        for name in names:
            print(name + " " * (longest_name_len - len(name)), "|", end=" ")
        print("Symmetry" + " " * (longest_name_len - 8), "|")

        print("-" * (longest_metric_name_len + 2 + (len(names) + 1) * (longest_name_len + 3)))

        for metric_name, metric_fn in metrics:
            print(" " * (longest_metric_name_len - len(metric_name)) + metric_name, "|", end=" ")
            for i, (a, b) in enumerate(elements):
                value = repr(round(metric_fn(a, b), min(longest_name_len - 2, 4)))
                print(value + " " * (longest_name_len - len(value)), "|", end=" ")
            symmetry = all(symmetry_checks[(name, metric_name)] for name in names)
            print(f"{symmetry}       |")

    return append, table


metrics = [
    ("Wasserstein form geomloss", voxel_wasserstein_similarity)
]

In [7]:
voxel_pairs = [
    ("Basic", voxel_basic),
    ("Different", voxel_different),
    ("Small Loss", voxel_loss_small),
    ("Big Loss", voxel_loss_big),
    ("With Growth", voxel_with_blob),
    ("Hole", voxel_different_hole),
    ("Negative", voxel_negative_blob)
]
compare_append, compare_table = compare_deferred(metrics)

#All comparisons
for (name, v1) in voxel_pairs:
    compare_append(f"Basic to {name}", [voxel_basic, v1])

compare_table()


voxel_names, voxel_data = zip(*voxel_pairs)
# plot_voxels_side_by_side(voxel_data, voxel_names, 2, 4)

Error computing Wasserstein distance: name 'generic_logsumexp' is not defined
Error computing Wasserstein distance: name 'generic_logsumexp' is not defined
Error computing Wasserstein distance: name 'generic_logsumexp' is not defined
                          | Basic to Basic       | Basic to Different   | Basic to Small Loss  | Basic to Big Loss    | Basic to With Growth | Basic to Hole        | Basic to Negative    | Symmetry             |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Wasserstein form geomloss | 1                    | 0.4118               | 0.9979               | 0.9248               | 0.9634               | 0.3948               | Error computing Wasserstein distance: name 'generic_logsumexp' is not defined
0.0                  | True       |
