In [2]:
import scipy.io
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Dense, LSTM, Dropout, BatchNormalization, Bidirectional, Input # type: ignore
from tensorflow.keras.optimizers import Adam, RMSprop  # type: ignore
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau # type: ignore
from tensorflow.keras.regularizers import l2 # type: ignore


from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
print(f"TensorFlow Version: {tf.__version__}")

# Load the .mat file
file_path = '../EV_Rank_1_52_RBs_50_UEs_1000_snaps.mat'
data = scipy.io.loadmat(file_path)

# Extract the relevant data
EV_data = data['EV_re_im_split']
data = EV_data
del EV_data
print(data.shape)

TensorFlow Version: 2.16.1
(50, 1000, 832)


In [3]:
# Function to create sequences
def create_sequences(data, timesteps_in):
    X, y = [], []
    for ue in data:
        for i in range(len(ue) - timesteps_in):
            X.append(ue[i:i + timesteps_in])
            y.append(ue[i + timesteps_in])
    return np.array(X), np.array(y)

timesteps_in = 5

X, y = create_sequences(data, timesteps_in)
print(f'X shape: {X.shape}, y shape: {y.shape}')

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)



X shape: (49750, 5, 832), y shape: (49750, 832)


In [4]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((39800, 5, 832), (9950, 5, 832), (39800, 832), (9950, 832))

In [None]:
# Define a function to create the advanced LSTM model with gradient clipping
def create_model(optimizer='adam', dropout_rate=0.2, lstm_units=256, dense_units=512, use_bidirectional=False):
    model = Sequential()
    model.add(Input(shape=(timesteps_in, X.shape[2])))
    if use_bidirectional:
        model.add(Bidirectional(LSTM(lstm_units, return_sequences=True)))
    else:
        model.add(LSTM(lstm_units, return_sequences=True))
    model.add(Dropout(dropout_rate))
    model.add(BatchNormalization())
    
    if use_bidirectional:
        model.add(Bidirectional(LSTM(lstm_units, return_sequences=True)))
    else:
        model.add(LSTM(lstm_units, return_sequences=True))
    model.add(Dropout(dropout_rate))
    model.add(BatchNormalization())
    
    if use_bidirectional:
        model.add(Bidirectional(LSTM(lstm_units)))
    else:
        model.add(LSTM(lstm_units))
    model.add(Dropout(dropout_rate))
    model.add(BatchNormalization())
    
    model.add(Dense(dense_units, activation='relu'))
    model.add(Dropout(dropout_rate))
    model.add(BatchNormalization())
    model.add(Dense(X.shape[2]))
    
    # Use gradient clipping
    if optimizer == 'adam':
        optimizer = Adam(clipvalue=1.0)
    elif optimizer == 'rmsprop':
        optimizer = RMSprop(clipvalue=1.0)

    model.compile(optimizer=optimizer, loss='mse')
    return model



In [None]:
# #VARIABLES

# lstm_units = 256
# dense_units = 512
# use_bidirectional = False
# optimizer = 'adam'

# # Best hyperparameters from RandomizedSearchCV
# best_params = {
#     'optimizer': 'adam',
#     'dropout_rate': 0.2,
#     'lstm_units': 256,
#     'dense_units': 256,
#     'batch_size': 64,
#     'epochs': 100,
#     'use_bidirectional': True
# }

# Best hyperparameters from RandomizedSearchCV
best_params = {

    'optimizer': 'adam',
    'dropout_rate': 0.2,
    'lstm_units': 2048,
    'dense_units': 2048,
    'batch_size': 256,
    'epochs': 100,
    'use_bidirectional': True
            }

# Create and train the best model
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)

model = create_model(
    optimizer=best_params['optimizer'],
    dropout_rate=best_params['dropout_rate'],
    lstm_units=best_params['lstm_units'],
    dense_units=best_params['dense_units'],
    use_bidirectional=best_params['use_bidirectional']
)

history = model.fit(
    X_train, y_train,
    batch_size=best_params['batch_size'],
    epochs=best_params['epochs'],
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1)


In [None]:


# Evaluate the model on the test set
test_loss = model.evaluate(X_test, y_test, verbose=1)
print(f"Test Loss: {test_loss}")


# Make predictions
predictions = model.predict(X_test)

# Calculate metrics
mse = mean_squared_error(y_test, predictions)
mae = mean_absolute_error(y_test, predictions)
r2 = r2_score(y_test, predictions)

print(f'Mean Squared Error (MSE): {mse}')
print(f'Mean Absolute Error (MAE): {mae}')
print(f'R-squared (R^2): {r2}')

# Mean Squared Error (MSE): 0.007433210159811503
# Mean Absolute Error (MAE): 0.062449529734068705
# R-squared (R^2): 0.514242911745018
