In [1]:
#link to tutorial: https://keras.io/examples/timeseries/timeseries_classification_transformer/

In [2]:
import numpy as np 

In [3]:
# creating a file read function 
def read_data_file(filename):
  dataset = np.loadtxt(filename, delimiter="\t")
  y = dataset[:, 0]
  x = dataset[:, 1:]
  return x, y.astype(int)

In [4]:
dataset_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"

In [5]:
# read the dataset and split it into training data and test dataset
x_training_set, y_training_set = read_data_file(dataset_url + "FordA_TRAIN.tsv")
x_test_set, y_test_set = read_data_file(dataset_url + "FordA_TEST.tsv")

In [6]:
# validating the data was read properly
x_training_set

array([[-0.79717168, -0.66439208, -0.37301463, ..., -0.66439208,
        -1.0737958 , -1.5643427 ],
       [ 0.80485472,  0.63462859,  0.37347448, ..., -0.71488505,
        -0.56044294, -0.31908642],
       [ 0.7279851 ,  0.11128392, -0.49912439, ...,  0.39446303,
         0.33940042,  0.25539062],
       ...,
       [-0.57005428, -0.33316523, -0.29351853, ..., -1.3937145 ,
        -0.94273327, -0.27072168],
       [ 2.0067321 ,  2.0791499 ,  2.0220362 , ..., -0.43214504,
        -0.44123126, -0.28070891],
       [-0.12524091, -0.32536268, -0.48823697, ...,  0.55576053,
         0.57445102,  0.57311598]])

In [7]:
y_training_set

array([-1,  1, -1, ..., -1,  1, -1])

In [8]:
x_test_set

array([[-0.14040239,  0.17164128,  0.30204415, ..., -0.69040244,
        -0.97659635, -0.79426313],
       [ 0.33403756,  0.32225332,  0.45384397, ..., -1.0417721 ,
        -1.1596145 , -1.3756589 ],
       [ 0.71668608,  0.74436655,  0.72591291, ..., -3.6752806 ,
        -4.1366217 , -4.3396117 ],
       ...,
       [ 0.71008362,  0.59397882,  0.3818858 , ..., -0.12655282,
        -0.11782239, -0.18909413],
       [ 0.00684706, -0.14062427, -0.27059412, ..., -1.0007084 ,
        -1.0841075 , -1.109963  ],
       [-0.54135529, -0.24172258,  0.10074086, ..., -0.09362504,
        -0.90080431, -1.778341  ]])

In [9]:
y_test_set

array([-1, -1, -1, ...,  1,  1,  1])

In [10]:
# next step is to reshape the training and test set into vectors 
x_training_set = x_training_set.reshape((x_training_set.shape[0], x_training_set.shape[1], 1))
x_test_set = x_test_set.reshape((x_test_set.shape[0], x_test_set.shape[1], 1))

In [11]:
# finding the number of classes in the dataset
number_of_classes = len(np.unique(y_training_set))

In [12]:
# shuffling the training dataset 
shuffle = np.random.permutation(len(x_training_set))
x_training_set = x_training_set[shuffle]
y_training_set = y_training_set[shuffle]

y_training_set[y_training_set == -1] = 0
y_test_set[y_test_set == -1] = 0

In [13]:
# the data is ready for training 
# model development time
# model input is a tensor of shape (batch_size, sequence_length, features) where sequence_length is the number of time steps and features is each input timeseries
from tensorflow import keras
from tensorflow.keras import layers 

In [15]:
# the projection layers are implemented using keras.layers.Conv1D

# creating the model function for encoding data
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
  # attention and normalization of the data
  x = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs)
  x = layers.Dropout(dropout)(x)
  x = layers.LayerNormalization(epsilon=1e-6)(x)
  res = x + inputs

  # feed forward component
  x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
  x = layers.Dropout(dropout)(x)
  x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
  x = layers.LayerNormalization(epsilon=1e-6)(x)
  return x + res

In [16]:
# to build the model, multiple transformer encoders can be stacked together and as the last layer add a multi-layer perceptron classifier.
# the output tensor from the transformer encoder has to be reduced to a vector of features for each data point
# you can achieve that by using a pooling layer.

In [19]:
def build_model(input_shape, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout=0, mlp_dropout=0):
  model_inputs = keras.Input(shape=input_shape)
  x = model_inputs
  for _ in range(num_transformer_blocks):
    x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
  
  x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
  for dim in mlp_units:
    x = layers.Dense(dim, activation="relu")(x)
    x = layers.Dropout(mlp_dropout)(x)
  
  outputs = layers.Dense(number_of_classes, activation="softmax")(x)
  return keras.Model(model_inputs, outputs)


In [None]:
# train and evaluate model performance
input_shape = x_training_set.shape[1:]

transformer_model = build_model(input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=4, mlp_units=[128], mlp_dropout=0.4, dropout=0.25)

transformer_model.compile(loss="sparse_categorical_crossentropy", optimizer=keras.optimizers.Adam(learning_rate=1e-4), metrics=["sparse_categorical_accuracy"])

transformer_model.summary()

# stop the model training early if the model has stopped improving 
callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]

transformer_model.fit(x_training_set, y_training_set, validation_split=0.2, epochs=200, batch_size=64, callbacks=callbacks)

transformer_model.evaluate(x_test_set, y_test_set, verbose=1)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 500, 1)]     0           []                               
                                                                                                  
 multi_head_attention_4 (MultiH  (None, 500, 1)      7169        ['input_2[0][0]',                
 eadAttention)                                                    'input_2[0][0]']                
                                                                                                  
 dropout_9 (Dropout)            (None, 500, 1)       0           ['multi_head_attention_4[0][0]'] 
                                                                                                  
 layer_normalization_8 (LayerNo  (None, 500, 1)      2           ['dropout_9[0][0]']          