In [140]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from typing import List, Dict, Optional, Tuple, Callable
import numpy as np
import math
import time

H_input=16
W_input=1800
batch_size=1
Down_conv_dis = [0.75, 3.0, 6.0, 12.0]
Up_conv_dis = [3.0, 6.0, 9.0]
Cost_volume_dis = [1.0, 2.0, 4.5]

stride_H_list = [4, 2, 2, 1]
stride_W_list = [8, 2, 2, 2]

out_H_list = [math.ceil( H_input /  stride_H_list[0])]
out_W_list = [math.ceil( W_input /  stride_W_list[0])]


def PreProcess(PC_f1):    ####    pre process procedure

        batch_size = len(PC_f1)
        PC_f1_concat = []
        PC_f1_aft_aug = []

        for p in PC_f1:
            
            num_points = torch.tensor(p.shape[0], dtype=torch.int32)
            add_T = torch.ones(num_points, 1).cuda().to(torch.float32)
            PC_f1_add = torch.cat([p, add_T], -1)  ##  concat one to form   n 4
            PC_f1_concat.append(PC_f1_add)



        for i in range(batch_size):


            cur_PC_f1_concat = PC_f1_concat[i]
   
            ##  select the 20m * 20m region ########
            r_f1 = torch.norm(cur_PC_f1_concat[:, :2], p=2, dim =1, keepdim = True).repeat(1, 4)
            cur_PC_f1_concat = torch.where( r_f1 > 20 , torch.zeros_like(cur_PC_f1_concat).cuda(), cur_PC_f1_concat ).to(torch.float32)
            PC_mask_valid1 = torch.any(cur_PC_f1_concat != 0, dim=-1).cuda().detach()  # H W
            cur_PC_f1_concat = cur_PC_f1_concat[PC_mask_valid1 > 0,:]



            #####   generate  the  valid  mask (remove the not valid points)
            mask_valid_f1 = torch.any(cur_PC_f1_concat != 0, dim=-1, keepdim=True).cuda().detach()  # N 1
            mask_valid_f1 = mask_valid_f1.to(torch.float32)

           

            cur_PC_f1_concat = cur_PC_f1_concat[:, :3]
            cur_PC_f1_mask = cur_PC_f1_concat * mask_valid_f1 # N 3

            PC_f1_aft_aug.append(cur_PC_f1_mask)### list




        return PC_f1_aft_aug


def ProjectPCimg2SphericalRing(PC, Feature = None, H_input = 64, W_input = 1800):

    
    batch_size = len(PC)

    if Feature != None:
        num_channel = Feature[0].shape[-1]

    degree2radian = math.pi / 180
    nLines = H_input
    AzimuthResolution = 360.0 / W_input # degree
    VerticalViewDown = -35.0
    VerticalViewUp = 35.0

    # specifications of Velodyne-64
    AzimuthResolution = AzimuthResolution * degree2radian
    VerticalViewDown = VerticalViewDown * degree2radian
    VerticalViewUp = VerticalViewUp * degree2radian
    VerticalResolution = (VerticalViewUp - VerticalViewDown) / (nLines - 1)
    VerticalPixelsOffset = -VerticalViewDown / VerticalResolution

    # parameters for spherical ring's bounds

    PI = math.pi

    for batch_idx in range(batch_size):

        ###  initialize current processed frame

        cur_PC = PC[batch_idx].to(torch.float32)  # N  3
        # print(cur_PC.shape)
        if Feature != None:
            cur_Feature = Feature[batch_idx]  # N  c

        x = cur_PC[:, 0]
        y = cur_PC[:, 1]
        z = cur_PC[:, 2]

        r = torch.norm(cur_PC, p=2, dim =1)

        PC_project_current = torch.zeros([H_input, W_input, 3]).cuda().detach()  # shape H W 3
        if Feature != None:
            Feature_project_current = torch.zeros([H_input, W_input, num_channel]).cuda().detach()

        

        ####  get iCol & iRow

        iCol = ((PI -torch.atan2(y, x))/ AzimuthResolution) # alpha
        iCol = iCol.to(torch.int32)

        beta = torch.asin(z/r)                              # beta

        tmp_int = (beta / VerticalResolution + VerticalPixelsOffset)
        tmp_int = tmp_int.to(torch.int32)

        iRow = H_input - tmp_int

        iRow = torch.clamp(iRow, 0, H_input - 1)
        iCol = torch.clamp(iCol, 0, W_input - 1)

        iRow = iRow.to(torch.long)  # N 1
        iCol = iCol.to(torch.long)  # N 1


        PC_project_current[iRow, iCol, :] = cur_PC[:, :]  # H W 3
        if Feature != None:
            Feature_project_current[iRow, iCol, :] = cur_Feature[:, :]


        # Generate mask

        PC_mask_valid = torch.any(PC_project_current != 0, dim=-1).cuda().detach()  # H W
        PC_mask_valid = torch.unsqueeze(PC_mask_valid, dim=2).to(torch.float32) # H W 1

        if Feature != None:
            Feature_mask_valid = ~torch.any(Feature_project_current!= 0, dim=-1).cuda().detach()  # H W
            Feature_mask_valid = torch.unsqueeze(Feature_mask_valid, dim=2).to(torch.float32)

        ####1 h w
        PC_project_current = torch.unsqueeze(PC_project_current, dim=0)
        PC_mask_valid = torch.unsqueeze(PC_mask_valid, dim=0)


        if Feature != None:
            Feature_project_current = torch.unsqueeze(Feature_project_current,dim=0)
            Feature_mask_valid = torch.unsqueeze(Feature_mask_valid, dim=0)
        ####b h w
        if batch_idx == 0:
            PC_project_final = PC_project_current
            PC_mask_final = PC_mask_valid
            if Feature != None:
                Feature_project_final = Feature_project_current
                Feature_mask_final = Feature_mask_valid

        else:
            PC_project_final = torch.cat([PC_project_final, PC_project_current], 0)  # b h w 3
            PC_mask_final = torch.cat([PC_mask_final, PC_mask_valid], 0)
            if Feature != None:
                Feature_project_final = torch.cat([Feature_project_final, Feature_project_current], 0)
                Feature_mask_final = torch.cat([Feature_mask_final, Feature_mask_valid], 0)


    if Feature != None:
        return PC_project_final,  Feature_project_final
    else:
        return PC_project_final,  PC_mask_final



def get_selected_idx(batch_size, out_H: int, out_W: int, stride_H: int, stride_W: int):
    """According to given stride and output size, return the corresponding selected points

    Args:
        array (tf.Tensor): [any array with shape (B, H, W, 3)]
        stride_H (int): [stride in height]
        stride_W (int): [stride in width]
        out_H (int): [height of output array]
        out_W (int): [width of output array]
    Returns:
        [tf.Tensor]: [shape (B, outh, outw, 3) indices]
    """
    select_h_idx = torch.arange(0, out_H * stride_H, stride_H)
    select_w_idx = torch.arange(0, out_W * stride_W, stride_W)
    height_indices = (torch.reshape(select_h_idx, (1, -1, 1))).expand(batch_size, out_H, out_W)         # b out_H out_W
    width_indices = (torch.reshape(select_w_idx, (1, 1, -1))).expand(batch_size, out_H, out_W)            # b out_H out_W
    padding_indices = torch.reshape(torch.arange(batch_size), (-1, 1, 1)).expand(batch_size, out_H, out_W)   # b out_H out_W

    return padding_indices, height_indices, width_indices    













points =torch.randn(100000,3).to('cuda')
points=30*points

pos1_list = []
pos1_list.append(points)
 #pos1_list.append(points)
 
 #with torch.no_grad():
     
 #    predictions,_,_ =  model(points)
t2=time.time()    
input_xyz_aug_f1= PreProcess(pos1_list)





input_xyz_aug_proj_f1, mask_xyz_f1 = ProjectPCimg2SphericalRing(input_xyz_aug_f1, None, H_input,  W_input)
 
 

l0_b_idx, l0_h_idx, l0_w_idx = get_selected_idx(batch_size, out_H_list[0],
    out_W_list[0],  stride_H_list[0],
    stride_W_list[0])
 

t1=time.time()
print(t1-t2)



input_points_f1 = torch.zeros_like(input_xyz_aug_proj_f1).cuda()
l0_xyz_proj_f1 = input_xyz_aug_proj_f1[l0_b_idx.cuda().long(),l0_h_idx.cuda().long(), l0_w_idx.cuda().long(), :]
 






 #new_points, new_xyz =  model(input_xyz_aug_proj_f1, input_points_f1, l0_xyz_proj_f1)
 #new_points =  model(input_xyz_aug_proj_f1, input_points_f1, l0_xyz_proj_f1)
     


print("测试成功!")
print(f"输入形状: input_xyz_aug_proj_f1={input_xyz_aug_proj_f1.shape}, input_points_f1={input_points_f1.shape},l0_xyz_proj_f1={l0_xyz_proj_f1.shape}")
 #print(f"输出形状: new_points={new_points.shape}")
 
"""
 
 输入形状: input_xyz_aug_proj_f1=torch.Size([1, 64, 1800, 3]), input_points_f1=torch.Size([1, 64, 1800, 3]),l0_xyz_proj_f1=torch.Size([1, 16, 225, 3])
 输出形状: new_points=torch.Size([1, 16])

 
"""
   

 #"""

input_xyz_aug_proj_f1=input_xyz_aug_proj_f1[:,:,600:1200,:]
mask_xyz_f1=mask_xyz_f1[:,:,600:1200,:]
im_numpy = input_xyz_aug_proj_f1.squeeze(0).cpu().numpy()
im_mask = mask_xyz_f1.squeeze(0).cpu().numpy()

 #"""

  
 
 





 # Here you would process the predictions and publish the segmented point cloud
 # For simplicity, we just publish the original point cloud
 # segmented_cloud_pub.publish(predictions)

0.0027472972869873047
测试成功!
输入形状: input_xyz_aug_proj_f1=torch.Size([1, 16, 1800, 3]), input_points_f1=torch.Size([1, 16, 1800, 3]),l0_xyz_proj_f1=torch.Size([1, 4, 225, 3])
