In [None]:
from src.datasets.kitti_dataset import KittiDataset

dataset_path = "dataset/"  # kitti
sequence = "00"
image_instances_path = "pipeline/vfm-labelss/sam/00/"  # images processed by the SAM algorithm in npz format
gt_labels_path = "dataset/sequences/00/labels/"  # ground true kitti cloud segmentation
kitti = KittiDataset(dataset_path, sequence, image_instances_path)

In [None]:
# setting parameter values

from src.services.preprocessing.common.config import ConfigDTO

config = ConfigDTO(
    **{
        "dataset": kitti,
        "start_index": 2024,
        "end_index": 2028,
        "start_image_index_offset": 0,
        "alpha_physical_distance": 5,
        "beta_instance_distance": 5,
        "T_normalized_cut": 0.02,
        "reduce_detail_int_to_union_threshold": 0.5,
        "reduce_detail_int_to_mask_threshold": 0.6,
        "cam_name": "cam2",
        "R": 18,
        "nb_neighbors": 25,
        "std_ratio": 5.0,
        "voxel_size": 0.25,
    }
)

In [None]:
# pcd initialisation and initial segmentation based on images

from src.services.preprocessing.init.map import InitMapProcessor
from src.services.preprocessing.init.instances_matrix import InitInstancesMatrixProcessor

init_pcd = InitMapProcessor().process(config)
points2instances = InitInstancesMatrixProcessor().process(config, init_pcd)

In [None]:
# building an array of gt instance segmentation for each sequence pcd

from src.utils.gt_utils import build_sem_inst_label_arrays

_, inst_label_array_src = build_sem_inst_label_arrays(
    gt_labels_path, config.start_index, config.end_index
)

In [None]:
# visualisation of gt instance masks on the pcd

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd = color_pcd_by_labels(copy.deepcopy(init_pcd), inst_label_array_src)
visualize_pcd(colored_pcd)

In [None]:
# visualisation of the initial pcd segmentation masks for a particular image

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd = color_pcd_by_labels(copy.deepcopy(init_pcd), points2instances[:, 0])
visualize_pcd(colored_pcd)

In [None]:
# pcd handler initialisation

from src.services.preprocessing.not_zero import SelectionNotZeroProcessor
from src.services.preprocessing.in_cube import SelectionInCubeProcessor
from src.services.preprocessing.statistical_outlier import StatisticalOutlierProcessor

processors = [
    SelectionNotZeroProcessor(),
    SelectionInCubeProcessor(),
    StatisticalOutlierProcessor(),
]

In [None]:
# pcd processing and saving the state

import copy

pcd = copy.deepcopy(init_pcd)
for processor in processors:
    pcd, points2instances, indices = processor.process(config, pcd, points2instances)
    inst_label_array_src = inst_label_array_src[indices]

pcd_for_clustering = copy.deepcopy(pcd)
points2instances_pcd_for_clustering = copy.deepcopy(points2instances)
inst_label_array_for_clustering = copy.deepcopy(inst_label_array_src)

In [None]:
# visualisation of gt instance masks on the processed pcd before voxelization

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd_for_clustering = color_pcd_by_labels(
    copy.deepcopy(pcd_for_clustering), inst_label_array_for_clustering
)
visualize_pcd(colored_pcd_for_clustering)

In [None]:
# visualisation of the initial pcd segmentation masks before voxelization

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd_for_clustering = color_pcd_by_labels(
    copy.deepcopy(pcd_for_clustering), points2instances_pcd_for_clustering[:, 0]
)
visualize_pcd(colored_pcd_for_clustering)

In [None]:
# final processing step - voxelisation of the pcd

from src.services.preprocessing.voxel_down import VoxelDownProcessor

pcd, points2instances, trace = VoxelDownProcessor().process(config, pcd, points2instances)

In [None]:
# calculation of distance matrix for voxelised pcd

import numpy as np

from scipy.spatial.distance import cdist
from src.utils.distances_utils import sam_label_distance

points = np.asarray(pcd.points)
spatial_distance = cdist(points, points)

dist, masks = sam_label_distance(
    points2instances,
    spatial_distance,
    3,
    config.beta_instance_distance,
    config.alpha_physical_distance
)

In [None]:
dist.shape

In [None]:
# distance matrix processing

from src.services.distance.isolated import RemovingIsolatedPointsProcessor
from src.services.distance.connected_component import ExtractionLargestConnectedComponentProcessor

distance_processors = [
    RemovingIsolatedPointsProcessor(),
    ExtractionLargestConnectedComponentProcessor(),
]

for processor in distance_processors:
    dist, points, trace = processor.process(dist, points, trace)

In [None]:
print(dist.shape)
print(len(points))
print(len(trace))

In [None]:
# pcd clustering using GraphCut algorithm

from src.services.normalized_cut_service import normalized_cut

eigenval =  2
clusters = normalized_cut(
    dist,
    np.array([i for i in range(len(points))], dtype=int),
    config.T_normalized_cut,
    eigenval
)

In [None]:
len(clusters)

In [None]:
clusters[0]

In [None]:
# visualisation of segmentation results. masks will be drawn on the processed pcd before voxelisation

import copy

from src.utils.pcd_utils import color_pcd_by_clusters_and_voxels
from src.utils.pcd_utils import visualize_pcd

colored_clusters_for_clustering = color_pcd_by_clusters_and_voxels(
    copy.deepcopy(pcd_for_clustering), copy.deepcopy(trace), clusters
)
visualize_pcd(colored_clusters_for_clustering)

In [None]:
# visualisation of gt instance masks on the processed pcd before voxelization

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd_for_clustering = color_pcd_by_labels(
    copy.deepcopy(pcd_for_clustering), inst_label_array_for_clustering
)
visualize_pcd(colored_pcd_for_clustering)

In [None]:
# auxiliary function for calculating metrics
# if a cluster-prediction point is in the gt label, return the mask id number in the gt label array

def find_num_in_inst_label_array(src_points, inst_label_array_for_clustering):
    for point in src_points:
        if inst_label_array_for_clustering[point] > 0:
            return inst_label_array_for_clustering[point]
    return -1

In [None]:
# building an array of predictions for comparison with gt instance label
# cell j stores the mask number for the j-th pcd point

def build_pred_inst_array(
    inst_label_array_for_clustering, clusters, trace, instance_threshold
):
    pred_inst_array = np.zeros(len(inst_label_array_for_clustering), dtype=int)
    free_id = 1
    for cluster in clusters:
        voxel_not_in_gt_cluster_count = 0
        for voxel in cluster:
            src_points = trace[voxel]
            id = find_num_in_inst_label_array(
                src_points, inst_label_array_for_clustering
            )
            if id == -1:
                voxel_not_in_gt_cluster_count += 1

        cluster_in_gt_instance = (
            (len(cluster) - voxel_not_in_gt_cluster_count) / len(cluster)
        ) * 100
        if cluster_in_gt_instance >= instance_threshold:
            for voxel in cluster:
                src_points = trace[voxel]
                for src_point in src_points:
                    pred_inst_array[src_point] = free_id
            free_id += 1
    return pred_inst_array

In [None]:
# an array of predictions
# if instance_threshold percent or more of the cluster is in the gt instance,
# consider the cluster to be selected for comparison

pred_inst_array = build_pred_inst_array(
    copy.deepcopy(inst_label_array_for_clustering),
    clusters,
    copy.deepcopy(trace),
    instance_threshold=30
)

In [None]:
# metrics calculation

from evops.metrics import precision
from evops.metrics import recall
from evops.metrics import fScore

pred_labels = pred_inst_array
gt_labels = inst_label_array_for_clustering
tp_condition = "iou"
print("precision={}".format(precision(pred_labels, gt_labels, tp_condition)))
print("recall={}".format(recall(pred_labels, gt_labels, tp_condition)))
print("fScore={}".format(fScore(pred_labels, gt_labels, tp_condition)))

In [None]:
# visualisation of prediction masks

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd_for_clustering = color_pcd_by_labels(
    copy.deepcopy(pcd_for_clustering), pred_labels
)
visualize_pcd(colored_pcd_for_clustering)

In [None]:
# visualisation of gt instance label masks

import copy

from src.utils.pcd_utils import color_pcd_by_labels
from src.utils.pcd_utils import visualize_pcd

colored_pcd_for_clustering = color_pcd_by_labels(
    copy.deepcopy(pcd_for_clustering), gt_labels
)
visualize_pcd(colored_pcd_for_clustering)