In [41]:
from matplotlib import pyplot as plt
import IPython.display as ipd
import tensorflow as tf
from tensorflow import keras

In [42]:
from dataset import get_datasets
from its_safoos import ITS

  from .autonotebook import tqdm as notebook_tqdm


In [43]:
#### hyper parameters that defines the structure of the model
num_classes = 31 # ds.get_labels()
sampled_frequencies = 129 # the number of frequency samples

learning_rate = 0.001
weight_decay = 0.005
batch_size = 64
num_epochs = 10000  # For real training, use num_epochs=100. 10 is a test value
# patch_size = 6  # Size of the patches to be extract from the input images
# num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 1
mlp_head_units = [
    526,
    256,
]  # Size of the dense layers of the final classifier


In [44]:
train, valid, test = get_datasets(batch_size=batch_size, type='mfccs')

  hf_names = hf_datasets.list_datasets()


In [45]:
# load the model weights
load_weights = False
if load_weights:
    ITS.load_weights(model_path)

In [None]:
import tensorflow as tf


class GatedMlpBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        inner_dim,
        outer_dim,
        non_linearity,
    ):
        super(GatedMlpBlock, self).__init__()
        self.inner_dense_non_linear = tf.keras.layers.Dense(
            units=inner_dim,
            activation=non_linearity,
        )
        self.inner_dense_linear = tf.keras.layers.Dense(
            units=inner_dim,
        )
        self.outer_dense = tf.keras.layers.Dense(
            units=outer_dim,
        )

    def call(self, input_seq):
        inner_non_linear = self.inner_dense_non_linear(input_seq)
        inner_linear = self.inner_dense_linear(input_seq)
        multiply = inner_non_linear * inner_linear
        return self.outer_dense(multiply)


class RotaryPositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, theta_0, projection_dim):
        super(RotaryPositionalEncoding, self).__init__()
        self.indices = tf.constant([(i // 2) for i in range(projection_dim)], dtype=tf.float32)
        self.thetas = theta_0 ** (-2 * (self.indices / projection_dim)) # thetas are of shape (projection_dim,)


    def call(self, input_seq):
        # input_seq is of shape (batch, input_seq_size, projection_dim)
        # compute the positional encoding
        input_seq_shape = tf.shape(input_seq)
        batch_size = input_seq_shape[0]
        input_seq_size = input_seq_shape[1]
        # create a vector of indices
        seq_indices = tf.range(0, input_seq_size, 1, dtype=tf.float32) # indices are of shape (input_seq_size,)
        # we need to create a matrix of shape (input_seq_size, projection_dim)
        seq_indices = tf.expand_dims(seq_indices, axis=-1)
        seq_indices = tf.tile(seq_indices, [1, tf.shape(input_seq)[2]])
        linear_phase = seq_indices * self.thetas

        # calculate the phase with consnie
        phased_with_cos = input_seq * tf.math.cos(linear_phase)

        # Rotate and multiply by [-1,1,-1,1,...] to calculate the phase with sine
        shifted_input_seq = tf.reshape(input_seq, [batch_size, input_seq_size, -1, 2])
        shifted_input_seq = tf.roll(shifted_input_seq, shift=1, axis=-1)
        shifted_input_seq = shifted_input_seq * tf.constant([-1,1], dtype=tf.float32)
        shifted_input_seq = tf.reshape(shifted_input_seq, [batch_size, input_seq_size, -1])
        phased_with_sin =  tf.math.sin(linear_phase) * shifted_input_seq
        
        return phased_with_cos + phased_with_sin


class MultiQueryAttention(tf.keras.layers.Layer):
    def __init__(
        self,
        num_heads,
        proj_dim,
        dropout=0.0,
        kernel_regularizer=None,
    ):
        super(MultiQueryAttention, self).__init__()
        
        # define linear layers for key and value
        self.key_layer = tf.keras.layers.Dense(
            units=proj_dim,
            kernel_regularizer=kernel_regularizer,
        )
        self.value_layer = tf.keras.layers.Dense(
            units=proj_dim,
            kernel_regularizer=kernel_regularizer,
        )

        # define linear layers for query, as the number of heads
        self.query_layers = [tf.keras.layers.Dense(
            units=proj_dim,
            kernel_regularizer=kernel_regularizer,
        ) for _ in range(num_heads)]

        # define linear layer for output
        self.output_layer = tf.keras.layers.Dense(
            units=proj_dim,
            kernel_regularizer=kernel_regularizer,
        )


    def _compute_attn(
        self,
        query, # shape will be [B,S,d]
        input_keys, # shape will be [B,T,d]
        memory_keys, # shape will be [B,S,d]
        input_vals, # shape will be [B,T,d]
        memory_vals, # shape will be [B,S,d]
    ):
        # Assume S represents the number of memory cells and T represents the number of input cells
        # Compute the attention weights
        
        # Compute the score a memory cell gives to an input cell
        input_score = tf.matmul(query, input_keys, transpose_b=True)
        # Shape will be [B,S,T]. This will result in a matrix,
        # s.t. row i describes how much attention should the query i give all other input cells
        
        self_score = query * memory_keys
        self_score = tf.reduce_sum(self_score, axis=-1, keepdims=True)
        # Shape will be [B,S,1]. This will result in a vector,
        # s.t. element i describes how much attention should the query i give to itself

        # Concat self_score with input_score
        score = tf.concat([self_score, input_score], axis=-1)
        # Shape will be [B,S,T+1]. This will result in a matrix,
        # s.t. row i describes how much attention should the query i give to inputs and itself

        score /= tf.math.sqrt(tf.cast(tf.shape(input_keys)[-1], tf.float32))
        attn = tf.nn.softmax(score, axis=-1)

        # Break attn to [B,S,1] and [B,S,T]
        self_attn = attn[:, :, 0:1]
        input_attn = attn[:, :, 1:]

        value_of_input = tf.matmul(input_attn, input_vals) # shape will be [B,S,d]
        value_of_self = self_attn * memory_vals # shape will be [B,S,d]
        return value_of_input + value_of_self


    def call(self, input_seq, memory_cells):
        # query_seq is of shape (batch_size, input_size, key_dim)
        # store_seq is of shape (batch_size, store_seq, key_dim)
        # compute the attention weights
        ik = self.key_layer(input_seq)
        mk = self.key_layer(memory_cells)
        iv = self.value_layer(input_seq)
        mv = self.value_layer(memory_cells)
        attns = [self._compute_attn(q, ik, mk, iv, mv) for q in [layer(memory_cells) for layer in self.query_layers]]
        concat = tf.concat(attns, axis=-1)
        return self.output_layer(concat)
        

class StateTransformerBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        num_heads,
        projection_dim,
        inner_ff_dim,
        dropout=0.0,
        kernel_regularizer=None,
    ):
        super(StateTransformerBlock, self).__init__()
        # primitive properties
        self.num_heads = num_heads
        self.projection_dim = projection_dim
        
        # layers
        self.attention = MultiQueryAttention(
            num_heads=num_heads,
            proj_dim=projection_dim,
            dropout=dropout,
            kernel_regularizer=kernel_regularizer,
        )
        self.add1 = tf.keras.layers.Add()
        self.layernorm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.inner_dense = tf.keras.layers.Dense(
            units=inner_ff_dim,
            kernel_regularizer=kernel_regularizer,
            activation="relu",
        )
        self.outer_dense = GatedMlpBlock(
            inner_dim=inner_ff_dim,
            outer_dim=projection_dim,
            non_linearity="relu",
        )
        self.ff_dropout = tf.keras.layers.Dropout(dropout)
        self.add2 = tf.keras.layers.Add()
        self.layernorm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)


    def call(self, state_seq, input_seq):
        # state sequence is of shape (batch_size, num_of_state_cells, projection_dim)
        # input sequence is of shape (batch_size, input_size, projection_dim)
        # store_seq = tf.concat([state_seq, input_seq], axis=1)
        attention_output = self.attention(input_seq, state_seq)
        attention_output = self.add1([attention_output, state_seq])
        attention_output = self.layernorm_1(attention_output)
        inner_output = self.inner_dense(attention_output)
        outer_output = self.outer_dense(inner_output)
        outer_output = self.ff_dropout(outer_output)
        outer_output = self.add2([outer_output, attention_output])
        return self.layernorm_2(outer_output) # the output is of shape (batch_size, num_of_state_cells, projection_dim)
    

class ITSRU(tf.keras.layers.Layer):
    def __init__(
        self,
        num_heads,
        num_state_cells,
        projection_dim,
        inner_ff_dim,
        initial_state_trainability=False,
        dropout=0.0,
        kernel_regularizer=None,
    ):
        super(ITSRU, self).__init__()

        # Initialize the learnable initial state
        self.initial_state = self.add_weight(
            shape=(1, num_state_cells, projection_dim),
            initializer='random_normal',
            trainable=initial_state_trainability,
            name='initial_state'
        )
        # State TE layers
        self.calc_z = StateTransformerBlock(
            num_heads=num_heads,
            projection_dim=projection_dim,
            inner_ff_dim=inner_ff_dim,
            dropout=dropout,
            kernel_regularizer=kernel_regularizer,
        )
        self.calc_r = StateTransformerBlock(
            num_heads=num_heads,
            projection_dim=projection_dim,
            inner_ff_dim=inner_ff_dim,
            dropout=dropout,
            kernel_regularizer=kernel_regularizer,
        )
        self.calc_current_state = StateTransformerBlock(
            num_heads=num_heads,
            projection_dim=projection_dim,
            inner_ff_dim=inner_ff_dim,
            dropout=dropout,
            kernel_regularizer=kernel_regularizer,
        )


    def set_initial_state_trainability(self, trainable):
        self.initial_state._trainable = trainable


    def call(self, input_seq):
        # Assume that input is of size [B,T,S,D] where B is the batch size, T is the number of time steps, S is the sequence length at each timestep, and D is the feature dimension
        # initialize the state sequence
        batch_size = tf.shape(input_seq)[0]
        # Use the learnable initial state, replicate it for the whole batch
        state_t = tf.tile(self.initial_state, [batch_size, 1, 1])
        
        folds = tf.shape(input_seq)[1]
        states = tf.TensorArray(
            tf.float32,
            dynamic_size=True,
            size=0
        )
        for fold in range(folds):
            curr_input_seq = input_seq[:, fold, :, :]
            z = self.calc_z(state_t, curr_input_seq)
            r = self.calc_r(state_t, curr_input_seq)
            current_state = self.calc_current_state(r*state_t, curr_input_seq)
            state_t = (1 - z)*state_t + z*current_state
            states = states.write(fold, state_t)#.mark_used()
        
        return tf.transpose(
            states.stack(),
            [1, 0, 2, 3]
        )


class ITS(tf.keras.models.Model):
    def __init__(
        self,
        num_classes,
        num_heads,
        num_repeats,
        num_state_cells,
        input_seq_size,
        projection_dim,
        inner_ff_dim,
        initial_state_trainability=False,
        dropout=0.0,
        kernel_regularizer=None,
    ):
        super(ITS, self).__init__()
        # the input sequence size
        self.input_seq_size = input_seq_size
        
        # ITS recurrent units
        self.encoding = tf.keras.layers.Dense(
            units=projection_dim,
            kernel_regularizer=kernel_regularizer,
        )

        self.rope = RotaryPositionalEncoding(
            theta_0=10000,
            projection_dim=projection_dim,
        )
        
        self.itsrus = [ ITSRU(
            num_heads=num_heads,
            num_state_cells=num_state_cells,
            projection_dim=projection_dim,
            inner_ff_dim=inner_ff_dim,
            initial_state_trainability=initial_state_trainability,
            dropout=dropout,
            kernel_regularizer=kernel_regularizer,
        ) for _ in range(num_repeats) ]
        
        # self.label_token = self.add_weight(
        #     shape=(1, 1, projection_dim),
        #     initializer='random_normal',
        #     trainable=initial_state_trainability,
        #     name='initial_state'
        # )
        # self.mixer = StateTransformerBlock(
        #     num_heads=num_heads,
        #     projection_dim=projection_dim,
        #     inner_ff_dim=inner_ff_dim,
        #     dropout=dropout,
        #     kernel_regularizer=kernel_regularizer,
        # )

        self.classifier = tf.keras.layers.Dense(
            units=num_classes,
            activation="softmax",
        )



    def call(self, input_seq):
        # input_seq is of shape (batch_size, input_size, feature_dim).
        # First of all, we will transform it to the shape (batch_size, folds, input_seq_size, projection_dim)
        # Pad the input sequence to the nearest multiple of input_seq_size
        input_seq = self.encoding(input_seq)
        input_seq_size = input_seq.shape[1]
        folds = tf.cast(tf.math.ceil(input_seq_size / self.input_seq_size), tf.int32)
        final_time_steps = folds * self.input_seq_size
        input_seq = tf.pad(
            input_seq,
            [[0, 0], [0, final_time_steps - input_seq_size], [0, 0]]
        )
        input_seq = self.rope(input_seq)
        
        input_seq = tf.reshape(
            input_seq,
            [-1, folds, self.input_seq_size, input_seq.shape[-1]]
        )
        # pass the input sequence through the ITSRUs
        x = input_seq
        for itsru in self.itsrus:
            x = itsru(x)

        # mix the states of the last timestep with the label token
        # transform the label weight to the shape (batch_size, 1, projection_dim)
        # label_token = tf.tile(self.label_token, [tf.shape(x)[0], 1, 1])
        # x = self.mixer(label_token, x[:, -1, 0, :])
        # x = tf.squeeze(x, axis=1)

        return self.classifier(x[:, -1, 0, :])

In [46]:
state_transformer = ITS(
    num_classes=31,
    num_repeats=2,
    num_heads=8,
    num_state_cells=10,
    input_seq_size=31,
    projection_dim=32,
    inner_ff_dim=64,
    dropout=0.1,
    kernel_regularizer=tf.keras.regularizers.l2(0.01),
)

In [47]:
state_transformer.compile(
    optimizer=tf.keras.optimizers.AdamW(learning_rate),
    loss="categorical_crossentropy",
    metrics=["accuracy"],
)


model_path = "./models/its_chkpnt/its_chkpnt.ckpt"
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=model_path,
    save_weights_only=True,
    save_freq="epoch",
    verbose=0,
)

state_transformer_history = state_transformer.fit(
    train,
    validation_data=valid,
    epochs=21,
    # callbacks=[
    #     model_checkpoint_callback,
    # ],
)

Epoch 1/21


ValueError: in user code:

    File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/engine/training.py", line 1338, in train_function  *
        return step_function(self, iterator)
    File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/engine/training.py", line 1322, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/engine/training.py", line 1303, in run_step  **
        outputs = model.train_step(data)
    File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/engine/training.py", line 1080, in train_step
        y_pred = self(x, training=True)
    File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/tmp/__autograph_generated_filexhjiflb7.py", line 14, in tf__call
        input_seq = ag__.converted_call(ag__.ld(self).rope, (ag__.ld(input_seq),), None, fscope)
    File "/tmp/__autograph_generated_fileba7ccgq3.py", line 16, in tf__call
        linear_phase = ag__.ld(seq_indices) * ag__.ld(self).thetas

    ValueError: Exception encountered when calling layer 'its' (type ITS).
    
    in user code:
    
        File "/home/zuherj/codehub/stable/active/kws/its_safoos.py", line 344, in call  *
            input_seq = self.rope(input_seq)
        File "/home/zuherj/miniconda3/envs/kws/lib/python3.9/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler  **
            raise e.with_traceback(filtered_tb) from None
        File "/tmp/__autograph_generated_fileba7ccgq3.py", line 16, in tf__call
            linear_phase = ag__.ld(seq_indices) * ag__.ld(self).thetas
    
        ValueError: Exception encountered when calling layer 'rotary_positional_encoding_4' (type RotaryPositionalEncoding).
        
        in user code:
        
            File "/home/zuherj/codehub/stable/active/kws/its_safoos.py", line 48, in call  *
                linear_phase = seq_indices * self.thetas
        
            ValueError: Dimensions must be equal, but are 39 and 32 for '{{node its/rotary_positional_encoding_4/mul}} = Mul[T=DT_FLOAT](its/rotary_positional_encoding_4/Tile, its/rotary_positional_encoding_4/mul/y)' with input shapes: [124,39], [32].
        
        
        Call arguments received by layer 'rotary_positional_encoding_4' (type RotaryPositionalEncoding):
          • input_seq=tf.Tensor(shape=(None, 124, 39), dtype=float32)
    
    
    Call arguments received by layer 'its' (type ITS):
      • input_seq=tf.Tensor(shape=(None, 124, 39), dtype=float32)


In [None]:
# Run both models TCResNet and StateTransformer for 30 epochs and graph the accuracy results
import matplotlib.pyplot as plt
results = {

}
for num_state_cells in [1, 4, 8, 12]:
    state_transformer = ITS(
        num_classes=31,
        num_repeats=2,
        num_heads=8,
        num_state_cells=num_state_cells,
        input_seq_size=31,
        projection_dim=32,
        inner_ff_dim=64,
        dropout=0.1,
        kernel_regularizer=tf.keras.regularizers.l2(0.01),
    )

    state_transformer.compile(
        optimizer=tf.keras.optimizers.AdamW(learning_rate),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )


    model_path = "./models/its_chkpnt/its_chkpnt.ckpt"
    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=model_path,
        save_weights_only=True,
        save_freq="epoch",
        verbose=0,
    )

    state_transformer_history = state_transformer.fit(
        train,
        validation_data=valid,
        epochs=21,
        # callbacks=[
        #     model_checkpoint_callback,
        # ],
    )
    results[num_state_cells] = state_transformer_history.history['val_accuracy']

# write results to a csv file
import csv
with open('results.csv', 'w') as f:
    csv_writer = csv.writer(f)
    for key, values in results.items():
        csv_writer.writerow([key] + values)


In [None]:
import csv
with open('results.csv', 'r') as f:
    model_names = []
    data = []
    first_row = True
    csv_reader = csv.reader(f)
    for row in csv_reader:
        if first_row:
            model_names = row
            first_row = False
            continue
        else:
            data.append(row)
        

In [None]:
def get_file_lines():
    model_names = []
    data = []
    with open("results.csv", 'r') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                model_names = row
                first_row = False
            else:
                data.append(row)

    return model_names, data

In [None]:
model_names, data = get_file_lines()

In [None]:
print(model_names)
print(data)

In [None]:
repeats = 1
state_cells = 4
model_names.index(f"r={repeats},s={state_cells}")

In [None]:
from itertools import product

import csv

In [None]:
f = open('results.csv', 'w')
csv_writer = csv.writer(f)
csv_writer.writerow(list(product([1, 2, 3], [4, 5, 6])))

In [None]:
f.close()