In [18]:
import numpy as np
import tensorflow as tf
import keras
import tensorflow_probability as tfp
from tensorflow.keras.layers import Input, Dense, Reshape, UpSampling2D, Conv2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers

In [20]:
# Set the input and output shapes
INPUT_SHAPE = (128, 128, 4)  # Input shape of the model
OUTPUT_SHAPE = (128, 128, 1)  # Output shape of the model


In [21]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [22]:
min_max = '/content/drive/MyDrive/example1/min_max_scale_new.npy'
min, max = np.load(min_max, encoding='bytes')
print(min, max)

-236.2857142857143 295.5714285714286


In [23]:
# Load and preprocess the data
def load_data(file_path):
    data = np.load(file_path)
    return data

def preprocess_data(file_list):
    # data = []
    # labels = []
    loaded_data = []
    for file_name in file_list:
        file_path = '/content/drive/MyDrive/example1/' + file_name
        loaded_data.append(load_data(file_path))
    return np.array(loaded_data)


def generate_data(data,min_train,max_train):
    # data shape=(n_samples, row, col, timesteps)
    n_samples=data.shape[0]
    time_step=data.shape[3]
    row=128
    col=128

    # replace the pixel of "no echo (-127)" as 0
    data[data<=-127]=0

    # for training dataset (t-30,t-20,t-10,t,1+10)
    n_frames=4
    movie_in=np.zeros((n_samples,row,col,n_frames))
    movie_out=np.zeros((n_samples,row,col,1))
    for i in range(n_samples):
        for j in range(n_frames):
            m_in=(255.*((data[i,::,::,j]+10.)/70.))+0.5
            movie_in[i,::,::,j]=m_in
        m_out=(255.*((data[i,::,::,-1]+10.)/70.))+0.5
        movie_out[i,::,::,0]=m_out

    # Min-max scaling
    movie_in=(movie_in-min_train)/(max_train-min_train)
    movie_out=(movie_out-min_train)/(max_train-min_train)

    return movie_in, movie_out


In [24]:
# Load the training and testing data
train_file_list = np.loadtxt('/content/drive/MyDrive/example1/radar_events_train.txt', dtype=str)
test_file_list = np.loadtxt('/content/drive/MyDrive/example1/radar_events_test.txt', dtype=str)

data_train = preprocess_data(train_file_list)
data_test = preprocess_data(test_file_list)


x_train, y_train = generate_data(data_train, min, max)
x_test, y_test = generate_data(data_test, min, max)

In [25]:
x_train = np.moveaxis(x_train,3,1)
x_test = np.moveaxis(x_test,3,1)

In [26]:
y_train = y_train.reshape(y_train.shape[0],1,128,128,1)
y_test = y_test.reshape(y_test.shape[0],1,128,128,1)
x_train = x_train.reshape(x_train.shape[0],4,128,128,1)
x_test = x_test.reshape(x_test.shape[0],4,128,128,1)

In [27]:
print(x_train.shape, y_train.shape)
print(x_test.shape,y_test.shape)

(1845, 4, 128, 128, 1) (1845, 1, 128, 128, 1)
(1247, 4, 128, 128, 1) (1247, 1, 128, 128, 1)


In [35]:
!pip install --upgrade keras



In [33]:
from keras.layers import LSTM

In [None]:
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import ConvLSTM2D
from keras.layers.convolutional import Conv3D
from keras.layers import LayerNormalization, Reshape, Lambda
import keras.backend as K

model = Sequential()
model.add(ConvLSTM2D(filters=64,
                     kernel_size=(3, 3),
                     padding='same',
                     kernel_initializer='HeNormal',
                     input_shape=(None, 128, 128, 1),
                     return_sequences=True))
model.add(LayerNormalization())
model.add(ConvLSTM2D(filters=64,
                     kernel_size=(3, 3),
                     padding='same',
                     kernel_initializer='HeNormal',
                     return_sequences=True))
model.add(LayerNormalization())
model.add(ConvLSTM2D(filters=64,
                     kernel_size=(3, 3),
                     padding='same',
                     kernel_initializer='HeNormal',
                     return_sequences=True))
model.add(LayerNormalization())
model.add(Conv3D(filters=1,
                 kernel_size=(3, 3, 3),
                 padding='same',
                 activation='linear',
                 data_format='channels_last'))

# Define a Lambda layer for element-wise multiplication along the second axis
multiply_layer = Lambda(lambda x: K.prod(x, axis=1, keepdims=True))
model.add(multiply_layer)

model.compile(loss='mse', optimizer='adam')
model.build((None, 4, 128, 128, 1))
model.summary()


In [None]:
# Train the model
model.fit(x_train, y_train, batch_size=8, epochs=10)

In [None]:
# Evaluate the model on test data
loss, metric = model.evaluate(x_test, y_test, verbose=0)
print(loss, metric)

In [None]:
y_test=y_test.reshape(1247,128,128)
predictions=predictions.reshape(1247,128,128)

In [None]:
# Save the trained model
model.save('ConvLSTM.h5')