In [None]:
# All in Torch
import torch
import torch.nn as nn
# We will need pytorch3d
from pytorch3d.transforms import  euler_angles_to_matrix


def construct_transform(rotation_vector : torch.Tensor, translation : torch.Tensor):
    '''
    Construct 4x4 transformation matrix from rotation vector and translation vector while perserving differentiation
    :param rotation_vector: 
    :param translation: 
    :return: Pose matrix
    '''    
    assert rotation_vector.shape == (3,)
    assert translation.shape == (1,3)
    
    rotation = euler_angles_to_matrix(rotation_vector, convention='XYZ')
    
    r_t_matrix = torch.hstack([rotation, translation.T])
    
    pose = torch.vstack([r_t_matrix, torch.tensor([[0,0,0,1]], device=rotation.device)])
    
    return pose

class PoseTransform(torch.nn.Module):
    '''
    Pose transform layer
    Works as a differentiable transformation layer to fit rigid ego-motion
    '''
    
    def __init__(self, device='cpu'):
        super().__init__()
        # If not working in sequences, use LieTorch
        self.translation = torch.nn.Parameter(torch.zeros((1,3), requires_grad=True, device=device))
        self.rotation_angles = torch.nn.Parameter(torch.tensor([0., 0., 0.], requires_grad=True, device=device))
        # self.pose = construct_transform(self.rotation_angles, self.translation).T.unsqueeze(0)
    
    def construct_pose(self):
        self.pose = construct_transform(self.rotation_angles, self.translation).T.unsqueeze(0)
        
        return self.pose    
        
    def forward(self, pc):
        
        pc_to_transform = torch.cat([pc, torch.ones((1, pc.shape[1], 1), device=pc.device)], dim=2)
        
        pose = construct_transform(self.rotation_angles, self.translation).T.unsqueeze(0)
        
        deformed_pc = torch.bmm(pc_to_transform, pose)[:,:,:3]
        
        return deformed_pc

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.0)

class RigidNeuralPrior(torch.nn.Module):
    '''
    Neural Prior with Rigid Transformation, takes only point cloud t=1 on input and returns flow and rigid flow (ego-motion flow)
    '''
    
    def __init__(self, pc1, dim_x=3, filter_size=128, act_fn='relu', layer_size=8):
        super().__init__()
        self.layer_size = layer_size
        self.RigidTransform = PoseTransform(device=pc1.device)
        # testing refinement
        self.Refinement = torch.nn.Parameter(torch.randn(pc1.shape, requires_grad=True))
        bias = True
        self.nn_layers = torch.nn.ModuleList([])
        
        # input layer (default: xyz -> 128)
        if layer_size >= 1:
            self.nn_layers.append(torch.nn.Sequential(torch.nn.Linear(dim_x, filter_size, bias=bias)))
            if act_fn == 'relu':
                self.nn_layers.append(torch.nn.ReLU())
            elif act_fn == 'sigmoid':
                self.nn_layers.append(torch.nn.Sigmoid())
            for _ in range(layer_size-1):
                self.nn_layers.append(torch.nn.Sequential(torch.nn.Linear(filter_size, filter_size, bias=bias)))
                if act_fn == 'relu':
                    self.nn_layers.append(torch.nn.ReLU())
                elif act_fn == 'sigmoid':
                    self.nn_layers.append(torch.nn.Sigmoid())
            self.nn_layers.append(torch.nn.Linear(filter_size, dim_x, bias=bias))
        else:
            self.nn_layers.append(torch.nn.Sequential(torch.nn.Linear(dim_x, dim_x, bias=bias)))
        
        self.initialize()
        
    def forward(self, x):
        """ Points -> Flow
            [B, N, 3] -> [B, N, 3]
        """
        
        
        deformed_pc = self.RigidTransform(x)
        rigid_flow = deformed_pc - x
        
        # x = self.Refinement / 10 + rigid_flow
        for layer in self.nn_layers:
            # deformed_pc = layer(deformed_pc)
            x = layer(x)
        
        final_flow = x + rigid_flow
        
        return final_flow, rigid_flow
    
    def initialize(self):
        self.apply(init_weights)

In [None]:
# Imports once
# import sys
# sys.path.append('../')
from data.dataloader import NSF_dataset, SFDataset4D
from loss.flow import DT, SmoothnessLoss
from ops.metric import scene_flow_metrics
from vis.deprecated_vis import visualize_points3D, visualize_multiple_pcls, visualize_flow3d

# Notes
### todo add refinement layer to dynamic?
### todo freespace flow refinement
### todo NN index "distance" transform
    - construct points around the original point cloud in another dimension (each point has its surroundings a row means index) [x]
    - distance is coded as a order in which points goes (for example, you add points in one "cell" and those are first, adjacent two cells are seconds) [x]
    - overlapping points that are "more distant" comes later along the axis, so if you order the final value backwards, the closest should be in the voxel
    - this can maybe be generalizable into K-NN? Such as using K voxel grids and unique indices?
    - task for Ruslan? [x]
    - precision is cell size, because we do not know which one is closer in that cell. Well we know because of values and they can be subtracted, but that might be to much of a hassle. And cell can encode only one value, so we are bounded by this one
    - max_radius parameter defines the amount of constructed points
    - you can construct all points and then sort it based on distance to the central points
    
### todo one batch is one sequence processing



In [None]:
### Main training loop

dataset = NSF_dataset()
device = torch.device('cuda:0')

for frame_id, data in enumerate(dataset):
    if frame_id != 3:
        continue
    pc1, pc2, gt_flow = data['pc1'], data['pc2'], data['gt_flow']
    
    pc1 = pc1.to(device)
    pc2 = pc2.to(device)
    
    # One model per both frames
    model = RigidNeuralPrior(pc1, 3, 128, 'relu', 8).to(device)
    # model = Neural_Prior(3, 128, 'relu', 8).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.008)
            
    # Smooth_layer = SmoothnessLoss(pc1, pc2, K=8, sm_normals_K=0, smooth_weight=1, VA=False, max_radius=2, forward_weight=0, pc2_smooth=False, dataset='argoverse')
    DT_layer = DT(pc1, pc2)
    
    # Iteration of losses
    for e in range(300):
        # deformed_pc = Transform_layer(pc1)
        # pred_flow = model(pc1)
        pred_flow, rigid_flow = model(pc1)
        
        dt_loss, per_point_dt_loss = DT_layer.torch_bilinear_distance(pc1 + pred_flow)
        # velocity_loss = 
        rigid_dt_loss, _ = DT_layer.torch_bilinear_distance(pc1 + rigid_flow)
        # smooth_loss, per_point_smooth_loss = Smooth_layer.smoothness_loss(pred_flow, Smooth_layer.NN_pc1, Smooth_layer.loss_norm)
        
        # truncated at two meters
        # dt_loss = per_point_dt_loss[per_point_dt_loss < 2].mean()
        
        deformed_pc = pc1 + pred_flow
    
        # (dt_loss).backward()
        # Loss
        (dt_loss + rigid_dt_loss).backward()
        # (dt_loss + smooth_loss + rigid_dt_loss).backward()
        optimizer.step()
        optimizer.zero_grad()
                
        print(f"Epoch: {e:03d}, NN Loss: {dt_loss.item():.3f} \t Rigid Loss: {rigid_dt_loss.item():.3f}")
        
        
    break    
    

# Way how to extract rigid transformation pose
computed_pose = model.RigidTransform.construct_pose()

# Computation of sceneflow metrics
epe, accs, accr, angle, outlier = scene_flow_metrics(pred_flow, gt_flow.to(device))

# Display metrics
import pandas as pd
# set pandas display options for float precision
pd.set_option("display.precision", 3)
metric_df = pd.DataFrame([epe, accs, accr, angle, outlier], index=['epe', 'accs', 'accr', 'angle', 'outlier']).T

metric_df.describe()

 
# Visualization
# visualize_points3D(pc1[0, :, :3].detach(), epe.detach()[0])
# visualize_flow3d(pc1[0, :, :3].detach().cpu(), pc2[0, :, :3].detach().cpu(), pred_flow.detach()[0].cpu())
# visualize_flow3d(pc1[0, :, :3].detach().cpu(), pc2[0, :, :3].detach().cpu(), gt_flow.detach()[0].cpu())
# visualize_multiple_pcls(*[pc1[0, :, :3].detach().cpu(), deformed_pc[0, :, :3].detach().cpu(), pc2[0,:,:3].detach().cpu()])


# Notes
- Prio: Fast testing of models in dev mode
  - [x] connect Jupyter 
    - [ ] Framework 
    
- RigidNeuralPrior can fit as a regular neural prior after some (tested in 500 epochs) iterations
- Transform and refinement layer works, but lacks regularization, that maybe cyclic smoothness provides (todo experiment)
- If NN in refinement has big weight, it deforms quickly
- Add same flow along z-axis as it is in gravity when we know the transformation (todo experiment)
- In this sample, the transformation is fitted correctly
- With Neural Prior, it will never be correct transformation, since Neural Prior can compensate translation with bias? But bias is only 1 number
- Paralel computation of Prior and Transformation? (todo experiment)
- Fit transformation first, then fit (Sequentialy/jointly) neural prior? (todo experiment)
- How to trust rigid transform more? (todo experiment)
- How to find robust transformation