In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

# standard python
import numpy as np
import scipy
from scipy import stats
#import pathlib

import os
# plotting, especially for jupyter notebooks
import matplotlib
#matplotlib.rcParams['text.usetex'] = True # breaks for some endpoint labels
from matplotlib import pyplot as plt
from IPython.display import Image
import pandas as pd
# tensorflow
import tensorflow as tf
#tf.enable_eager_execution() # needed for tf version 1 or it stages operations but does not do them
from tensorflow import keras
from tensorflow.keras import layers, regularizers
tf.keras.backend.clear_session()  # For easy reset of notebook state.
# local routines
#from chemdataprep import load_PDBs,load_countsfromPDB,load_diametersfromPDB,find_chemnames
#from toxmathandler import load_tscores

#checkpoint_path = "/home2/ajgreen4/Read-Across_w_GAN/Models/cp.ckpt"
#checkpoint_dir = os.path.dirname(checkpoint_path)

print("tensorflow version",tf.__version__,". Executing eagerly?",tf.executing_eagerly())
print("Number of GPUs: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [2]:
import chemdataprep as CP
#import toxmathandler
import scipy.io

In [3]:
qm7_data = scipy.io.loadmat('qm7.mat')


In [None]:

# Print the dimensions of each array
for key in ['X', 'R', 'Z', 'T', 'P']:
    array = qm7_data.get(key)
    if array is not None:
        print(f"Dimensions of {key}: {array.shape}")
    else:
        print(f"{key} not found in the dataset")


In [None]:
# Get the atomic numbers array
atomic_numbers = qm7_data['Z']
all_atomic_numbers = atomic_numbers.flatten()

# Find atomic numbers
unique_atomic_numbers = set(all_atomic_numbers)
unique_atomic_numbers.discard(0)  # Remove 0 if it's not a valid atomic number

# Print unique atomic numbers
print("atomic numbers:", unique_atomic_numbers)

In [9]:
def atomic_number_to_symbol(atomic_number):
    periodic_table = {
        1: 'H', 6: 'C', 7: 'N', 8: 'O', 16: 'S'
    }
    return periodic_table.get(atomic_number, 'Unknown')

# Convert atomic numbers to element symbols
unique_elements = {atomic_number_to_symbol(int(num)) for num in unique_atomic_numbers}


Z = qm7_data['Z']
R = qm7_data['R']
mollist = []

for mol_idx in range(Z.shape[0]):
    molecule = []
    for atom_idx in range(Z.shape[1]):
        atomic_number = Z[mol_idx, atom_idx]
        if atomic_number != 0:
            atom_type = atomic_number_to_symbol(atomic_number)
            # Ensure coordinates are a NumPy array
            coordinates = np.array(R[mol_idx, atom_idx, :])
            molecule.append((atom_type, coordinates))
    mollist.append(molecule)


In [10]:

def speciesmap(atom_type):
    atom_to_number = {'H': 1, 'C': 6, 'N': 7, 'O': 8, 'S': 16}
    #print(atom_type)
    return np.array([atom_to_number.get(atom_type, 0)])  # Returns 0 if atom type is not recognized


In [None]:
from chemdataprep import load_qm7_data
ws, vs, Natoms, Nviews = load_qm7_data(mollist, speciesmap, setNatoms=None, setNviews=None, carbonbased=False, verbose=1)


In [None]:
from chemdataprep import chemicaldiameter
#delist=[chemicaldiameter(x) for x in mollist]
#labels=np.array(delist)
T=qm7_data['T']
T_reshaped = T.reshape((7165,))
print(type(T_reshaped))
T_reshaped.shape


In [13]:
from sklearn.model_selection import train_test_split
Z = qm7_data['Z']
R = qm7_data['R']

# Split the dataset into training and testing sets
Z_train, Z_test, R_train, R_test, T_train, T_test = train_test_split(Z, R,T_reshaped, test_size=0.2, random_state=42)

def atomic_number_to_symbol(atomic_number):
    periodic_table = {1: 'H', 6: 'C', 7: 'N', 8: 'O', 16: 'S'}
    return periodic_table.get(atomic_number, 'Unknown')
def convert_to_mollist1(Z_data, R_data):
    mollist1 = []
    for mol_idx in range(Z_data.shape[0]):
        molecule = []
        for atom_idx in range(Z_data.shape[1]):
            atomic_number = Z_data[mol_idx, atom_idx]
            if atomic_number != 0:  # Assuming 0 means no atom present
                atom_type = atomic_number_to_symbol(atomic_number)
                coordinates = np.array(R_data[mol_idx, atom_idx, :])
                molecule.append((atom_type, coordinates))
        mollist1.append(molecule)
    return mollist1

# Convert training and testing data to mollist format
qm7_train = convert_to_mollist1(Z_train, R_train)
qm7_test = convert_to_mollist1(Z_test, R_test)
qm7_labels_train = T_train
qm7_labels_test = T_test


In [None]:
length_of_qm7_labels_train = len(qm7_labels_train)
print(len(qm7_labels_train))
length_of_qm7_labels_test = len(qm7_labels_test)
print(len(qm7_labels_test))
len(T_reshaped)


In [None]:
ws_qm7_train, vs_qm7_train, Natoms_train, Nviews_train = load_qm7_data(qm7_train, speciesmap, setNatoms=None, setNviews=None, carbonbased=False, verbose=1)
qm7G_train=[ws_qm7_train, vs_qm7_train]
ws_qm7_test, vs_qm7_test, Natoms_test, Nviews_test = load_qm7_data(qm7_test, speciesmap, setNatoms=None, setNviews=None, carbonbased=False, verbose=1)
qm7G_test=[ws_qm7_test, vs_qm7_test]


In [16]:
labelsG_train = qm7_labels_train
labelsG_test = qm7_labels_test
Ntoxicity = 1
ws_train, vs_train = ws_qm7_train, vs_qm7_train
ws_test, vs_test = ws_qm7_test, vs_qm7_test
dataG_train=[ws_train,vs_train]
dataG_test=[ws_test,vs_test]

### **Neural Network Code**

#### **constructor routines**



In [17]:
# generic dense NN
def multiDense(Nin,Nout,Nhidden,widthhidden=None,kernel_regularizer=None):
    """Construct a basic NN with some dense layers.
    
    :parameter Nin: The number of inputs
    :type Nin: int
    :parameter Nout: The number of outputs
    :type Nout: int
    :parameter Nhidden: The number of hidden layers.
    :type Nhidden: int
    :parameter widthhidden: The width of each hidden layer.
        If left at None, Nin + Nout will be used.
    :parameter kernel_regularizer: the regularizer to use, such as regularizers.l2(0.001)
    :type kernel_regularizer: tensorflow.keras.regularizers.xxx
    :returns: The NN model
    :rtype: keras.Model
    
    """
    if widthhidden is None:
        widthhidden = Nin + Nout
    x = inputs = keras.Input(shape=(Nin,), name='multiDense_input')
    if kernel_regularizer is not None:
        print("Using regularization")
    for i in range(Nhidden):
        x = layers.Dense(widthhidden, activation='relu', kernel_regularizer=kernel_regularizer,name='dense'+str(i))(x)
#        x = layers.Dense(widthhidden, name='dense'+str(i))(x)
#        x = tf.nn.leaky_relu(x, alpha=0.05)
#    outputs = layers.Dense(Nout, activation='linear',name='multiDense_output')(x)
    outputs = layers.Dense(Nout,name='multiDense_output')(x)
    #outputs = tf.nn.leaky_relu(outputs, alpha=0.05)
    return keras.Model(inputs=inputs, outputs=outputs)#, name='multiDense')
if 1:
    # manual check of multiDense
    mmd = multiDense(10,4,3)
    # used to do the weighted sum over views


In [18]:
def parallelwrapper(Nparallel,basemodel,insteadmax=False):
    """Construct a model that applies a basemodel multiple times and take a weighted sum (or max) of the result.
    
    :parameter Nparallel: The number of times to apply in parallel
    :type Nparallel: int
    :parameter basemodel: a keras.Model inferred to have Nin inputs and Nout outputs.
    :type basemodel: a keras.Model
    :parameter insteadmax: If True, take the max of the results of the basemodel instead of the weighted sum.
        For compatibility, the model is still constructed with weights as inputs, but it ignores them.
    :type insteadmax: Boolean
    :returns: model with inputs shape [(?,Nparallel),(?,Nin,Nparallel)] and outputs shape (?,Nout).
        The first input is the scalar weights in the sum.
    :rtype: keras.Model
    
    Note: We could do a max over the parallel applications instead of or in addition to the weighted sum.
    
    """
    # infer shape of basemodel inputs and outputs
    Nin =  basemodel.inputs[0].shape[1]
    Nout =  basemodel.outputs[0].shape[1]
    
    # Apply basemodel Nparallel times in parallel
    # create main input (?,Nparallel,Nin) 
    parallel_inputs = keras.Input(shape=(Nparallel,Nin), name='parallelwrapper_input0')
    # apply base NN to each parallel slice; outputs (?,Nparallel,Nout)
    if False:
        # original version, stopped working at some tensorflow update
        xb = basemodel(parallel_inputs) # worked in earlier tensorflow
        #xb = tf.map_fn(basemodel,parallel_inputs) # another version that fails
    else:
        # newer version, works but makes summary and graphing cumbersome
        # unstack in the Nparallel directio
        parallel_inputsunstacked = tf.keras.ops.unstack(parallel_inputs, Nparallel, 1)
        # apply base NN to each 
        xbunstacked = [basemodel(x) for x in parallel_inputsunstacked]
        # re-stack
        xb = tf.keras.ops.stack(xbunstacked,axis=1)
    
    # create input scalars for weighted sun (?,Nparallel)
    weight_inputs = keras.Input(shape=(Nparallel,), name='parallelScalars')
    if insteadmax:
        # take max over the Nparallel direction to get (?,1,Nout)
        out = layers.MaxPool1D(pool_size=Nparallel)(xb)
        # reshape to (?,Nout)
        out = layers.Reshape((Nout,))(out)
    else:
        # do a weighted sum over the Nparallel direction to get (?,Nout)
        out = layers.Dot((-2,-1))([xb,weight_inputs])
    
    return keras.Model(inputs=[weight_inputs,parallel_inputs], outputs=out, name='parallelwrapper')
if 1:
    # manual check
    mmd = multiDense(10,4,3)
    mpw = parallelwrapper(5,mmd,insteadmax=0)
    # make models


In [None]:
def init_generator(data,labels,baselayers,Nfeatures,endlayers,base_regularizer=None,end_regularizer=None):
    """Initialize the generator neural net.
    
    :returns: return generator and descrimina NN.
    :rtype: keras.Model
    
    """
    ## Option changing how results of each view are aggregated
    insteadmax = False # Does weighted average; original design
    #insteadmax = True # Does max instead of weighted average (for both G and D)

    # G
    # base NN
    Gbase = multiDense(data[1].shape[2],Nfeatures,baselayers,kernel_regularizer=base_regularizer) 
    # parallel view wrapper
    Gpw = parallelwrapper(Nviews,Gbase,insteadmax)
    # features to toxicity
    #Gft = multiDense(Nfeatures,labels.shape[1],endlayers,kernel_regularizer=end_regularizer)
    #we got an error when running original line since energies are one dimnsional array so we can interpret the single dimension as having only one dimension
    Gft = multiDense(Nfeatures,1,endlayers,kernel_regularizer=end_regularizer) 
    # string together
    generator = keras.Model(inputs=Gpw.inputs,outputs=Gft(Gpw.outputs),name='generator')
    # make trainable
    generator.compile(optimizer='adam',loss='mse')
    #generator.summary()
    # previously did better with Nfeatures=Ntoxicity and no Gft

    if 0:
        # sanity checks that model is working
        print("Sanity check:")
        ws,vs = data
        gbv0call = Gbase(vs[:,0,:]).numpy()
        gbv0predict = Gbase.predict(vs[:,0,:])
        print("base: 0 ?==", np.linalg.norm(gbv0call-gbv0predict))
        gpwcall = Gpw([ws,vs]).numpy()
        gpwpredict = G
        pw.predict([ws,vs])
        print("wrapper: 0 ?==",np.linalg.norm(gpwcall-gpwpredict))
        gencall = generator([ws,vs]).numpy()
        genpredict = generator.predict([ws,vs])
        print("whole: 0 ?==",np.linalg.norm(gencall-genpredict))
        
    return generator
baselayers = 2  # hidden layers before weighted sum
base_reg = 0.1  # regularization for the base layers
Nfeatures = 1  # number of outputs of weighted sum
endlayers = 1  # hidden layers after weighted sum
end_reg = 0.1  # regularization for the end layers

if base_reg == 0:
    base_regularizer = None
else:
    base_regularizer = regularizers.l2(base_reg)

if end_reg == 0:
    end_regularizer = None
else:
    end_regularizer = regularizers.l2(end_reg)  #???

print("(baselayers, base_reg, Nfeatures, endlayers, end_reg) =",
      (baselayers, base_reg, Nfeatures, endlayers, end_reg))
# compile model with options
generator = init_generator(dataG_train,labelsG_train,baselayers,Nfeatures,endlayers,
                           base_regularizer=base_regularizer,end_regularizer=end_regularizer)
generator.compile(optimizer='adam',loss='mse')
#generator.summary()
#change loss to MAE for energies
dataG_train = [np.array(dataG_train[0], dtype='float32'), np.array(dataG_train[1], dtype='float32')]

# Ensure labels are numpy arrays of type float32 and have a 2D shape if required
labelsG_train = np.array(labelsG_train, dtype='float32').reshape(-1, 1)
dataG_test = [np.array(dataG_test[0], dtype='float32'), np.array(dataG_test[1], dtype='float32')]
labelsG_test = np.array(labelsG_test, dtype='float32').reshape(-1, 1)

In [None]:
%%time
# fit
BATCH_SIZE = 32
if 1: 
    history = generator.fit(dataG_train,labelsG_train,
                  epochs=500,batch_size=BATCH_SIZE,verbose=0,
                  validation_data=(dataG_test, labelsG_test))
    print("train loss=",generator.evaluate(dataG_train,labelsG_train,verbose=0))
    print("test loss=",generator.evaluate(dataG_test,labelsG_test,verbose=0))
#     generator.fit(dataG_train,labelsG_train,epochs=1000,batch_size=BATCH_SIZE,verbose=0)
#    generator.save(modelpath+"AG-model-RT.h5")
    


In [21]:
# plot predictions of G versus truth
def PvT_plot(model,data,labels,legend=None,title=None,doresidual=False):
    """Construct a plot of the true labels (x-axis) vs the data generated by the model (y-axis).
    
    :parameter model: the model (e.g. NN)
    :type model: keras.model
    :parameter data: the data that can be input to the model
    :type data: numpy.array
    :parameter labels: the true outputs corresponding to the data
    :type labels: numpy.array
    :parameter legend: The string labels corresponding to the columns of labels 
    :type legend: None or list of str
    :parameter title: A title for the plot
    :type title: None or string
    :parameter doresidual: If true, plot the residual instead
    :type doresidual: Boolean
    """
        
    gen_lab = model.predict(data)
    if doresidual:
        gen_lab = gen_lab - labels

    plt.figure()
    ax = plt.subplot(111)        
    if legend is None:
        for i in range(labels.shape[1]):
            plt.plot(labels[:,i],gen_lab[:,i],'o')
    else:
        for i in range(labels.shape[1]):
            # include legend
            plt.plot(labels[:,i],gen_lab[:,i],'o', label=legend[i])
        ax.legend(bbox_to_anchor=(1.04,1), borderaxespad=0)
    ax.set_xlabel('True Values')
    if doresidual:
        ax.set_ylabel('Residual Values')
    else:
        ax.set_ylabel('Generated Values')
        # reference line
        mintrue = np.min(labels)
        maxtrue = np.max(labels)
        plt.plot([mintrue,maxtrue],[mintrue,maxtrue])
    if title is None:
        title = ''
    if 1:
        if not doresidual:
            gen_lab = gen_lab - labels
        MSE = np.square(gen_lab).mean()
        title = title+" Mean Squared Error="+str(MSE)
        print("Mean Squared Error: ", MSE)
    plt.title(title)
    plt.show()

In [None]:
# Plot training and validation loss
if 1:
    plt.figure()
    if base_regularizer is None and end_regularizer is None:
        plt.title('Loss (no regularization)')
    else:
        plt.title(f'Loss, including regularization (base_reg,end_reg)=({base_reg},{end_reg})')
    plt.semilogy(history.history['loss'], label='train')
    plt.semilogy(history.history['val_loss'], label='test')
    plt.ylim((0, plt.ylim()[1]))  # Set lower ylim to 0
    plt.legend()
    plt.show()



In [33]:
# Calculate and print mean squared error
#if 1:
 #   gen_lab = generator.predict(dataG_train)
  #  MSE = np.square(np.subtract(labelsG_train, gen_lab)).mean()
   # print("Train Mean Squared Error: ", MSE)

In [None]:
# Predict the values using the trained model for both training and test sets
predicted_train = generator.predict(dataG_train)
predicted_test = generator.predict(dataG_test)

# Calculate the Mean Absolute Error for the training set
mae_train = np.mean(np.abs(predicted_train - labelsG_train))

# Calculate the Mean Absolute Error for the test set
mae_test = np.mean(np.abs(predicted_test - labelsG_test))

# Print out the MAE for both sets
print("Train Mean Absolute Error: ", mae_train)
print("Test Mean Absolute Error: ", mae_test)


In [None]:
doresidual = False
legend=None
PvT_plot(generator,dataG_train,labelsG_train,title='train',doresidual=doresidual,legend=legend)
PvT_plot(generator,dataG_test,labelsG_test,title='test',doresidual=doresidual,legend=legend)