In [73]:
#!pip install torch torchvision timm accelerate open3d scikit-learn chamferdist
# !pip install timm accelerate open3d scikit-learn chamferdist  opencv-python


In [77]:
from pathlib import Path
import random
import numpy as np
import cv2
import gc
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import timm


DATA_ROOT = Path("/dataset/") 
SAVE_DIR  = Path("./runs_rgb2point_output")
SAVE_DIR.mkdir(exist_ok=True, parents=True)


FRAMES_PER_SCENE = 10        # how many images we keep per scene
POINTS_N = 512               # cloud size during training
BATCH = 5                    # batch size
EPOCHS = 10                 

device = torch.device("cuda" if torch.cuda.is_available()
                      else "mps" if torch.backends.mps.is_available()
                      else "cpu")
print("running on", device)

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


running on cuda


<torch._C.Generator at 0x7ee59121c170>

In [78]:
#all pid
all_pids = sorted(p.name for p in DATA_ROOT.iterdir() if p.is_dir())
random.shuffle(all_pids)

n = len(all_pids)
train_pids = all_pids[: int(0.8 * n)]
val_pids   = all_pids[int(0.8 * n) : int(0.9 * n)]
test_pids  = all_pids[int(0.9 * n) :]

print(len(train_pids), "train scenes")
print(len(val_pids),   "val scenes")
print(len(test_pids),  "test scenes")


90 train scenes
11 val scenes
12 test scenes


In [79]:
def read_pfm(path):
    with open(path, "rb") as f:
        header = f.readline().decode().strip()
        assert header in ("Pf", "PF")
        width, height = map(int, f.readline().split())
        scale = float(f.readline())
        data = np.fromfile(f, "<f" if scale < 0 else ">f")
        return data.reshape(height, width)

def cam_txt_to_mats(path):
    tokens = open(path).read().split()
    extr = np.array(tokens[1:17], float).reshape(4, 4)
    intr = np.array(tokens[18:27], float).reshape(3, 3)
    return intr, extr

def depth_to_xyz_sparse(depth, intr, extr, k):
    h, w = depth.shape
    ys = np.random.randint(0, h, k)
    xs = np.random.randint(0, w, k)

    z = depth[ys, xs]
    x = (xs - intr[0, 2]) * z / intr[0, 0]
    y = (ys - intr[1, 2]) * z / intr[1, 1]

    cam = np.stack([x, y, z, np.ones_like(z)], -1)  # (k, 4)
    world = (extr @ cam.T).T[:, :3]
    return world


The network is a **two‑stage Transformer**:

1. **ViT‑Base backbone** (16 × 16 patches, 12 heads, 12 layers) turns a
      224 × 224 RGB image into a single 768‑D *CLS* token that encodes global
      appearance.
      The ViT weights are frozen so the model can be trained on a laptop in minutes.

2. A lightweight **point‑cloud head**:
      \* a 1‑layer multi‑head self‑attention (4 heads, 1024 D) that lets the CLS
        token reason about a learnable “point template”;
      \* a 2‑layer MLP that maps the attended features to
        `N × 3` coordinates (we use **N = 256** or **512**).


 **RGB2Point** (Lee & Benes, *WACV 2025*) 





# # Data Loader for BlendedMVS Dataset
The BlendedMVS Dataset is a comprehensive multi-view stereo dataset designed for benchmarking dense 3D reconstruction algorithms. It provides high-quality, large-scale 3D models with multi-view images, camera parameters, and ground-truth depth ma

In [80]:
class BlendedSubset(Dataset):
    def __init__(self, scene_list, frames_per_scene, sample_n):
        self.frames = []
        for pid in scene_list:
            imgs = [j for j in (DATA_ROOT / pid / "blended_images").glob("*.jpg")
                    if "_masked" not in j.stem]
            imgs = imgs[: frames_per_scene]
            for j in imgs:
                self.frames.append((pid, j.stem))

        self.sample_n = sample_n
        self.tfm = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406),
                                 (0.229, 0.224, 0.225))
        ])

    def __len__(self):
        return len(self.frames)

    def __getitem__(self, idx):
        pid, fid = self.frames[idx]
        base = DATA_ROOT / pid

        img_path = base / "blended_images" / f"{fid}.jpg"
        img = cv2.imread(str(img_path))[..., ::-1]

        depth_path = base / "rendered_depth_maps" / f"{fid}.pfm"
        depth = read_pfm(depth_path)

        intr, extr = cam_txt_to_mats(base / "cams" / f"{fid}_cam.txt")
        xyz = depth_to_xyz_sparse(depth, intr, extr, self.sample_n)

        img_t = self.tfm(img).unsqueeze(0)
        xyz_t = torch.from_numpy(xyz).float()
        return img_t, xyz_t


In [81]:
train_set = BlendedSubset(train_pids, FRAMES_PER_SCENE, POINTS_N)
val_set   = BlendedSubset(val_pids,   FRAMES_PER_SCENE, POINTS_N * 2)

train_loader = DataLoader(
    train_set,
    batch_size=BATCH,
    shuffle=True,
    num_workers=0,
    drop_last=True
)

val_loader = DataLoader(
    val_set,
    batch_size=1,
    shuffle=False,
    num_workers=0
)

print(len(train_set), "train frames")
print(len(val_set),   "val frames")


900 train frames
110 val frames


In [82]:
class PointCloudHead(nn.Module):
    def __init__(self, feat_dim, pc_size):
        super().__init__()
        self.attn = nn.MultiheadAttention(
            embed_dim=feat_dim,
            num_heads=4,
            batch_first=True
        )
        self.mlp = nn.Sequential(
            nn.Linear(feat_dim, 2048),
            nn.GELU(),
            nn.Linear(2048, pc_size * 3)
        )
        self.pc_size = pc_size

    def forward(self, x):
        x, _ = self.attn(x, x, x)
        x = self.mlp(x.flatten(start_dim=1))
        return x.view(-1, self.pc_size, 3)

class PointCloudNet(nn.Module):
    def __init__(self, pc_size):
        super().__init__()
        self.backbone = timm.create_model(
            "vit_base_patch16_224",
            pretrained=True,
            num_classes=0
        )
        for p in self.backbone.parameters():
            p.requires_grad = False

        self.reduce = nn.Linear(self.backbone.num_features, 1024)
        self.head = PointCloudHead(1024, pc_size)

    def forward(self, x):
        b, v, c, h, w = x.shape
        feats = self.backbone(x.view(b * v, c, h, w))
        feats = feats.view(b, v, -1).mean(dim=1)
        feats = self.reduce(feats).unsqueeze(1)
        return self.head(feats)


In [83]:
def chamfer_cdist(p, q):
    d = torch.cdist(p, q)                           
    return (d.min(2)[0].mean(1) + d.min(1)[0].mean(1)).mean()
    
def normalise(pc):
    centre = pc.mean(dim=1, keepdim=True)
    pc = pc - centre
    scale = pc.abs().max(dim=1, keepdim=True)[0]
    return pc / (scale + 1e-8)
    
def fscore(p, q, tau=0.01):
    d = torch.cdist(p, q)
    prec = (d.min(2)[0] < tau).float().mean(1)
    rec  = (d.min(1)[0] < tau).float().mean(1)
    f = 2 * prec * rec / (prec + rec + 1e-8)
    return f.mean().item()


In [84]:
model = PointCloudNet(pc_size=POINTS_N).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)

best_val_cd = float("inf")

for epoch in range(1, EPOCHS + 1):
    model.train()
    epoch_losses = []

    for img, gt in train_loader:
        img = img.to(device)
        gt = gt.to(device)

        pred = model(img)
     
        loss = chamfer_cdist(pred,gt)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_losses.append(loss.item())

    train_cd = np.mean(epoch_losses)
    print(f"Epoch {epoch}  |  train Chamfer {train_cd:.4f}")

    model.eval()
    cds = []
    fscores = []

    with torch.no_grad():
        for img, gt in val_loader:
            img = img.to(device)
            gt = gt.to(device)

            pred = model(img)
            cd_val = chamfer_cdist(pred, gt).item()
            cds.append(cd_val)

            fs_val = fscore(normalise(pred),normalise( gt))
            fscores.append(fs_val)

    val_cd = np.mean(cds)
    val_f = np.mean(fscores)
    print(f"val Chamfer {val_cd:.4f}   F‑score {val_f:.3f}")

    if val_cd < best_val_cd:
        best_val_cd = val_cd
        torch.save(model.state_dict(), SAVE_DIR / "best_rgb2point.pth")
        print("new best model saved")

    torch.cuda.empty_cache()
    gc.collect()

print("Training finished. Best validation Chamfer:", best_val_cd)


Epoch 1  |  train Chamfer 223.4820
val Chamfer 101.0465   F‑score 0.006
new best model saved
Epoch 2  |  train Chamfer 204.4432
val Chamfer 105.8543   F‑score 0.007
Epoch 3  |  train Chamfer 196.5042
val Chamfer 116.7266   F‑score 0.005
Epoch 4  |  train Chamfer 189.0235
val Chamfer 140.2498   F‑score 0.006
Epoch 5  |  train Chamfer 166.3376
val Chamfer 115.2909   F‑score 0.006
Epoch 6  |  train Chamfer 164.4245
val Chamfer 98.2706   F‑score 0.006
new best model saved
Epoch 7  |  train Chamfer 165.9970
val Chamfer 122.4682   F‑score 0.007
Epoch 8  |  train Chamfer 156.8859
val Chamfer 128.7129   F‑score 0.006
Epoch 9  |  train Chamfer 161.4080
val Chamfer 106.0725   F‑score 0.006
Epoch 10  |  train Chamfer 154.7619
val Chamfer 98.8186   F‑score 0.006
Training finished. Best validation Chamfer: 98.27057601755315


In [86]:
test_set = BlendedSubset(test_pids, FRAMES_PER_SCENE, POINTS_N * 2)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=0)

model.load_state_dict(torch.load(SAVE_DIR / "best_rgb2point.pth"))
model.eval()

cds = []
fscores = []

with torch.no_grad():
    for img, gt in test_loader:
        img = img.to(device)
        gt = gt.to(device)

        pred = model(img)
        cds.append(chamfer_cdist(pred, gt).item())
        fscores.append(fscore(normalise(pred), normalise(gt)))

print("Test Chamfer distance:", np.mean(cds))
print("Test F‑score:", np.mean(fscores))


Test Chamfer distance: 55.20543807347615
Test F‑score: 0.002478624888074895
