In [None]:
import time
import numpy as np
from numpy import mean, std, dstack
import matplotlib.pyplot as plt
from scipy.io import loadmat, savemat
from mlxtend.plotting import plot_confusion_matrix
from sklearn.metrics import confusion_matrix, matthews_corrcoef
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Dropout, Input, BatchNormalization, Conv1D, MaxPooling1D, Reshape, Permute
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.regularizers import l2
import xlsxwriter
from tensorflow.keras.callbacks import EarlyStopping, History




In [16]:
# Revised Time2Vec layer to support 3D input (batch, time, features)
class Time2Vec(layers.Layer):
    def __init__(self, kernel_size=1):
        super(Time2Vec, self).__init__()
        self.k = kernel_size
    
    def build(self, input_shape):
        # input_shape: (batch, time, features)
        self.w0 = self.add_weight(name="w0", shape=(input_shape[-1],), initializer="uniform", trainable=True)
        self.b0 = self.add_weight(name="b0", shape=(1,), initializer="uniform", trainable=True)
        self.w = self.add_weight(name="w", shape=(input_shape[-1], self.k), initializer="uniform", trainable=True)
        self.b = self.add_weight(name="b", shape=(self.k,), initializer="uniform", trainable=True)
    
    def call(self, inputs):
        # inputs shape: (batch, time, features)
        v1 = inputs * self.w0 + self.b0  # Linear component; broadcasting applies elementwise multiplication
        # Use tensordot to compute (batch, time, k)
        v2 = tf.math.sin(tf.tensordot(inputs, self.w, axes=[[2],[0]]) + self.b)
        return tf.concat([v1, v2], axis=-1)
    
    def get_config(self):
        config = super().get_config()
        config.update({"kernel_size": self.k})
        return config

In [None]:

# Revised TimesBlock that reshapes 1D to 2D and then reassembles a 3D sequence.
class TimesBlock(layers.Layer):
    def __init__(self, period=24, filters=32, kernel_sizes=[(3,3), (5,5), (7,7)], **kwargs):
        super(TimesBlock, self).__init__(**kwargs)
        self.period = period  # Fixed period; you can later adapt this using FFT
        self.filters = filters
        self.kernel_sizes = kernel_sizes
        # Create an inception-like module: one Conv2D per kernel size
        self.conv_layers = [
            layers.Conv2D(filters, kernel_size, padding='same', activation='relu')
            for kernel_size in kernel_sizes
        ]
        
    def call(self, inputs):
        # inputs: shape (batch, timesteps, channels)
        batch_size = tf.shape(inputs)[0]
        t = tf.shape(inputs)[1]
        c = tf.shape(inputs)[2]
        
        # Compute required padding so that t is divisible by self.period
        pad_len = tf.math.mod(-t, self.period)
        padded = tf.pad(inputs, [[0, 0], [0, pad_len], [0, 0]])
        new_t = tf.shape(padded)[1]
        
        # Reshape to 2D: (batch, new_t // period, period, channels)
        x_2d = tf.reshape(padded, (batch_size, new_t // self.period, self.period, c))
        
        # Process the 2D tensor with multiple 2D convolutional layers (inception style)
        conv_outputs = [conv(x_2d) for conv in self.conv_layers]
        # Concatenate along the channel dimension
        x_conv = tf.concat(conv_outputs, axis=-1)
        
        # Instead of pooling, reshape the 2D feature map back to a 3D sequence.
        # x_conv shape: (batch, H, W, channels_conv) with H=new_t//period, W=period.
        H = tf.shape(x_conv)[1]
        W = tf.shape(x_conv)[2]
        channels_conv = tf.shape(x_conv)[3]
        # Reshape to (batch, H*W, channels_conv)
        x_flat = tf.reshape(x_conv, (batch_size, H * W, channels_conv))
        # Truncate to original timesteps t (if padded, discard extra steps)
        x_out = x_flat[:, :t, :]
        return x_out

    def get_config(self):
        config = super(TimesBlock, self).get_config()
        config.update({
            "period": self.period,
            "filters": self.filters,
            "kernel_sizes": self.kernel_sizes
        })
        return config



In [17]:

# Keeping the transformer_encoder as is (for further processing)
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Attention and Normalization
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(inputs, inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    return x + res


In [None]:
# Revised build_model function integrating TimesBlock and Time2Vec.
def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_classes,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    num_times_blocks=2,
    period=24,
    filters=32,
    mlp_dropout=0.4,
):
    inputs = Input(shape=input_shape)
    # Initial 1D convolution to extract features
    x = Conv1D(filters=64, kernel_size=8, activation='relu', kernel_regularizer=l2(0.01))(inputs)
    x = MaxPooling1D(pool_size=17, strides=9)(x)
    x = Dropout(0.3)(x)
    x = BatchNormalization()(x)
    
    # Process through TimesBlock layers to transform 1D to 2D and reshape back to 3D
    for _ in range(num_times_blocks):
        x = TimesBlock(period=period, filters=filters)(x)
    
    # Apply Time2Vec on the resulting 3D sequence
    x = Time2Vec()(x)
    
    # Optionally apply transformer encoder blocks for further processing
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
    
    # Global average pooling and fully-connected layers for classification
    x = layers.GlobalAveragePooling1D()(x)
    for dim in mlp_units:
        x = Dense(dim, activation="relu")(x)
        x = Dropout(mlp_dropout)(x)
    outputs = Dense(num_classes, activation="softmax")(x)
    return Model(inputs, outputs)

In [None]:
def evaluate_model(trainX, trainy, testX, testy, sujet):
    verbose, epochs, batch_size = 1, 150, 64
    n_timesteps = trainX.shape[1]
    d_model = trainX.shape[2]
    n_outputs = trainy.shape[1]

    print("Train Data Shape:", trainX.shape)
    print("Test Data Shape:", testX.shape)

    model = build_model(
        input_shape=(n_timesteps, d_model),
        num_classes=n_outputs,
        mlp_units=[128],
        mlp_dropout=0.4,
        head_size=128,
        num_heads=12,
        ff_dim=24,
        num_transformer_blocks=1,
        dropout=0.25,
    )
    model.summary()

    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(learning_rate=0.0001),
        metrics=['accuracy']
    )

    start = time.time()
    model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose, callbacks=[history])
    train_time = time.time() - start

    model.save(fr'G:\former students work\Code source\DB2_models\DB2_s{str(sujet)}_Transformer_model.h5')

    loss_train, accuracy_train = model.evaluate(trainX, trainy, batch_size=batch_size, verbose=1)
    start = time.time()
    loss_test, accuracy_test = model.evaluate(testX, testy, batch_size=batch_size, verbose=1)
    test_time = time.time() - start

    y_pred = np.argmax(model.predict(testX), axis=-1)
    testy_indices = [np.argmax(y) for y in testy]

    return loss_train, accuracy_train, loss_test, accuracy_test, y_pred, testy_indices, train_time, test_time

def summarize_results(scores, losses):
    m, s = np.mean(scores), np.std(scores)
    mL, sL = np.mean(losses), np.std(losses)
    print(f'\nAccuracy: {m:.5f} (+/-{s:.5f})')
    print(f'Loss: {mL:.5f} (+/-{sL:.5f})')

def run_my_experiment(sujet):
    data = loadmat(fr"G:\former students work\Code source\DB2_mat\DB2_s{str(sujet)}_E1_A1_150_100_N.mat")
    train_data = data['train_data']
    train_labels = data['train_labels']
    test_data = data['test_data']
    test_labels = data['test_labels']

    scores = []
    losses = []

    loss_train, score_train, loss_test, score_test, y_pred, testy, train_time, test_time = evaluate_model(
        train_data, train_labels, test_data, test_labels, sujet
    )

    print(f'>#{sujet}:')
    print(f'  train accuracy: {score_train:.5f}')
    print(f'  train loss    : {loss_train:.5f}')
    print(f'  test accuracy: {score_test:.5f}')
    print(f'  test loss    : {loss_test:.5f}')
    scores.append(score_test)
    losses.append(loss_test)
    summarize_results(scores, losses)

    return loss_train, score_train, loss_test, score_test, y_pred, testy, train_time, test_time

In [20]:
# Main execution
globel_perd = []
globel_class = []
globel_perd1 = []
globel_class1 = []

workbook = xlsxwriter.Workbook(fr'G:\former students work\Code source\DB2_results\DB2_xlsx\Trans_rslt.xlsx')
worksheet1 = workbook.add_worksheet('Subjects informations')

row = 0
worksheet1.write(row, 0, 'Subject')
worksheet1.write(row, 1, 'Train_time')
worksheet1.write(row, 2, 'Test_time')
worksheet1.write(row, 3, 'Train_acc')
worksheet1.write(row, 4, 'Train_loss')
worksheet1.write(row, 5, 'Test_acc')
worksheet1.write(row, 6, 'Test_loss')
worksheet1.write(row, 7, 'MCC')

history = History()
for i in range(1, 41):
    loss_train, score_train, loss_test, score_test, y_pred, testy, train_time, test_time = run_my_experiment(i)

    globel_perd.append(y_pred)
    globel_class.append(testy)
    globel_perd1.extend(y_pred)
    globel_class1.extend(testy)

    mcc = matthews_corrcoef(testy, y_pred)
    mat = confusion_matrix(testy, y_pred)
    cfm_plot, ax = plot_confusion_matrix(mat, figsize=(10, 10), show_normed=True, show_absolute=False)
    cfm_plot.savefig(fr'G:\former students work\Code source\DB2_results\DB2_confusion_matrix\DB2_s{str(i)}trans_confusion_matrix.png')
    plt.close(cfm_plot)

    fig, axarr = plt.subplots(2, 1, figsize=(12, 10))
    start_renge = (i-1) * 150
    end_renge = i * 150
    axarr[0].plot(range(150), history.history['accuracy'][start_renge:end_renge], label='train accuracy')
    axarr[0].set_xlabel('Number of Epochs', fontsize=18)
    axarr[0].set_ylabel('Accuracy', fontsize=18)
    axarr[0].legend()
    axarr[1].plot(range(150), history.history['loss'][start_renge:end_renge], label='train loss')
    axarr[1].set_xlabel('Number of Epochs', fontsize=18)
    axarr[1].set_ylabel('Loss', fontsize=18)
    axarr[1].legend()
    plt.tight_layout()
    plt.show()
    fig.savefig(fr'G:\former students work\Code source\DB2_results\DB2_graphs\DB2_s{str(i)}trans_graphe.png')
    plt.close(fig)

    worksheet1.write(i, 0, f'Sujet {i}')
    worksheet1.write(i, 1, train_time)
    worksheet1.write(i, 2, test_time)
    worksheet1.write(i, 3, score_train)
    worksheet1.write(i, 4, loss_train)
    worksheet1.write(i, 5, score_test)
    worksheet1.write(i, 6, loss_test)
    worksheet1.write(i, 7, mcc)

workbook.close()

new_data = {'pred_labels': globel_perd, 'class_labels': globel_class}
savemat(fr'G:\former students work\Code source\DB2_results\trans_global_predection.mat', new_data)

mat = confusion_matrix(globel_class1, globel_perd1)
cfm_plot, ax = plot_confusion_matrix(mat, figsize=(10, 10), show_normed=True, show_absolute=False)
cfm_plot.savefig(fr'G:\former students work\Code source\DB2_results\trans_global_confusion_matrix.png')
plt.close(cfm_plot)


Train Data Shape: (1993, 300, 12)
Test Data Shape: (969, 300, 12)


Epoch 1/150
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 217ms/step - accuracy: 0.0865 - loss: 3.0622
Epoch 2/150
[1m25/32[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m1s[0m 224ms/step - accuracy: 0.1528 - loss: 2.7733

KeyboardInterrupt: 