In [10]:
import torch
from torch import nn
from torch.autograd import Variable

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import open3d as o3d
from pyntcloud import PyntCloud
import glob
import plyfile as ply
from tqdm import tqdm
import pandas as pd

from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.utils import save_image

In [11]:

class ThreeD_conv(nn.Module):
    def enc_linear (self,flatten_size):
        # print ('flatten_size', flatten_size)
        return nn.Sequential(
            nn.Linear(flatten_size, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, self.code_size),
        )

    def dec_linear (self, flatten_size):
        return nn.Sequential(
            nn.ReLU(inplace=True),
            nn.Linear(self.code_size, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, flatten_size),
        )

    def unflatten (self, unflattened_size):
        return nn.Unflatten(dim=1, unflattened_size=unflattened_size)

    def __init__(self, code_size=100):
        super(ThreeD_conv, self).__init__()

        self.code_size = code_size
        self.encoder = nn.Sequential(
            nn.Conv3d(in_channels=1, out_channels=16, kernel_size=4, padding=1, bias=False),
            nn.BatchNorm3d(16),
            nn.ReLU(inplace=True),
            nn.Conv3d(in_channels=16, out_channels=32, kernel_size=4, padding=1, bias=False),
            nn.BatchNorm3d(32),            
            nn.ReLU(inplace=True),
            nn.Conv3d(in_channels=32, out_channels=64, kernel_size=4, padding=1, bias=False),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True),
        )
        self.flatten = nn.Flatten(start_dim=1)
        
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose3d(in_channels=64, out_channels=32, kernel_size=4, padding=1, bias=False),
            nn.BatchNorm3d(32),
            nn.ReLU(inplace=True),
            nn.ConvTranspose3d(in_channels=32, out_channels=16, kernel_size=4, padding=1, bias=False),
            nn.BatchNorm3d(16),
            nn.ReLU(inplace=True),
            nn.ConvTranspose3d(in_channels=16, out_channels=1, kernel_size=4, padding=1, bias=False),
            nn.Sigmoid(),
        )

    def forward(self, input):
        x = self.encoder(input)
        unflattened_size = x.shape[1:]
        x = self.flatten(x)
        flattened_size = x.shape[1]
        x = self.enc_linear(flattened_size)(x)
        encoded_data = x
        x = self.dec_linear(flattened_size)(x)
        x = self.unflatten(unflattened_size)(x)
        x = self.decoder(x)
        return encoded_data, x

        

In [12]:
import plyfile 
class LidarDataset (Dataset):
    def __init__(self, all_files, transform=None):
        """2DRepresentation

        Args:
            folder_pattern (string): Path to the folder with all the images. regex pattern
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
    
        self.transform = transform
        # Read the file and get all the lines
        self.data = [] # list of images

        for file in all_files:
            img = plyfile.PlyData.read(file)            
            img = np.column_stack((img['vertex']['x'],img['vertex']['y'],img['vertex']['z']))
            
            # remove last column, which is the label
            img = img[:,0:3]
            self.data.append(img)
            
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        if self.transform:
            img = self.transform(img)
        return img


In [13]:
import open3d as o3d
def transform(point_cloud, image_size=(128, 128, 80)):

    voxel_size = 2
    #resha
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud)
    voxels = o3d.geometry.VoxelGrid.create_from_point_cloud(pcd, voxel_size=voxel_size)
    
    # get torch voxels
    voxels = voxels.get_voxels() 

    v = np.array([])
    for voxel in voxels:
        v = np.append(v, voxel.grid_index)
    v = v.reshape(-1, 3)

    data = np.zeros(image_size, dtype=np.int32)
    # center the voxel in order to have the center in the middle of the image
    minx = np.min(v[:,0])
    miny = np.min(v[:,1])
    minz = np.min(v[:,2])

    v[:,0] = v[:,0] - minx
    v[:,1] = v[:,1] - miny
    v[:,2] = v[:,2] - minz

    for voxel in v:
        try:
            data[int(voxel[0]), int(voxel[1]), int(voxel[2])] = 1
        except:
            pass
    
    data = torch.from_numpy(data).double()
    data = data.unsqueeze(0)

    return data

try:
    from google.colab import drive
    drive.mount('/content/drive')
    file_path = "/content/drive/My Drive/3DAR/dataset_final/dataset_final/*.ply"
except:
    file_path = "dataset-downloader-kit/CV/dataset/Town01_Opt_ClearSunset/dataset_final/*.ply"


train_test_split = 0.8

all_files = glob.glob(file_path)[:50]
all_files.sort()
print ("Total number of files: ", len(all_files))

train_files = all_files[:int(len(all_files)*train_test_split)]
test_files = all_files[int(len(all_files)*train_test_split):]

batch_size = 6
train_dataset = LidarDataset(train_files, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size)

test_dataset = LidarDataset(test_files, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)


Total number of files:  50


In [14]:
### Training function
def train_epoch(autoencoder, device, dataloader, loss_fn, optimizer):
    # Set train mode for both the encoder and the decoder
    autoencoder.train()
    autoencoder = autoencoder.float()
    losses = []

    # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
    for image_batch in dataloader: 
        image_batch = image_batch.to(device)
        print ('image_batch.shape', image_batch.shape)
        encoded_data, decoded_data = autoencoder(image_batch.float())
        # convert to float
        decoded_data = decoded_data.float()
        image_batch = image_batch.float()

        loss = loss_fn(decoded_data, image_batch)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        losses.append(loss.detach().cpu().numpy())
    losses = np.mean(losses)
    return losses

### Testing function
def test_epoch(autoencoder, device, dataloader, loss_fn):
    # Set evaluation mode for encoder and decoder
    autoencoder.train()
    autoencoder = autoencoder.float()
    with torch.no_grad(): # No need to track the gradients
        # Define the lists to store the outputs for each batch
        conc_out = []
        conc_label = []
        for image_batch in dataloader:
            # Move tensor to the proper device
            image_batch = image_batch.to(device)
            # Encode data
            encoded_data, decoded_data = autoencoder(image_batch.float())
            
            # Append the network output and the original image to the lists
            conc_out.append(decoded_data.cpu())
            conc_label.append(image_batch.cpu())
        # Create a single tensor with all the values in the lists
        conc_out = torch.cat(conc_out)
        conc_label = torch.cat(conc_label) 
        # Evaluate global loss
        val_loss = loss_fn(conc_out, conc_label)
    return val_loss.data


In [15]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Define the loss function
loss_fn = torch.nn.MSELoss()
AE = ThreeD_conv(100)

### Define an optimizer (both for the encoder and the decoder!)
lr = 0.005
params_to_optimize = [
    {'params': AE.parameters(), 'lr': lr}
]
optim = torch.optim.Adam(params_to_optimize, lr=lr)

# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
AE.to(device)



ThreeD_conv(
  (encoder): Sequential(
    (0): Conv3d(1, 16, kernel_size=(4, 4, 4), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
    (1): BatchNorm3d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv3d(16, 32, kernel_size=(4, 4, 4), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
    (4): BatchNorm3d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): Conv3d(32, 64, kernel_size=(4, 4, 4), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
    (7): BatchNorm3d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (decoder): Sequential(
    (0): ConvTranspose3d(64, 32, kernel_size=(4, 4, 4), stride=(1, 1, 1), padding=(1, 1, 1), bias=False)
    (1): BatchNorm3d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose3d(32

In [16]:
### Training cycle
num_epochs = 30
for epoch in range(num_epochs):
    print('EPOCH %d/%d' % (epoch + 1, num_epochs))
    ### Training (use the training function)
    train_loss = train_epoch(
        autoencoder=AE,
        device=device, 
        dataloader=train_dataloader, 
        loss_fn=loss_fn, 
        optimizer=optim)
    print(f'TRAIN - EPOCH {epoch+1}/{num_epochs} - loss: {train_loss}')

    ### Validation  (use the testing function)
    val_loss = test_epoch(
        autoencoder=AE,
        device=device, 
        dataloader=test_dataloader, 
        loss_fn=loss_fn)
    # Print Validationloss
    print(f'VALIDATION - EPOCH {epoch+1}/{num_epochs} - loss: {val_loss}\n')


    # Save network parameters
    torch.save(AE.state_dict(), 'AE_params.pth')


EPOCH 1/30
image_batch.shape torch.Size([6, 1, 128, 128, 80])


: 

: 