In [3]:
# Author: Gergely Zahoranszky-Kohalmi, PhD
#
# Email: gergely.zahoranszky-kohalmi@nih.gov
#
# Organization: NCATS/NIH
#
# Aim: Top-down aggregated yield computation using matrix operations, i.e. we compute the efficiency of producing 1 unit quantity of the target molecule TM.
#



In [4]:
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import scipy
import sys


In [5]:
#A = np.matrix([[0, 3, 0], [0, 0, 8], [0, 0, 0]])¶
#r = np.array([4, 0, 0])

# A = np.matrix([
#                 [0, 3, 0, 0, 0, 0, 0],
#                 [0, 0, 8, 2, 0, 0, 0],
#                 [0, 0, 0, 0, 10, 0, 0],
#                 [0, 0, 0, 0, 0, 25, 12],
#                 [0, 0, 0, 0, 0, 25, 0],
#                 [0, 0, 0, 0, 0, 0, 0],
#                 [0, 0, 0, 0, 0, 0, 0]
#             ])

# r = np.array([4, 0, 0, 0, 0, 0, 0])

# print (A)

In [6]:
# Input: a viable sysnthesis route (VSR) which in turn is projected to a "reaction network"

# Here we only assemble the projected reaction network, so the complete logic will need toi include the projectiopn as well.

# Projected reaction network:

# here the string-ids are placeholders for rxids, which are also strings

G = nx.DiGraph()

# G.add_edge('1', '2')
# G.add_edge('2', '3')
# G.add_edge('2', '4')
# G.add_edge('3', '5')
# G.add_edge('4', '6')
# G.add_edge('4', '7')
# G.add_edge('5', '6')


# >>>>>
G.add_edge('2', '1')
G.add_edge('3', '2')
G.add_edge('4', '2')
G.add_edge('5', '3')
G.add_edge('6', '4')
G.add_edge('7', '4')
G.add_edge('6', '5')

G.nodes['1']['yield']  = 0.25
G.nodes['2']['yield']  = 0.333
G.nodes['3']['yield']  = 0.125
G.nodes['4']['yield']  = 0.5
G.nodes['5']['yield']  = 0.1
G.nodes['6']['yield']  = 0.04
G.nodes['7']['yield']  = 0.083
# <<<<<<

# >>>> test for single-step synthesis rout (one reaction node in the projected network) 
# G.add_node('1')
# G.nodes['1']['yield'] = 0.25
# <<<<<



In [7]:
def visualize_graph (G):
    nx.draw_networkx(G, arrows=None, with_labels=True)

    plt.show()
    
def identify_terminal_rxn (G):
    # !!! This function operates on the *reverse* synthesis route, hence the edges are flipped!!!
    # Identify the node without outgoing edge, that is your target molecule (TM's) reaction (terminal reaction, TR), that we need to backtrack from
    # Therefore, the "root-node", i.e. the termonal reaction (Tr), the one that produces the target molecule TM is the one without incoming edges.
    
    root_node_idx = None
    
    first = True
    
    for n in G.nodes():
        in_deg = G.in_degree(n)
        if in_deg == 0:
            if first:
                root_node_idx = n
                first = False
            else:
                
                # Please replace this part with proper error handling, e.g. by throibg a multiple root  node in projected network error,
                # or something along the lines. Thanks!
                
                print (f'[ERROR] There should be only one root node in the projected network, multiple found: Reaction rxid: {n}')
                sys.exit(-1)
    
    return (root_node_idx)


def identify_leaf_nodes (G):
    # !!! This function operates on the *reverse* synthesis route, hence the edges are flipped!!!
    # Therefore, leaf-nodes are those that have no out-going edge as opposed to the "original" synthesis route graphs, where leaf
    # are the ones with no incoming edges.
    
    
    leaf_nodes = []
    
    has_leaf_node = False
    
    for n in G.nodes():
        out_deg = G.out_degree(n)

        if out_deg == 0:
            leaf_nodes.append(n)
            has_leaf_node = True

    # Please replace this part with proper error handling, e.g. by throibg a multiple root  node in projected network error,
    # or something along the lines. Thanks!
    
    if not has_leaf_node:
        
        print (f'[ERROR] There is no leaf node in the projected network!')
        sys.exit(-1)


    
    return (leaf_nodes)   
    

def synth_route_to_adjacency_matrix (G):
    # reverse edges
    G = G.reverse ()

    # keeping track of nodes
    node_map = {}
    node_reverse_map = {}

    
    for i, n in enumerate (G.nodes()):
        node_map[i] = n
        node_reverse_map[n] = i
    
    
    # computing 1/yield (accounting for Div by Zero and for non-sensical negative yields)
    nodes = G.nodes()
    for n in nodes:
        y = G.nodes[n]['yield']
        
        if y > 0.0:
            G.nodes[n]['one_over_yield'] = 1 / y
        
        else:
            print ('[ERROR] Invalid yield associated with node: {n}')
            sys.exit(-1)
    
    # assign edge weights by assuming the end node's yield value
    for e in G.edges():
        start_node = e[0]
        end_node = e[1]
        G.edges[e]['one_over_yield'] = G.nodes[end_node]['one_over_yield']
        #print (G.edges[e]['one_over_yield'])
        

    # convert to weighted adjecency matrix
    A = nx.adjacency_matrix(G, nodelist=nodes, dtype=float, weight='one_over_yield')

    # convert to numpy array (to be tested)
    A = A.toarray()

    # convert to binary adjecency matrix
    A_bin = nx.adjacency_matrix(G, nodelist=nodes, dtype=float)

    # convert to numpy array (to be tested)
    A_bin = A_bin.toarray()

    
    #print (A_bin)

    return ((G, A, A_bin, node_map, node_reverse_map))



# def is_computation_over (node_adjusted_yield_row_vector):
#     L = node_adjusted_yield_row_vector > 0

#     # if any of the row vector of adjusted aggregated yields of nodes is greater than 0 then return False, i.e. the computation is not yet over
    
#     # otherwise the computation is over (all row vector elements are 0)

#     return (not L.any()) 


def report_accumulated_scale_of_leaves (leaf_node_indices, row_vector):
    actual = 0.0

    for idx in leaf_node_indices:
        actual += float(row_vector[idx])



    return (float(actual))


def accumulate_scale (G, rt_idx, node_map, A, is_reference = False):
    # G: Input reversed projected reaction network
    # rt_idx: the terminal reaction's index (the one that synthesizes TM)
    # node_map: node index - node id map of nodes, that encodes the row/column indices of the adjacency matrix (and row-vector)
    # A: input adjacency matrix , either weighted or binary (but float, as data type)
    # is_reference: whether we compute the accumulated scales of reactions based on yield (set to False, default) or based on a hypothetical, ideal
    #               all 100% yield (set to True).
    
    # >>>>
    row_vector = []

    for i in range(len(G.nodes())):

        if i != rt_idx:
            row_vector.append(0)
        
        else:
        
            if is_reference:
                row_vector.append(1.0)
            
            else:
                rt = node_map[rt_idx]
                row_vector.append(G.nodes[rt]['one_over_yield'])
                
                

    row_vector = np.array (row_vector, dtype = float)
            
    # --- The magic happens from here ...
    
    #first = True

    C = row_vector

    # We go with the longest path for now, as it is exact:
    max_path_length = nx.dag_longest_path_length (G)
    
    for i in range(max_path_length):


    # There is an alternetive approach to monitor the end of iteration, that ends the iteration when the row-vector turns to all-ZERO.
    # Not sure which is approach is faster, monitoring if the row-vector is all-ZERO, or computing the longest path in the DAG (current choice).
    # For now we go with the longest-path approach as it is exact hence provides a boundery on the number oteration cycles, which
    # we can exploit in for the proof.
    #
    #
    # If you want to go with the monitoring the all-zero row-vector approach, the necessary functions are already in the 
    # "Functions" section of this workflow, but you need to comment out the two commands above, i.e.
    # # max_path_length = ... 
    # and 
    # # for i in range(max_path_length): ...
    #
    # and uncomment this line:
    # while not is_computation_over(row_vector):

        row_vector = np.matmul(row_vector, A)
        C = C + row_vector

    # --- ... to here


    # <<<<<

    return (C)


def aggregate_yield (G):
    #visualize_graph (G)
    
    # Convert projected reaction network to adjacency matrix
    (G, A, A_bin, node_map, node_reverse_map) = synth_route_to_adjacency_matrix (G)


    #visualize_graph (G)
    
    # Initialize row vector of aggregated yields
    # - identify the terminal reaction Tr (of which the product is the target molecule, TM)
    
    rt = identify_terminal_rxn (G)
    rt_idx = node_reverse_map[rt]


    leaf_nodes = identify_leaf_nodes (G)
    #print (leaf_nodes)

    leaf_node_indices = [node_reverse_map[n] for n in leaf_nodes]
    #print (leaf_node_indices)
    
    
    C = accumulate_scale (G, rt_idx, node_map, A, is_reference = False)
    C_ref = accumulate_scale (G, rt_idx, node_map, A_bin, is_reference = True)
    
    #print (C) 

    # compute ratio between the ideal and computed => report this as aggregated yield, done!
    
    accumulated_scale_of_leaves = report_accumulated_scale_of_leaves  (leaf_node_indices, C)
    accumulated_scale_of_leaves_ref = report_accumulated_scale_of_leaves (leaf_node_indices, C_ref)


    
    

    aggregated_yield = float (100.0 * float(accumulated_scale_of_leaves_ref / accumulated_scale_of_leaves))



    print (f'[*] Aggregated yield: {aggregated_yield}% .')
    
    return (aggregated_yield)




In [8]:
#nx.draw_networkx(G, arrows=None, with_labels=True)

#plt.show()

#(A, nodes, node_map, node_reverse_map) = synth_route_to_adjacency_matrix (G)
#print (A)

In [9]:
aggregate_yield (G)

# Result with monitoring all zero row-vector:



[*] Aggregated yield: 0.012041388324135932% .


0.012041388324135932

In [10]:
# References

# Ref: https://numpy.org/doc/2.1/reference/generated/numpy.matrix.html
# Ref: https://networkx.org/documentation/stable/reference/generated/networkx.convert_matrix.to_numpy_array.html
# Ref: https://numpy.org/doc/stable/reference/generated/numpy.matmul.html
# Ref: https://www.nagwa.com/en/explainers/432180315293/#:~:text=Definition%3A%20Square%20of%20a%20Matrix&text=In%20other%20words%2C%20just%20like,multiplying%20the%20matrix%20by%20itself.
# Ref: https://numpy.org/doc/2.1/reference/generated/numpy.transpose.html
# Ref: https://stackoverflow.com/questions/20229822/check-if-all-values-in-list-are-greater-than-a-certain-number
# Ref: https://stackoverflow.com/questions/10062954/valueerror-the-truth-value-of-an-array-with-more-than-one-element-is-ambiguous
# Ref: https://www.geeksforgeeks.org/valueerror-the-truth-value-of-an-array-with-more-than-one-element-is-ambiguous-use-a-any-or-a-all/
# Ref: https://networkx.org/documentation/stable/reference/generated/networkx.linalg.graphmatrix.adjacency_matrix.html
# Ref: https://stackoverflow.com/questions/26576524/how-do-i-transform-a-scipy-sparse-matrix-to-a-numpy-matrix
# Ref: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.dag.dag_longest_path.html
# Ref: https://stackoverflow.com/questions/75853349/use-networkx-to-calculate-the-longest-path-to-a-given-node
#



