In [None]:
# Author: Emmanuel Akinrintoyo
# Import some Python libraries
from tensorflow import keras
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from tensorflow import math as TFmath

In [None]:
np.seterr(divide = 'ignore')
print(tf.__version__)

In [None]:
# Mean absolute error
def customLoss(y_actual, y_pred):
  # Calculate the number of loaded channel
  no_loaded_channels = tf.dtypes.cast(TFmath.count_nonzero(y_actual),
                                      tf.float32)
  # Set the values of the unloaded channels to zero
  modified_y_pred = TFmath.divide_no_nan(TFmath.multiply(y_pred, y_actual),
                                         y_actual)
  # Calculate the loss
  error = TFmath.abs(TFmath.subtract(modified_y_pred, y_actual))
  loss = TFmath.divide(TFmath.reduce_sum(error), no_loaded_channels)
  return loss


# Plot the loss history
def plotLoss(history):
  plt.figure(0)
  plt.plot(history.history['loss'], label='loss')
  plt.plot(history.history['val_loss'], label='val_loss')
  plt.xlabel('Epochs', fontsize=16)
  plt.ylabel('Error', fontsize=16)
  plt.legend()
  plt.grid(True)


In [None]:
# Convert the inputted dB values to absolute
def convert2Linear(values):
  return np.power(10, (values / 10))


# Convert the linear values to dB
def convert2dB(values):
  return 10 * np.log10(values)


# Build and compile the model
def buildModel(normalization_layer, output_layer):
  model = keras.Sequential([

      normalization_layer,
      layers.Dense(no_of_input_features, activation='relu', name='fc1'),
      layers.Dense(256, activation='relu', name='fc2'),
      layers.Dense(128, activation='relu', name='fc3'),
      layers.Dense(128, activation='relu', name='fc4'),
      layers.Dense(128, activation='relu', name='fc5'),
      layers.Dense(output_layer)
  ])
  model.compile(loss=customLoss,
                optimizer=tf.keras.optimizers.Adam(learning_rate=0.01))
  return model


In [None]:
# Input features consist of the target gain, total input power,
# total output power, 94 channels' input powers and their status
no_of_channels = 94
no_of_input_features = (no_of_channels * 2) + 3

# Store the names of the .csv files for the data
train_data = "../../data/train.csv"
valid_data = "../../data/valid.csv"
test_data = "../../data/test.csv"


In [None]:
# Generates the names of columns used in the data for 94 channels
def generateColumnNames(column_name):
  labels_list = []
  for i in range(1, no_of_channels + 1):

    # convert the integer to a string
    num = f'{i}'
    if i < 10:
      num = '0' + num
    # Store each label
    labels_list.append(column_name + num)

  # Return the names for the corresponding 94 channels
  return labels_list

# Store the column names in the data
column_names = ['target_gain',
                'EDFA_input_spectra_',
                'DUT_WSS_activated_channel_index_',
                'calculated_gain_spectra_']

# Store the complete column names for all channels
input_spectra_columns = generateColumnNames(column_names[1])
onehot_channels_columns = generateColumnNames(column_names[2])
output_spectra_columns = generateColumnNames(column_names[3])


In [None]:
# Delete the incomplete column names
del column_names[1:4]

# Reconstruct the column names
column_names = column_names + input_spectra_columns + output_spectra_columns
column_names_copy = column_names + onehot_channels_columns

# Extract the names of all the columns excluding the first (i.e, target_gain)
modified_columns = column_names[1:len(column_names)]
column_names


In [None]:
# Store the data for the train, test and valid sets
train_set = pd.read_csv(train_data, index_col=0)
test_set = pd.read_csv(test_data, index_col=0)
valid_set = pd.read_csv(valid_data, index_col=0)

# Convert the values read to the linear scale
train_set[modified_columns] = convert2Linear(train_set[modified_columns])
test_set[modified_columns] = convert2Linear(test_set[modified_columns])
valid_set[modified_columns] = convert2Linear(valid_set[modified_columns])

train_set = train_set[column_names_copy]
test_set = test_set[column_names_copy]
valid_set = valid_set[column_names_copy]


In [None]:
with pd.option_context('display.max_rows', None,
                       'display.max_columns', None,
                       'display.precision', 3,
                       ):
    display(valid_set)


In [None]:
# Separate the target value from the features.
# target value - "label"
# Label is the value that the model is trained to predict.
def popLabels(features, result_labels):
  output = pd.DataFrame()
  for i in result_labels:
    output = pd.concat([output, features.pop(i)], axis=1)
  return output

train_features = train_set.copy()
train_labels = popLabels(train_features, output_spectra_columns)

test_features = test_set.copy()
test_labels = popLabels(test_features, output_spectra_columns)

valid_features = valid_set.copy()
valid_labels = popLabels(valid_features, output_spectra_columns)


In [None]:
# Set the gain value of the unloaded channels to zero
def setActivatedChannels(labels, features, onehot_channels):
  activated_channels = labels.values * features[onehot_channels].values
  labels = pd.DataFrame(activated_channels, columns=labels.columns,
                        index=labels.index)
  return labels

train_labels = setActivatedChannels(train_labels, train_features,
                                    onehot_channels_columns)

test_labels = setActivatedChannels(test_labels, test_features,
                                   onehot_channels_columns)

valid_labels = setActivatedChannels(valid_labels, valid_features,
                                    onehot_channels_columns)


In [None]:
# Define a normalisation layer
normalizer = layers.Normalization(axis=-1)
normalizer.adapt(train_features)

# Build the model and show its summary
model = buildModel(normalizer, no_of_channels)
model.summary()


In [None]:
history = model.fit(train_features, train_labels, epochs=350, verbose=2,
                    validation_data=(valid_features, valid_labels))


In [None]:
# View the training progress of the model
hist = pd.DataFrame(history.history)
hist['epoch'] = history.epoch
hist.tail()


In [None]:
plotLoss(history)
plt.savefig('lossplot.eps', format='eps', dpi=900, bbox_inches='tight')


In [None]:
# Store the test set results
test_results = {}
test_results['model'] = model.evaluate(test_features, test_labels, verbose=2)
results = pd.DataFrame(test_results, index=['Mean absolute error [Gain]']).T


In [None]:
# Make predictions with the model
test_predictions = model.predict(test_features)

# Perform a conversion to dB
test_predictions_dB = convert2dB(test_predictions)
test_labels_dB = convert2dB(test_labels)


In [None]:
plt.figure(1)
font_size = 16

# Show the predicted and meeasured gain values on a scatter plot
plt.scatter(test_labels_dB, test_predictions_dB)
plt.xlabel('Measured EDFA Gain (dB)', fontsize=font_size)
plt.ylabel('Predicted EDFA Gain (dB)', fontsize=font_size)

# Set the dimension values
lims = [*np.arange(11, 16.2, 1)]
limits = [lims[0], lims[len(lims) - 1]]
plt.xlim(limits)
plt.ylim(limits)
plt.plot(limits, limits, 'b--')
plt.savefig('regression_plt.eps', format='eps', dpi=900, bbox_inches='tight')


In [None]:
# Plot the error distribution
plt.figure(2)
error = (test_labels_dB - test_predictions_dB).to_numpy()
error = [i for i in np.reshape(error, -1) if i > -10]

# Set the bin values
bins_list = [*np.arange(-1.2, 0.7, 0.1)]

# Plot the histogram of the errors
plt.hist(error, bins=bins_list)

# Add vertical lines to the plot
for i in [-0.2, -0.1, 0.1, 0.2]:
  plt.axvline(x=i, color='black', ls='--')

plt.xlabel('Prediction Gain Error (dB)', fontsize=font_size)
plt.ylabel('Instances', fontsize=font_size)
label_list = ['-1.2', '', '-1.0', '', '-0.8', '', '-0.6', '', '-0.4',
              '', '-0.2', '', '0.0', '', '0.2', '', '0.4', '', '0.6']

plt.xticks(ticks=bins_list, labels=label_list, fontsize=font_size)
plt.yticks(fontsize=font_size)
plt.savefig('histogram_plt.eps', format='eps', dpi=900, bbox_inches='tight')


In [None]:
# Check if the error values are within 0.2 dB and 0.1 dB
reasonable_error = [i for i in error if abs(i) <= 0.2]
reasonable_error2 = [i for i in error if abs(i) <= 0.1]
print("Error <= 0.1 dB: {:.3f}".format(len(reasonable_error2) / len(error)))
print("Error <= 0.2 dB: {:.3f}".format(len(reasonable_error) / len(error)))
print("Error > 0.2 dB: {:.3f}".format(1 - len(reasonable_error) / len(error)))

error = (np.square(error)).mean(axis=None)
print("MSE(all): {:.6f}".format(error))

error = (np.absolute(error)).mean(axis=None)
print("MAE(all): {:.6f}".format(error))


In [None]:
# Register the custom loss function..
tf.keras.utils.register_keras_serializable(
    package='custom_loss', name=customLoss
)

# Save the trained model
model.save('LineAmp')
print('Model saved')
