In [34]:
import torch
import numpy as np
import torch.nn.functional as F
from pointcept.models.point_transformer_v3.point_transformer_v3m1_base import PointTransformerV3
from pointcept.datasets.transform import (
    GridSample, Compose, ToTensor, Collect, RandomRotateTargetAngle, TRANSFORMS
)
from pointcept.datasets.utils import collate_fn
from collections import OrderedDict
import pointcept.utils.comm as comm
from pointcept.models import build_model

In [35]:
def inference(points, weight_path):
    """
    Args:
        points: numpy array of shape (n,4) - x,y,z,intensity
        weight_path: path to model weights
    Returns:
        pred: predictions
        probs: confidence scores
    """
    # Create data dict 
    coord = points[:, :3]
    strength = points[:, -1].reshape([-1, 1])
    segment = np.zeros(points.shape[0]).astype(np.int32)
    data_dict = dict(coord=coord, strength=strength, segment=segment)

    # Define model config directly
    model_cfg = dict(
        type='DefaultSegmentorV2',
        num_classes=2,
        backbone_out_channels=64,
        backbone=dict(
            type='PT-v3m1',
            in_channels=4,
            order=['z', 'z-trans', 'hilbert', 'hilbert-trans'],
            stride=(2, 2, 2, 2),
            enc_depths=(2, 2, 2, 6, 2),
            enc_channels=(32, 64, 128, 256, 512),
            enc_num_head=(2, 4, 8, 16, 32),
            enc_patch_size=(64, 64, 64, 64, 64),
            dec_depths=(2, 2, 2, 2),
            dec_channels=(64, 64, 128, 256),
            dec_num_head=(4, 4, 8, 16),
            dec_patch_size=(64, 64, 64, 64),
            mlp_ratio=4,
            qkv_bias=True,
            qk_scale=None,
            attn_drop=0.0,
            proj_drop=0.0,
            drop_path=0.3,
            shuffle_orders=True,
            pre_norm=True,
            enable_rpe=False,
            enable_flash=False,
            upcast_attention=False,
            upcast_softmax=False,
            cls_mode=False,
            pdnorm_bn=False,
            pdnorm_ln=False,
            pdnorm_decouple=True,
            pdnorm_adaptive=False,
            pdnorm_affine=True,
            pdnorm_conditions=('nuScenes', 'SemanticKITTI', 'Waymo'))
    )

    # Initialize model and load weights
    model = build_model(model_cfg)
    if torch.cuda.is_available():
        model = model.cuda()
        checkpoint = torch.load(weight_path)
    else:
        checkpoint = torch.load(weight_path, map_location='cpu')
        
    # Load state dict
    weight = OrderedDict()
    for key, value in checkpoint["state_dict"].items():
        if key.startswith("module."):
            key = key[7:]
        weight[key] = value
    model.load_state_dict(weight, strict=True)
    model.eval()

    # Define and apply transforms
    transform = Compose([
        dict(type='Copy', keys_dict=dict(segment='origin_segment')),
        dict(
            type='GridSample',
            grid_size=0.025,
            hash_type='fnv',
            mode='train',
            keys=('coord', 'strength', 'segment'),
            return_inverse=True)
    ])
    data_dict = transform(data_dict)
    inverse = data_dict.pop("inverse")

    # Apply voxelize transform
    voxelize = TRANSFORMS.build(dict(
        type='GridSample',
        grid_size=0.05,
        hash_type='fnv',
        mode='test',
        return_grid_coord=True,
        keys=('coord', 'strength')))
    data_part_list = voxelize(data_dict)

    # Apply post transform
    post_transform = Compose([
        dict(type='ToTensor'),
        dict(
            type='Collect',
            keys=('coord', 'grid_coord', 'index'),
            feat_keys=('coord', 'strength'))
    ])
    
    fragment_list = []
    for data_part in data_part_list:
        fragment_list.append(post_transform(data_part))

    # Do inference
    pred = torch.zeros((points.shape[0], 2))  # num_classes = 2
    if torch.cuda.is_available():
        pred = pred.cuda()

    for fragment in fragment_list:
        idx_part = fragment["index"]
        
        input_dict = {}
        for key, value in fragment.items():
            if isinstance(value, torch.Tensor):
                input_dict[key] = value.cuda() if torch.cuda.is_available() else value
            else:
                input_dict[key] = value

        with torch.no_grad():
            pred_part = model(input_dict)["seg_logits"]
            pred_part = F.softmax(pred_part, -1)
            
            bs = 0
            for be in input_dict["offset"]:
                pred[idx_part[bs:be], :] += pred_part[bs:be]
                bs = be
        
        torch.cuda.empty_cache()

    # Get final predictions and probabilities 
    probs = pred.max(1)[0].cpu().numpy()
    pred = pred.max(1)[1].cpu().numpy()

    # Map back to original points
    pred = pred[inverse]
    probs = probs[inverse]

    return pred, probs



In [36]:
path='/home/yan/pointcept151/dataset/sequences/00/velodyne/000200.bin'
#read the point cloud data
def read_bin(path):
    pc = np.fromfile(path, dtype=np.float32).reshape(-1, 4)
    return pc
test_points = read_bin(path)

In [None]:


points = np.random.rand(1000, 4)
# Load pointcloud
# points = np.fromfile(points_in, dtype=np.float32).reshape(-1, 4)

weight_path="/home/yan/pointcept151/exp/nuscenes/train_highbay_02/model/model_best.pth"
# Run inference
pred, probs = inference(test_points, weight_path)


In [39]:
np.unique(pred,return_counts=True)
np.save('pred.npy',pred)