In [None]:
# | default_exp output


In [None]:
# |export
import re
from collections import Counter, defaultdict
from typing import Iterable, List, Optional, Tuple

import joblib
import numpy as np
import pandas as pd
from nbdev.showdoc import *
from rdkit import Chem
from rdkit import DataStructs
from rdkit.Chem.Fingerprints import FingerprintMols
from sklearn.metrics import (max_error, mean_absolute_error,
                             mean_squared_error, r2_score)
from strsimpy.levenshtein import Levenshtein
from strsimpy.longest_common_subsequence import LongestCommonSubsequence
from strsimpy.normalized_levenshtein import NormalizedLevenshtein

from gpt3forchem.baselines import compute_fragprints
from gpt3forchem.api_wrappers import query_gpt3, extract_inverse_prediction

  warn(
  warn(
  from .autonotebook import tqdm as notebook_tqdm


# Analyzing results

> Analyze the outputs of the models


To measure how different our outputs are from the input data, we'll use string distances.


In [None]:
# | export 

_DEFAULT_AGGREGATIONS =  [
        ("min", lambda x: np.min(x)),
        ("max", lambda x: np.max(x)),
        ("mean", lambda x: np.mean(x)),
        ("std", lambda x: np.std(x)),
    ]

def aggregate_array(array, aggregations: Optional[List[Tuple[str, callable]]]= None): 
    if aggregations is None:
        aggregations = _DEFAULT_AGGREGATIONS

    aggregated_array = {}
    for k,v in aggregations:
        aggregated_array[k] = v(array)
    return aggregated_array

In [None]:
aggregate_array(np.array([1,2,3,4,5]), aggregations=[("mean", lambda x: np.mean(x))])

{'mean': 3.0}

If no aggregation functions are specified, the default aggregation functions are used.

In [None]:
aggregate_array(np.array([1,2,3,4,5]))

{'min': 1, 'max': 5, 'mean': 3.0, 'std': 1.4142135623730951}

In [None]:
# |export
def string_distances(
    training_set: Iterable[str], # string representations of the compounds in the training set
    query_string: str # string representation of the compound to be queried
):

    distances = defaultdict(list)

    metrics = [
        ("Levenshtein", Levenshtein()),
        ("NormalizedLevenshtein", NormalizedLevenshtein()),
        ("LongestCommonSubsequence", LongestCommonSubsequence()),
    ]

    aggregations = [
        ("min", lambda x: np.min(x)),
        ("max", lambda x: np.max(x)),
        ("mean", lambda x: np.mean(x)),
        ("std", lambda x: np.std(x)),
    ]

    for training_string in training_set:
        for metric_name, metric in metrics:
            distances[metric_name].append(
                metric.distance(training_string, query_string)
            )

    aggregated_distances = {}

    for k, v in distances.items():
        for agg_name, agg_func in aggregations:
            aggregated_distances[f"{k}_{agg_name}"] = agg_func(v)

    return aggregated_distances


In [None]:
# |hide
training_set = ["AAA", "BBB", "CCC"]
query_string = "BBB"
result = string_distances(training_set, query_string)

assert result["NormalizedLevenshtein_min"] == 0.0
assert result["NormalizedLevenshtein_max"] == 1.0


In [None]:
# |export

def is_valid_smiles(smiles: str) -> bool:
    """We say a SMILES is valid if RDKit can parse it."""
    try:
        mol = Chem.MolFromSmiles(smiles)
        if mol is None:
            return False
        return True
    except:
        return False

In [None]:
is_valid_smiles('aba')

[08:49:37] SMILES Parse Error: syntax error while parsing: aba
[08:49:37] SMILES Parse Error: Failed parsing SMILES 'aba' for input: 'aba'


False

In [None]:
is_valid_smiles("CCC")

True

In [None]:
# |export
def is_string_in_training_data(string: str, training_data: Iterable[str]) -> bool:
    """Check if a string is in the training data.
    
    Note that this is not an exact check of a molecule is in the training data 
    as the model might in principle generate an equivalent, non-canonical SMILES.
    However, one might expect that if a model remembers the training data
    it will simple remember the canonical SMILES.
    """
    return string in training_data

In [None]:
is_string_in_training_data('a a hahah', ['a', 'b', 'c'])

False

In [None]:
is_string_in_training_data('a a hahah', ['a', 'b', 'c', 'a a hahah'])

True

In [None]:
# | export

def get_similarity_to_train_mols(smiles: str, train_smiles: List[str]) -> List[float]: 
    train_mols = [Chem.MolFromSmiles(x) for x in train_smiles]
    mol = Chem.MolFromSmiles(smiles)

    train_fps = [Chem.RDKFingerprint(x) for x in train_mols]
    fp = Chem.RDKFingerprint(mol)

    s = DataStructs.BulkTanimotoSimilarity(fp, train_fps)
    return s


In [None]:
get_similarity_to_train_mols('CCC', ['CCC', 'CCCNC', "N"])

[1.0, 0.25, 0.0]

In [None]:
aggregate_array(get_similarity_to_train_mols('CCC', ['CCC', 'CCCNC']))

{'min': 0.25, 'max': 1.0, 'mean': 0.625, 'std': 0.375}

## Polymers

> Code specific for the polymer test case


In [None]:
# |export


def convert2smiles(string):
    new_encoding = {"A": "[Ta]", "B": "[Tr]", "W": "[W]", "R": "[R]"}

    for k, v in new_encoding.items():
        string = string.replace(k, v)

    string = string.replace("-", "")

    return string


To train the model, we simply use single letters, without any special characters such as brackets.


In [None]:
convert2smiles("AWWRRA")


'[Ta][W][W][R][R][Ta]'

To get the composition from the prompt, we will check how often we find a given monomer in the string.


In [None]:
# |export
def get_num_monomer(string, monomer):
    num = re.findall(f"([\d+]) {monomer}", string)
    try:
        num = int(num[0])
    except Exception:
        num = 0
    return num


In [None]:
get_num_monomer("Polymer with 3 A, 5 B and 0 C", "A")


3

In [None]:
# |export
def get_prompt_compostion(prompt):
    composition = {}

    for monomer in ["R", "W", "A", "B"]:
        composition[monomer] = get_num_monomer(prompt, monomer)

    return composition


In [None]:
# |export


def get_target(string, target_name="adsorption"):
    num = re.findall(f"([\d+]) {target_name}", string)
    return int(num[0])


In [None]:
# |export


def get_prompt_data(prompt):
    composition = get_prompt_compostion(prompt)

    return composition, get_target(prompt)


In [None]:
# |export


def get_completion_composition(string):
    parts = string.split("-")
    counts = Counter(parts)
    return dict(counts)


In [None]:
# |export

def string2performance(string):
    # we need to perform a bunch of tasks here:
    # 1) Featurize
    # 2) Query the model

    predicted_monomer_sequence = string.split("@")[0].strip()
    monomer_sq = re.findall("[(R|W|A|B)\-(R|W|A|B)]+", predicted_monomer_sequence)[0]
    composition = get_completion_composition(monomer_sq)
    smiles = convert2smiles(predicted_monomer_sequence)

    features = pd.DataFrame(featurize_many([smiles]))
    prediction = DELTA_G_MODEL.predict(features[FEATURES])
    return {
        "monomer_squence": monomer_sq,
        "composition": composition,
        "smiles": smiles,
        "prediction": prediction,
    }


In [None]:
# |export


def composition_mismatch(composition: dict, found: dict):
    distances = []

    # We also might have the case the there are keys that the input did not contain
    all_keys = set(composition.keys()) & set(found.keys())

    expected_len = []
    found_len = []

    for key in all_keys:
        try:
            expected = composition[key]
        except KeyError:
            expected = 0
        expected_len.append(expected)
        try:
            f = found[key]
        except KeyError:
            f = 0
        found_len.append(f)

        distances.append(np.abs(expected - f))

    expected_len = sum(expected_len)
    found_len = sum(found_len)
    return {
        "distances": distances,
        "min": np.min(distances),
        "max": np.max(distances),
        "mean": np.mean(distances),
        "expected_len": expected_len,
        "found_len": found_len,
    }


In [None]:
# |export


def get_regression_metrics(
    y_true,  # actual values (ArrayLike)
    y_pred,  # predicted values (ArrayLike)
) -> dict:

    try:
        return {
            "r2": r2_score(y_true, y_pred),
            "max_error": max_error(y_true, y_pred),
            "mean_absolute_error": mean_absolute_error(y_true, y_pred),
            "mean_squared_error": mean_squared_error(y_true, y_pred),
        }
    except Exception:
        return {
            "r2": np.nan,
            "max_error": np.nan,
            "mean_absolute_error": np.nan,
            "mean_squared_error": np.nan,
        }


In [None]:
get_regression_metrics([1, 2, 3, 4, 5], [1, 2, 3, 4, 5])


{'r2': 1.0,
 'max_error': 0,
 'mean_absolute_error': 0.0,
 'mean_squared_error': 0.0}

## Photoswitches

Code specific for the photoswitch case study.


First, we'll have some wrapper around GPR models that predict for us the $\pi-\pi^*$ and $n-\pi^*$ transition energies. 
For simplicity, we'll just go via joblib files.

In [None]:
# |export
def _predict_photoswitch(smiles_string: str,pi_pi_star_model_file='../models/pi_pi_star_model.joblib', n_pi_star_model_file='../models/n_pi_star_model.joblib'):
    """Predicting for a single SMILES string. Not really efficient due to the I/O overhead in loading the model."""
    pi_pi_star_model = joblib.load(pi_pi_star_model_file)
    n_pi_star_model = joblib.load(n_pi_star_model_file)
    fragprints = compute_fragprints([smiles_string])
    return pi_pi_star_model.predict(fragprints)[0], n_pi_star_model.predict(fragprints)[0]

In [None]:
# |export
def predict_photoswitch(smiles: Iterable[str], pi_pi_star_model_file='../models/pi_pi_star_model.joblib', n_pi_star_model_file='../models/n_pi_star_model.joblib'): 
    """Predicting for a single SMILES string. Not really efficient due to the I/O overhead in loading the model."""
    if not isinstance(smiles, Iterable):
        smiles = [smiles]
    pi_pi_star_model = joblib.load(pi_pi_star_model_file)
    n_pi_star_model = joblib.load(n_pi_star_model_file)
    fragprints = compute_fragprints(smiles)
    return pi_pi_star_model.predict(fragprints), n_pi_star_model.predict(fragprints)

In [None]:
predict_photoswitch(['C1=CC=C(/N=N/C2=CC=C(NCCC#N)C=C2)C=C1'])

(array([[390.91004025]]), array([[446.54990223]]))

In [None]:
# |  export

_PI_PI_STAR_REGEX = r'pi-pi\* transition wavelength of ([.\d]+) nm'
_N_PI_STAR_REGEX = r'n-pi\* transition wavelength of ([.\d]+) nm'

def get_expected_wavelengths(prompt): 
    pi_pi_star_match = re.search(_PI_PI_STAR_REGEX, prompt)
    n_pi_star_match = re.search(_N_PI_STAR_REGEX, prompt)
    pi_pi_star = float(pi_pi_star_match.group(1)) if pi_pi_star_match else None
    n_pi_star = float(n_pi_star_match.group(1)) if n_pi_star_match else None
    return pi_pi_star, n_pi_star

In [None]:
get_expected_wavelengths('What is a molecule pi-pi* transition wavelength of 404.0 nm###')

(404.0, None)

In [None]:
get_expected_wavelengths('What is a molecule pi-pi* transition wavelength of 321.0 nm and n-pi* transition wavelength of 424.0 nm###')

(321.0, 424.0)

The full evaluation is wrapped upped in the function below. Note that sampling at temperature > 0 is associated with some randomness. In other works, people samples $k$ times and took the best prediction for analysis. In the function below, we do not do this; we only sample once.

Query multiple times to estimate the variance.

In [None]:
# | export
def test_inverse_photoswitch(
    prompt_frame, model, train_smiles, temperature, max_tokens: int = 80
):
    completions = query_gpt3(
        model, prompt_frame, max_tokens=max_tokens, temperature=temperature
    )
    predictions = np.array(
        [
            extract_inverse_prediction(completions, i)
            for i in range(len(completions["choices"]))
        ]
    )

    valid_smiles = [is_valid_smiles(smiles) for smiles in predictions]

    smiles_in_train = [
        is_string_in_training_data(smiles, train_smiles)
        for smiles in predictions[valid_smiles]
    ]

    expected_pi_pi_star, expected_n_pi_star = [], []

    for prompt in prompt_frame["prompt"].values:
        pi_pi_star, n_pi_star = get_expected_wavelengths(prompt)
        expected_pi_pi_star.append(pi_pi_star)
        expected_n_pi_star.append(n_pi_star)

    expected_pi_pi_star = np.array(expected_pi_pi_star)
    expected_n_pi_star = np.array(expected_n_pi_star)

    has_expected_n_pi_star = np.array(
        [n_pi_star is not None for n_pi_star in expected_n_pi_star]
    )

    try:
        predicted_pi_pi_star, predicted_n_pi_star = predict_photoswitch(
            predictions[valid_smiles]
        )

        predicted_pi_pi_star = predicted_pi_pi_star.flatten()
        predicted_n_pi_star = predicted_n_pi_star.flatten()

        pi_pi_star_metrics = get_regression_metrics(
            expected_pi_pi_star[valid_smiles],
            predicted_pi_pi_star,
        )

        mask_n_valid_smiles = [
            has_expected_n_pi_star[i] for i in range(len(valid_smiles)) if valid_smiles[i]
        ]
        n_pi_star_metrics = get_regression_metrics(
            expected_n_pi_star[valid_smiles & has_expected_n_pi_star],
            np.array(predicted_n_pi_star)[mask_n_valid_smiles],
        )

        error_pi_pi_star = np.abs(expected_pi_pi_star[valid_smiles] - predicted_pi_pi_star)
        error_n_pi_star = np.abs(
            expected_n_pi_star[valid_smiles & has_expected_n_pi_star]
            - np.array(predicted_n_pi_star)[mask_n_valid_smiles]
        )

        min_error_pi_pi_star = predictions[valid_smiles][np.argmin(error_pi_pi_star)]
        min_error_n_pi_star = predictions[valid_smiles & has_expected_n_pi_star][np.argmin(error_n_pi_star)]

        max_error_pi_pi_star = predictions[valid_smiles][np.argmax(error_pi_pi_star)]
        max_error_n_pi_star = predictions[valid_smiles & has_expected_n_pi_star][np.argmax(error_n_pi_star)]

        error_pi_pi_star_w_n = np.abs(
            expected_pi_pi_star[valid_smiles & has_expected_n_pi_star]
            - np.array(predicted_pi_pi_star)[mask_n_valid_smiles]
        )

        total_error_pi_pi_star = error_n_pi_star + error_pi_pi_star_w_n
        min_total_error_pi_pi_star = predictions[valid_smiles & has_expected_n_pi_star][
            np.argmin(total_error_pi_pi_star)
        ]
        max_total_error_pi_pi_star = predictions[valid_smiles & has_expected_n_pi_star][
            np.argmax(total_error_pi_pi_star)
        ]

        mol_similarity_metrics = pd.DataFrame(
            [
                aggregate_array(get_similarity_to_train_mols(smile, train_smiles))
                for smile in predictions[valid_smiles]
            ]
        )
    except Exception:
        smiles_in_train = []
        predicted_pi_pi_star = None
        predicted_n_pi_star = None
        expected_pi_pi_star = None
        expected_n_pi_star = None
        valid_smiles = []
        pi_pi_star_metrics= None
        n_pi_star_metrics = None
        min_error_pi_pi_star = None
        max_error_pi_pi_star = None
        min_error_n_pi_star = None
        max_error_n_pi_star = None
        min_total_error_pi_pi_star = None
        max_total_error_pi_pi_star = None
        mol_similarity_metrics = pd.DataFrame([])

    results = {
        "meta": {
            "temperature": temperature,
            "max_tokens": max_tokens,
            "model": model,
        },
        "predictions": predictions,
        "valid_smiles": valid_smiles,
        "smiles_in_train": smiles_in_train,
        "predicted_pi_pi_star": predicted_pi_pi_star,
        "predicted_n_pi_star": predicted_n_pi_star,
        "expected_pi_pi_star": expected_pi_pi_star,
        "expected_n_pi_star": expected_n_pi_star,
        "fractions_valid_smiles": np.mean(valid_smiles),
        "fractions_smiles_in_train": np.mean(smiles_in_train),
        "pi_pi_star_metrics": pi_pi_star_metrics,
        "n_pi_star_metrics": n_pi_star_metrics,
        "examples": {
            "min_error_pi_pi_star": min_error_pi_pi_star,
            "max_error_pi_pi_star": max_error_pi_pi_star,
            "min_error_n_pi_star": min_error_n_pi_star,
            "max_error_n_pi_star": max_error_n_pi_star,
            "min_total_error_pi_pi_star": min_total_error_pi_pi_star,
            "max_total_error_pi_pi_star": max_total_error_pi_pi_star,
        },
        "mol_similarity_metrics": mol_similarity_metrics,
        "mol_similarity_metrics_mean": mol_similarity_metrics.mean(),
        "mol_similarity_metrics_std": mol_similarity_metrics.std(),
    }

    return results
