In [None]:
import numpy as np
import os, glob
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,regularizers, losses
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Conv3D, Flatten, BatchNormalization, MaxPooling3D
from tensorflow.keras.layers import concatenate, Reshape, UpSampling3D
import random
import open3d as o3d
from scipy.io import loadmat
import scipy.io
# import pyvista as pv
# from sklearn.model_selection import train_test_split



In [None]:
# Class for binary crossentropy custom loss function

class CustomLoss_BCE(tf.keras.losses.Loss):
    def __init__(self):
        super().__init__()
    def call(self, y_true, y_pred):
        
        _epsilon = tf.convert_to_tensor(0.000001, y_pred.dtype.base_dtype)
        y_pred = tf.clip_by_value(y_pred, _epsilon, 1 - _epsilon)

        s1 = tf.math.subtract(1.0, y_true)
        s2 = tf.math.subtract(1.0, y_pred)
        
        a = tf.math.multiply_no_nan(tf.math.log(y_pred), y_true)
        b = tf.math.multiply_no_nan(tf.math.log(s2), s1)
        out = -(20*a + b)
        return out


In [None]:
# #Input data

# folder_path = ''
# c=0
# files_input1 = []
# for filename in glob.glob(os.path.join(folder_path, "pCIR_area2_rratio_lim.mat")):
#     files_input1.append(filename)

# print(len(files_input1))
# print(files_input1[0])

In [None]:
#Assumptions of minimum and maximum value for x,y and z for generating the voxel grid 

min_bound=[-16, -16, -2]
max_bound=[16,16,2]
voxel_size=0.25

x_lim = int((max_bound[0] - min_bound[0])/voxel_size);
y_lim = int((max_bound[1] - min_bound[1])/voxel_size);
z_lim = int((max_bound[2] - min_bound[2])/voxel_size);

# print(x_lim, y_lim, z_lim)

In [None]:
# Generating the input data in the required format for testing similar format is used for ML model training

if(os.path.exists("X_all_area2.npy")):
    Xp_all_area2 = np.load("X_all_area2.npy")
else:
    import mat73
    filename_inp = files_input1[0]
    with open(filename_inp, 'r') as f:
        rf_signal = mat73.loadmat(f.name)['Pwr_RTP']
        Xp_all_area2= rf_signal
    np.save("X_all_area2", Xp_all_area2)

In [None]:
# Function for converting x, y and z values to our assumed bounds of voxel grid coordinates

def convert2(x, y, z):

    x_val = (x - min_bound[0])/voxel_size
    y_val = (y - min_bound[1])/voxel_size
    z_val = (z - min_bound[2])/voxel_size
    
    return int(np.floor(x_val)), int(np.floor(y_val)), int(np.floor(z_val))

In [None]:
def generate_pcd(arr, threshold):
    c1 = 0
    pcd_points = np.zeros((x_lim*y_lim*z_lim,3))
    for i in range(x_lim):
        for j in range(y_lim):
            for k in range(z_lim):
                if(arr[i,j,k] > threshold):
                    X = -16+(i)*voxel_size + voxel_size/2
                    Y = -16+(j)*voxel_size + voxel_size/2
                    Z = -2+(k)*voxel_size + voxel_size/2 
                    pcd_points[c1,0] = X
                    pcd_points[c1,1] = Y
                    pcd_points[c1,2] = Z
                    c1=c1+1
    pcl = o3d.geometry.PointCloud()
    pcl.points = o3d.utility.Vector3dVector(pcd_points[:c1, :])
    return pcl

In [None]:
#Testing via prediction output modification

import math

def out_modification(arr):
    c = 3*(10**8)
    sample_dist = (1/1.76)*c*(10**(-9))
    output = np.zeros((128, 128, 16))
    
    for theta in range(0, 180, 2):
        for phi in range(-180, 180):
            max_val = -100
            ind_x = -1
            ind_y = -1
            ind_z = -1
            
            for i in range(93):
                r = i*sample_dist
                xx = r*math.sin(math.radians(theta))*math.cos(math.radians(phi))
                yy = r*math.sin(math.radians(theta))*math.sin(math.radians(phi))
                zz = r*math.cos(math.radians(theta))
    
                [x, y, z] = convert2(xx, yy, zz)
    
                if(x > -1 and x < x_lim and y > -1 and  y < y_lim and z > -1 and  z < z_lim and output[x,y,z]!=1):
                    if(max_val < arr[x,y,z]):
                        max_val = arr[x,y,z]
                        ind_x = x
                        ind_y = y
                        ind_z = z
                
            output[ind_x, ind_y, ind_z] = 1
#             print(ind_x,ind_y, ind_z)
    return output

In [None]:
# Normalizing the testing data

X_test = Xp_all_area2

for i in range(X_test.shape[0]):
    min_val = np.min(X_test[i,:,:,:100])           
    max_val = np.max(X_test[i,:,:,:100])
    X_test[i,:,:,:100] = (X_test[i,:,:,:100] - min_val)/(max_val - min_val)
    

X_test=np.reshape(X_test[:,:,:,:100],(-1,6400))

In [None]:
#NN model-1 

input_1 = Input(shape = (6400,), name = 'input_1')
x = Dense(4096, activation='relu')(input_1)


x = tf.reshape(x, [-1,32,32,4,1])


x = UpSampling3D((3, 3, 3))(x)


x = Conv3D(2,kernel_size = (13,13,2),activation='relu')(x)

x = Conv3D(4,kernel_size = (13,13,3),activation='relu')(x)

x = BatchNormalization()(x)

x = Conv3D(8,kernel_size = (9,9,2),activation='sigmoid')(x)

output = tf.reshape(x, [-1,128,128,16])


model = Model(inputs=input_1, outputs=output)
print(model.summary())

In [None]:
lr_schedule = keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.0002,decay_steps=10000,decay_rate=0.9)
opt = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
model.compile(optimizer=opt, loss=CustomLoss_BCE(),metrics=['binary_accuracy'])
# #model.summary()

In [None]:
# Loading trained ML model weights

model.load_weights('model_norm_cae_Ts_CL_ep100_ArrData_woReg_AA13_epc30.h5')

In [None]:
# Prediction for testing data

Y_pred = model.predict(X_test)

In [None]:
# Creating directory to store predicted pcd

directory = 'Lidar_predicted'
parent_dir = ''

path = os.path.join(parent_dir, directory)

if(os.path.exists(path) == False):
    os.mkdir(path)

In [None]:
# Generating and saving predicted point clouds in .pcd file

samples = X_test.shape[0]

for i in range(5):
    
    print("PCD generation for sample", i)
    
#     # Method - 1
#     pcl = = generate_pcd(pcl, 0.8)
    
    
    # Method - 2
    pcl = out_modification(Y_pred[i,:,:,:])
    pcl = generate_pcd(pcl, 0)
    
    file = "{}{:05d}{}".format("Lidar_predicted/LidarPred_2", int(i), '.pcd')
    print(file)
#     file = ['Lidar_predicted/LidarPred_' + str(file_number.zfill(4)) + '.pcd']
    o3d.io.write_point_cloud(file, pcl)