### Lab session: representing a digital surface model with a neural network

In [None]:
pipeline = """
[
    "LHD_FXX_0808_6485_PTS_C_LAMB93_IGN69.copc.laz",
    {
        "type": "filters.sort",
        "dimension": "Z"
    }
]
"""

import pdal
r = pdal.Pipeline(pipeline)
#r.validate()
r.execute()
arrays = r.arrays


In [None]:
import numpy as np

N = arrays[0].shape[0]
X = np.zeros(N,)
Y = np.zeros(N,)
Z = np.zeros(N,)
I = np.zeros(N,)

for n in range(N):
    pt = arrays[0][n]
    X[n] = pt[0]
    Y[n] = pt[1]
    Z[n] = pt[2]
    I[n] = pt[3]

print(len(X[::100]))

In [None]:
import matplotlib.pyplot as plt

fig = plt.figure(1)
ax = fig.add_subplot(projection='3d')
ax.scatter(X[::100], Y[::100], Z[::100], c=Z[::100], marker='o', s=1)

ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')

plt.show()

In [None]:
selx = (X>808305)*(X<808538)
sely = (Y>6484516)*(Y<6484626)
sel = selx*sely

I[I>10000] = 10000 #np.max(I[I!=np.max(I)])

In [None]:
fig = plt.figure(2)
ax = fig.add_subplot(projection='3d')
ax.scatter(X[sel], Y[sel], Z[sel], c=I[sel], marker='o', s=1, cmap='gray')

ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_aspect('equal')

plt.show()

In [None]:
Xsel = X[sel]
Ysel = Y[sel]
Zsel = Z[sel]
Isel = I[sel]
Nsel = Xsel.shape[0]

Nsmall = 50000
pts = np.rint(np.linspace(Nsel/(2*(Nsmall-1)),Nsel-Nsel/(2*(Nsmall-1)),num=Nsmall)+(np.random.rand(Nsmall)-.5)*Nsel/(Nsmall-1)).astype(np.int32)-1

Xsmall = Xsel[pts]
Ysmall = Ysel[pts]
Zsmall = Zsel[pts]
Ismall = Isel[pts]

In [None]:
Ismall[Ismall==np.max(Ismall)] = np.max(Ismall[Ismall!=np.max(Ismall)])

In [None]:
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure()
ax = fig.add_subplot(projection='3d')
# ax.scatter(Xsmall, Ysmall, Zsmall, c=Ismall.astype(np.float64), marker='o', s=1)
# ax.scatter(Xsmall, Ysmall, Zsmall, c=(Ismall.astype(np.float64)-np.min(Ismall))/(np.max(Ismall)-np.min(Ismall)), marker='o', s=1,cmap='gray')
ax.scatter(Xsmall, Ysmall, Zsmall, c=(Ismall.astype(np.float64))/np.max(Ismall), marker='o', s=1,cmap='gray')

ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_aspect('equal')

plt.show()

In [None]:
import torch
from torch.utils.data import TensorDataset, random_split, DataLoader

# F(x, y) = z

# STACKING INPUTS IN (N,2) MATRIX
inputs = np.stack([Xsmall, Ysmall], axis=1)

# SHAPING OUTPUT IN (N, 1)
targets = Zsmall.reshape(-1, 1)
# CONVERTING NUMPY ARRAYS TO TENSOR
i_tensor = torch.tensor(inputs, dtype=torch.float32)
o_tensor = torch.tensor(targets, dtype=torch.float32)

# COMPLETE DATASET
dataset = TensorDataset(i_tensor, o_tensor)

# SPLITTING IN DIFFERENT SIZES
N = len(dataset)
n_train = int(0.7*N)
n_val = int(0.15*N)
n_test = int(0.15*N)

# RANDOM ATTRIBUTION
train_set, val_set, test_set = random_split(dataset, [n_train, n_val, n_test], generator=torch.Generator().manual_seed(42))

print(f"Train : {len(train_set)}, Val : {len(val_set)}, Test : {len(test_set)}")

In [None]:
from torch import nn

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu" 

class NeuralNetwork(nn.Module) :
    def __init__(self, input_dim, output_dim, width, depth = 3, activation = 'relu') :
        super().__init__()

        self.activation = activation
        self.factivation = nn.ReLU() if (isinstance(activation, str) and activation == 'relu') else nn.Sigmoid()

        layers = []

        # INPUT LAYER
        layers.append(nn.Linear(input_dim, width))
        layers.append(self.factivation)

        # HIDDEN LAYERS
        for _ in range(depth - 1) :
            layers.append(nn.Linear(width, width))
            layers.append(self.factivation)

        # OUTPUT LAYER
        layers.append(nn.Linear(width, output_dim))
        
        self.net = nn.Sequential(*layers)

    def forward(self, x) :
        return self.net(x)

class CustomOptimizer():
    def __init__(self, params, lr) :
        self.params = list(params)
        super().__init__()

    def sgd(self) :
        for p in self.params :
            if p.grad is not None :
                p = torch.sub(p, self.lr * p.grad)
                # p.data -= learning_rate * p.grad.data
                p.grad.data.zero_()

    def zeroing_grad(self) :
        for p in self.params :
            if p.grad is not None :
                p.grad.data.zero_()

In [None]:
# HYPER PARAMETERS
learning_rate = 1e-3
batch_size = 64
epochs = 5

# LOSS FUNCTION
loss_fn = nn.MSELoss()

# MODEL INITIALIZATION
model = NeuralNetwork(2, 1, 64, 3, 'relu').to(device)
print(model)

# OUR CUSTOM OPTIMIZER FOR SGD
coptimizer = CustomOptimizer(model.parameters(), learning_rate)

Load datas

In [None]:
# DATALOADERS
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
val_loader = DataLoader(val_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64, shuffle=True)

In [None]:
def train(model, loss_fn, optimizer) :
    model.train()
    total_loss = 0

    # TRAINING LOOP
    for epoch in range(epochs) :
        for xi, yi in train_loader :
            pred = model(xi)
            loss = loss_fn(pred, yi)

            coptimizer.zeroing_grad()
            loss.backward()
            coptimizer.sgd()

            total_loss += loss.item() * len(xi)

        train_loss = total_loss / len(train_set)

        # VALIDATING LOOP
        model.eval()
        total_val_loss = 0

        with torch.no_grad() :
            for xi, yi in val_loader :
                pred = model(xi)
                loss = loss_fn(pred, yi)
                total_val_loss += loss.item() * len(xi)
        val_loss = total_val_loss / len(val_set)

        print(f"Epoch {epoch+1:02d}/{epochs} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}")
    print("\n---Done training---")

def test(model, loss_fn) :
    model.eval()
    total_test_loss, correct = 0

    # don't trace gradient here to avoid duplicates and gain precious computational ressources.
    # No need to compute sgd here, we only want to evaluate the predictions
    with torch.no_grad() :
        for xi, yi in test_loader :
            pred = model(xi)
            loss = loss_fn(pred, yi)
            total_test_loss += loss.item() * len(xi)
            correct += (pred.argmax(1) == yi).type(torch.float).sum().item()

    test_loss = total_test_loss / len(test_set)
    correct /= len(test_set)

    print(f"Test Loss: {test_loss:.6f}")
    print(f"Correct: {correct}")


In [None]:
train(model, loss_fn, coptimizer)
test(model, loss_fn)