# Evaluating Atomic Array Representations with Attention Mechanisms for Bandgap Prediction and Classification
This notebook applies attention-based deep learning to atomic array representations for bandgap prediction and classification.

- Atomic arrays encode crystal structures into tensors.

- Attention layers highlight relevant atomic interactions for accurate and interpretable predictions.

The goal is to benchmark self attention-enhanced deep learning models in materials materials property prediction.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf

initializer_seed = 10
initializer_scale = 1e-2

kernel_initializer = tf.keras.initializers.VarianceScaling(scale = initializer_scale, distribution = 'normal', seed = initializer_seed)

In [None]:
np.set_printoptions(suppress=True)

#### Atomic Array definition

In [None]:
df = pd.read_pickle('support/hseDataset.pkl')
x = np.load('data/inputs.npy')
y = np.load('data/outputs.npy')

print(df.shape, x.shape)

x[:,:,:3] = (x[:,:,:3] - np.median(x[:,:,:3], axis=1, keepdims=True))

### Materials with 1 < Z <= 84
idx = np.intersect1d(np.argwhere((x[:,:,-1] > 0).sum(axis=1) == 512)[:,0], np.argwhere((x[:,:,-1] <= 84).sum(axis=1) == 512)[:,0])
x = x[idx]
y = y[idx]
df = df.iloc[idx,:].reset_index(drop=True)

### Band gap filtering (Up to two standard deviations)
n = 2

conditions = dict()
for row in range(4):
    conditions[row] = np.intersect1d(np.argwhere(0 < y[:,row])[:,0], np.argwhere(y[:,row] <= y[:,row].mean() + n*y[:,row].std())[:,0])
idxRightOutputs = np.intersect1d(np.intersect1d(conditions[0], conditions[1]), np.intersect1d(conditions[2], conditions[3]))

### Gap type (Just Direct or Indirect)
idxRightOutputs = np.intersect1d(idxRightOutputs, np.argwhere(y[:,4] == y[:,5])[:,0])

x = x[idxRightOutputs]
y = y[idxRightOutputs, :5]
df = df.iloc[idxRightOutputs,:].reset_index(drop=True)

#### Training and Test set construction 

In [None]:
testFrac = 0.20
muestras = x.shape[0]

idxtest = np.random.choice(range(muestras), int(testFrac*muestras), replace=False)
idxtrain = np.setdiff1d(np.arange(muestras), idxtest)

xtrain = x[idxtrain]
ytrain = y[idxtrain]

xtest = x[idxtest]
ytest = y[idxtest]

In [None]:
def data_augmentation(x, y):

    xtemp1 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp1[row] = x[row,np.lexsort((x[row,:,0], x[row,:,1], x[row,:,2]))]

    xtemp2 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp2[row] = x[row,np.lexsort((x[row,:,2], x[row,:,1], x[row,:,0]))]

    xtemp3 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp3[row] = x[row,np.lexsort((x[row,:,0], x[row,:,2], x[row,:,1]))]

    xtemp4 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp4[row] = x[row,np.lexsort((x[row,:,1], x[row,:,2], x[row,:,0]))]

    xtemp5 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp5[row] = x[row,np.lexsort((x[row,:,1], x[row,:,0], x[row,:,2]))]

    xtemp6 = np.zeros(x.shape)
    for row in range(x.shape[0]):
        xtemp6[row] = x[row,np.lexsort((x[row,:,2], x[row,:,0], x[row,:,1]))]

    return np.concatenate((xtemp1, xtemp2, xtemp3, xtemp4, xtemp5, xtemp6), axis=0), np.concatenate((y,y,y,y,y,y), axis=0)

In [None]:
xtrain, ytrain = data_augmentation(xtrain, ytrain)

In [None]:
for row in range(xtest.shape[0]):
    xtest[row] = xtest[row, np.lexsort((xtest[row,:,0], xtest[row,:,1], xtest[row,:,2]))]

In [None]:
#Band-gap type
#bg=pd.DataFrame(ytest)
#bg[4].value_counts()

#### Training set randomization

In [None]:
idxBarajeo = np.random.choice(range(xtrain.shape[0]), xtrain.shape[0], replace=False)
idxBarajeo = np.random.choice(idxBarajeo, xtrain.shape[0], replace=False)
idxBarajeo = np.random.choice(idxBarajeo, xtrain.shape[0], replace=False)

xtrain = xtrain[idxBarajeo]
ytrain = ytrain[idxBarajeo]

#### Attention Mechanism-based DL Architectures 

In [None]:
def create_set(tensors = list):

    dtensors = list()
    for i in tensors:
        dtensors += [tf.data.Dataset.from_tensor_slices(i)]
    dtensors = tuple(dtensors)
    return tf.data.Dataset.zip(dtensors)

In [None]:
class ProcessTensor(tf.keras.layers.Layer):
    def __init__(self,**kwargs):
        super(ProcessTensor, self).__init__()

    def call(self, x, random):
        indices = tf.cast(x[:,:,-1], dtype=tf.int32)
        xelem = tf.gather(random, indices, batch_dims=1)
        return tf.concat((x[:,:,:-1], xelem), axis=-1)

    def get_config(self):
        config = super(ProcessTensor, self).get_config()
        return config


In [None]:
def ConvLayer(x, filters = 32, filter_size=4, kernel_initializer = kernel_initializer,
              activation='', dropout=0.0):

    """
    This function performs a convolution on a tensorflow tensor without reducing its original dimensionality;
    i.e., the provided tensor is padded to keep the original dimension. After the convolution,
    the operations Batch Normalization, Activation, and Dropout are performed.
    Parameters:
        x: a tensorflow tensor of shape (batch_size, length, feature_maps)
        filters: the number of feature maps of the convolved tensor.
        filter_size: the length of the feature maps.
        kernel_initializer: an object from the module keras initializers.
                        It sets how the parameters are intialized at the beginning of the optimization.
        activation: string, the activation function to use after the convolutional part.
        dropout: the dropout fraction.
        stage: a number to keep control on the name of the layers in the CNN architecture.
    Returns:
        x: a processed (convolved) tensorflow tensor of shape (batch_size, length, feature_maps)
        int(stage) + 1: a number that keeps the serialization of the layers in the CNN architecture
    """


    x = tf.keras.layers.Conv1D(filters = filters, kernel_size = filter_size,
                      padding = 'same', kernel_initializer=kernel_initializer,
                      bias_initializer="zeros")(x)
    x = tf.keras.layers.BatchNormalization()(x)

    if activation:
        if activation == 'leaky_relu': x = tf.keras.layers.LeakyReLU(0.15)(x)
        else: x = tf.keras.layers.Activation(activation)(x)

    if dropout:
        x = tf.keras.layers.Dropout(dropout)(x)

    return x

def ResBlock(x, filters = 32, fsize_main = 4, fsize_sc = 1, kernel_initializer=kernel_initializer,
             activation='', dropout=0.0, chain = 2):
    """
    This function creates a residual block using the defined function above 'ConvLayer'.
    The depth of the residual block (number of functions ConvLayer) is controlled with
    the parameter chain.
    This function does not reduce the dimensionality of the original tensor.
    If the number of feature maps changes in the main path a ConvLayer is performed in the secondary path,
    otherwise the original tensor is simply added with the processed one.

    Parameters:
        x: a tensorflow tensor of shape (batch_size, length, feature_maps)
        filters: the number of feature maps of the convolved tensor.
        fsize_main: the length of the feature maps in the main path.
        fsize_sc: the length of the feature maps in the secondary path.
        kernel_initializer: an object from the module keras initializers.
                        It sets how the parameters are intialized at the beginning of the optimization.
        activation: string, the activation function to use after the convolutional part.
        dropout: the dropout fraction.
        stage: a number to keep control on the name of the layers in the CNN architecture.
        chain: this parameter defines the depth of the main path.
    Returns:
        x: a processed (convolved) tensorflow tensor of shape (batch_size, length, feature_maps)
        int(stage) + 1: a number that keeps the serialization of the layers in the CNN architecture
    """
    x_shortcut = x

    for depth in range(chain):

        if depth%2 == 0:
            x = ConvLayer(x, filters = filters, filter_size = fsize_main,
                                  kernel_initializer = kernel_initializer,
                                  activation = activation, dropout=dropout)
        else:
            x = ConvLayer(x, filters = filters, filter_size = fsize_main,
                                  kernel_initializer = kernel_initializer,
                                  dropout=dropout)

    if x_shortcut.shape[-1] != x.shape[-1]:
        x_shortcut = ConvLayer(x_shortcut, filters = filters, filter_size = fsize_sc,
                                       kernel_initializer = kernel_initializer,
                                       dropout=(fsize_sc/fsize_main)*dropout)

    x = tf.keras.layers.Add()([x, x_shortcut])

    if activation:
        if activation == 'leaky_relu': x = tf.keras.layers.LeakyReLU(0.15)(x)
        else: x = tf.keras.layers.Activation(activation)(x)

    return x

In [None]:
def ResDense(x, units=[1], dropout = 0.25, activation='leaky_relu'):

    ldim = x.shape[-1]

    for n, unit in enumerate(units):
        unit=int(unit)
        dense_layer = tf.keras.layers.Dense(unit, kernel_initializer = kernel_initializer)

        if n == 0: xm = dense_layer(x)
        else: xm = dense_layer(xm)

        if n != len(units)-1:

            if activation == 'leaky_relu':
                xm = tf.keras.layers.LeakyReLU(0.15)(xm)
            else:
                xm = tf.keras.layers.Activation(activation)(xm)
            xm = tf.keras.layers.LayerNormalization()(xm)

        if dropout: xm = tf.keras.layers.Dropout(dropout)(xm)

    if unit != ldim:
        sc_dense_layer = tf.keras.layers.Dense(unit, kernel_initializer = kernel_initializer)

        xs = sc_dense_layer(x)

        if dropout: xs = tf.keras.layers.Dropout(dropout)(xs)

        x = tf.keras.layers.Add()([xm,xs])
    else:
        x = tf.keras.layers.Add()([xm, x])

    return x

In [None]:
def EncBlock(xenc = tf.Tensor, heads = 1, kernel_initializer = kernel_initializer, dropout = 0, activation = '', units = [1,1]):
    xmhaenc = tf.keras.layers.MultiHeadAttention(num_heads = heads, key_dim = xenc.shape[-1]//heads, value_dim = xenc.shape[-1]//heads, kernel_initializer = kernel_initializer, dropout = dropout)(xenc,xenc)
    xenc = tf.keras.layers.Add()([xenc, xmhaenc]) #xenc + xmhaenc
    xenc = tf.keras.layers.LayerNormalization()(xenc)

    xenc = ResDense(xenc, units = [xenc.shape[-1]*i for i in units], dropout = dropout, activation = activation)
    xenc = tf.keras.layers.LayerNormalization()(xenc)

    return xenc

In [None]:
def model_2():
    input_tensor = tf.keras.Input((512, 4))
    input_tensor2 = tf.keras.Input((96, 100))

    x = ProcessTensor()(input_tensor, input_tensor2) #(batch, 512, 103)

    x = ResDense(input_tensor, units = [2*64, 64], dropout=0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)
    
    x = tf.keras.layers.Permute((2,1))(x)

    x = ResDense(x, units=[2*64, 64], dropout=0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)

    for block, hl in enumerate([[2,1],[0.25, 0.125]]):
        x = EncBlock(xenc = x, heads = 8, kernel_initializer = kernel_initializer, dropout = 0.25, activation = 'relu', units = hl)
    
    x = tf.keras.layers.Flatten()(x)

    x = ResDense(x, units = [64, 32], dropout = 0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)

    output_layer = tf.keras.layers.Dense(5)(x)

    return tf.keras.models.Model(inputs = [input_tensor, input_tensor2], outputs = output_layer, name='nn')

In [None]:
def model():
    input_tensor = tf.keras.Input((512, 4))
    input_tensor2 = tf.keras.Input((96, 100))

    x = ProcessTensor()(input_tensor, input_tensor2) #(batch, 512, 103)

    x = ResDense(input_tensor, units = [10*512, 64], dropout=0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)
    
    x = tf.keras.layers.Permute((2,1))(x)

    x = ResDense(x, units=[10*512, 512], dropout=0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)

    for block, hl in enumerate([[5,1],[5,1],[5,1],[5,1],[5,1], [5,1],[0.25, 0.125]]):
        x = EncBlock(xenc = x, heads = 8, kernel_initializer = kernel_initializer, dropout = 0.25, activation = 'relu', units = hl)
    
    x = tf.keras.layers.Flatten()(x)

    x = ResDense(x, units = [2048, 512], dropout = 0.25, activation='leaky_relu')
    x = tf.keras.layers.LayerNormalization()(x)
    x = tf.keras.layers.LeakyReLU(0.15)(x)

    output_layer = tf.keras.layers.Dense(5)(x)

    return tf.keras.models.Model(inputs = [input_tensor, input_tensor2], outputs = output_layer, name='nn')

In [None]:
# Model building/loading
nn = model()
#nn = tf.keras.models.load_model('nn_conv_23_2sd.keras', custom_objects={'ProcessTensor': ProcessTensor},compile=False)
nn.summary()

In [None]:
def loss_function(yreal = tf.Tensor, ypred = tf.Tensor):

    yrgaps = yreal[:,:-1]
    yrtype = yreal[:,-1]

    ypgaps = ypred[:,:-1]
    yptype = ypred[:,-1]

    loss_gaps = tf.keras.losses.LogCosh()(yrgaps, ypgaps)
    loss_type = tf.keras.losses.BinaryCrossentropy(from_logits=True)(yrtype, yptype)

    return loss_gaps + loss_type

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

In [None]:
trainset = create_set(tensors = [xtrain, ytrain])
testset = create_set(tensors = [xtest, ytest])

In [None]:
batch_size = 8

In [None]:
trainset = trainset.shuffle(200000, seed = 3451, reshuffle_each_iteration=True)
trainset = trainset.batch(batch_size, drop_remainder=True)

testset = testset.shuffle(200000, seed = 3451, reshuffle_each_iteration=True)
testset = testset.batch(batch_size, drop_remainder=True)

In [None]:
num_epochs=6

In [None]:
random = tf.random.uniform(minval=-1,maxval=1,shape=(96,96))

econf = np.load('data/econf.npy', allow_pickle=True).item()

xeconf = np.zeros((96,4))
for el in range(96):
    xeconf[el] = econf[el+1]

xeconf = tf.cast(xeconf, dtype = tf.float32)
xeconf = tf.concat((xeconf, random), axis=-1)

In [None]:
def processTensor(x, y, random):
    x = tf.cast(x, dtype=tf.float32)
    y = tf.cast(y, dtype = tf.float32)
    xelem = tf.one_hot(tf.cast(x[:,:,-1], dtype = tf.int32), depth=96)
    xelem = tf.cast(xelem, dtype=tf.float32)
    #xelem = xelem @ random
    x = tf.concat((x[:,:,:3], xelem), axis=-1)

    return x, y

#### Training

In [None]:
losses = list()
all_train_losses = list()
all_test_losses = list()
for epoch in range(1, num_epochs+1):

    epoch_losses = list()
    for i, (x, y) in enumerate(trainset):
        with tf.GradientTape() as tape:
            ypred = nn([x, tf.repeat(xeconf[None,...], batch_size, axis=0)], training=True)
            loss = loss_function(y, ypred)
            
        grad = tape.gradient(loss, nn.trainable_variables)
        optimizer.apply_gradients(grads_and_vars = zip(grad, nn.trainable_variables))
        epoch_losses += [loss.numpy()]
        print(i, loss.numpy())
    mean_epoch_loss = np.asarray(epoch_losses).mean()
    all_train_losses += [mean_epoch_loss]

    test_losses = tf.TensorArray(tf.float32, size = len(testset))
    for i, (xtest, ytest) in enumerate(testset):
        ptest = nn([xtest, tf.repeat(xeconf[None,...], batch_size, axis=0)], training = False)
        loss = loss_function(ytest, ptest)

        test_losses = test_losses.write(i, loss)

    test_losses = test_losses.stack()
    mean_test_loss = tf.reduce_mean(test_losses).numpy()
    all_test_losses += [mean_test_loss]

    print('Epoch', epoch, ' | Training loss:', mean_epoch_loss ,' | Test loss: ', mean_test_loss)

In [None]:
model_name='nn_conv_27_2sd'

In [None]:
dftrain = df.iloc[idxtrain,:].reset_index(drop=True)
dftest = df.iloc[idxtest,:].reset_index(drop=True)

dftrain.to_csv(f'data/dftrain_{model_name}.csv')
dftest.to_csv(f'data/dftest__{model_name}.csv')

In [None]:
nn.save(f'models/{model_name}.keras')

In [None]:
plt.figure()
plt.scatter(np.arange(num_epochs), all_train_losses, s=10, color='blue', label='training')
plt.scatter(np.arange(num_epochs), all_test_losses, s=10, color='red', label='test')
plt.show()


In [None]:
np.save('data/element_codification', xeconf.numpy())

In [None]:
losses_diccio = dict()
losses_diccio['train'] = all_train_losses
losses_diccio['test'] = all_test_losses
pd.DataFrame(losses_diccio).to_csv(f'data/{model_name}_losses.csv')

### Evaluation

In [None]:
x = np.load('data/inputs.npy')
y = np.load('data/outputs.npy')
df = pd.read_pickle('support/hseDataset.pkl')
#Load datasets (model evaluation)
#dftrain = pd.read_csv('data/dftrain_26_2sd.csv')
#dftest = pd.read_csv('.data/dftest_26_2sd.csv')

x[:,:,:3] = (x[:,:,:3] - np.median(x[:,:,:3], axis=1, keepdims=True))

idx_diccio = {k:v for v,k in enumerate(df['snumat_id'])}
idx_test = [idx_diccio[sample] for sample in dftest['snumat_id']]
xtest = x[idx_test]
ytest = y[idx_test]

for row in range(xtest.shape[0]):
    xtest[row] = xtest[row, np.lexsort((xtest[row,:,0], xtest[row,:,1], xtest[row,:,2]))]

In [None]:
xtest.shape, xeconf.shape

In [None]:
#load a trained model 
#nn = tf.keras.models.load_model('models/nn_conv_23_2sd.keras', custom_objects={'ProcessTensor': ProcessTensor})
#nn.summary()

In [None]:
xtest

In [None]:
ptest = nn.predict([xtest, tf.repeat(xeconf[None,...], xtest.shape[0], axis=0)], batch_size=16)

In [None]:
ptest, ytest

In [None]:
print('MAE:', abs(ytest[:,:4]-ptest[:,:4]).mean(axis=0))
print('RMSE:', ((ytest[:,:4]-ptest[:,:4])**2).mean(axis=0)**0.5)



In [None]:
xtest.shape

In [None]:
for gap in range(4):

    plt.figure()
    plt.scatter(ytest[:,gap], ptest[:,gap], color='blue', marker='x', alpha=0.5)
    plt.grid(True)
    plt.show()

In [None]:
for item in range(4):
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.scatter(ytest[:,item],ptest[:,item], s=10, color='purple', alpha=0.5)
    plt.grid()
    plt.xlabel(r'actual band gap (eV)')
    plt.ylabel(r'predicted bandgap (eV)')
    #plt.plot(np.arange(-1,7), np.arange(-1,7), linewidth=1, color='black')
    plt.title('Actual vs Predicted Values')
# Plot histograms of actual and predicted values
    ytest_plot = ytest[:,item]
    ptest_plot = ptest[:,item]
    plt.subplot(1, 2, 2)
    plt.hist(ytest_plot, bins=200, color='blue')
    plt.hist(ptest_plot, bins=200, color='red', alpha=0.5)
    plt.title('Histogram of Actual and Predicted Values')
    plt.xlabel('Value')
    plt.ylabel('Frequency')
    plt.legend(['Actual', 'Predicted'], loc='upper right')

In [None]:
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix

In [None]:
precision_recall_fscore_support(ytest[:,-1].astype(int), (1 + np.exp(-ptest[:,-1]))**-1 >= 0.5)

In [None]:
confusion_matrix(ytest[:,-1].astype(int), (1 + np.exp(-ptest[:,-1]))**-1 > 0.5)

In [None]:
losses_df= pd.read_csv(f'data/{model_name}_losses.csv')
epochs = losses_df.index 
train_data = losses_df['train']
test_data = losses_df['test']
plt.plot(epochs, train_data, label='Train')
plt.plot(epochs, test_data, label='Test')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
#Model architecture visualization
!netron nn_conv_27_2sd.keras