# Point cloud extraction pipeline

This pipeline takes a frame and segmentation mask and derives a point cloud using depth detection model

### Imports

In [1]:
import cv2
import torch
import numpy as np
import open3d as o3d
from pathlib import Path
from accelerate.test_utils.testing import get_backend
from depth_anything_v2.metric_depth.depth_anything_v2.dpt import DepthAnythingV2
from probreg import cpd
import copy
from ultralytics import YOLO

model_path = Path()/'best_1105.pt'
modelY = YOLO(model_path)

object_path = Path()/'object_withcam.ply'
fin_path = Path()

H, W = 4.37, 7.78 #image sensor sizes in mm
focal = 25.0 

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


xFormers not available
xFormers not available


### Camera matrix

In [2]:
def get_xforms(image):
    h, w, _ = image.shape
    fx, fy = focal * w/W, focal * h/H

    xform = np.array([[fx, 0, 0],[0, fy, 0],[0, 0, 1]], dtype=np.float32)
    inv_xform = np.linalg.inv(xform)
    return xform, inv_xform

### Mask creating

In [3]:
def process_image(image):
    h_im, w_im = image.shape[:2]
    results = modelY(image)[0]
    
    # getting orig image and segment
    image = results.orig_img
    mask = results.masks.data.cpu().numpy()[0]
    
    mask_resized = cv2.resize(mask, (w_im, h_im)) 
        
    x, y, w, h = cv2.boundingRect(mask_resized.astype(np.uint8) )
    bbox = x, y, w, h
    
    return bbox, image, w_im, h_im, mask_resized

### Depth detection

In [4]:
model_configs = {
    'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
    'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
    'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}
}

encoder = 'vitb' # or 'vitl', 'vits'
dataset = 'hypersim' # 'hypersim' for indoor model, 'vkitti' for outdoor model
max_depth = 1.5 # 20 for indoor model, 80 for outdoor model

device, _, _ = get_backend()

modelD = DepthAnythingV2(**{**model_configs[encoder], 'max_depth': max_depth})
modelD.load_state_dict(torch.load(f'checkpoints/depth_anything_v2_metric_{dataset}_{encoder}.pth', map_location='cpu'))
modelD.to(device).eval()

DepthAnythingV2(
  (pretrained): DinoVisionTransformer(
    (patch_embed): PatchEmbed(
      (proj): Conv2d(3, 768, kernel_size=(14, 14), stride=(14, 14))
      (norm): Identity()
    )
    (blocks): ModuleList(
      (0-11): 12 x NestedTensorBlock(
        (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (attn): MemEffAttention(
          (qkv): Linear(in_features=768, out_features=2304, bias=True)
          (attn_drop): Dropout(p=0.0, inplace=False)
          (proj): Linear(in_features=768, out_features=768, bias=True)
          (proj_drop): Dropout(p=0.0, inplace=False)
        )
        (ls1): LayerScale()
        (drop_path1): Identity()
        (norm2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (mlp): Mlp(
          (fc1): Linear(in_features=768, out_features=3072, bias=True)
          (act): GELU(approximate='none')
          (fc2): Linear(in_features=3072, out_features=768, bias=True)
          (drop): Dropout(p=0.0, inplace=False)
    

### Point cloud extraction 

In [None]:
def extract_point_cloud(model, image, mask, bbox,  inv_xform):
        
    x_bbox, y_bbox, w_bbox, h_bbox = bbox
    depth = model.infer_image(image)
    vertices = []

    y_ind = np.arange(y_bbox, y_bbox + h_bbox)
    x_ind = np.arange(x_bbox, x_bbox + w_bbox)
    
    j, i = np.meshgrid(x_ind, y_ind, indexing='ij')
    
    v = np.dot(inv_xform, np.vstack((j.ravel(), i.ravel(), np.ones_like(j.ravel()))))
    
    mask_filt = mask[i.ravel(), j.ravel()] > 0
    
    vertices = np.column_stack((v[0, mask_filt], v[1, mask_filt], depth[i.ravel(), j.ravel()][mask_filt]))
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(vertices)
    pcd.estimate_normals()
    pcd = pcd.normalize_normals()
    
    return pcd

### Non-rigid deformation

In [None]:
# load source and target point cloud
def NRD(object_path, pcd):
    
    mesh = o3d.io.read_point_cloud(object_path)

    source = mesh
    target = copy.deepcopy(pcd)
    
    # transform target point cloud
    th = np.deg2rad(180)
    source.transform(np.array([[np.cos(th), -np.sin(th), 0.0, 0.0],
                            [np.sin(th), np.cos(th), 0.0, 0.0],
                            [0.0, 0.0, 1.0, 0.0],
                            [0.0, 0.0, 0.0, 1.0]]))
    source = source.voxel_down_sample(voxel_size=0.002)
    target = target.voxel_down_sample(voxel_size=0.005)
    
    source_pt = np.asarray(source.points, dtype=np.float32)
    target_pt = np.asarray(target.points, dtype=np.float32)
    
    # compute acpd registration
    
    acpd = cpd.AffineCPD(source_pt)
    tf_param, _, _ = acpd.registration(target_pt)
    result_pt = tf_param.transform(source_pt)
    result = o3d.geometry.PointCloud()
    result.points = o3d.utility.Vector3dVector(result_pt)
   
    return copy.deepcopy(result)

### From point cloud to mesh

In [None]:
def mesh_create(result, image, xform ):
        
    point_cloud = copy.deepcopy(result)
    alpha = 0.2
    mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_alpha_shape(point_cloud, alpha)
    
    vertices = np.asarray(mesh.vertices)
    projected_2d_points = vertices[:, :3]
    
    edges = np.asarray(mesh.triangles)
    lines = []
    for triangle in edges:
        for i in range(3):
            lines.append([triangle[i], triangle[(i + 1) % 3]])

    for line in lines:
        pt1 = tuple((np.dot(xform, projected_2d_points[line[0]])[:2]).astype(int))
        pt2 = tuple((np.dot(xform, projected_2d_points[line[1]])[:2]).astype(int)) 
        cv2.line(image, pt1, pt2, (0, 128, 0), thickness=1)
    
    return image
    

In [8]:
# creation images from video
video_path = Path()/'video.avi'


cap = cv2.VideoCapture(video_path)

all_img = []
while True:
    ret, frame = cap.read()
    if not ret:
        break

    else:
        all_img.append(frame)

cap.release()

#segmentation, deformation, creation images with masks
fin_img = []
for img in all_img:

    bbox, image, w_im, h_im, mask = process_image(img)
        
    xform, inv_xform = get_xforms(image)
        
    pcd = extract_point_cloud(modelD, image, mask, bbox,  inv_xform)
        
    result = NRD(object_path, pcd)
        
    image_new = mesh_create(result, image, xform )
        
    fin_img.append(image_new)
        


0: 384x640 1 uterus, 70.4ms
Speed: 7.0ms preprocess, 70.4ms inference, 267.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 18.8ms
Speed: 4.0ms preprocess, 18.8ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 19.4ms
Speed: 2.0ms preprocess, 19.4ms inference, 2.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 23.5ms
Speed: 3.0ms preprocess, 23.5ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 21.1ms
Speed: 3.6ms preprocess, 21.1ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 18.8ms
Speed: 2.1ms preprocess, 18.8ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 19.8ms
Speed: 2.6ms preprocess, 19.8ms inference, 4.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 uterus, 17.3ms
Speed: 2.4ms preprocess, 17.3ms inference, 2.8ms postprocess per image at shape (1, 3, 

### Video

In [9]:
image_folder = fin_path
video_name = 'output_video.avi'

height, width, _ = image_new.shape
video = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'DIVX'), 30, (width, height))

# add frames
for image in fin_img:
    video.write(image)

video.release()
print(f"Видео сохранено как {video_name}")

Видео сохранено как output_video.avi
