# Selecting the Most Efficient Method for a Biological Optimization Problem using Deep Bidirectional Encoder Representations from Transformers (BERT)
Designed to automatically select between deterministic and heuristic methods of solving a biological optimization problem, CutFree, using the BERT unsupervised model architecture.

## Import Dependencies

In [None]:
# system
import os
import csv
from datetime import datetime

# data analysis
import numpy as np
import pandas as pd

# data visualization
import matplotlib.pyplot as plt
import matplotlib.font_manager as font_manager
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

# deep learning
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from transformers import DistilBertTokenizer, TFDistilBertModel, AdamWeightDecay
from numba import cuda

# check device
device = cuda.get_current_device()
device.reset()

## Create Directory Path to Save Model Information

In [None]:
# get previous model version
highest_model_number = 0
for directory in os.listdir('models'):
    model_number = int(directory.split("-V")[-1])
    if model_number >= highest_model_number:
        highest_model_number = model_number + 1

# create save directory
save_folder = 'models/AlgorithmClassifier-V' + str(highest_model_number)
save_folder

## Clean Dataset to Include Target

In [None]:
# read in data to dataframe
file_path = '../runtime-simulations/runtime_data.csv'
df_original = pd.read_csv(file_path)
df = df_original.copy()

# drop duplicate rows
duplicate_rows = df.duplicated(subset=['Oligo', 'Sites'], keep='first')
df = df[~duplicate_rows]

# fix the sites column
df['Sites'] = [s[5:-2] for s in df['Sites']]
df['Sites'] = [s.replace('"', '').replace(' ', '') for s in df['Sites']]
df['Sites'] = [s.split(',') for s in df['Sites']]

# add time discrepancy column
df['Discrepancy'] = df['CutFree_Time'] - df['CutFreeRL_Time']
df = df.sort_values(by=['Discrepancy'], ascending=True)
df = df.reset_index(drop=True)

# add correct algorithm column
conditions = [
    (df['CutFree_Time'] <= df['CutFreeRL_Time']),
    (df['CutFree_Time'] > df['CutFreeRL_Time'])
]
values = [0, 1] # 0 = CutFree, 1 = CutFreeRL
df['Algorithm'] = np.select(conditions, values)

# adjust correct algorithm based on degeneracy if it outside of the confidence interval 
# (i.e., ignore CutFreeRL if the degeneracy loss is too significant, typically caused by incomplete CutFreeRL output)
df.loc[df['CutFree_Degeneracy'] == 0, 'Algorithm'] = 1
df.loc[df['CutFreeRL_Degeneracy'] <= df['CutFree_Degeneracy'] - (df['CutFree_Degeneracy'] * 0.10), 'Algorithm'] = 0

# count classifcations
print(df['Algorithm'].value_counts())

df.shape, df.iloc[0]

## Get Train, Validation, and Test Data

In [None]:
# gather text and label information
texts = []
labels = []
for index, row in df.iterrows():
    texts.append(row['Oligo'] + '; ' + ', '.join(row['Sites']))
    labels.append(row['Algorithm'])

In [None]:
# get random state to improve validity of results
random_state = np.random.randint(0, 1000)

# load train, validation, and test data for text
x_train, x_val, y_train, y_val = train_test_split(texts, labels, test_size=0.3, random_state=random_state)
x_val, x_test, y_val, y_test = train_test_split(x_val, y_val, test_size=0.5, random_state=random_state)

print("Random State: ", random_state)
print("Train, Validation, and Test Sizes: ", len(x_train), len(x_val), len(x_test))

## Tokenize Input Sequences

In [None]:
# constants
MAX_SEQUENCE_LENGTH = 256
BATCH_SIZE = 24
BERT_MODEL_NAME = 'distilbert-base-uncased'

# initialize tokenizer
tokenizer = DistilBertTokenizer.from_pretrained(BERT_MODEL_NAME)

# tokenize training data
tokens_train = tokenizer.batch_encode_plus(x_train, max_length=MAX_SEQUENCE_LENGTH, padding='max_length', truncation=True, return_tensors='tf')

input_ids_train = tokens_train['input_ids']
attention_mask_train = tokens_train['attention_mask']

texts_train = (input_ids_train, attention_mask_train)
labels_train = tf.constant(y_train)

# tokenize validation data
tokens_val = tokenizer.batch_encode_plus(x_val, max_length=MAX_SEQUENCE_LENGTH, padding='max_length', truncation=True, return_tensors='tf')

input_ids_val = tokens_val['input_ids']
attention_mask_val = tokens_val['attention_mask']

texts_val = (input_ids_val, attention_mask_val)
labels_val = tf.constant(y_val)

## Compile, Train, and Save Model

In [None]:
# input layer
input_ids = tf.keras.layers.Input(shape=(MAX_SEQUENCE_LENGTH,), name='input_ids', dtype='int32')
attention_mask = tf.keras.layers.Input(shape=(MAX_SEQUENCE_LENGTH,), name='attention_mask', dtype='int32')

# bert model
bert_model = TFDistilBertModel.from_pretrained(BERT_MODEL_NAME, output_attentions=False, output_hidden_states=False)([input_ids, attention_mask])
bert_output = bert_model.last_hidden_state[:, 0, :]

# mlp layers
mlp = tf.keras.Sequential([
    tf.keras.layers.Dropout(0.8, name='dropout_0'),
    tf.keras.layers.Dense(768, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_1'),
    tf.keras.layers.BatchNormalization(name='batch_norm_1'),
    tf.keras.layers.Dropout(0.4, name='dropout_1'),
    tf.keras.layers.Activation('relu', name='relu_1'),
    tf.keras.layers.Dense(512, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_2'),
    tf.keras.layers.BatchNormalization(name='batch_norm_2'),
    tf.keras.layers.Dropout(0.4, name='dropout_2'),
    tf.keras.layers.Activation('relu', name='relu_2'),
    tf.keras.layers.Dense(256, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_3'),
    tf.keras.layers.BatchNormalization(name='batch_norm_3'),
    tf.keras.layers.Dropout(0.4, name='dropout_3'),
    tf.keras.layers.Activation('relu', name='relu_3'),
    tf.keras.layers.Dense(128, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_4'),
    tf.keras.layers.BatchNormalization(name='batch_norm_4'),
    tf.keras.layers.Dropout(0.4, name='dropout_4'),
    tf.keras.layers.Activation('relu', name='relu_4'),
    tf.keras.layers.Dense(64, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_5'),
    tf.keras.layers.BatchNormalization(name='batch_norm_5'),
    tf.keras.layers.Dropout(0.4, name='dropout_5'),
    tf.keras.layers.Activation('relu', name='relu_5'),
    tf.keras.layers.Dense(32, 
                          kernel_regularizer = tf.keras.regularizers.L2(1e-4),
                          bias_regularizer = tf.keras.regularizers.L2(1e-4),
                          activity_regularizer = tf.keras.regularizers.L2(1e-4),
                          name='dense_6'),
    tf.keras.layers.BatchNormalization(name='batch_norm_6'),
    tf.keras.layers.Dropout(0.4, name='dropout_6'),
    tf.keras.layers.Activation('relu', name='relu_6')
], name='mlp')(bert_output)

# output layer
output = tf.keras.layers.Dense(2, activation='softmax', name='output')(mlp)

# combine input and output layers to create model
model = tf.keras.Model(inputs=[input_ids, attention_mask], outputs=output, name='AlgorithmClassifier')

model.summary()

In [None]:
# create optimizer and loss function
optimizer = AdamWeightDecay(learning_rate=5e-6, weight_decay_rate=0.01)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

# compile model
model.compile(optimizer, loss, metrics=['accuracy'])

# class weights (if not balanced)
class_weights = dict(enumerate(compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=np.array(y_train))))

# callbacks for early stopping, saving checkpoints, and visualizing in tensorboard
model_callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5),
    tf.keras.callbacks.ModelCheckpoint(filepath=save_folder + '/checkpoints/model.{epoch:02d}-{val_accuracy:.3f}.h5'),
    tf.keras.callbacks.TensorBoard(log_dir=save_folder + '/logs')]

# fine-tune model to training data
history = model.fit(
    texts_train,
    labels_train,
    validation_data=(texts_val, labels_val), 
    epochs=50, 
    batch_size=BATCH_SIZE,
    class_weight=class_weights,
    callbacks=model_callbacks)

In [None]:
tokenizer.save_pretrained(save_folder + '/tokenizer')

## Evaluate Model Performance

In [None]:
fig = plt.figure(figsize=(16, 12))

plt.subplot(2, 1, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.subplot(2, 1, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

fig.savefig(save_folder + '/training.png')

In [None]:
# tokenize test data
tokens_test = tokenizer.batch_encode_plus(x_test, max_length=MAX_SEQUENCE_LENGTH, padding='max_length', truncation=True, return_tensors='tf')

input_ids_test = tokens_test['input_ids']
attention_mask_test = tokens_test['attention_mask']

texts_test = (input_ids_test, attention_mask_test)
labels_test = tf.constant(y_test)

In [None]:
# function to get accuracy of model predictions
def get_accuracy(pred, true):
    test_accuracy = accuracy_score(pred, true) * 100
    print('Accuracy: {:.2f}%'.format(test_accuracy))
    return test_accuracy

# retrieve model predictions on test data
y_pred = model.predict(texts_test)
y_pred = np.argmax(y_pred, axis=1)

# get accuracy of predictions
acc = get_accuracy(y_pred, y_test)

# save accuracy
with open(save_folder + "/accuracy_results.csv", "a") as f:
    writer = csv.writer(f)
    writer.writerow([acc])
    
acc

In [None]:
# create classification report
class_report = classification_report(y_pred, y_test, target_names=['CutFree', 'CutFreeRL'])

# save classification report
with open(save_folder + "/classification_report.csv", "a") as f:
    writer = csv.writer(f)
    writer.writerow([class_report])
    
print(class_report)

In [None]:
# create confusion matrix
cm = confusion_matrix(y_test, y_pred)
fig = ConfusionMatrixDisplay(cm, display_labels=['CutFree', 'CutFreeRL'])
fig.plot()

# save confusion matrix
plt.savefig(save_folder + '/confusion_matrix.png')

## Plot Correct Selection for CutFree and CutFreeRL

In [None]:
# set colors and font for plots
rgb = []
for _ in df[df['Algorithm'] == 0].index:
    c = [46/255, 108/255, 190/255]
    rgb.append(c)

rgb2 = []
for _ in df[df['Algorithm'] == 1].index:
    c = [220/255, 77/255, 58/255]
    rgb2.append(c)

gfont = {'fontname': 'Georgia'}
font = font_manager.FontProperties(family='Georgia', style='normal', size=32)

In [None]:
plt.figure(figsize=(24, 8))

# plot expected algorithm selection for all time discrepencies
plt.scatter(
    df[df['Algorithm'] == 0].index, df[df['Algorithm'] == 0]['Discrepancy'], 
    c=rgb,
    linewidths=1,
    marker='|',
    s=500)
plt.scatter(
    df[df['Algorithm'] == 1].index, df[df['Algorithm'] == 1]['Discrepancy'], 
    c=rgb2,
    linewidths=1,
    marker='_',
    s=500)
    
plt.title('Correct Algorithm Selection for All Runtime Discrepancies', fontsize=32, **gfont)
plt.ylabel('Runime Discrepancy (sec)', fontsize=32, **gfont)
plt.legend(['CutFree', 'CutFreeRL'], prop=font)
ax = plt.gca()
plt.xticks(fontsize=32, **gfont)
plt.yticks(fontsize=32, **gfont)
ax.get_xaxis().set_visible(False)
ax.spines.right.set_visible(False)
ax.spines.top.set_visible(False)

plt.savefig(save_folder + '/correct_algorithm_selection.png')

## Plot Predictions for Selection of CutFree and CutFreeRL

In [None]:
# organize test data for analysis
x_test = np.array(x_test)

# get oligos from test inputs
x_test_oligos = [i.split(';')[0] for i in x_test]

# get sites from test inputs
x_test_sites = [i.split(';')[1] for i in x_test]
x_test_sites = np.array(x_test_sites)
x_test_sites = [x_test_site.replace(' ', '') for x_test_site in x_test_sites]
x_test_sites = [x_test_site.split(',') for x_test_site in x_test_sites]

print(len(x_test_oligos), len(x_test_sites), len(y_test))

In [None]:
# copy dataframe
df_test = df.copy()

# add prediction column using test data
for oligo, site, pred, test in zip(x_test_oligos, x_test_sites, y_pred, y_test):
    for index, (oligo_df, sites_df) in enumerate(zip(df_test['Oligo'].values, df_test['Sites'].values)):
        if (oligo == oligo_df) and (site == sites_df):
            df_test.loc[index, 'Prediction'] = pred == test

# drop unassigned rows
df_test = df_test.dropna(subset=['Prediction'])

# set prediction algorithm
cutfree_condition = ((df_test['Prediction'] == True) & (df_test['Algorithm'] == 0)) | ((df_test['Prediction'] == False) & (df_test['Algorithm'] == 1))
cutfreerl_condition = ((df_test['Prediction'] == True) & (df_test['Algorithm'] == 1)) | ((df_test['Prediction'] == False) & (df_test['Algorithm'] == 0))
df_test.loc[cutfree_condition, 'Prediction'] = 0
df_test.loc[cutfreerl_condition, 'Prediction'] = 1

# create correct/incorrect column
df_test['Correct'] = df_test['Algorithm'] == df_test['Prediction']

# sort dataframe by discrepency and reset index
df_test = df_test.sort_values(by=['Discrepancy'], ascending=True).reset_index(drop=True)

# check prediction column
df_test['Prediction'].value_counts() # 0 = CutFree, 1 = CutFreeRL

In [None]:
fig, ax = plt.subplots(figsize=(24, 16))

# plot correct and incorrect selections for all time discrepencies
plt.subplot(2, 1, 1)

correct_conditions = (df_test['Correct'] == True)
plt.scatter(
    df_test[correct_conditions].index,
    df_test.loc[correct_conditions, 'Discrepancy'], 
    c='g',
    linewidths=1,
    marker='s',
    s=50)

incorrect_conditions = (df_test['Correct'] == False)
plt.scatter(
    df_test[incorrect_conditions].index,
    df_test.loc[incorrect_conditions, 'Discrepancy'], 
    c='r',
    linewidths=1,
    marker='x',
    s=500)

plt.title(f'Accuracy of BERT-based Neural Network ({acc:.1f}%) for All Runtime Discrepancies', fontsize=32, **gfont)
plt.ylabel('Runime Discrepancy (sec)', fontsize=32, **gfont)
plt.legend([f'Correct Predictions ({len(df_test[correct_conditions])})', 
            f'Incorrect Predictions ({len(df_test[incorrect_conditions])})'], 
            prop=font)
ax = plt.gca()
plt.xticks(fontsize=32, **gfont)
plt.yticks(fontsize=32, **gfont)
ax.get_xaxis().set_visible(False)
ax.spines.right.set_visible(False)
ax.spines.top.set_visible(False)

# plot problematic selections for all time discrepencies
plt.subplot(2, 1, 2)

nonproblematic_conditions = (df_test['Correct'] == True) | ((df_test['Correct'] == False) & (df_test['Algorithm'] == 0)) | ((df_test['Correct'] == False) & (df_test['CutFree_Time'] <= 60))
plt.scatter(
    df_test[nonproblematic_conditions].index,
    df_test.loc[nonproblematic_conditions, 'Discrepancy'], 
    c='g',
    linewidths=1,
    marker='s',
    s=50)

# problematic conditions are defined as incorrect predictions that place the system at risk of runtime explosion
problematic_conditions = ((df_test['Correct'] == False) & (df_test['Algorithm'] == 1) & (df_test['CutFree_Time'] >= 60))
plt.scatter(
    df_test[problematic_conditions].index,
    df_test.loc[problematic_conditions, 'Discrepancy'], 
    c='r',
    linewidths=1,
    marker='x',
    s=500)

plt.title(f'Non-problematic vs. Problematic Predictions for All Runtime Discrepancies', fontsize=32, **gfont)
plt.ylabel('Runime Discrepancy (sec)', fontsize=32, **gfont)
plt.legend([f'Non-problematic Predictions ({len(df_test[nonproblematic_conditions])})', 
            f'Problematic Predictions ({len(df_test[problematic_conditions])})'], 
            prop=font)
ax = plt.gca()
plt.xticks(fontsize=32, **gfont)
plt.yticks(fontsize=32, **gfont)
ax.get_xaxis().set_visible(False)
ax.spines.right.set_visible(False)
ax.spines.top.set_visible(False)

plt.savefig(save_folder + '/runtime_discrepancies.png')

In [None]:
# get sensitivity for values above runtime limit
sensitivity = (len(df_test[nonproblematic_conditions])) / (len(df_test[nonproblematic_conditions]) + len(df_test[problematic_conditions])) * 100

# save sensitivity
with open(save_folder + "/sensitivity_results.csv", "a") as f:
    writer = csv.writer(f)
    writer.writerow([sensitivity])
    
sensitivity

## Get Average Model Statistics

In [None]:
# initialize values
average_sensitivity = np.array([])
average_acc = np.array([])

# retrieve all values from saved CSV files
for directory in os.listdir('models'):
    if directory.startswith('AlgorithmClassifier'):
        with open('models/' + directory + '/sensitivity_results.csv', 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                if len(row) > 0:
                    average_sensitivity = np.append(average_sensitivity, float(row[0]))
        with open('models/' + directory + '/accuracy_results.csv', 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                if len(row) > 0:
                    average_acc = np.append(average_acc, float(row[0]))
        n = len(average_sensitivity)

print(f"Average Sensitivity: {average_sensitivity.mean():.1f}% +/- {average_sensitivity.std():.5f}")
print(f"Average Accuracy: {average_acc.mean():.1f}% +/- {average_acc.std():.5f}")
print(f"Number of Trials: {n}")

# Load Model and Make New Predictions

In [None]:
# import dependencies
import tensorflow as tf
import numpy as np
from transformers import DistilBertTokenizer, TFDistilBertModel, AdamWeightDecay

# load model
new_model = tf.keras.models.load_model(
    'models/AlgorithmClassifier-V3/checkpoints/model.25-0.909.h5',
    custom_objects={'TFDistilBertModel': TFDistilBertModel, 
                    'AdamWeightDecay': AdamWeightDecay})

# load tokenizer
new_tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')        

In [None]:
# get input
text = 'NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN; ACGWGN, TTTCGGCC, RTAGGCAY, CCTGCATAGG'
text_0 = 'NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN; GCGGCCGC, GGCCGGCC, RTGCGCAY, CCTCGAGG'
text_1 = 'NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN; CRCCGGYG, CCTCGAGG, GTTTAAAC, ATTTAAAT, CGCGCGCG, GGCGCGCC, RTGCGCAY, TTAATTAA, CGTCGACG, GCCCGGGC'

new_tokens_test = new_tokenizer.batch_encode_plus([text, text_0, text_1], max_length=256, padding='max_length', truncation=True, return_tensors='tf')

new_input_ids_test = new_tokens_test['input_ids']
new_attention_mask_test = new_tokens_test['attention_mask']

new_texts_test = (new_input_ids_test, new_attention_mask_test)

# predict
new_y_pred = np.argmax(new_model.predict(new_texts_test), axis=1)

print('Reconstructed Model Prediction (0 = CutFree, 1 = CutFreeRL): ', new_y_pred)