In [1]:
from pathlib import Path
import pickle
import typing as T

import open3d as o3
import numpy as np
from sklearn.neighbors import KDTree
from typing import *

from pointcloud.config import DATA_PATH
from pointcloud.utils.io import read_ply_file
from pointcloud.processors.sensat.plot import draw_pointcloud, draw_segmented_pointcloud

LABELS = {
    0: "Ground",
    1: "High Vegetation",
    2: "Buildings",
    3: "Walls",
    4: "Bridge",
    5: "Parking",
    6: "Rail",
    7: "traffic Roads",
    8: "Street Furniture",
    9: "Cars",
    10: "Footpath",
    11: "Bikes",
    12: "Water",
}

pointcloud_path = Path(
    DATA_PATH / "sensat_urban/grid_0.2/birmingham_block_11_sample.ply"
)

assert pointcloud_path.exists(), "PointCloud file not found."

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
points, colors, labels = read_ply_file(pointcloud_path)

In [None]:
LABEL_COLORS = [
    [85, 107, 47],  # ground -> OliveDrab
    [0, 255, 0],  # tree -> Green
    [255, 165, 0],  # building -> orange
    [41, 49, 101],  # Walls ->  darkblue
    [0, 0, 0],  # Bridge -> black
    [0, 0, 255],  # parking -> blue
    [255, 0, 255],  # rail -> Magenta
    [200, 200, 200],  # traffic Roads ->  grey
    [89, 47, 95],  # Street Furniture  ->  DimGray
    [255, 0, 0],  # cars -> red
    [255, 255, 0],  # Footpath  ->  deeppink
    [0, 255, 255],  # bikes -> cyan
    [0, 191, 255],  # water ->  skyblue
]

In [17]:
np.set_printoptions(formatter={"float_kind": "{:.2f}".format})

array([ 0,  1,  2,  3,  5,  7,  8,  9, 10], dtype=uint8)

In [37]:
# Consider this implementation going forward
# https://medium.com/gumgum-tech/handling-class-imbalance-by-introducing-sample-weighting-in-the-loss-function-3bdebd8203b4
def get_class_weights(data_path: Path, num_classes: int = 13) -> np.ndarray:
    """
    Get the total count for each unique label and divide by the total number of instances.    
    """
    files = list(data_path.glob("*_sample.ply"))
    _, _, labels = read_ply_file(files[0])
    num_points = 0
    total_counts = np.zeros(num_classes)
    for file in files:
        _, _, labels = read_ply_file(file)
        num_points += labels.shape[0]
        counts = np.unique(labels, return_counts=True)
        for i in range(len(counts[0])):
            total_counts[counts[0][i]] += counts[1][i]

    print(f"Total value for counts: {total_counts}")
    
    return (1 - (total_counts / num_points))

class_weights = get_class_weights(DATA_PATH / "sensat_urban/grid_0.2")


def get_sample_weights(num_classes: int, samples_per_cls, exp: float = 1.0):
    weights_for_samples = 1.0 / np.array(np.power(samples_per_cls, exp))
    weights_for_samples = weights_for_samples / np.sum(weights_for_samples) * num_classes
    
    return weights_for_samples

Total value for counts: [75944442.00 42631398.00 69673505.00 2014879.00 220709.00 3762223.00
 42245.00 10003283.00 2190137.00 2948521.00 3309520.00 13792.00 609498.00]


In [38]:
class_weights

array([0.64, 0.80, 0.67, 0.99, 1.00, 0.98, 1.00, 0.95, 0.99, 0.99, 0.98,
       1.00, 1.00])

In [None]:
# Path to pointcloud file that has not been sub-sampled
pointcloud_path = Path(
    "/home/macdonaldezra/Desktop/code/grid_0.200/birmingham_block_11.ply"
)
assert pointcloud_path.exists(), "PointCloud file not found."

In [None]:
points, colors, labels = read_ply_file(pointcloud_path)

In [None]:
draw_pointcloud(points, colors)

In [None]:
_ = draw_segmented_pointcloud(points, labels)

In [None]:
def shuffle_indices(arr: np.ndarray) -> np.ndarray:
    """
    Randomly shuffle an input array's indices and returned array with shuffled index.
    """
    indices = np.arange(len(arr))
    np.random.shuffle(indices)

    return arr[indices]


def compute_distances(arr: np.ndarray, point: float) -> np.ndarray:
    """
    Return the absolute normalized distance between a set of points and a given point.
    """
    distances = np.sum((np.square(arr - point)).astype(np.float32), axis=-1)
    return np.square((1 - distances) / np.max(distances))


def resize_pointcloud_inputs(
    points: np.ndarray,
    colors: np.ndarray,
    labels: np.ndarray,
    indices: np.ndarray,
    size: int,
) -> List[np.ndarray]:
    """
    Resize PointCloud input array to match the size parameter.

    This functionality was taken from the original Sensat implementation and has been created to better understand,
    how their research team is modifying input data
    """
    num_inputs = len(points)
    duplicate = np.random.choice(num_inputs, size - num_inputs)
    point_duplicates = points[duplicate, ...]
    points_resized = np.concatenate([points, point_duplicates], 0)

    color_duplicates = colors[duplicate, ...]
    colors_resized = np.concatenate([colors, color_duplicates], 0)

    duplicate_indices = list(range(size)) + list(duplicate)
    resize_index = indices[duplicate_indices]
    label_duplicates = labels[duplicate, ...]
    labels_resized = np.concatenate([labels, label_duplicates], 0)

    return [points_resized, colors_resized, labels_resized, resize_index]

In [None]:
MODEL_INPUT_SIZE = 40960
# How Sensat repos handle generating a training batch for a model

possibility = np.random.rand(tree.data.shape[0]) * 1e-3
min_possibility = float(np.min(possibility))

# Choose the point with the minimum of possibility as query points
point_index = np.argmin(possibility)
# Get points from the tree structure
points = np.array(tree.data, copy=False)
center_point = points[point_index, :].reshape(1, -1)

# Add noise to the center point
noise = np.random.normal(scale=3.5 / 10, size=center_point.shape)
chosen_point = center_point + noise.astype(center_point.dtype)

# If PointCloud is smaller than model input size, then
if len(points) < MODEL_INPUT_SIZE:  # Chosen number of points
    queried_index = tree.query(chosen_point, k=len(points))[1][0]
else:
    queried_index = tree.query(chosen_point, k=MODEL_INPUT_SIZE)[1][0]

queried_index = shuffle_indices(queried_index)
queried_points = points[queried_index]
queried_points -= chosen_point
queried_colors = colors[queried_index]
queried_labels = labels[queried_index]

# possibility[queried_index] += compute_distances(queried_points, chosen_point)
# min_possibility = float(np.min(possibility))

if len(points) < MODEL_INPUT_SIZE:
    (
        queried_points,
        queried_colors,
        queried_labels,
        queried_index,
    ) = resize_pointcloud_inputs(
        queried_points, queried_colors, queried_labels, queried_index, MODEL_INPUT_SIZE
    )

In [None]:
draw_pointcloud(queried_points, queried_colors)

In [None]:
_ = draw_segmented_pointcloud(queried_points, queried_labels)

In [None]:
np.min(possibility)

In [None]:
np.max(possibility)

In [None]:
min_possibility