In [None]:
def add_average_values(X, scale=1):
  """
  This method extend the original dataset adding between the original values
    new data that are a random weighted average of the previous and the next timestamps.
  :param X: the original dataset to extend
  :param scale: define how many timestamps add between 2 original timestamps
  :return: the extended dataset 
  """
  new_X = np.zeros((X.shape[0], X.shape[1] + scale * X.shape[1] - scale, X.shape[2]))

  for sample in tqdm(range(X.shape[0])):
    new_sample = []
    for i in range(X[sample].shape[0]):
      # Append the original value
      new_sample.append(X[sample,i,:])
      for _ in range(scale):
        # If there is an other element
        if i != X[sample].shape[0] - 1:
          # Append the (random) average with the next one
          alpha = np.random.random()
          new_sample.append(alpha * X[sample,i,:] + (1-alpha) * X[sample,i+1,:])
    # Append the new sample
    new_X[sample] = np.array(new_sample)

  return new_X

In [None]:
def SMOTE(x, num_of_samples=100, num_to_combine=2):
  """
  Implementation of SMOTE for oversampling
  :input x: is the class samples to oversample
  :input num_of_samples: is the num of samples to reach for the specific class
  :input num_to_combine: is the number of sample to linearly combine to get the new one
  :return: a list containing all the elements of x plus the new samples
  """
  # This is a possible addition to do downsampling
  # If a class has already a lot of samples, remove some of them and keep just the first ones
  #stop = min(x.shape[0]-1, num_of_samples-1)

  #if x.shape[0] > num_of_samples:
  #  stop = num_of_samples // 3
  #  return list(x[:stop])
  #else:
  stop = x.shape[0] - 1

  # Get all the original index
  original_index = np.arange(start=0, stop=stop, step=1)

  # Create a list with all the elements
  X_list = list(x)

  # Till when not enough samples have been created
  while(len(X_list) < num_of_samples):
    # Select num_to_combine sequences
    selected_index = np.random.choice(original_index, size=num_to_combine, replace=False)

    # toDo: extend the method to allow combination of more than 2 sequences

    # Get the random position between the 2 vectors
    probability_position = np.random.random() 

    # Create a new array with the same size of the sequences
    new_sample = np.zeros((x[0].shape))

    # Combine the selected sequences 
    for row in range(new_sample.shape[0]):
      for feature in range(new_sample.shape[1]):
        new_sample[row, feature] = probability_position * x[selected_index[0], row, feature] + \
                                (1 - probability_position) * x[selected_index[1], row, feature]

    # Append the new sequence
    X_list.append(new_sample)

  return X_list

In [None]:
class RandomSequenceAugmentation(tf.keras.layers.Layer):
  """
  This class extend the tf.keras.layers.Layer
  The idea of this class is to create the network with the augmentation built in.
  """
  def __init__(self, sigma_s=0.1, sigma_j=1, verbose=False, prob=0.5, **kwargs):
    """
    Initiliaze the layer
    :param sigma_s: it is the scaling defining the normal distribution to sample
    :param sigma_j: it is the scaling defining the normal distribution to sample
    :param verbose: if True, print some operations done
    :param prob: it defines the probability of applying an augmentation
    """
    super(RandomSequenceAugmentation, self).__init__(**kwargs)
    self.sigma_s = sigma_s
    self.sigma_j = sigma_j
    self.verbose = verbose
    self.verbose = verbose
    self.prob = prob

  def call(self, sequence,  training=None):
    """
    This method is called and it's effective only during the training phase.
    It applies 3 different augmentation techniques: scaling, jittering and flipping.
    :param sequence: it's the current input sequence in the network
    :param training: it says whether the phase is training or test
    :return: the augmented sequence
    """
    # If it's not training, do not apply anything
    if not training:
        return sequence

    if self.verbose:
      print("Type of sequence is {} and its shape is {}".format(type(sequence), sequence.shape))

    aug_sequence = sequence

    # Apply scaling augmentation with some probability
    if np.random.random() >= self.prob:
      if self.verbose:
        print("Applying scaling augmentation")
      factor = np.random.normal(loc=1., scale=self.sigma_s, size=(aug_sequence.shape[1], aug_sequence.shape[2]))
      aug_sequence = tf.multiply(aug_sequence, factor)

    # Apply jitter augmentation with some probability
    if np.random.random() >= self.prob:
      if self.verbose:
        print("Applying jitter augmentation")
      aug_sequence = aug_sequence + np.random.normal(loc=0., scale=self.sigma_j, size=(aug_sequence.shape[1], aug_sequence.shape[2]))

    # Apply flip augmentation with some probability
    if np.random.random() >= self.prob:
      if self.verbose:
        print("Applying flip augmentation")
      aug_sequence = aug_sequence * (-1.0)

    return aug_sequence 

In [None]:
def feature_selection(X_train, X_val, X_test, Y_train, Y_val, Y_test):
  """
  This method, given a dataset, tries to find an optimal subset of the original
    features, selecting just the most significant one.
  :param X_train: original training datased with all the features
  :param X_val: original validation datased with all the features
  :param X_test: original test datased with all the features
  :param Y_train: the labels of each sample in the training set
  :param Y_val: the labels of each sample in the validation set
  :param Y_test: the labels of each sample in the test set
  :return: the subset of features selected and the corresponding accuracy.
           Optionally, it's possible to modify the the method in order to return
           also the optimal model already trained.
  """

  # Start with no feature selected
  old_selected_features = []
  selected_features = []
  optimal_features = []

  # All the available feature
  features = [i for i in range(X_train.shape[2])]
  old_accuracy = -1.0
  accuracy = 0.0

  accuracies = []
  analysed_models = []

  # Go on till there is an improvement in the accuracy or till when all the features have been added
  while accuracy > old_accuracy and len(optimal_features) <= len(features):
    print("Current optimal features are {} with accuracy of {}".format(optimal_features, round(accuracy, 4)))

    # Get the features that are still available 
    available_feature = set(features) - set(optimal_features)

    # Save the data of the last iteration
    old_features = optimal_features.copy()
    old_accuracy = accuracy

    # Forward step
    for feature in available_feature:
      # Append the new feature
      selected_features = optimal_features.copy()
      selected_features.append(feature)
      # Order the list
      selected_features.sort()

      # If the configuration has already been evaluated
      if selected_features in analysed_models:
        continue

      # Create the new dataset arrays (with the new number of features)
      X_train_f = np.zeros((X_train.shape[0], X_train.shape[1], len(selected_features)))
      X_val_f = np.zeros((X_val.shape[0], X_val.shape[1], len(selected_features)))
      X_test_f = np.zeros((X_test.shape[0], X_test.shape[1], len(selected_features)))

      # Save the new features only
      X_train_f = X_train[:,:,selected_features]
      X_val_f = X_val[:,:,selected_features]
      X_test_f = X_test[:,:,selected_features]

      # Compute the predictions
      # Compute prediction is a method that instanciate the wanted NN structure, compile and fit the model.
      #   Then it already computes the prediction on the test set, returning the predictions.
      predictions = compute_prediction(X_train_f=X_train_f, 
                                      Y_train=Y_train, 
                                      X_val_f=X_val_f,
                                      Y_val=Y_val,
                                      X_test_f=X_test_f
                                      )

      # Compute the accuracy
      accuracy_prediction = accuracy_score(np.argmax(Y_test, axis=-1), np.argmax(predictions, axis=-1))
      print('Accuracy with features {} is {}'.format(selected_features, round(accuracy_prediction, 4)))

      # Save the results of this evaluation
      accuracies.append(accuracy_prediction)
      analysed_models.append(selected_features)

    # Now select the best model evaluated in this iteraion
    best_model_index = np.argmax(np.array(accuracies))

    # If there is an improvement in the accuracy
    if accuracies[best_model_index] >= old_accuracy:
      optimal_features = analysed_models[best_model_index].copy()
      accuracy = accuracies[best_model_index]

    # Backpropagation step
    # Try to remove one of the old features and see if the perfromance incerase
    # For all the features thet were present before
    for feature in old_features:
      # Start from the last optimal configuration
      selected_features = analysed_models[best_model_index].copy()
      # Remove one old feature
      selected_features.remove(feature)

      # If the configuration has already been evaluated
      if selected_features in analysed_models:
        continue

      # Create the new dataset arrays (with the new number of features)
      X_train_f = np.zeros((X_train.shape[0], X_train.shape[1], len(selected_features)))
      X_val_f = np.zeros((X_val.shape[0], X_val.shape[1], len(selected_features)))
      X_test_f = np.zeros((X_test.shape[0], X_test.shape[1], len(selected_features)))

      # Save the new features only
      X_train_f = X_train[:,:,selected_features]
      X_val_f = X_val[:,:,selected_features]
      X_test_f = X_test[:,:,selected_features]

      # Compute the predictions
      # Compute prediction is a method that instanciate the wanted NN structure, compile and fit the model.
      #   Then it already computes the prediction on the test set, returning the predictions.
      predictions = compute_prediction(X_train_f=X_train_f, 
                                      Y_train=Y_train, 
                                      X_val_f=X_val_f,
                                      Y_val=Y_val,
                                      X_test_f=X_test_f, 
                                      class_weights=class_weight
                                      )
      
      # Compute the accuracy
      accuracy_prediction = accuracy_score(np.argmax(Y_test, axis=-1), np.argmax(predictions, axis=-1))
      print('Accuracy with features {} is {}'.format(selected_features, round(accuracy_prediction, 4)))

      # Save the results of this evaluation
      accuracies.append(accuracy_prediction)
      analysed_models.append(selected_features)

    # Now select the best model evaluated in this iteraion
    best_model_index = np.argmax(np.array(accuracies))

    # If there is an improvement in the accuracy (max between the old one and the forward one)
    if accuracies[best_model_index] > max(old_accuracy, accuracy):
      optimal_features = analysed_models[best_model_index]
      accuracy = accuracies[best_model_index]

  print('Optimal features are {} with an accuracy of {}'.format(optimal_features, round(accuracy,4)))
  # toDo: this method is easely extendable in order to return directly the best model trained
  return optimal_features, accuracy

In [None]:
def compute_prediction(X_train_f, Y_train, X_val_f, Y_val, X_test_f, class_weights=None):
  """
  This method instanciate the wanted NN structure, compile and fit the model.
    Then it already computes the prediction on the test set, returning the predictions.
  :param X_train_f: training datased with only a subset of features
  :param Y_train: the labels of each sample in the training set
  :param X_val_f: validation datased with only a subset of features
  :param Y_val: the labels of each sample in the validation set
  :param X_test_f: test datased with only a subset of features
  :param class_weight: optional, weights to use during the training
  :return: the predictions of the model trained
  """
  # Define the training paramenters
  input_shape = X_train_f.shape[1:]
  classes = Y_train.shape[-1]
  batch_size = 64
  epochs = 200

  # Build the model
  model = build_network(input_shape, classes)

  # Fit the model
  model.fit(x = X_train_f,
            y = Y_train,
            batch_size = batch_size,
            epochs = epochs,
            #class_weight=class_weight,
            verbose=0,
            validation_data=(X_val_f, Y_val),
            callbacks = [
                tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=30, restore_best_weights=True),
                tfk.callbacks.ReduceLROnPlateau(monitor='val_accuracy', mode='max', patience=10, factor=0.5, min_lr=1e-5)
            ])

  # Compute the prediction on the test set
  predictions = model.predict(X_test_f)
  return predictions