# Initial attempt to learn from SF data
Just a prototype

In [None]:
import pathlib
import sqlite3
import sys

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import cm
from rdkit import Chem
from rdkit.Chem.Descriptors import rdMolDescriptors
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

sys.path.append(str(pathlib.Path().resolve().parents[1]))

from src.definitions import DB_PATH
from src.util.db_utils import SynFermDatabaseConnection

In [None]:
db = SynFermDatabaseConnection()  # we will use this for various simple queries

In [None]:
con = sqlite3.connect(DB_PATH)
df_full = pd.read_sql('SELECT * FROM experiments', con, index_col='id', coerce_float=False)
con.close()
print(f'Number of reactions (in total): {len(df_full)}')

In [None]:
# select experiment numbers for further analysis
start_exp_nr = 4  # kick out invalid experiments (SOP changed after exp3)
end_exp_nr = 29  # (inclusive)

df_full = df_full.loc[df_full['exp_nr'].between(start_exp_nr, end_exp_nr)]  
                      
print(f'Number of reactions considered here: {len(df_full)}')

In [None]:
# doublecheck that there are no "empty" experiments where results are missing
df_full[df_full["product_A_lcms_ratio"].isna()]

In [None]:
df_full.loc[~df_full["valid"].str.contains("ERROR", na=False), "valid"].value_counts().index

In [None]:
df_full["valid"].isna().value_counts()

In [None]:
# remove everything that says ERROR
df_full = df_full.loc[~df_full["valid"].str.contains("ERROR", na=False)]
len(df_full)

In [None]:
# plot everything with a value > 0
df_full.loc[df_full["product_A_lcms_ratio"].between(1e-10,1), "product_A_lcms_ratio"].plot.hist(bins=400)

In [None]:
# We assign classes 0 (failure) and 1 (success) by >0

df_full["product_A_outcome"] = 0
df_full.loc[df_full["product_A_lcms_ratio"] > 0, "product_A_outcome"] = 1

In [None]:
df_full["product_A_outcome"].value_counts()

In [None]:
# %pos class
df_full["product_A_outcome"].value_counts()[1] / len(df_full)

In [None]:
descriptor_names = list(rdMolDescriptors.Properties.GetAvailableProperties())

get_descriptors = rdMolDescriptors.Properties(descriptor_names)

def smi_to_descriptors(smile):
    mol = Chem.MolFromSmiles(smile)
    descriptors = []
    if mol:
        descriptors = np.array(get_descriptors.ComputeProperties(mol))
    return descriptors

In [None]:
df_full['descriptors'] = df_full["product_A_smiles"].apply(smi_to_descriptors)

In [None]:
df_pred = df_full.dropna(subset=["descriptors", "product_A_outcome"])

In [None]:
# extract our data
X = np.stack(df_pred["descriptors"].values.tolist())
y = df_pred["product_A_outcome"].values

In [None]:
X.shape

In [None]:
y.shape

In [None]:
# train-test split (produce indices)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1)

In [None]:
# train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1)

In [None]:
# scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

In [None]:
# hparam search
val_scores = []
Cs = np.logspace(-3, 4, num=100)
for C in Cs:
    model = LogisticRegression(C=C, solver="newton-cholesky")
    model.fit(X_train, y_train)
    y_hat = model.predict(X_val)
    val_scores.append(accuracy_score(y_val, y_hat))

In [None]:
# show hparam search results
plt.plot(Cs, val_scores)
plt.xscale("log")
plt.show()

In [None]:
# best C
best_C = Cs[np.argmax(val_scores)]
print(best_C)

In [None]:
# refit best model and evaluate on test set
model = LogisticRegression(C=best_C, solver="newton-cholesky")
model.fit(np.concatenate((X_train, X_val)), np.concatenate((y_train, y_val)))
y_hat = model.predict(X_test)

print(f"Accuracy: {accuracy_score(y_test, y_hat)}")
print(f"Balanced Accuracy: {np.sqrt(balanced_accuracy_score(y_test, y_hat))}")
print(f"F1-score: {np.sqrt(f1_score(y_test, y_hat))}")

In [None]:
y_test.mean()

In [None]:
plt.figure(figsize=(3,2.67))
sns.heatmap(confusion_matrix(y_test, y_hat), annot=True, fmt="01", cmap=sns.color_palette("mako_r", as_cmap=True))
plt.xlabel("Predicted")
plt.ylabel("True")
plt.tight_layout()
plt.savefig("confusion_matrix_LogReg_props.png", dpi=300)

In [None]:
# get highest coefficients
highest_coefs = np.argsort(np.abs(model.coef_.flatten()))[::-1]

In [None]:
# show most extreme coefficients with their names
for idx in highest_coefs[:5]:
    print(descriptor_names[idx])
    print(model.coef_.flatten()[idx])

In [None]:
# helper function
def iterable_to_nested_list(list_to_break_up, inner_len, pad_last=False, pad_value=None):
    """Break up list into nested lists of given length"""
    # invert list for fast pop()
    list_tmp = list(reversed(list_to_break_up))

    outer_list = []
    while len(list_tmp) > 0:
        inner_list = []
        while len(inner_list) < inner_len:
            if len(list_tmp) == 0:
                if pad_last:
                    while len(inner_list) < inner_len:
                        inner_list.append(pad_value)
                break
            inner_list.append(list_tmp.pop())
        outer_list.append(inner_list)
    return outer_list

In [None]:
# show most extreme descriptors
fig, ax = plt.subplots(figsize=(18,4))
ax.axis('tight')
ax.axis('off')

norm = plt.Normalize()
colors = iterable_to_nested_list(cm.PuOr(norm(model.coef_.flatten())), 5,True,  np.array([1.,1.,1.,1.]))

ax.table(cellText = iterable_to_nested_list(descriptor_names, 5, True, "-"), cellColours=colors, loc='center')
plt.show()

In [None]:
# let's train a different model instead: OHE of reactants + FFN
from sklearn.preprocessing import OneHotEncoder
ohe = OneHotEncoder(sparse_output=False, handle_unknown="infrequent_if_exist")
df_OHE = df_pred[["initiator", "monomer", "terminator"]]
df_OHE

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df_OHE.values, y)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train)

In [None]:
X_train_OHE = ohe.fit_transform(X_train)
X_val_OHE = ohe.transform(X_val)
X_test_OHE = ohe.transform(X_test)

In [None]:
import torch


In [None]:
class FFN(torch.nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.W_i = torch.nn.Linear(input_size, 600, bias=True)
        self.W_h = torch.nn.Linear(600, 400, bias=True)
        self.W_o = torch.nn.Linear(400, 1, bias=False)
        self.activation = torch.nn.ReLU()
        
    def forward(self, x):
        return self.W_o(self.activation(self.W_h(self.activation(self.W_i(x)))))

In [None]:
class SFDataset(torch.utils.data.Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
model = FFN(182)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
loss_fn = torch.nn.BCEWithLogitsLoss()

In [None]:
dataset_train = SFDataset(X_train_OHE, y_train)
dataset_val = SFDataset(X_val_OHE, y_val)
dataset_test = SFDataset(X_test_OHE, y_test)

In [None]:
training_loader = torch.utils.data.DataLoader(dataset_train, batch_size=32, shuffle=True)

In [None]:
validation_loader = torch.utils.data.DataLoader(dataset_val, batch_size=32, shuffle=False)

In [None]:
next(iter(training_loader))

In [None]:
torch.tensor([0,1]).reshape(-1,1).shape

In [None]:
def train_one_epoch(epoch_index):
    running_loss = 0.
    last_loss = 0.

    # Here, we use enumerate(training_loader) instead of
    # iter(training_loader) so that we can track the batch
    # index and do some intra-epoch reporting
    for i, data in enumerate(training_loader):
        # Every data instance is an input + label pair
        inputs, labels = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs)
        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels.reshape(-1, 1))
        
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()

        last_loss = running_loss / (i+1) # loss per batch
        #print('  batch {} loss: {}'.format(i + 1, last_loss))

    return last_loss


In [None]:
EPOCHS = 100

best_vloss = 1_000_000.

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch + 1))

    # Make sure gradient tracking is on, and do a pass over the data
    model.train(True)
    avg_loss = train_one_epoch(epoch)

    # We don't need gradients on to do reporting
    model.train(False)

    running_vloss = 0.0
    for i, vdata in enumerate(validation_loader):
        vinputs, vlabels = vdata
        voutputs = model(vinputs)
        vloss = loss_fn(voutputs, vlabels.reshape(-1,1))
        running_vloss += vloss

    avg_vloss = running_vloss / (i + 1)
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

    

    # Track best performance, and save the model's state
    if avg_vloss < best_vloss:
        best_vloss = avg_vloss
        model_path = 'model_{}'.format(epoch)
        torch.save(model.state_dict(), model_path)



In [None]:
model_path

In [None]:
best_model = FFN(182)
best_model.load_state_dict(torch.load(model_path))

In [None]:
best_model.eval()

with torch.no_grad():
    pred = torch.sigmoid(model(dataset_test.X))
    print(pred)
    predicted, actual = (pred > 0.5).float(), dataset_test.y
    print(f'Predicted: "{predicted}", Actual: "{actual}"')

In [None]:
(pred > 0.5).count_nonzero()

In [None]:
accuracy_score(predicted, actual)

In [None]:
# %pos class in test set
dataset_test.y.mean()

In [None]:
plt.figure(figsize=(3,2.33))
sns.heatmap(confusion_matrix(predicted, actual), vmin=0, vmax=len(dataset_test), annot=True, fmt="01", cmap=sns.color_palette("mako_r", as_cmap=True))
plt.xlabel("Predicted")
plt.ylabel("True")
plt.tight_layout()
plt.savefig("confusion_matrix_FFN_OHE.png", dpi=300)

In [None]:
f1_score(predicted, actual)

In [None]:
balanced_accuracy_score(predicted, actual)