In [1]:
import numpy as np
import matplotlib.pyplot as plt
import mpl_toolkits.mplot3d.art3d as art3d
from matplotlib.patches import Rectangle, Annulus
import open3d as o3d

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
from dataset import pcloader, gtloader, assign_weld_region
from plot_utils import add_staple_patch

In [3]:
scan = "./_data/scans/RH-8-231201590-Pass-2023_06_13-11-38-58-193.ply"
label = "./_data/labels/RH-8-231201590-Pass-2023_06_13-11-38-58-193.txt"
output_dir = "./_data/Augmnented/"

In [4]:
print("Data Sample:")
pc, _ = pcloader(scan)
print("--pc: ", pc.shape)

gt = gtloader(label)
gt = [(float(x), float(y), int(cls)) for [x, y, cls] in gt]
print("--labels", gt)

z_max = pc[:,-1].max() 
z_min = pc[:,-1].min() 
z_spread = z_max - z_min
print("--z_max, z_min, z_spread", z_max, z_min, z_spread)


Data Sample:
--pc:  (184640, 3)
--labels [(-81.3, -0.51, 1), (11.9, 0.69, 1), (75.03, 1.33, 2), (87.03, 1.53, 2)]
--z_max, z_min, z_spread 4.980000019073486 -17.762499809265137 22.742499828338623


In [5]:
## Utilities for drawing 3D point cloud with optional Label Overlay

bw, bh = 8, 22.5
dx, dy = 2.5, 0
sw, sh = 1.5, 15.06
alpha = 0.5

roi_out_w, roi_out_h = 10, 25
border_width = 2
roi_in_w, roi_in_h = roi_out_w-(border_width*2), roi_out_h-(border_width*2)



def add_staple_patch(ax, x, y, angle, cls):
    '''
    R_AB -> Frame of center of bounding box wrt to global A
    R_BC -> Frame of bounding box center rotated wrt to non-rotated
    R_CD -> Frame of rotated straight wrt to center of rotated bounding box
    R_CE -> Frame of top annulus wrt to  center of rotated bounding box
    R_CF -> Frame of bottom annulus wrt to  center of rotated bounding box

    R_Cor1 -> Frame of bounding box origin wrt to bounding box center
    R_Dor2 -> Frame of straight origin wrt to straight center
    '''
    theta = np.radians(angle)
    c,s = np.cos(theta), np.sin(theta)
    T = np.array([  [c,-s, 0], 
                    [s, c, 0],
                    [0, 0, 1]])
    
    T_AB = np.array([[1, 0, x],
                     [0, 1, y],
                     [0, 0, 1]])

    T_BC = T

    T_Cor1 = np.array([ [1, 0, -bw/2],
                        [0, 1, -bh/2],
                        [0, 0, 1]])

    T_CD = np.array([[1, 0, dx],
                     [0, 1, dy],
                     [0, 0, 1]])

    T_Dor2 = np.array([ [1, 0, -sw/2],
                        [0, 1, -sh/2],
                        [0, 0, 1]])

    T_CE = np.array([[1, 0, 0],
                     [0, 1, sh/2],
                     [0, 0, 1]])

    T_CF = np.array([[1, 0, 0],
                     [0, 1, -sh/2],
                     [0, 0, 1]])

    T_Aor1 = T_AB@T_BC@T_Cor1
    T_AE = T_AB@T_BC@T_CE
    T_AF = T_AB@T_BC@T_CF

    if cls == 1:
        T_CD = np.array([[1, 0, -dx],
                        [0, 1, dy],
                        [0, 0, 1]])
        _color = "red"
    else:
        T_CD = np.array([[1, 0, dx],
                        [0, 1, dy],
                        [0, 0, 1]])
        _color = "black"
        
    T_Aor2 = T_AB@T_BC@T_CD@T_Dor2
    
    Rec1 = Rectangle((T_Aor1[0,2], T_Aor1[1,2]), bw, bh, angle=angle, ec=_color, fc='none', ls='dashed')
    Rec2 = Rectangle((T_Aor2[0,2], T_Aor2[1,2]), sw, sh, angle=angle, ec=_color, fc='none', alpha=alpha)
    Ann1 = Annulus((T_AE[0,2], T_AE[1,2]), dx+sw/2, sw, ec=_color, fc='none', alpha=alpha)
    Ann2 = Annulus((T_AF[0,2], T_AF[1,2]), dx+sw/2.0, sw, ec=_color, fc='none', alpha=alpha)
    
    ax.add_patch(Rec1)
    ax.add_patch(Rec2)
    ax.add_patch(Ann1)
    ax.add_patch(Ann2)
    
    art3d.pathpatch_2d_to_3d(Rec1, z=0, zdir="z")
    art3d.pathpatch_2d_to_3d(Rec2, z=0, zdir="z")
    art3d.pathpatch_2d_to_3d(Ann1, z=0, zdir="z")
    art3d.pathpatch_2d_to_3d(Ann2, z=0, zdir="z")

def draw_bg_boundary(ax, welds):
    for weld in welds:
        add_border_patch(ax, weld[0], weld[1])
        
def add_border_patch(ax, x, y):
    Rec1 = Rectangle((x - roi_in_w/2, y - roi_in_h/2), roi_in_w, roi_in_h, ec="black", fc='none', ls='dashed')
    Rec2 = Rectangle((x - roi_out_w/2, y - roi_out_h/2), roi_out_w, roi_out_h, ec="black", fc='none', ls='dashed')
    ax.add_patch(Rec1)
    ax.add_patch(Rec2)
    art3d.pathpatch_2d_to_3d(Rec1, z=0, zdir="z")
    art3d.pathpatch_2d_to_3d(Rec2, z=0, zdir="z")
    
def draw_labels(ax, welds):
    for weld in welds:
        markers = add_staple_patch(ax ,weld[0], weld[1], 0, weld[2] )
        
def visualize_pc(pc_arr=None, labels=None, weld_or_boundary="weld", elev=90, azim=-90, roll=0):
    """
    Nx3 numpy.ndarray
    labels - [Optional] List of label list
    """
    X = pc_arr[:, 0]
    Y = pc_arr[:, 1]
    Z = pc_arr[:, 2]
    
    fig = plt.figure(figsize=(25,20))
    ax = fig.add_subplot(projection="3d")
    ax.view_init(elev=elev, azim=azim, roll=roll)
    
    ## Plot Points
    ax.scatter(X, Y, Z, s=0.1, marker="+")
    
    ## Plot Label, 2D
    if labels:
        if weld_or_boundary == 'weld':
            draw_labels(ax, labels)
        elif weld_or_boundary == "boundary":
            draw_bg_boundary(ax, labels)
        elif weld_or_boundary == "both":
            draw_bg_boundary(ax, labels)
            draw_labels(ax, labels)
    
    ## For equal aspect ratio
    max_range = np.array([X.max()-X.min(), Y.max()-Y.min(), Z.max()-Z.min()]).max()
    Xb = 0.5*max_range*np.mgrid[-1:2:2, -1:2:2, -1:2:2][0].flatten() + 0.5*(X.max()-X.min())
    Yb = 0.5*max_range*np.mgrid[-1:2:2, -1:2:2, -1:2:2][1].flatten() + 0.5*(Y.max()-Y.min())
    Zb = 0.5*max_range*np.mgrid[-1:2:2, -1:2:2, -1:2:2][2].flatten() + 0.5*(Z.max()-Z.min())
    for xb, yb, zb in zip(Xb, Yb, Zb):
        ax.plot([xb], [yb], [zb], 'w')
        
# visualize_pc(pc, weld_or_boundary="both")
# # visualize_pc(pc)

In [60]:
## Plotting the sliding window variance

def get_low_variance_replacement(pc):
    increment = 1
    window_width = 9 
    print("pc.shape", pc.shape)
    pc_ = pc[pc[:, 2] > -2]
    print("pc_.shape", pc_.shape)

    Xs = pc_[:, 0]
    Ys = pc_[:, 1]
    Zs = pc_[:, 2]
    Xmin = Xs.min()
    Xmax = Xs.max()
    lst = []

    for i in np.arange(Xmin, Xmax, increment):
        subset = np.logical_and(Xs > i, Xs < i+window_width)
        lst.append((i+increment/2, np.var(Zs[subset]), Zs[subset].shape[0]))

    # print(lst)

    center, var, shape = zip(*lst)
    center = list(center)
    var = list(var)
    shape = list(shape)
    shape_mean = np.mean(shape)
    print("shape_mean", shape_mean)
    # print(center)
    # print(var)

    for i, sh in enumerate(shape):
        if sh < shape_mean:
            shape.pop(i)
            center.pop(i)
            var.pop(i)

    # peak_var = max(var)
    # peak_var_idx = var.index(peak_var)
    # peak_var_center = center[peak_var_idx]
    # print(peak_var_center, peak_var)

    # if peak_var_idx < len(var)/2:
    #     var = var[peak_var_idx:]
    #     center = center[peak_var_idx:]
    # else:
    #     var = var[:peak_var_idx]
    #     center = center[:peak_var_idx]

    lowest_vars = []
    lowest_centers = []
    for i in range(1):
        lowest_var = min(var)
        lowest_var_idx = var.index(lowest_var)
        lowest_var_center = center.pop(lowest_var_idx)
        lowest_var = var.pop(lowest_var_idx)
        print(i, lowest_var_center, lowest_var)

        lowest_vars.append(lowest_var)
        lowest_centers.append(lowest_var_center)



    plt.plot(center, var)
    plt.scatter(lowest_centers, lowest_vars)
#     visualize_pc(pc_, elev=0, azim=-90, roll=0)

    replacement_mask = np.logical_and(pc_[:, 0] > lowest_var_center - window_width/2, pc_[:, 0] < lowest_var_center + window_width/2 )
    replacement = pc_[replacement_mask]
    replacement[:, 0] -= (lowest_var_center)
#     replacement[:, 0] -= 
    
    return replacement
#     visualize_pc(replacement)

In [None]:
## Class for augmenting a samples

def jitter_like(array, x_range, y_range, z_range):
    arr = np.copy(array)
    x_jit = np.random.uniform(low=x_range[0], high=x_range[1], size=(arr.shape[0], 1))
    y_jit = np.random.uniform(low=y_range[0], high=y_range[1], size=(arr.shape[0], 1))
    z_jit = np.random.uniform(low=z_range[0], high=z_range[1], size=(arr.shape[0], 1))
    jit = np.concatenate([x_jit, y_jit, z_jit], axis=1)
    arr +=jit
    display(arr)
    return arr

class SampleAugmenter:
    def __init__(self, scan_path, label_path, destination_root="./_data/Augmented"):
        self.scan_path = scan_path
        self.label_path = label
        pc, _ = pcloader(scan)
        gt = gtloader(label)
        gt = [(float(x), float(y), int(cls)) for [x, y, cls] in gt]
        self.pc = pc
        self.labels = gt
        
        self.welds = {} ## instance, location, class, pc
        self.augmentations = {}  ## new scan, new label, destination
        self.bg = None
        
    def extract_welds(self, visualize_welds=False, visualize_bg=False, visualize_baseline=False):
        """
        Returns/Saves Dictionary of per-weld point cloud
        """
        non_bg_mask = None
        for weld_no, label in enumerate(self.labels, 1):
            mask = assign_weld_region(self.pc, label, np.zeros((2)))
            extraction = self.pc[mask]
            mask = np.expand_dims(mask, axis=1)
            if non_bg_mask is not None:
                non_bg_mask = np.concatenate([non_bg_mask, mask], axis=1)
            else:
                non_bg_mask = mask
            
            self.welds[weld_no] = {'instance': weld_no,
                                 'x': label[0],
                                 'y': label[1],
                                 'cls': label[2],
                                 'pc': extraction}
            
        non_bg_mask = np.any(non_bg_mask, axis=1)
        bg_mask = np.logical_not(non_bg_mask)
        print("non_bg_mask", non_bg_mask.shape)
        print("bg_mask", bg_mask.shape)
        bg = self.pc[bg_mask]
        self.bg = bg
        
        ## for base line filling: target background points around weld region, print the min and max
        roi_width, roi_height = 10, 25
        fill_points = None
        
        for weld_no, label in enumerate(self.labels, 1):
            
            print(weld_no)

            left_start = label[0] - roi_out_w/2
            left_end = label[0] - roi_in_w/2
            right_end = label[0] + roi_out_w/2
            right_start = label[0] + roi_in_w/2
            bottom_start = label[1] - roi_out_h/2
            bottom_end = label[1] - roi_in_w/2
            top_end = label[1] + roi_out_w/2
            top_start = label[1] + roi_in_w/2
            left_mask = np.logical_and(bg[:, 0] > left_start, bg[:, 0] < left_end )
            right_mask = np.logical_and(bg[:, 0] > right_start, bg[:, 0] < right_end )
            bottom_mask = np.logical_and(bg[:, 1] > bottom_start, bg[:, 1] < bottom_end )
            top_mask = np.logical_and(bg[:, 1] > top_start, bg[:, 1] < top_end )
            width_mask = np.logical_and(bg[:, 0]> left_start, bg[:, 0]< right_end )
            height_mask = np.logical_and(bg[:, 1]> bottom_start, bg[:, 1]< top_end )
            left_border_mask =np.logical_and(left_mask, height_mask) 
            right_border_mask =np.logical_and(right_mask, height_mask) 
            bottom_border_mask = np.logical_and(bottom_mask, width_mask)
            top_border_mask = np.logical_and(top_mask, width_mask)
            border_mask = np.logical_or(np.logical_or(left_border_mask, right_border_mask),\
                                     np.logical_or(bottom_border_mask, top_border_mask))
            baseline_max = bg[border_mask, 2].max()
            baseline_min = bg[border_mask, 2].min()
            baseline_avg = bg[border_mask, 2].mean()
#             print("border_mask.sum()", border_mask.sum())
#             print("spread: ", baseline_min, " ->", baseline_max, "==> ", baseline_max-baseline_min)
#             print("mean/median/mode", baseline_avg)
            
            self.welds[weld_no]['baseline_z'] = baseline_avg
            
            fill = jitter_like(self.welds[weld_no]["pc"], (-0.2, 0.2), (-0.2,0.2), (0,0))
            fill[:, 2] = np.random.uniform(low=baseline_avg-0.025, high=baseline_avg+0.025, size=(fill.shape[0]))
            print(self.welds[weld_no]["pc"] - fill)
            if fill_points is not None:
                fill_points = np.concatenate([fill_points, fill], axis=0)
            else:
                fill_points = fill
            print("fill_points", fill_points.shape)
            
                
        visualize_pc(fill_points)
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(fill_points)
#         o3d.visualization.draw_geometries([pcd])        
        
        
        ## Fill in extraction with baseline avg plus random noise
        no_welds = np.concatenate([bg, fill_points])
        print("no_welds", no_welds.shape )
     
        ## Testing by visualizing
        if visualize_welds:
#             display(self.welds)
            agg_pc = [weld["pc"] for weld in self.welds.values()]
            agg_pc = np.concatenate(agg_pc, axis=0)
        
#             fill_points[:, 2] +=1
#             agg_pc = np.concatenate([agg_pc, fill_points], axis=0)
            visualize_pc(agg_pc, self.labels)
#             pcd = o3d.geometry.PointCloud()
#             pcd.points = o3d.utility.Vector3dVector(agg_pc)
#             o3d.visualization.draw_geometries([pcd])

        if visualize_bg:
            visualize_pc(self.bg)
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(self.bg)
#             o3d.visualization.draw_geometries([pcd])
            
        if visualize_baseline:
            visualize_pc(no_welds)
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(no_welds)
            pcd = pcd.voxel_down_sample(voxel_size=1)
            o3d.visualization.draw_geometries([pcd])
            
            
    def create_baseline(self):
        window_width = 9
        replacement = get_low_variance_replacement(self.pc)
        pc_no_welds = np.copy(self.pc)
        
        non_bg_mask = None
        for weld_no, label in enumerate(self.labels, 1):
            x_start = label[0] - window_width/2
            x_end = label[0] + window_width/2
            mask = np.logical_and(pc_no_welds[:, 0] > x_start, pc_no_welds[:, 0] < x_end)
            not_mask = np.logical_not(mask)
            replace = np.copy(replacement)
            replace[:, 0] += label[0]
            pc_no_welds = np.concatenate([pc_no_welds[not_mask], replace])
        visualize_pc(pc_no_welds)
        
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(pc_no_welds)
#         pcd = pcd.voxel_down_sample(voxel_size=1)
        o3d.visualization.draw_geometries([pcd])
        
            
            #             if non_bg_mask is not None:
#                 non_bg_mask = np.concatenate([non_bg_mask, np.expand_dims(mask, axis=1)], axis=1)
#             else:
#                 non_bg_mask = np.expand_dims(mask, axis=1)
                
#         non_bg_mask = np.any(non_bg_mask, axis=1)
#         bg_mask = np.logical_not(non_bg_mask)
#         bg = pc[bg_mask]
        
#         for 
        
                
            
sa = SampleAugmenter(scan, label)
sa.create_baseline()
# SA.extract_welds(visualize_bg=True, visualize_welds=True, visualize_baseline=True)
# # SA.make_partials([1, 2, 3], [0.25, .5, .33], [0, 0.25, 0.25], visualize=True)

pc.shape (184640, 3)
pc_.shape (121382, 3)
shape_mean 5577.271276595745
0 97.49999237060547 0.013991442007028919


In [7]:
arr = np.ones((10,3)) + np.array([1,5,10])
print(arr)

arr1 = jitter_like(arr, (-0.5, 0.5), (-1, 1), (-0.01, 0.01))
print(arr1)

[[ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]
 [ 2.  6. 11.]]


array([[ 2.02139429,  6.16488418, 10.9953381 ],
       [ 1.90016647,  5.88501477, 11.00688341],
       [ 1.53761838,  5.73367092, 11.00134338],
       [ 1.67042848,  5.8354797 , 10.99195815],
       [ 2.0852488 ,  6.68364455, 10.99938682],
       [ 1.61399305,  5.40647361, 11.00073506],
       [ 2.29693012,  6.82065528, 11.00397121],
       [ 2.33553428,  5.51971899, 11.00560148],
       [ 1.5469004 ,  5.68773295, 11.00635382],
       [ 1.93986087,  6.21900432, 11.00781402]])

[[ 2.02139429  6.16488418 10.9953381 ]
 [ 1.90016647  5.88501477 11.00688341]
 [ 1.53761838  5.73367092 11.00134338]
 [ 1.67042848  5.8354797  10.99195815]
 [ 2.0852488   6.68364455 10.99938682]
 [ 1.61399305  5.40647361 11.00073506]
 [ 2.29693012  6.82065528 11.00397121]
 [ 2.33553428  5.51971899 11.00560148]
 [ 1.5469004   5.68773295 11.00635382]
 [ 1.93986087  6.21900432 11.00781402]]


In [8]:
#   def make_partials(self, weld_nos, occlusion_fractions, occlusion_starts, dir_tag = "Partials", visualize=False):
        
#         if self.welds:
#             assert len(weld_nos) == len(occlusion_fractions), "Mismatch: lengths of weld_nos and occlusion_fractions"
#             assert len(weld_nos) == len(occlusion_starts), "Mismatch: lengths of weld_nos and occlusion_starts"
            
#             partials = {}
            
#             for weld_no, occl_fraction, occl_start in zip(weld_nos, occlusion_fractions, occlusion_starts):
#                 Y = self.welds[weld_no]["pc"][:, 1]
#                 Y_span = Y.max() - Y.min()
#                 Y_start = Y.min() + Y_span*occl_start
#                 Y_end = Y_start + Y_span*occl_fraction
#                 mask = np.logical_or(self.welds[weld_no]["pc"][:, 1] < Y_start, self.welds[weld_no]["pc"][:, 1] > Y_end )
#                 bg_mask = np.logical_not(mask)
#                 extraction = self.welds[weld_no]["pc"][mask]
#                 bg = self.welds[weld_no]["pc"][bg_mask]
                
#                 partials[weld_no] = {'instance': weld_no,
#                                  'x': self.labels[weld_no][0],
#                                  'y': self.labels[weld_no][1],
#                                  'cls': self.labels[weld_no][2],
#                                  'pc': extraction}
            
#             for weld_no in self.welds.keys():
#                 if weld_no not in weld_nos:
# #                     print(weld_no)
#                     partials[weld_no] = {'instance': weld_no,
#                                  'x': self.labels[weld_no-1][0],
#                                  'y': self.labels[weld_no-1][1],
#                                  'cls': self.labels[weld_no-1][2],
#                                  'pc': self.welds[weld_no]['pc']}
                    
#             ## Testing by visualizing
#             if visualize:
# #                 display(partials)
#                 agg_pc = [weld["pc"] for weld in partials.values()]
#                 agg_pc = np.concatenate(agg_pc, axis=0)
#                 visualize_pc(agg_pc, self.labels)        
            
#             self.augmentations.append({})
#         else:
#             raise ReferenceError("A call to self.extract_welds is required beforehand. Dictionary is currently empty.")
    
    
#     def make_flipped_scan(self):
#         if self.welds:
#             pass
#         else:
#             raise ReferenceError("A call to self.extract_welds is required beforehand. Dictionary is currently empty.")
#         pass
    
#     def make_flipped_welds(self, weld_nos):
#         if self.welds:
#             pass
#         else:
#             raise ReferenceError("A call to self.extract_welds is required beforehand. Dictionary is currently empty.")
#         pass
    
#     def make_jittered_scan(self, delta_ranges):
#         pass
        
#     def make_jittered_welds(self, weld_nos, delta_ranges):
#         pass
    
#     def make_rotated_welds(self, weld_nos, rotations):
#         pass
    
#     def make_shifted_welds(seld, weld_nos, offsets):
#         pass
    