In [19]:
import tensorflow as tf
from tensorflow.keras.layers import Layer
import tensorflow.keras.backend as K
from tensorflow.keras import layers
import sys
sys.path.append('../..'); sys.path.append('../');

class HDSymbolicAttention(Layer):
    def __init__(self, d_model, **kwargs):
        super(HDSymbolicAttention, self).__init__(**kwargs)
        self.d_model = d_model  # Dimensionality of the model

    def build(self, input_shape):
        a = input_shape[0][0]
        self.value_weight = self.add_weight(name='value_weight',
                                            shape=(64, self.d_model),
                                            initializer='glorot_uniform',
                                            trainable=True)
        self.symbols = self.add_weight(shape=(3, 64),
                                 initializer='glorot_uniform',
                                 trainable=True)
        self.bn = layers.BatchNormalization(synchronized=True)
        super(HDSymbolicAttention, self).build(input_shape)


    def cosine_similarity(self, a, b):
        # Compute the cosine similarity as dot product divided by magnitudes
        dot_product = tf.reduce_sum(tf.math.sign(a) * tf.math.sign(b), axis=-1)/1000
        return dot_product   
    
    
    def create_cosine_similarity_matrix(self,X):
        X_expanded = tf.expand_dims(X, 2)  # Shape: (batch_size, N, 1, D)
        X_repeated = tf.repeat(X_expanded, repeats=tf.shape(X)[1], axis=2)  # Shape: (batch_size, N, N, D)
    
        X_i_expanded = tf.expand_dims(X, 1)  # Shape: (batch_size, 1, N, D)
        X_i_repeated = tf.repeat(X_i_expanded, repeats=tf.shape(X)[1], axis=1)  # Shape: (batch_size, N, N, D)
    
        X_i_plus_X_j = X_i_repeated + X_repeated  # Broadcasting adds the matrices element-wise
    
        S = self.cosine_similarity(X_i_repeated, X_i_plus_X_j)  # Shape: (batch_size, N, N)
    
        return tf.nn.softmax(S)
    
    def call(self, inputs):
        # Unpack the inputs (queries, keys, values)
        queries, keys, values = inputs

        # Linear projections
        value_projected = K.dot(values, self.value_weight)
        symbol_projected = K.dot(self.symbols, self.value_weight)
        #symbol_projected = self.symbols
        # Scaled dot-product attention
        #scores = self.W /tf.math.sqrt(tf.cast(tf.shape(value_projected)[-1], tf.float32))
        scores  = self.create_cosine_similarity_matrix(value_projected)
        attention_output = tf.einsum('bii,bij->bij', scores, value_projected)
        return tf.nn.swish(attention_output*symbol_projected)

    def compute_output_shape(self, input_shape):
        return (input_shape[0][0], input_shape[0][1], self.d_model)

In [51]:
embedding_dim = 64
D = 1000
encoder_kwargs=dict(use_bias=True)
ordertransformer_kwargs = dict(embedding_dim=embedding_dim, encoder_kwargs=encoder_kwargs)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, name='binary_crossentropy')
create_opt = lambda : tf.keras.optimizers.Adam(learning_rate=0.001)
class LARS_VSA(tf.keras.Model):
    def __init__(self, embedding_dim, encoder_kwargs, name=None):
        super().__init__(name=name)
        #self.embedder = layers.Dense(embedding_dim)
        object_embedder = tf.keras.Sequential([layers.Dense(embedding_dim)])
        self.source_embedder = layers.TimeDistributed(object_embedder, name='source_embedder')
        self.dropout = layers.Dropout(0.2)
        self.flatten = layers.Flatten()
        self.hidden_dense = layers.Dense(256, activation='relu', name='hidden_layer')
        self.final_layer = layers.Dense(1, activation='sigmoid', name='final_layer')
        self.mha = HDSymbolicAttention(1000) 
        self.bn = layers.BatchNormalization(synchronized=True) 
    def call(self, inputs):
        #x = self.embedder(inputs)
        x = self.source_embedder(inputs)
        x = self.bn(x)
        h0 = self.mha([x,x,x])
        h0 = self.dropout(h0)
        x = self.flatten(h0)
        x = self.hidden_dense(x)
        x = self.final_layer(x)

        return x

    def print_summary(self, input_shape):
        inputs = layers.Input(input_shape)
        outputs = self.call(inputs)
        print(tf.keras.Model(inputs, outputs, name=self.name).summary())


transformer_model = HDTransformerOrderModel(**ordertransformer_kwargs, name='rmts_transformer')
transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
transformer_model.print_summary((3,512))

Model: "rmts_transformer"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_7 (InputLayer)        [(None, 3, 512)]             0         []                            
                                                                                                  
 source_embedder (TimeDistr  (None, 3, 64)                32832     ['input_7[0][0]']             
 ibuted)                                                                                          
                                                                                                  
 batch_normalization_19 (Ba  (None, 3, 64)                256       ['source_embedder[0][0]']     
 tchNormalization)                                                                                
                                                                                   

In [39]:
import numpy as np
from sklearn.model_selection import train_test_split
x_train = np.load('train_features.npy')
x_test = np.load('test_features.npy')
y_train = np.load('train_labels.npy')
y_test = np.load('test_labels.npy')
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=42)
print(x_train.shape)
import numpy as np

def create_sequence_data(x_data, y_data, N):
    # Initialize the new data structures
    X_new = np.zeros((N, 3, 512))
    Y_new = np.zeros(N)
    
    # Define the target patterns
    valid_patterns = {tuple([0, 1, 0]), tuple([1, 0, 1])}
    
    # Loop to fill the new data structures
    for i in range(N):
        # Choose 3 random indices
        indices = np.random.choice(x_data.shape[0], 3, replace=False)
        # Extract the submatrix and corresponding labels
        x_submatrix = x_data[indices]
        y_sublabels = y_data[indices]
        
        # Check if the selected labels match the desired patterns
        if tuple(y_sublabels) in valid_patterns:
            Y_new[i] = 1
        
        # Store the submatrix in the new X matrix
        X_new[i] = x_submatrix
    
    return X_new, Y_new

# Example usage:
# Assume x_train, y_train, x_val, y_val, x_test, y_test are already defined
N_train, N_val, N_test = 10000, 5000, 5000  # Define new dataset sizes if needed

# Generate new datasets
X_train_new, Y_train_new = create_sequence_data(x_train, y_train, N_train)
X_val_new, Y_val_new = create_sequence_data(x_val, y_val, N_val)
X_test_new, Y_test_new = create_sequence_data(x_test, y_test, N_test)

def transform_and_save(features, labels, filename_prefix):
    np.save(filename_prefix + '_features_seq.npy', features)
    np.save(filename_prefix + '_labels_seq.npy', labels)

# Assuming you have already loaded your datasets into train_features, train_labels, etc.
# Example usage:
transform_and_save(X_train_new, Y_train_new, 'train')
transform_and_save(X_val_new, Y_val_new, 'val')
transform_and_save(X_test_new, Y_test_new, 'test')

(16000, 512)


In [62]:
X_train = np.load('train_features_seq.npy')
X_test = np.load('test_features_seq.npy')
y_train = np.load('train_labels_seq.npy')
y_test = np.load('test_labels_seq.npy')
X_val = np.load('val_features_seq.npy')
y_val = np.load('val_labels_seq.npy')
Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)
        transformer_model = LARS_VSA(**ordertransformer_kwargs, name='rmts_transformer')
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics='binary_accuracy')
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=0)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_hdc.npy',Total_hdc)

KeyboardInterrupt: 

In [None]:
from transformer_modules import Encoder, Decoder, AddPositionalEmbedding
from abstracters import RelationalAbstracter
from abstractor import Abstractor

def create_abstractor(encoder_kwargs, abstractor_kwargs, embedding_dim, dropout_rate=0.1, name='abstractor'):
    inputs = layers.Input(shape=[3,512], name='input_seq')
    object_embedder = tf.keras.Sequential([layers.Dense(embedding_dim)])
    source_embedder = layers.TimeDistributed(object_embedder, name='source_embedder')
    # pos_embedding_adder_input = AddPositionalEmbedding(name='add_pos_embedding_input')
    abstractor = RelationalAbstracter(**abstractor_kwargs, name='abstractor')
    flattener = layers.Flatten()
    hidden_dense = layers.Dense(64, activation='relu', name='hidden_dense')
    final_layer = layers.Dense(1, name='final_layer',activation='sigmoid')

    x = source_embedder(inputs)
    # x = pos_embedding_adder_input(x)
    abstract_states = abstractor(x)
    x = flattener(abstract_states)
    x = hidden_dense(x)
    logits = final_layer(x)

    abstractor_model = tf.keras.Model(inputs=inputs, outputs=logits, name=name)
    return abstractor_model

In [None]:
abstractor_kwargs = dict(num_layers=1, num_heads=4, dff=64,
     use_pos_embedding=True, mha_activation_type='relu', dropout_rate=0.1)

abstractor_model_kwargs = dict(encoder_kwargs=None, abstractor_kwargs=abstractor_kwargs, embedding_dim=64)
abstractor_model = create_abstractor(**abstractor_model_kwargs)

abstractor_model.compile(loss=loss, optimizer=create_opt(), metrics=['acc'])
abstractor_model(X_train[:32]); # build
abstractor_model.summary()

In [None]:
Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)
        abstractor_model_kwargs = dict(encoder_kwargs=None, abstractor_kwargs=abstractor_kwargs, embedding_dim=64)
        transformer_model = create_abstractor(**abstractor_model_kwargs)
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=0)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_abstractor.npy',Total_hdc)

In [None]:
from transformer_modules import Encoder, Decoder, AddPositionalEmbedding
from abstracters import SymbolicAbstracter, RelationalAbstracter

def create_transformer(num_layers, num_heads, dff, embedding_dim, dropout_rate=0.1,):
    inputs = layers.Input(shape=(3,512))
    source_embedder = layers.TimeDistributed(layers.Dense(embedding_dim), name='source_embedder')
    pos_embedding_adder_input = AddPositionalEmbedding(name='add_pos_embedding_input')
    encoder = Encoder(num_layers=num_layers, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate, name='encoder')
    flattener = layers.Flatten()
    final_layer = layers.Dense(2, name='final_layer')

    x = source_embedder(inputs)
    x = pos_embedding_adder_input(x)
    encoder_context = encoder(x)
    output = flattener(encoder_context)
    logits = final_layer(output)

    transformer_model = tf.keras.Model(inputs=inputs, outputs=logits)
    return transformer_model

In [None]:
Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)
        transformer_model = create_transformer(num_layers=1, num_heads=4, dff=64, embedding_dim=64)
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=1)
        acc = transformer_model.evaluate(X_test, y_test,verbose=1)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_transformer.npy',Total_hdc)

In [None]:
from multi_head_relation import MultiHeadRelation

def create_corelnet(embedding_dim, activation='softmax', name='corelnet'):
    inputs = layers.Input(shape=(3,512), name='input_seq')
    object_embedder = tf.keras.Sequential([layers.Dense(embedding_dim)])
    source_embedder = layers.TimeDistributed(object_embedder, name='source_embedder')
    activation = layers.Softmax(axis=1) if activation == 'softmax' else layers.Activation(activation)
    mhr = MultiHeadRelation(rel_dim=1, proj_dim=None, symmetric=True, dense_kwargs=dict(use_bias=False))
    flattener = layers.Flatten()
    final_layer = layers.Dense(1, name='final_layer',activation='sigmoid')

    x = source_embedder(inputs)
    x = mhr(x)
    x = activation(x)
    x = flattener(x)
    logits = final_layer(x)

    corelnet_model = tf.keras.Model(inputs=inputs, outputs=logits, name=name)
    return corelnet_model
    
Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)
        abstractor_model_kwargs = dict(encoder_kwargs=None, abstractor_kwargs=abstractor_kwargs, embedding_dim=64)
        transformer_model = create_corelnet(embedding_dim=64)
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=0)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_corelnet.npy',Total_hdc)

In [None]:
from multi_head_relation import MultiHeadRelation

def create_corelnet(embedding_dim, activation='relu', name='corelnet'):
    inputs = layers.Input(shape=(3,512), name='input_seq')
    object_embedder = tf.keras.Sequential([layers.Dense(embedding_dim)])
    source_embedder = layers.TimeDistributed(object_embedder, name='source_embedder')
    activation = layers.Softmax(axis=1) if activation == 'softmax' else layers.Activation(activation)
    mhr = MultiHeadRelation(rel_dim=1, proj_dim=None, symmetric=True, dense_kwargs=dict(use_bias=False))
    flattener = layers.Flatten()
    final_layer = layers.Dense(1, name='final_layer',activation='sigmoid')

    x = source_embedder(inputs)
    x = mhr(x)
    x = activation(x)
    x = flattener(x)
    logits = final_layer(x)

    corelnet_model = tf.keras.Model(inputs=inputs, outputs=logits, name=name)
    return corelnet_model
    
Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)
        abstractor_model_kwargs = dict(encoder_kwargs=None, abstractor_kwargs=abstractor_kwargs, embedding_dim=64)
        transformer_model = create_corelnet(embedding_dim=64)
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=0)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_corelnet_relu.npy',Total_hdc)

In [None]:
from tensorflow.keras import layers
from baseline_models.predinet import PrediNet
embedding_dim = 64
predinet_kwargs = dict(embedding_dim=embedding_dim, predinet_kwargs=dict(key_dim=4, n_heads=4, n_relations=16, add_temp_tag=False))

def create_predinet(embedding_dim, predinet_kwargs, name=None):
    predinet_model = tf.keras.Sequential([
        PrediNet(**predinet_kwargs),
        layers.Flatten(),
        layers.Dense(1, name='final_layer',activation='sigmoid')
    ], name=name)
    return predinet_model

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, name='binary_crossentropy')
create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)

Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        create_opt = lambda : tf.keras.optimizers.Adam(learning_rate=0.0001)
        transformer_model = create_predinet(**predinet_kwargs, name='predinet')
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=1)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_predinet.npy',Total_hdc)

In [None]:
from tensorflow.keras import layers
from baseline_models.predinet import PrediNet
embedding_dim = 64
predinet_kwargs = dict(embedding_dim=embedding_dim, predinet_kwargs=dict(key_dim=4, n_heads=4, n_relations=16, add_temp_tag=False))

def create_mlp(embedding_dim, predinet_kwargs, name=None):
    predinet_model = tf.keras.Sequential([
        layers.Dense(64, name='hidden_layer',activation='relu'),
        layers.Flatten(),
        layers.Dense(1, name='final_layer',activation='sigmoid')
    ], name=name)
    return predinet_model

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, name='binary_crossentropy')
create_opt = lambda : tf.keras.optimizers.AdamW(learning_rate=0.001)

Total_hdc = []
for train_size in [50,100,200,500,750,1000,2000,3000,5000]:
    hdc = []
    for j in range(10):
        np.random.seed(j)
        transformer_model = create_mlp(**predinet_kwargs, name='predinet')
        transformer_model.compile(loss='binary_crossentropy', optimizer=create_opt(), metrics=['binary_accuracy'])
        X_train_1, y_train_1 = X_train[:train_size], y_train[:train_size]
        history = transformer_model.fit(X_train_1, y_train_1, validation_data=(X_val, y_val), epochs=50, batch_size=512,verbose=0)
        acc = transformer_model.evaluate(X_test, y_test,verbose=0)[1]
        hdc.append(acc)
    hdc = np.array(hdc)
    Total_hdc.append(hdc)
np.save('total_mlp.npy',Total_hdc)