In [None]:
# Author : leekyoj@kaist.ac.kr

# --- 1) Download data ---
!gdown --folder https://drive.google.com/drive/folders/1G_Nc1YgwjXEAV92XX2onVTOQmgWDrnZd?usp=sharing
!unzip processed_data/data_96.zip -d ./data_96

In [None]:
import os
import sys
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from tqdm import tqdm
import matplotlib.pyplot as plt
import time
import torch.optim as optim

In [None]:
# --- 2) Data Loading ---

SAVE_DIR = './data_96'
TRAINED_DIR = './trained_models'
OUTPUT_DIR = './output'

for d in [SAVE_DIR, TRAINED_DIR, OUTPUT_DIR]:
    os.makedirs(d, exist_ok=True)
    print(f"Created or exists: {d}")

def load_dataset(prefix):
    x1 = np.load(os.path.join(SAVE_DIR, f"{prefix}_x1.npy"), mmap_mode='r')
    x1 = np.array(x1, copy=True)
    x2 = np.load(os.path.join(SAVE_DIR, f"{prefix}_x2.npy"), mmap_mode='r')
    x2 = np.array(x2, copy=True)
    y  = np.load(os.path.join(SAVE_DIR, f"{prefix}_y.npy"),  mmap_mode='r')
    y = np.array(y, copy=True)

    x1 = torch.from_numpy(x1).permute(0,3,1,2).float()
    x2 = torch.from_numpy(x2).permute(0,3,1,2).float()
    y  = torch.from_numpy(y).float()
    print(f"Loaded {prefix}: x1 {x1.shape}, x2 {x2.shape}, y {y.shape}")
    return x1, x2, y

x_train1, x_train2, train_y = load_dataset('train')
x_val1,   x_val2,   val_y   = load_dataset('val')
x_test1,  x_test2,  test_y  = load_dataset('test')

train_loader = DataLoader(TensorDataset(x_train1, x_train2, train_y),batch_size=1024,  sampler=RandomSampler(x_train1), pin_memory=True)
val_loader   = DataLoader(TensorDataset(x_val1,   x_val2,   val_y  ),batch_size=1024,  sampler=SequentialSampler(x_val1),pin_memory=True)
test_loader  = DataLoader(TensorDataset(x_test1,  x_test2,  test_y ),batch_size=1024,  sampler=SequentialSampler(x_test1), pin_memory=True)

In [None]:
# --- 3) Check image and drag coefficient ---
import matplotlib.pyplot as plt

def show_image_with_label(x, y, idx, title):
    img = x[idx].permute(1,2,0).numpy()   # (H, W, C)
    plt.figure(figsize=(4,4))
    plt.imshow(img)
    plt.axis('off')
    plt.title(f"{title} #{idx}\nLabel: {y[idx].item():.4f}")
    plt.show()

idx = 15
print("▶️ Train set:")
show_image_with_label(x_train1, train_y, idx, "Train Depth")
show_image_with_label(x_train2, train_y, idx, "Train Normal")

print("\n▶️ Validation set:")
show_image_with_label(x_val1, val_y, idx, "Val Depth")
show_image_with_label(x_val2, val_y, idx, "Val Normal")

print("\n▶️ Test set:")
show_image_with_label(x_test1, test_y, idx, "Test Depth")
show_image_with_label(x_test2, test_y, idx, "Test Normal")

In [None]:
# --- 4) Define CNN model ---
class FusionNet_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.dropout_rate = 0.5

        self.feature_extractor = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(self.dropout_rate),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(self.dropout_rate),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(self.dropout_rate),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten()
        )

        self.regressor = nn.Sequential(
            nn.Linear(512 * 2, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),

            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),

            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.3),

            nn.Linear(256, 1)
        )

    def forward(self, x1, x2):
        f1 = self.feature_extractor(x1)
        f2 = self.feature_extractor(x2)

        combined = torch.cat([f1, f2], dim=1)

        return self.regressor(combined)


In [None]:
# --- 5) Setup train hyperparameter ---

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("GPU device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU")
model = FusionNet_CNN().to(device)

from torchsummary import summary
summary(model, input_size=[(3, 96, 96), (3, 96, 96)])

lr = 5e-3
epochs = 10
batch_size = 1024

optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()


In [None]:
# --- 6) Train and evaluation function ---
def train_one_epoch(loader):
    model.train()
    total_loss = 0.0
    start_time = time.time()
    for i, (x1, x2, y) in enumerate(loader, 1):
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)

        optimizer.zero_grad()
        out = model(x1, x2)
        loss = criterion(out.squeeze(), y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        if i % 5 == 0:
            print(f'  Batch {i}/{len(loader)}  loss={loss.item():.4f}', end='\r')

    elapsed = time.time() - start_time
    print(f' Epoch train time: {elapsed:.2f}s')
    return total_loss / len(loader)


def evaluate(loader):
    model.eval()
    total_loss = 0.0
    preds = []
    with torch.no_grad():
        for x1, x2, y in loader:
            x1, x2, y = x1.to(device), x2.to(device), y.to(device)

            out = model(x1, x2)
            loss = criterion(out.squeeze(), y)

            total_loss += loss.item()
            preds.append(out.cpu().numpy())

    return total_loss / len(loader), np.concatenate(preds)


In [None]:
# --- Training loop ---
best_val=float('inf')
save_path=os.path.join(TRAINED_DIR,'pure_cnn.pt')
print('\n--- Starting Training ---')
for e in range(1,epochs+1):
    print(f'Epoch {e}/{epochs}')
    tr_loss = train_one_epoch(train_loader)
    val_loss, _ = evaluate(val_loader)
    print(f' - train={tr_loss:.4f}, val={val_loss:.4f}')

    if val_loss < best_val:
        best_val = val_loss
        state_dict = model.state_dict()
        torch.save(state_dict, save_path)
        print(f"  Improved, model saved to {save_path}")

print('--- Training Finished ---')

In [None]:
print('\n--- Evaluating Test Set ---')
if os.path.exists(save_path):
    state = torch.load(save_path)
    model.load_state_dict(state)
else:
    print('Warning: no checkpoint')

loss_test, preds = evaluate(test_loader)
costs = test_y.cpu().numpy()
predv = preds.squeeze()
r2 = float('nan')
try:
    r2 = np.corrcoef(costs, predv)[0,1]**2
except:
    pass
mse = float(((predv-costs)**2).mean())
print(f'Test MSE={mse:.4f}, R2={r2:.4f}')

# Visualize of training result
plot = os.path.join(OUTPUT_DIR, f'improved_pure_cnn_R2_{r2:.4f}_MSE_{mse:.4f}.png')
plt.figure(figsize=(5,5))
plt.scatter(costs, predv, s=10)
mn, mx = costs.min(), costs.max()
plt.plot([mn, mx], [mn, mx], '--k')
plt.gca().set_aspect('equal')
plt.title('Ground Truth vs Prediction')
plt.xlabel('GT')
plt.ylabel('Pred')
plt.savefig(plot)
plt.show()
print(f'Plot saved to {plot}')
print('--- Done ---')


In [None]:
from torchvision.models import resnet18
from torchvision.models.resnet import ResNet18_Weights

class FusionNet_resnet(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        weights = ResNet18_Weights.IMAGENET1K_V1 if pretrained else None

        # Backbone 1
        self.backbone1 = resnet18(weights=weights).to(device)
        num_ftrs1 = self.backbone1.fc.in_features
        self.backbone1.fc = nn.Identity()

        # Backbone 2
        self.backbone2 = resnet18(weights=weights).to(device)
        num_ftrs2 = self.backbone2.fc.in_features
        self.backbone2.fc = nn.Identity()

        # Regression head
        self.regressor = nn.Sequential(
            nn.Linear(num_ftrs1 + num_ftrs2, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(256, 1)
        )

    def forward(self, x1, x2):
        f1 = self.backbone1(x1)
        f2 = self.backbone2(x2)
        combined = torch.cat([f1, f2], dim=1)
        return self.regressor(combined)

In [None]:
# --- 5) Setup train hyperparameter ---

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("GPU device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU")
model = FusionNet_resnet().to(device)

from torchsummary import summary
summary(model, input_size=[(3, 96, 96), (3, 96, 96)])

lr = 5e-4
epochs = 10
batch_size = 1024

optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

# --- Training loop ---
best_val=float('inf')
save_path=os.path.join(TRAINED_DIR,'resnet18.pt')
print('\n--- Starting Training ---')
for e in range(1,epochs+1):
    print(f'Epoch {e}/{epochs}')
    tr_loss = train_one_epoch(train_loader)
    val_loss, _ = evaluate(val_loader)
    print(f' - train={tr_loss:.4f}, val={val_loss:.4f}')

    if val_loss < best_val:
        best_val = val_loss
        state_dict = model.state_dict()
        torch.save(state_dict, save_path)
        print(f"  Improved, model saved to {save_path}")

print('--- Training Finished ---')

In [None]:
print('\n--- Evaluating Test Set ---')
if os.path.exists(save_path):
    state = torch.load(save_path)
    model.load_state_dict(state)
else:
    print('Warning: no checkpoint')

loss_test, preds = evaluate(test_loader)
costs = test_y.cpu().numpy()
predv = preds.squeeze()
r2 = float('nan')
try:
    r2 = np.corrcoef(costs, predv)[0,1]**2
except:
    pass
mse = float(((predv-costs)**2).mean())
print(f'Test MSE={mse:.4f}, R2={r2:.4f}')

# Visualize of training result
plot = os.path.join(OUTPUT_DIR, f'resnet18_R2_{r2:.4f}_MSE_{mse:.4f}.png')
plt.figure(figsize=(5,5))
plt.scatter(costs, predv, s=10)
mn, mx = costs.min(), costs.max()
plt.plot([mn, mx], [mn, mx], '--k')
plt.gca().set_aspect('equal')
plt.title('Ground Truth vs Prediction')
plt.xlabel('GT')
plt.ylabel('Pred')
plt.savefig(plot)
plt.show()
print(f'Plot saved to {plot}')
print('--- Done ---')


In [None]:
# Inference pretrained-resnet
# https://colab.research.google.com/drive/1jWA2GOFCZIOH5SPooDTwAiig_-MYnWA9?usp=sharing