In [1]:
############ Based on Akiyama sensei's source code.
# Here, I have defined the Pgraph class to generate our defined modular product graph from a SMILES or SMARTS string

from CGRtools import smiles
from CGRtools import ReactionContainer
import itertools as it
import igraph as ig
import os
import networkx as nx
######################################

# class for graph theoretic molecular structure
class MolGraph:
    
    # initialization of MolGraph class
    def __init__( self, str_smiles):
        self.mol = smiles( str_smiles )
        
        # list for atom label
        self.atoms = [ x for x,_ in self.mol.atoms() ]
        
        # dict for bond label -> two atom label
        self.bonds = dict( (i,(x,y)) for i,(x,y,_) in enumerate( self.mol.bonds() ) )
        
        # dict for atom label -> atomic number
        self.at_elem = dict( (i,x.atomic_number) for i,x in self.mol.atoms() )
        
        # dict for bond label -> two atomic numbers unique pair
        self.bn_elem = dict()
        for i,(x,y) in self.bonds.items():
            an1 = self.at_elem[x]
            an2 = self.at_elem[y]
            if an1 > an2:
                an1, an2 = an2, an1
            self.bn_elem[i] = an1, an2
        
        # dict for bond from two atom labels
        self.bn_ij = dict()
        for i, j in it.combinations( self.atoms, 2 ):
            if (i, j) in [ x for x in self.bonds.values() ]:
                if self.at_elem[i] < self.at_elem[j]:
                    self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = self.at_elem[i], self.at_elem[j]
                else:
                    self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = self.at_elem[j], self.at_elem[i]
            else:
                self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = None
            
    # from two bond labels, return the atomic number of common atom for two bonds
    def common_atom( self, i, j ):
        s, t = self.bonds[i]
        u, v = self.bonds[j]
        if s == u or s == v:
            return self.at_elem[s]
        if t == u or t == v:
            return self.at_elem[t]
        return None
    
    # from two bond labels, return the sequence of three atom labels consisting of two bonds if bonds are adjacent
    # in case of not adjacent, return None tuple
    def is_adj_bonds( self, i, j ):
        s, t = self.bonds[i]
        u, v = self.bonds[j]
        if s == u:
            return t, s, v
        if s == v:
            return t, s, u
        if t == u:
            return s, t, v
        if t == v:
            return s, t, u
        return None, None, None
    
class Pgraph:
    
    # initialization of Mapping class
    def __init__( self, str_rxn ):
        
        str_rct, str_prd = str_rxn.split(">>")
        self.rct = MolGraph( str_rct )
        self.prd = MolGraph( str_prd )
        self.rxn = ReactionContainer( [self.rct.mol], [self.prd.mol] )
        self.rxn.clean2d()

    def edge_product_graph( self ):
                                                                                                                                                                
        # definition for nodes of modular product
        nodes_modular = []
        for i, x in self.rct.bn_elem.items():
            for j, y in self.prd.bn_elem.items():
                if x == y:
                    nodes_modular.append((i,j))
        
        # definition for edges of modular product
        edges_modular = []
        for node1, node2 in it.combinations( nodes_modular, 2 ):
            r1, p1 = node1
            r2, p2 = node2
            if r1 != r2 and p1 != p2 and self.rct.common_atom( r1, r2 ) == self.prd.common_atom( p1, p2 ):
                i = nodes_modular.index( node1 )
                j = nodes_modular.index( node2 )
                edges_modular.append((i,j))
        return edges_modular
    



In [2]:
md=Pgraph("[CH2:2]([CH3:3])[SiH:1]([CH2:4][CH3:5])[CH2:6][CH3:7].[CH3:8][O:9][C:10]([CH2:11][CH2:12][C:13]([Cl:14])=[O:15])=[O:16]>>[CH2:2]([CH3:3])[Si:1]([CH2:4][CH3:5])([CH2:6][CH3:7])[Cl:14].[CH2:11]([C:10]([O:9][CH3:8])=[O:16])[CH2:12][CH:13]=[O:15]")
edges=md.edge_product_graph()

In [3]:
edges

[(0, 6),
 (0, 10),
 (0, 11),
 (0, 13),
 (0, 14),
 (0, 16),
 (0, 17),
 (0, 18),
 (0, 19),
 (0, 20),
 (0, 22),
 (0, 23),
 (0, 24),
 (0, 25),
 (0, 26),
 (0, 27),
 (0, 28),
 (0, 29),
 (0, 30),
 (0, 31),
 (0, 32),
 (0, 33),
 (0, 34),
 (0, 36),
 (0, 37),
 (0, 38),
 (0, 39),
 (0, 40),
 (0, 41),
 (0, 42),
 (0, 43),
 (0, 44),
 (0, 46),
 (0, 47),
 (0, 48),
 (0, 49),
 (0, 50),
 (0, 52),
 (0, 53),
 (0, 54),
 (0, 55),
 (0, 56),
 (0, 57),
 (0, 58),
 (0, 59),
 (0, 60),
 (1, 7),
 (1, 9),
 (1, 11),
 (1, 12),
 (1, 14),
 (1, 15),
 (1, 17),
 (1, 18),
 (1, 19),
 (1, 20),
 (1, 21),
 (1, 23),
 (1, 24),
 (1, 25),
 (1, 26),
 (1, 27),
 (1, 28),
 (1, 29),
 (1, 30),
 (1, 31),
 (1, 32),
 (1, 33),
 (1, 34),
 (1, 35),
 (1, 37),
 (1, 38),
 (1, 39),
 (1, 40),
 (1, 41),
 (1, 42),
 (1, 43),
 (1, 44),
 (1, 45),
 (1, 47),
 (1, 48),
 (1, 49),
 (1, 50),
 (1, 51),
 (1, 53),
 (1, 54),
 (1, 55),
 (1, 56),
 (1, 57),
 (1, 58),
 (1, 59),
 (1, 60),
 (2, 8),
 (2, 9),
 (2, 10),
 (2, 12),
 (2, 13),
 (2, 15),
 (2, 16),
 (2, 18),
 (2, 

In [8]:
############ Based on Yuta sensei's source code.

#I have defined the MaxCliques class to find all maximum cliques and measure execution time from a graph in nx.Graph format.
# Note: The IndependentSetSampler class was written by Yuta sensei in the original source code.
from optim.core.problem import *
from optim.ising.samplers import NealSampler

from functools import partial
from math import ceil, log, exp, sqrt
from statistics import mean, stdev
import time
from scipy.special import zeta
import networkx as nx
import pandas as pd
from sympy import Add

class IndependentSetSampler():

    def __init__(self, sampler, G):
        self.sampler = sampler
        self.G = G
        self.qubo = self.to_qubo(G)
        self._num_success = {}

    @staticmethod
    def to_qubo(G):
        x = []
        v_to_i = {}
        for i, v in enumerate(G.nodes):
            x.append(Variable(name=f'x_{v}', type=int, lb=0, ub=1))
            v_to_i[v] = i
        obj = Objective(name='objective', vars=x,
                        expr=1*(-Add(*x)+2*Add(*(x[v_to_i[u]]*x[v_to_i[v]]
                                              for (u, v) in G.edges))),
                        type='min')
        Q = Problem(name='sample', variables=x, objectives=[obj])
        return Q

    def sample(self, threshold=0, **kwargs):

        num_success, num_total, p_success = \
            self._num_success.get(threshold, (0, 0, 1.0))

        solve = self.sampler.solve
        Q = self.qubo

        node_to_varindex = {i: vi for vi, i in enumerate(self.G.nodes)}

        def is_target(s):
            penalty = sum(s[node_to_varindex[i]]*s[node_to_varindex[j]]
                          for (i, j) in self.G.edges)
            if (penalty == 0) and (sum(s) >= threshold):
                return True
            else:
                return False

        while True:
            num_reads = ceil(1/p_success)
            samples = solve(Q, num_reads=num_reads, beta_schedule_type='linear', num_sweeps=1000)
            target_solutions = \
                samples.solutions[list(map(is_target, samples.solutions))]

            if len(target_solutions) > 0:
                target_sample = {v.name: target_solutions[0][j]
                                 for j, v in enumerate(Q.variables)}
                independent_set = frozenset(
                    node for node in self.G.nodes
                    if target_sample[f'x_{node}'] == 1
                )
                size = len(independent_set)
                num_total += num_reads
                num_success += len(target_solutions)
                p_success = num_success/num_total
                break

            num_total += num_reads
            p_success = max(num_success, 1)/num_total

        self._num_success[threshold] = (num_success, num_total, p_success)

        return independent_set, size
    
class MaxCliques:
    def __init__(self, G):
        """
        Initialize the class with the graph G and prepare other needed variables.
        
        Parameters
        ----------
        G : networkx.Graph
            Input graph to find the maximum cliques.
        """
        self.G = G

    def find_maximum_cliques_cp(self):
        """
        Find all maximum cliques by integrating Carraghan's pruning into `networkx.find_cliques()`.
        
        Returns
        -------
        max_cliques : set
            A set of frozensets where each frozenset represents a maximum clique.
        execution_time : int
            Time taken to execute the clique-finding algorithm (in seconds).
        """
        start = time.perf_counter_ns()
        cbc_size = 0
        cbc_list = []

        if len(self.G) == 0:
            return set(), 0

        adj = {u: {v for v in self.G[u] if v != u} for u in self.G}

        Q = []
        cand = set(self.G)
        subg = cand.copy()
        stack = []
        Q.append(None)

        u = max(subg, key=lambda u: len(cand & adj[u]))
        ext_u = cand - adj[u]

        try:
            while True:
                if ext_u:
                    q = ext_u.pop()
                    cand.remove(q)
                    Q[-1] = q
                    if len(Q) + len(cand) >= cbc_size:
                        adj_q = adj[q]
                        subg_q = subg & adj_q
                        if not subg_q:
                            if len(Q) == cbc_size:
                                cbc_list.append(Q[:])
                            elif len(Q) > cbc_size:
                                cbc_size = len(Q)
                                cbc_list = [Q[:]]
                        else:
                            cand_q = cand & adj_q
                            if cand_q:
                                stack.append((subg, cand, ext_u))
                                Q.append(None)
                                subg = subg_q
                                cand = cand_q
                                u = max(subg, key=lambda u: len(cand & adj[u]))
                                ext_u = cand - adj[u]
                    else:
                        Q.pop()
                        subg, cand, ext_u = stack.pop()
                else:
                    Q.pop()
                    subg, cand, ext_u = stack.pop()
        except IndexError:
            pass

        end = time.perf_counter_ns()
        execution_time = (end - start)* 1e-9
        
        return set(frozenset(clique) for clique in cbc_list), execution_time
    
    def find_maximum_cliques_sa(self):
        """
        Find all maximum cliques by simulated by integrating our enumeration approach`.
        
        Returns
        -------
        max_cliques : set
            A set of frozensets where each frozenset represents a maximum clique.
        execution_time : int
            Time taken to execute the clique-finding algorithm (in seconds).
        """
        def enumerate_by_sampling(sampling_method, epsilon, kappa):

            def enumerate_with_threshold(S, t, m, threshold):
                while True:
                    while t < ceil(m*log(m*kappa/epsilon)):
                        sample, score = sampling_method(threshold=threshold)
                        if score == threshold:
                            S.add(sample)
                            t += 1
                        else:
                            S = set()
                            S.add(sample)
                            t = 1
                            m = 2
                            return S, t, m, score
                    if len(S) < m:
                        return S, t, m, threshold
                    m += 1

            S = set()
            t = 0
            m = 1
            threshold = 0
            while True:
                S, t, m, score = enumerate_with_threshold(S, t, m, threshold)
                if score == threshold:
                    break
                else:
                    threshold = score
            return S

        sa_sampler = NealSampler()
        epsilon=1e-2
        alpha = log(1 / epsilon) - 1
        beta = ((exp(-1) + (1 / 3) * log(1 / 3)) / (exp(-1) - 1 / 3)) * alpha
        inf_sum = zeta(2 * alpha) - sum([k ** (-2 * alpha) for k in range(1, 6)])
        r = exp(-alpha / (exp(1) - 1))
        kappa = (4 ** alpha) / (1 - exp(-beta)) * inf_sum + (2 - r) / ((1 - r) ** 2)
        sampler = IndependentSetSampler(sa_sampler, nx.complement(self.G))
        start = time.perf_counter_ns()
        cliques = enumerate_by_sampling(sampler.sample, epsilon, kappa)
        # Start time
        end = time.perf_counter_ns()
        execution_time = (end - start) * 1e-9  # Convert ns to seconds
        return cliques, execution_time         

In [9]:
# Create a graph
graph=nx.Graph(edges)

# Initialize the MaxCliques class with the graph
max_cliques_finder = MaxCliques(graph)

# Find maximum cliques using cp algorithm
max_cliques_cp, execution_time_cp = max_cliques_finder.find_maximum_cliques_cp()
print("Maximum Cliques (cp):", max_cliques_cp)
print("Execution Time (cp, s):", execution_time_cp)

# Find maximum cliques using SA-based method
max_cliques_sa, execution_time_sa = max_cliques_finder.find_maximum_cliques_sa()
print("Maximum Cliques (SA):", max_cliques_sa)
print("Execution Time (SA, seconds):", execution_time_sa)


Maximum Cliques (cp): {frozenset({0, 38, 6, 42, 10, 14, 16, 49, 23, 56, 60, 29, 31}), frozenset({1, 38, 7, 9, 42, 14, 15, 49, 23, 56, 60, 29, 31}), frozenset({2, 38, 8, 42, 10, 12, 16, 49, 21, 56, 60, 29, 31}), frozenset({0, 38, 6, 42, 11, 13, 49, 17, 22, 56, 60, 29, 31}), frozenset({2, 38, 8, 9, 42, 13, 15, 49, 22, 56, 60, 29, 31}), frozenset({1, 38, 7, 42, 11, 12, 49, 17, 21, 56, 60, 29, 31})}
Execution Time (cp, s): 0.033589400000000005
Maximum Cliques (SA): {frozenset({0, 6, 38, 10, 42, 14, 16, 49, 23, 56, 60, 29, 31}), frozenset({1, 38, 7, 9, 42, 14, 15, 49, 23, 56, 60, 29, 31}), frozenset({2, 38, 8, 10, 42, 12, 16, 49, 21, 56, 60, 29, 31}), frozenset({0, 6, 38, 42, 11, 13, 17, 49, 22, 56, 60, 29, 31}), frozenset({2, 38, 8, 9, 42, 13, 15, 49, 22, 56, 60, 29, 31}), frozenset({1, 38, 7, 42, 11, 12, 17, 49, 21, 56, 60, 29, 31})}
Execution Time (SA, seconds): 0.4589555


In [10]:
# This code was originally written by Akiyama sensei for calculating partial mappings from a SMILES or SMARTS string.
#I have integrated our SA-based maximum clique enumeration algorithm to find all maximum cliques, replacing the use of the igraph package.
##################### import lib 
import networkx as nx
import neal
from math import ceil, log, exp, sqrt
import time
import networkx as nx
import pandas as pd
from sympy import Add
from scipy.special import zeta
from CGRtools import smiles
from CGRtools import ReactionContainer
import itertools as it
import igraph as ig
from itertools import permutations
#########################

########################
class MolGraph:
    
    # initialization of MolGraph class
    def __init__( self, str_smiles):
        self.mol = smiles( str_smiles )
        
        # list for atom label
        self.atoms = [ x for x,_ in self.mol.atoms() ]
        
        # dict for bond label -> two atom label
        self.bonds = dict( (i,(x,y)) for i,(x,y,_) in enumerate( self.mol.bonds() ) )
        
        # dict for atom label -> atomic number
        self.at_elem = dict( (i,x.atomic_number) for i,x in self.mol.atoms() )
        
        # dict for bond label -> two atomic numbers unique pair
        self.bn_elem = dict()
        for i,(x,y) in self.bonds.items():
            an1 = self.at_elem[x]
            an2 = self.at_elem[y]
            if an1 > an2:
                an1, an2 = an2, an1
            self.bn_elem[i] = an1, an2
        
        # dict for bond from two atom labels
        self.bn_ij = dict()
        for i, j in it.combinations( self.atoms, 2 ):
            if (i, j) in [ x for x in self.bonds.values() ]:
                if self.at_elem[i] < self.at_elem[j]:
                    self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = self.at_elem[i], self.at_elem[j]
                else:
                    self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = self.at_elem[j], self.at_elem[i]
            else:
                self.bn_ij[(i,j)] = self.bn_ij[(j,i)] = None
            
    # from two bond labels, return the atomic number of common atom for two bonds
    def common_atom( self, i, j ):
        s, t = self.bonds[i]
        u, v = self.bonds[j]
        if s == u or s == v:
            return self.at_elem[s]
        if t == u or t == v:
            return self.at_elem[t]
        return None
    
    # from two bond labels, return the sequence of three atom labels consisting of two bonds if bonds are adjacent
    # in case of not adjacent, return None tuple
    def is_adj_bonds( self, i, j ):
        s, t = self.bonds[i]
        u, v = self.bonds[j]
        if s == u:
            return t, s, v
        if s == v:
            return t, s, u
        if t == u:
            return s, t, v
        if t == v:
            return s, t, u
        return None, None, None
    
class Mapping:
    
    # initialization of Mapping class
    def __init__( self, str_rxn ):
        
        str_rct, str_prd = str_rxn.split(">>")
        self.rct = MolGraph( str_rct )
        self.prd = MolGraph( str_prd )
        self.rxn = ReactionContainer( [self.rct.mol], [self.prd.mol] )
        self.rxn.clean2d()
    
    def atom_to_atom_mapping( self ):
        nodes_modular = []
        for i, x in self.rct.at_elem.items():
            for j, y in self.prd.at_elem.items():
                if x == y:
                    nodes_modular.append((i,j))
        
        edges_modular = []
        for node1, node2 in it.combinations( nodes_modular, 2 ):
            r1, p1 = node1
            r2, p2 = node2
            if r1 != r2 and p1 != p2 and self.rct.bn_ij[(r1,r2)] == self.prd.bn_ij[(p1,p2)]:
                i = nodes_modular.index( node1 )
                j = nodes_modular.index( node2 )
                edges_modular.append((i,j))
        
       # solution of max cliques by our framework.
       
        cliques=MaxCliques(nx.Graph(edges_modular)).find_maximum_cliques_sa ()[0]
        cqs = [tuple(s) for s in cliques]
        
        # conversion to mappings from cliques
        mappings = []
        for cq in cqs:
            mp = [ nodes_modular[x] for x in cq ]
            mp.sort()
            mappings.append( mp )
        
        return mappings
    
    # converter from bond-to-bond to atom-to-atom mapping
    def mapping_converter( self, btb_map ):

        ata_map = []
        
        for r, p in btb_map:
            r1, r2 = self.rct.bonds[r]
            r1_an = self.rct.at_elem[r1]
            r2_an = self.rct.at_elem[r2]
            
            p1, p2 = self.prd.bonds[p]
            p1_an = self.prd.at_elem[p1]
            p2_an = self.prd.at_elem[p2]
            
            # if bond is formed by different elements
            if r1_an != r2_an and p1_an != p2_an:
                
                if r1_an == p1_an and r2_an == p2_an:
                    ata_map.append( (r1, p1) )
                    ata_map.append( (r2, p2) )
                else:
                    ata_map.append( (r1, p2) )
                    ata_map.append( (r2, p1) )
        
        for (r1b,p1b),(r2b,p2b) in it.combinations( btb_map, 2 ):
            r1a, r2a, r3a = self.rct.is_adj_bonds( r1b, r2b )
            p1a, p2a, p3a = self.prd.is_adj_bonds( p1b, p2b )
            if r1a != None and p1a != None:
                ata_map.append( (r1a, p1a) )
                ata_map.append( (r2a, p2a) )
                ata_map.append( (r3a, p3a) )
        
        # removing duplication and sorting
        ata_map = list( set( ata_map ) )
        ata_map.sort()
        
        return ata_map
    
    
    def bond_to_bond_mapping( self ):
                                                                                                                                                                
        # definition for nodes of modular product
        nodes_modular = []
        for i, x in self.rct.bn_elem.items():
            for j, y in self.prd.bn_elem.items():
                if x == y:
                    nodes_modular.append((i,j))
        
        # definition for edges of modular product
        edges_modular = []
        for node1, node2 in it.combinations( nodes_modular, 2 ):
            r1, p1 = node1
            r2, p2 = node2
            if r1 != r2 and p1 != p2 and self.rct.common_atom( r1, r2 ) == self.prd.common_atom( p1, p2 ):
                i = nodes_modular.index( node1 )
                j = nodes_modular.index( node2 )
                edges_modular.append((i,j))
        

       # solution of max cliques by  SA based enumeration.    
        cliques=MaxCliques(nx.Graph(edges_modular)).find_maximum_cliques_sa ()[0]
        cqs = [tuple(s) for s in cliques]
        # conversion to mappings from cliques
        btb_mappings = []
        for cq in cqs:
            mp = [ nodes_modular[x] for x in cq ]
            btb_mappings.append( mp )
        
        # conversion to atom-to-atom mapping from bond-to-bond mapping
        # set-list operation remove duplicated pairs
        ata_mappings = [ list( set( self.mapping_converter( x ) ) ) for x in btb_mappings ]
        for x in ata_mappings:
            x.sort()
        
        return  ata_mappings
 

In [11]:
mpg = Mapping('[CH2:2]([CH3:3])[SiH:1]([CH2:4][CH3:5])[CH2:6][CH3:7].[CH3:8][O:9][C:10]([CH2:11][CH2:12][C:13]([Cl:14])=[O:15])=[O:16]>>[CH2:2]([CH3:3])[Si:1]([CH2:4][CH3:5])([CH2:6][CH3:7])[Cl:14].[CH2:11]([C:10]([O:9][CH3:8])=[O:16])[CH2:12][CH:13]=[O:15]')


In [12]:
mpg.bond_to_bond_mapping()

[[(1, 1),
  (2, 2),
  (3, 3),
  (4, 4),
  (5, 5),
  (6, 6),
  (7, 7),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)],
 [(1, 1),
  (2, 4),
  (3, 5),
  (4, 2),
  (5, 3),
  (6, 6),
  (7, 7),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)],
 [(1, 1),
  (2, 6),
  (3, 7),
  (4, 4),
  (5, 5),
  (6, 2),
  (7, 3),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)],
 [(1, 1),
  (2, 2),
  (3, 3),
  (4, 6),
  (5, 7),
  (6, 4),
  (7, 5),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)],
 [(1, 1),
  (2, 6),
  (3, 7),
  (4, 2),
  (5, 3),
  (6, 4),
  (7, 5),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)],
 [(1, 1),
  (2, 4),
  (3, 5),
  (4, 6),
  (5, 7),
  (6, 2),
  (7, 3),
  (8, 8),
  (9, 9),
  (10, 10),
  (11, 11),
  (12, 12),
  (13, 13),
  (15, 15),
  (16, 16)]]

In [13]:
csv_file_name = 'product_graph_241_corrected_data.csv'
data = pd.read_csv(csv_file_name, index_col='Unnamed: 0')


In [15]:
MaxCliques(nx.Graph(eval(data['product graph'][6]))).find_maximum_cliques_sa()[1]

54.4673796