In [1]:
import os
import pathlib
import PIL
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib as mpl
from rdkit import Chem
from rdkit.Chem.Draw import rdMolDraw2D
import pygraphviz
import cairosvg

These are helper functions for the analysis below

In [2]:
def find_searched_structures(config_folder):
    '''
    Find the structures that have been searched
    
    Returns:
        searched_structures (list): List of structures that have been searched
    '''
    searched_structures = []
    for config_file in config_folder.iterdir():
        if config_file.is_file():
            with open(config_file, 'r') as f:
                lines = f.readlines()
            for line in lines:
                if line.startswith("  input_structure:"):
                    searched_structures.append(line.split(":")[1].strip())
    return searched_structures

def parse_search_data(results_folder, equilibrium_cache_file, searched_structures):
    '''
    Parse the data from the results folder and return the reactions and barriers

    Returns:
        reactions (list): List of reactions
        barriers (list): List of barriers
    '''
    
    # Extract all of the equilibrium energies from the equilibrium cache file
    equilibrium_energies = []
    equilibrium_structures = []
    with open(equilibrium_cache_file, 'r') as f:
        lines = f.readlines()
    lines = lines[1:]
    for line in lines:
        equilibrium_structures.append(line.split(",")[0])
        equilibrium_energies.append(float(line.split(",")[1]))

    # Iterate through all the folders in the results folder and parse the data in qm_calcs_smiles.csv
    reactions = []
    ts_energies = []
    for folder in results_folder.iterdir():
        if folder.is_dir():
            # Get the reaction and barrier data if it exists
            try:
                df = pd.read_csv(str(folder) + '/qm_calcs_smiles.csv')
                for rxn, energy in zip(list(df['smiles'].values), list(df['ts_energy'].values)):
                    reverse_rxn = rxn.split(">>")[1] + ">>" + rxn.split(">>")[0]
                    # If the reaction and reverse reaction are already in reactions, replace the barrier if the new one is lower
                    if rxn in reactions and reverse_rxn in reactions:
                        index = reactions.index(rxn)
                        reverse_index = reactions.index(reverse_rxn)
                        if energy < ts_energies[index]:
                            ts_energies[index] = energy
                            ts_energies[reverse_index] = energy
                    # If the reaction is already in reactions but the reverse reaction isn't, check whether the reverse reaction reactant is in searched structures and add it if it is
                    # Also use the lowest barrier
                    elif rxn in reactions and reverse_rxn not in reactions:
                        index = reactions.index(rxn)
                        min_energy = min(energy, ts_energies[index])
                        ts_energies[index] = min_energy
                        if rxn.split(">>")[1] in searched_structures:
                            reactions.append(reverse_rxn)
                            ts_energies.append(min_energy)
                    # If the reaction isn't in reactions but the reverse reaction is, add the reaction and use the lowest barrier
                    elif rxn not in reactions and reverse_rxn in reactions:
                        index = reactions.index(reverse_rxn)
                        min_energy = min(energy, ts_energies[index])
                        ts_energies[index] = min_energy
                        reactions.append(rxn)
                        ts_energies.append(min_energy)
                    # If the reaction and reverse reaction aren't in reactions, add the reaction, check whether the reverse reaction reactant is in searched structures and add it if it is
                    # Also use the lowest barrier
                    else:
                        reactions.append(rxn)
                        ts_energies.append(energy)
                        if rxn.split(">>")[1] in searched_structures:
                            reactions.append(reverse_rxn)
                            ts_energies.append(energy)
            except:
                pass

    # Find the barriers
    barriers = []
    for i, reaction in enumerate(reactions):
        if reaction.split(">>")[0] in equilibrium_structures:
            index = equilibrium_structures.index(reaction.split(">>")[0])
            if ts_energies[i] - equilibrium_energies[index] < 0:
                if reaction.split(">>")[1] in equilibrium_structures:
                    product_index = equilibrium_structures.index(reaction.split(">>")[1])
                    barriers.append(equilibrium_energies[product_index] - equilibrium_energies[index])
                    continue
            barriers.append(ts_energies[i] - equilibrium_energies[index])
        else:
            barriers.append(1000)
    
    return reactions, barriers

def find_nnp_barriers(reactions, results_folder, nnp_file_name):
    '''
    Goes to all of the folders in the results folder and finds the barriers for the reactions in the nnp_file_name file and saves the smallest barrier for each reaction

    Returns:
        barriers (list): List of barriers
    '''
    barriers = []
    for reaction in reactions:
        barrier = 1000
        for folder in results_folder.iterdir():
            if folder.is_dir():
                try:
                    df = pd.read_csv(str(folder) + nnp_file_name)
                    for i, smiles in enumerate(list(df['smiles'].values)):
                        if smiles == reaction:
                            barrier = min(barrier, df['ts_energy'].values[i])
                except:
                    pass
        barriers.append(barrier)

    return barriers

def draw_mol_to_folder(mol, folder_name, i):
    '''
    Draw the molecule to the folder with the name folder_name

    Returns:
        PIL.Image: The image of the molecule
    '''
    if os.path.exists(folder_name) == False:
        os.mkdir(folder_name)
    svg_text = rdMolDraw2D.MolToACS1996SVG(mol)
    with open(folder_name + "/" + str(i) + ".svg", 'w') as f:
        f.write(svg_text)
    cairosvg.svg2png(url=folder_name + "/" + str(i) + ".svg", write_to=folder_name + "/" + str(i) + ".png", dpi=500, output_height=200, output_width=200)
    return PIL.Image.open(folder_name + '/' + str(i) + '.png')

def make_weighted_graph_from_reactions(reactions, barriers, nnp_barriers, canon_reactant, searched_structures, max_barrier=60):
    '''
    Make a directed networkx graph where the reactants and products are nodes and the reactions are edges. The edges are weighted by the barrier height.
    
    Returns:
        G (networkx graph): The graph with the reactants and products as nodes and the reactions as edges.
    '''
    ha_to_kcal = 627.509
    G = nx.DiGraph()
    for i, reaction in enumerate(reactions):
        barrier = barriers[i] * ha_to_kcal
        if barrier > max_barrier:
            continue
        nnp_barrier = nnp_barriers[i] * ha_to_kcal
        reactant = reaction.split(">>")[0]
        product = reaction.split(">>")[1]
        #if product == canon_reactant:
        #    continue
        if reactant == product:
            continue
        if G.has_node(reactant) == False:
            G.add_node(reactant, image = draw_mol_to_folder(Chem.MolFromSmiles(reactant), "images", i))
        if G.has_node(product) == False:
            G.add_node(product, image = draw_mol_to_folder(Chem.MolFromSmiles(product), "images", i))
        #if G.has_edge(product, reactant):
        #    continue
        if G.has_edge(reactant, product):
            G[reactant][product]['weight'] = max(0, min(G[reactant][product]['weight'], barrier))
            G[reactant][product]['nnp_weight'] = max(0, min(G[reactant][product]['nnp_weight'], nnp_barrier))
        else:
            G.add_edge(reactant, product, weight=max(0, barrier), nnp_weight=max(0, nnp_barrier))
    
    return G

def pick_next_search(G, initial_node, searched_structures):
    '''
    Pick the next node to search from the graph G. The next node is the one with the smallest maximum barrier that hasn't been searched.

    Returns:
        next_search (str): The next node to search
    '''
    # Find the node that isn't a parent node and has the smallest maximum barrier from the intial node
    next_search = None
    edges = []
    for node in G.successors(initial_node):
        edges.append((initial_node, node, G[initial_node][node]['weight']))
    
    # Order edges by weight
    edges = sorted(edges, key=lambda x: x[2])
    searched_edges = [edges[0]]
    # Find the first node that hasn't been searched
    count = 0
    while True:
        count += 1
        if len(list(G.successors(edges[0][1]))) == 0 and edges[0][1] not in searched_structures:
            next_search = edges[0][1]
            break
        else:
            for node in G.successors(edges[0][1]):
                edges.append((edges[0][1], node, G[edges[0][1]][node]['weight']))
            # Remove any edges that have already been searched
            edges = [edge for edge in edges if edge not in searched_edges]
            edges = sorted(edges, key=lambda x: x[2])
            # Add searched edge to searched edges
            searched_edges.append(edges[0])
        if count > 200:
            print("Uh oh, this shouldn't happen!")
            break
        
    return next_search, searched_edges

def get_drawing_info(G):
    '''
    Get the drawing information for the graph G
    '''
    fig, ax = plt.subplots(figsize=(25,15), dpi=400)
    pos = nx.nx_pydot.graphviz_layout(G, prog="neato") 
    cmap = plt.cm.viridis
    edge_colors = []
    for u, v, d in G.edges(data=True):
        edge_colors.append(d['weight'])
    return fig, ax, pos, cmap, edge_colors

def write_next_config(next_search, results_folder_name, search_num):
    '''
    Write the next config file to search the next node

    Returns
        None
    '''
    search = str(search_num).zfill(2)
    config_file = "../config/" + results_folder_name + "/search_" + search + ".yaml"
    old_config_file = "../config/" + results_folder_name + "/search_01.yaml"

    with open(old_config_file, 'r') as f:
        lines = f.readlines()
    
    with open(config_file, 'w') as f:
        for line in lines:
            if line.startswith("  input_structure:"):
                line = "  input_structure: " + next_search + "\n"
            f.write(line)

This cell is used to build the chemical reaction network and select the next search candidate from the RAMP results. Input your canon reactant SMILES, results folder name (which will be the same as the config folder), and search number

In [None]:
canon_reactant = "C/C=C/C=C\C=C/C=C/C"
results_folder_name = "example_config"
search_number = 1

equilibrium_cache_file = results_folder_name + "/equilibrium_cache.csv"
results_folder = pathlib.Path(results_folder_name)
searched_structures = find_searched_structures(pathlib.Path("../config/" + results_folder_name))
reactions, barriers = parse_search_data(results_folder, equilibrium_cache_file, searched_structures)
nnp_barriers = find_nnp_barriers(reactions, results_folder, "/nnp_barrier_preds.csv")

G = make_weighted_graph_from_reactions(reactions, barriers, nnp_barriers, canon_reactant, searched_structures, max_barrier=200)
next_search, path = pick_next_search(G, canon_reactant, searched_structures)

print(G)
print(next_search)
print(path)

Run this cell to visualize the chemical reaction network. Adjust scaling_factor to change the size of the molecules (bigger graphs need smaller scaling factors) and adjust source_margin to adjust the length of the arrows 

In [None]:
scaling_factor = 0.001
source_margin = 88

fig, ax, pos, cmap, edge_colors = get_drawing_info(G)
edges = nx.draw_networkx_edges(G, pos, ax=ax, arrows=True, min_source_margin=source_margin, min_target_margin=source_margin, edge_color=edge_colors, edge_cmap=cmap, style="dashed", connectionstyle="arc3,rad=0.05")
for i in range(G.number_of_edges()):
    edges[i].set_alpha(edge_colors[i]/max(edge_colors))
pc = mpl.collections.PatchCollection(edges, cmap=cmap)
pc.set_array(edge_colors)
cbar = plt.colorbar(pc, ax=ax, label="Reaction Barrier (kcal/mol)", location="bottom")
cbar.set_label("Reaction Barrier (kcal/mol)", fontsize=32)
cbar.ax.tick_params(labelsize=32)
cbar.outline.set_linewidth(3)
cbar.ax.tick_params(width=3, length=10)
tr_figure = ax.transData.transform
tr_axes = fig.transFigure.inverted().transform
icon_size = (ax.get_xlim()[1] - ax.get_xlim()[0]) * scaling_factor
icon_center = icon_size / 2.0
for n in G.nodes:
    xf, yf = tr_figure(pos[n])
    xa, ya = tr_axes((xf, yf))
    a = plt.axes([xa - icon_center, ya - icon_center, icon_size, icon_size])
    a.imshow(G.nodes[n]["image"])
    if n == canon_reactant or n == next_search:
        a.axis("on")
        a.set_xticks([])
        a.set_yticks([])
        a.set_title("Reactant" if n == canon_reactant else "Next Search", loc="center")
    else:
        a.axis("off")
edges = nx.draw_networkx_edges(G, pos, ax=ax, arrows=True, width=4, min_source_margin=source_margin, min_target_margin=source_margin, edge_color=edge_colors, edge_cmap=cmap, style="dashed", connectionstyle="arc3,rad=0.05")
# Make a list for the path that doesn't have the weights
path_no_weights = [(edge[0], edge[1]) for edge in path]
for i, edge in enumerate(G.edges):
    if edge in path_no_weights:
        edges[i].set_linewidth(6)
        edges[i].set_linestyle("-")
ax.set_axis_off()

Run this cell to write a config file for the next search (VERY IMPORTANT TO HAVE THE SEARCH NUMBER CORRECT). It will use the settings from the previous search

In [None]:
write_next_config(next_search, results_folder_name, search_number)