In [3]:
from keras.models import Sequential, Model
from keras.layers import GRU, Dense, TimeDistributed, Lambda, Flatten, Dropout, Conv1D, BatchNormalization, ReLU, MaxPooling1D
from keras.layers import Input, Reshape, concatenate
from keras.optimizers import Adam
import keras.backend as K
from keras.regularizers import l2

2024-05-27 20:04:21.982857: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
class BaseGRU:
    def __init__(self, input_shape, learning_rate, dropout, recurrent_dropout, 
                 kernel_regularizer, recurrent_regularizer):
        self.input_shape = input_shape
        self.learning_rate = learning_rate
        self.dropout = dropout
        self.recurrent_dropout = recurrent_dropout
        self.kernel_regularizer = kernel_regularizer
        self.recurrent_regularizer = recurrent_regularizer
        self.model = self.create_model()

    def create_model(self):
        model = Sequential()
        model.add(GRU(20, dropout=self.dropout, recurrent_dropout=self.recurrent_dropout, return_sequences=True, input_shape=self.input_shape, 
                      kernel_regularizer=self.kernel_regularizer, recurrent_regularizer=self.recurrent_regularizer))
        model.add(TimeDistributed(Dense(500, activation='tanh', kernel_regularizer=self.kernel_regularizer)))
        model.add(TimeDistributed(Dense(1, activation='sigmoid', kernel_regularizer=self.kernel_regularizer)))
        model.add(Flatten())

        # Custom MIL aggregation layer using max functions
        def mil_aggregation(x):
            return K.max(x, axis=1, keepdims=True)

        model.add(Lambda(mil_aggregation))
        model.compile(optimizer=Adam(learning_rate=self.learning_rate), loss='binary_crossentropy', metrics=['accuracy'])
        return model

    def summary(self):
        return self.model.summary()

    def fit(self, x, y, epochs=10, batch_size=32, validation_data=None):
        return self.model.fit(x, y, epochs=epochs, batch_size=batch_size, validation_data=validation_data)

    def evaluate(self, x, y):
        return self.model.evaluate(x, y)

    def predict(self, x):
        return self.model.predict(x)

    def save(self, filepath):
        self.model.save(filepath)

In [5]:
class MultiHeadCnnRnn:
    def __init__(self, input_shape, kernel_sizes, filters, learning_rate, weight_decay, 
                 dropout, recurrent_dropout, kernel_regularizer, recurrent_regularizer):
        self.input_shape = input_shape
        self.kernel_sizes = kernel_sizes
        self.filters = filters
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.dropout = dropout
        self.recurrent_dropout = recurrent_dropout
        self.kernel_regularizer = kernel_regularizer
        self.recurrent_regularizer = recurrent_regularizer
        self.model = self.create_model()

    def create_model(self):
        input_layer = Input(shape=self.input_shape)

        # Multi-Head CNN processing
        cnn_outputs = []
        print(f'Feature amount: {self.input_shape[1]}')
        for i in range(self.input_shape[1]):
            head = Reshape((self.input_shape[0], 1))(input_layer[:, :, i:i+1])
            for j in range(len(self.kernel_sizes)):
                head = Conv1D(filters=self.filters[j], kernel_size=self.kernel_sizes[j], activation='relu',
                              kernel_regularizer=l2(self.weight_decay))(head)
                head = BatchNormalization()(head)
            head = Conv1D(filters=1, kernel_size=1, padding='same', activation='sigmoid',
                          kernel_regularizer=l2(self.weight_decay))(head)
            cnn_outputs.append(head)
        concatenated = concatenate(cnn_outputs)
        
        # Sequential GRU processing from BaseGRU configuration
        gru_layer = GRU(20, dropout=self.dropout, recurrent_dropout=self.recurrent_dropout,
                        return_sequences=True, kernel_regularizer=self.kernel_regularizer,
                        recurrent_regularizer=self.recurrent_regularizer)(concatenated)
        time_distributed_dense = TimeDistributed(Dense(500, activation='tanh', kernel_regularizer=self.kernel_regularizer))(gru_layer)
        time_distributed_output = TimeDistributed(Dense(1, activation='sigmoid', kernel_regularizer=self.kernel_regularizer))(time_distributed_dense)
        flattened = Flatten()(time_distributed_output)
        
        # Custom MIL aggregation layer using max functions
        def mil_aggregation(x):
            return K.max(x, axis=1, keepdims=True)

        mil_output = Lambda(mil_aggregation)(flattened)

        # Compile the model
        model = Model(inputs=input_layer, outputs=mil_output)
        model.compile(optimizer=Adam(learning_rate=self.learning_rate), loss='binary_crossentropy', metrics=['accuracy'])

        return model
    
    def summary(self):
        return self.model.summary()

    def fit(self, x, y, epochs=30, batch_size=128, validation_data=None):
        return self.model.fit(x, y, epochs=epochs, batch_size=batch_size, validation_data=validation_data)

    def evaluate(self, x, y):
        return self.model.evaluate(x, y)

    def predict(self, x):
        return self.model.predict(x)

    def save(self, filepath):
        self.model.save(filepath)