In [None]:
import h5py
import numpy as np
import torch
import open_clip
from torch import Tensor
from sklearn.cluster import DBSCAN

## Load h5

In [121]:
model, _, _ = open_clip.create_model_and_transforms(
    "ViT-B-16",  # e.g., ViT-B-16
    pretrained="laion2b_s34b_b88k",  # e.g., laion2b_s34b_b88k
    precision="fp16",
)
model.eval()
model = model.to("cuda")
tokenizer = open_clip.get_tokenizer("ViT-B-16")

negatives = ["object", "things", "stuff", "texture"]
with torch.no_grad():
    tok_phrases = torch.cat([tokenizer(phrase) for phrase in negatives]).to(
        "cuda"
    )
    neg_embeds = model.encode_text(tok_phrases)
neg_embeds /= neg_embeds.norm(dim=-1, keepdim=True)

In [122]:
tokenizer = open_clip.get_tokenizer("ViT-B-16")

In [123]:
def load_h5_file(load_config: str) -> dict:
    hdf5_file = h5py.File(load_config, "r")
    # batch_idx = 5
    points_nerfstudio = hdf5_file["points_nerfstudio"]["points_nerfstudio"][:]
    points_scannet = hdf5_file["points_scannet"]["points_scannet"][:]
    origins = hdf5_file["origins"]["origins"][:]
    directions = hdf5_file["directions"]["directions"][:]

    clip_embeddings_per_scale = []

    clips_group = hdf5_file["clip"]
    for i in range(30):
        clip_embeddings_per_scale.append(clips_group[f"scale_{i}"][:])

    rgb = hdf5_file["rgb"]["rgb"][:]
    hdf5_file.close()
    h5_dict = {
        "points_nerfstudio": points_nerfstudio,
        "points_scannet": points_scannet,
        "origins": origins,
        "directions": directions,
        "clip_embeddings_per_scale": clip_embeddings_per_scale,
        "rgb": rgb,
    }
    return h5_dict

In [124]:
def get_relevancy(
    embed: torch.Tensor,
    positive_id: int,
    pos_embeds: Tensor,
    neg_embeds: Tensor,
    positive_words_length: int,
) -> torch.Tensor:
    phrases_embeds = torch.cat([pos_embeds, neg_embeds], dim=0)
    p = phrases_embeds.to(embed.dtype)  # phrases x 512
    output = torch.mm(embed, p.T)  # rays x phrases
    positive_vals = output[..., positive_id : positive_id + 1]  # noqa E501
    negative_vals = output[..., positive_words_length:]  # rays x N_phrase
    repeated_pos = positive_vals.repeat(
        1, 4
    )  # rays x N_phrase

    sims = torch.stack((repeated_pos, negative_vals), dim=-1)  # rays x N-phrase x 2
    softmax = torch.softmax(10 * sims, dim=-1)  # rays x n-phrase x 2
    best_id = softmax[..., 0].argmin(dim=1)  # rays x 2
    return torch.gather(
        softmax,
        1,
        best_id[..., None, None].expand(
            best_id.shape[0], 4, 2
        ),
    )[:, 0, :]

In [125]:
def compute_probability_query_property(query: str, h5_dict: dict):
    positives = [query]
    with torch.no_grad():
        tok_phrases = torch.cat(
            [tokenizer(phrase) for phrase in positives]
        ).to("cuda")
        pos_embeds = model.encode_text(tok_phrases)
    pos_embeds /= pos_embeds.norm(dim=-1, keepdim=True)
    scales_list = torch.linspace(0.0, 1.5, 30)

    n_phrases = len(positives)
    prob_per_scale = []
    for index, _ in enumerate(scales_list):
        clip_output = torch.from_numpy(
            h5_dict["clip_embeddings_per_scale"][index]
        ).to("cuda")
        # TODO: ensure i = 1
        for i in range(n_phrases):
            probs = get_relevancy(
                embed=clip_output,
                positive_id=i,
                pos_embeds=pos_embeds,
                neg_embeds=neg_embeds,
                positive_words_length=1,
            )
            pos_prob = probs[..., 0:1]
            # assert torch.sum(pos_prob) == 1
            prob_per_scale.append(pos_prob)
            # print(pos_prob)

    return prob_per_scale

In [126]:
def find_cluster(h5_dict, probability_over_all_points):
    # Calculate the number of top values directly
    top_count = int(probability_over_all_points.size * 0.005)
    print(top_count)
    print(probability_over_all_points.size)
    # Find the indices of the top values
    top_indices = np.argpartition(probability_over_all_points, -top_count)[
        -top_count:
    ]
    # Fetch related data from the HDF5 dictionary
    points = h5_dict["points_scannet"]
    # origins = self.h5_dict["origins"]

    top_positions = points[top_indices]
    # top_origins = origins[top_indices]
    top_values = probability_over_all_points[top_indices].flatten()

    # Apply DBSCAN clustering
    dbscan = DBSCAN(eps=0.05, min_samples=15)  # Directly use values where possible
    clusters = dbscan.fit(top_positions)
    labels = clusters.labels_

    # Find the cluster with the point closest to its centroid that has the highest value
    best_cluster_value = -np.inf
    best_cluster_id = None
    for cluster_id in set(labels):
        if cluster_id == -1:  # Ignore noise
            continue

        members = top_positions[labels == cluster_id]
        values_for_members = top_values[labels == cluster_id]

        # Calculate the centroid of the cluster
        centroid = np.mean(members, axis=0)

        # Compute the distance of all members to the centroid
        distances_to_centroid = np.linalg.norm(members - centroid, axis=1)

        # Find the index of the member closest to the centroid
        closest_member_idx = np.argmin(distances_to_centroid)

        # If this member has a better value than the current best, update
        if values_for_members[closest_member_idx] > best_cluster_value:
            best_cluster_value = values_for_members[closest_member_idx]
            best_cluster_id = cluster_id
    # For the best cluster, compute its centroid, bounding box and other desired values
    members_of_best_cluster = top_positions[labels == best_cluster_id]
    # values_for_best_cluster = top_values[labels == best_cluster_id]
    # origins_for_best_cluster = top_origins[labels == best_cluster_id]

    # Calculate the centroid of the best cluster
    centroid_of_best = np.mean(members_of_best_cluster, axis=0)

    # Determine the bounding box
    # min_bounds = np.min(members_of_best_cluster, axis=0)
    # max_bounds = np.max(members_of_best_cluster, axis=0)

    sx = np.max(members_of_best_cluster[:, 0]) - np.min(
        members_of_best_cluster[:, 0]
    )
    sy = np.max(members_of_best_cluster[:, 1]) - np.min(
        members_of_best_cluster[:, 1]
    )
    sz = np.max(members_of_best_cluster[:, 2]) - np.min(
        members_of_best_cluster[:, 2]
    )

    return centroid_of_best, (sx, sy, sz)

In [127]:
def construct_bbox_corners(center, box_size):
    sx, sy, sz = box_size
    x_corners = [sx / 2, sx / 2, -sx / 2, -sx / 2, sx / 2, sx / 2, -sx / 2, -sx / 2]
    y_corners = [sy / 2, -sy / 2, -sy / 2, sy / 2, sy / 2, -sy / 2, -sy / 2, sy / 2]
    z_corners = [sz / 2, sz / 2, sz / 2, sz / 2, -sz / 2, -sz / 2, -sz / 2, -sz / 2]
    corners_3d = np.vstack([x_corners, y_corners, z_corners])
    corners_3d[0, :] = corners_3d[0, :] + center[0]
    corners_3d[1, :] = corners_3d[1, :] + center[1]
    corners_3d[2, :] = corners_3d[2, :] + center[2]
    corners_3d = np.transpose(corners_3d)

    return corners_3d

## Script

In [128]:
h5_dict = load_h5_file("/workspace/chat-with-nerf-eval/data/scannet/scans/scene0025_00/h5_embedding/embeddings.h5")

In [129]:
prob_per_scale = compute_probability_query_property("computer screen", h5_dict)

In [130]:
probability_per_scale_per_phrase: None | Tensor = None
scales_list = torch.linspace(0.0, 1.5, 30)
for i, scale in enumerate(scales_list):
    pos_prob = prob_per_scale[i]
    if (
        probability_per_scale_per_phrase == None
        or pos_prob.max() > probability_per_scale_per_phrase.max()  # type: ignore
    ):
        best_scale_for_phrases = scale
        probability_per_scale_per_phrase = pos_prob

possibility_array = probability_per_scale_per_phrase.detach().cpu().numpy().squeeze()  # type: ignore # noqa: E501

In [131]:
center, box_size = find_cluster(h5_dict, possibility_array)

2514
502854


In [134]:
conrners_3d = construct_bbox_corners(center, box_size)

In [136]:
import open3d as o3d
import numpy as np


In [137]:
def create_bbox(center, extents):
    sx, sy, sz = extents
    x_corners = [sx / 2, sx / 2, -sx / 2, -sx / 2, sx / 2, sx / 2, -sx / 2, -sx / 2]
    y_corners = [sy / 2, -sy / 2, -sy / 2, sy / 2, sy / 2, -sy / 2, -sy / 2, sy / 2]
    z_corners = [sz / 2, sz / 2, sz / 2, sz / 2, -sz / 2, -sz / 2, -sz / 2, -sz / 2]
    corners_3d = np.vstack([x_corners, y_corners, z_corners])
    corners_3d[0, :] = corners_3d[0, :] + center[0]
    corners_3d[1, :] = corners_3d[1, :] + center[1]
    corners_3d[2, :] = corners_3d[2, :] + center[2]
    corners_3d = np.transpose(corners_3d)

    lines = [
        [0, 1], [1, 2], [2, 3], [3, 0],
        [4, 5], [5, 6], [6, 7], [7, 4],
        [0, 4], [1, 5], [2, 6], [3, 7]
    ]
    
    colors = [[1, 0, 0] for i in range(len(lines))]  # Red color for all lines
    line_set = o3d.geometry.LineSet()
    line_set.points = o3d.utility.Vector3dVector(corners_3d)
    line_set.lines = o3d.utility.Vector2iVector(lines)
    line_set.colors = o3d.utility.Vector3dVector(colors)
    
    return line_set


def visualize_mesh_with_bboxes(mesh_path, bbox_centers, bbox_extents):
    # Load mesh
    mesh = o3d.io.read_triangle_mesh(mesh_path)

    # Create a list to store all geometries (mesh + bboxes)
    geometries = [mesh]
    
    # mesh.vertex_colors = o3d.utility.Vector3dVector([mesh_color for _ in range(len(mesh.vertices))])


    # Create bounding boxes and add them to the list
    bbox = create_bbox(bbox_centers, bbox_extents)
    geometries.append(bbox)

    # Visualize
    o3d.visualization.draw_plotly(geometries)

In [None]:
mesh_path = "/workspace/chat-with-nerf-eval/data/scannet/scans/scene0025_00/scene0025_00_vh_clean_2.ply"
bbox_centers = np.array([3.4272046, 2.0027978, 1.1025069])
bbox_extents = np.array([0.48718548, 0.6294966, 0.34797996])
visualize_mesh_with_bboxes(mesh_path, bbox_centers, bbox_extents)