# Subgraph Matching Algorithms
---
### Basic Information
**Description:** This script defines our symmetry-based subgraph matching (SBSM) algorithm and conventional subgraph matching algorithms such as VF2 & VF2++.

In [1]:
# Load packages
from rustworkx import *
from rustworkx.visualization import mpl_draw
import rustworkx as rx
import rustworkx.generators

from qiskit import QuantumCircuit
from qiskit.converters import circuit_to_dag

import networkx as nx
from networkx.algorithms.isomorphism import GraphMatcher

import numpy as np
import time

In [2]:
# Conversion rustworkx graphs and networkx graphs
def convert_networkx_to_rustworkx(graph):
    """Convert a networkx graph to a rustworkx graph."""
    rx_graph = rx.PyGraph()

    rx_graph.add_nodes_from(graph.nodes())
    rx_graph.add_edges_from([(edge[0], edge[1], None) for edge in graph.edges()])

    for node_index in rx_graph.node_indices():
        rx_graph[node_index] = None

    return rx_graph

def convert_rustworkx_to_networkx(graph):
    """Convert a rustworkx PyGraph or PyDiGraph to a networkx graph."""
    edge_list = [(
        x[0], x[1],
        {'weight': 1}) for x in graph.weighted_edge_list()]

    return nx.Graph(edge_list)

In [3]:
# Find the neighborhood of a rustworkx subgrpah
def neighborhood_rustworkx(r, initial_nodes, graph):
    # r is the order of the neighborhood
    # initial_nodes are the vertices in the subgraph
    # graph is the entire graph that contains the subgraph

    neighbors = list(initial_nodes)
    neighbors_new = list(initial_nodes)
    neighbors_new_temp = []
    
    for _ in range(r):
        for node in neighbors_new:
            adj_nodes = graph.adj(node)
            for new_node in list(adj_nodes.keys()):
                if not (new_node in neighbors):
                    neighbors.append(new_node)
                    neighbors_new_temp.append(new_node)
        neighbors_new = list(neighbors_new_temp) 
        neighbors_new_temp = []
    
    return neighbors

# Find the neighborhood of a networkx subgraph
def neighborhood_networkx(r, initial_nodes, graph):
    # r is the order of the neighborhood
    # initial_nodes are the vertices in the subgraph
    # graph is the entire graph that contains the subgraph

    neighbors = list(initial_nodes)
    neighbors_new = list(initial_nodes)
    neighbors_new_temp = []
    
    for _ in range(r):
        for node in neighbors_new:
            adj_nodes = graph.neighbors(node)
            for new_node in list(adj_nodes):
                if not (new_node in neighbors):
                    neighbors.append(new_node)
                    neighbors_new_temp.append(new_node)
        neighbors_new = list(neighbors_new_temp) 
        neighbors_new_temp = []
    
    return neighbors

## Conventional Subgraph Matching Algorithms

The following implementation comes from  
D. Nation and M. Treinish, Suppressing quantum circuit
errors due to system variability, PRX Quantum 4, 010327 (2023)

In [4]:
def mapping_VF2(target_graph, pattern_graph, circ, time_limit=None):
    # start the timer if a time_limit is provided
    if time_limit is not None:  
        start_time = time.time()
    
    # set the problem of subgraph isomorphism with standard VF2
    mappings = rustworkx.vf2_mapping(
        target_graph, 
        pattern_graph, 
        subgraph=True, 
        id_order=True, 
        induced=False,
    )

    # initialize indices of qubtis in the circuit and the hardware
    dag = circuit_to_dag(circ)
    qubits = dag.qubits # indices of qubits
    cm_nodes = list(target_graph.node_indexes()) # indices of nodes in the target graph

    # solve the problem with VF2
    layouts = []
    for mapping in mappings:
        # apply the qubit mappings to the quantum circuits
        temp_list = [None]*circ.num_qubits
        for cm_i, im_i in mapping.items():
            key = qubits[im_i]
            val = cm_nodes[cm_i]
            temp_list[circ.find_bit(key).index] = val
        layouts.append(temp_list)

        if (time_limit is not None) and (time.time() - start_time > time_limit):
            break # early stop
    return layouts

In [5]:
def mapping_VF2pp(target_graph, pattern_graph, circ, time_limit=None):
    # start the timer if a time_limit is provided
    if time_limit is not None:  
        start_time = time.time()
    
    # set the problem of subgraph isomorphism with VF2++
    mappings = rustworkx.vf2_mapping(
        target_graph, 
        pattern_graph, 
        subgraph=True, 
        id_order=False, 
        induced=False,
    )

    # initialize indices of qubtis in the circuit and the hardware
    dag = circuit_to_dag(circ)
    qubits = dag.qubits # indices of qubits
    cm_nodes = list(target_graph.node_indexes()) # indices of nodes in the target graph

    # solve the problem with VF2++
    layouts = []
    for mapping in mappings:
        # apply the qubit mappings to the quantum circuits
        temp_list = [None]*circ.num_qubits
        for cm_i, im_i in mapping.items():
            key = qubits[im_i]
            val = cm_nodes[cm_i]
            temp_list[circ.find_bit(key).index] = val
        layouts.append(temp_list)

        if (time_limit is not None) and (time.time() - start_time > time_limit):
            break # early stop
    return layouts

## Symmetry-Based Subgraph Matching Algorithm

In [6]:
def translation(layout, move, node_dict, node_dict_reverse):
    # this function shifts a given group of qubits (or nodes, specified by `layout`) in a certain direction
    # layout: a list of node indices
    # move: a two-element list that specifies the direction and distance of translation
    # node_dict: a dictionary that maps node indices to node positions and types
    # node_dict_reverse: a dictionary that maps node positions and types to node indices

    layout_converted = np.array([node_dict[node] for node in layout]) # retrieve the position of the nodes
    layout_shifted = layout_converted + np.array([move[0], move[1], 0])
    layout_shifted_converted = [node_dict_reverse[tuple(node)] for node in layout_shifted]

    return layout_shifted_converted

In [7]:
def translation_vectorized(layout, moves, node_dict, node_dict_reverse, graph_type):
    # this function implements batch shifts based on NumPy
    # layout: a list of node indices
    # moves: a n-by-two numpy array that specifies the direction and distance of translation
    # node_dict: a dictionary that maps node indices to node positions and types
    # node_dict_reverse: a dictionary that maps node positions and types to node indices
    # graph_type: a string that could be 'grid' or 'heavy_hex' or 'octagonal'

    layout_converted = np.array([node_dict[node] for node in layout]) # retrieve the position of the nodes
    moves = np.hstack((moves, np.zeros((moves.shape[0], 1)))).astype(int)
    layouts_shifted = [(node_pos + moves) for node_pos in layout_converted]
    layouts_shifted = np.transpose(np.array(layouts_shifted), axes=(1, 0, 2)).astype(int)
    layouts_shifted = layouts_shifted.reshape(-1, len(layouts_shifted[0, 0]))
    layouts_shifted_converted = retrieve_node_indices(layouts_shifted, node_dict_reverse, graph_type).reshape(-1, len(layout))
    return layouts_shifted_converted.tolist()

In [8]:
def feasible_range(layout, node_dict, node_dict_reverse, num_node_types):
    # this function returns all the feasible movements of the given group of qubits (or nodes, specified by `layout`) 
    # within the target graph (specified by `node_dict` and `node_dict_reverse`)

    # layout: list of numbers (node indices)
    # node_dict: a dictionary that maps node indices to node positions and types
    # node_dict_reverse: a dictionary that maps node positions and types to node indices
    # num_node_types: number of different node types

    node_pos_array = np.array(list(node_dict_reverse.keys()))
    node_array_by_types = []
    for node_type in range(num_node_types):
        node_array_by_types.append(node_pos_array[node_pos_array[:, 2] == node_type])

    feasible_movements_individual_nodes = []
    for node in layout:
        node_pos = node_dict[node]
        node_type = node_pos[2]
        feasible_movements_array = node_array_by_types[node_type][:, 0:2] - np.array(node_pos[0:2])
        feasible_movements_converted = set(tuple(row) for row in feasible_movements_array)
        feasible_movements_individual_nodes.append(feasible_movements_converted)    

    feasible_range = set.intersection(*feasible_movements_individual_nodes)

    return [list(move) for move in feasible_range]

In [9]:
def feasible_range_hardware_specific(layout, node_dict, node_dict_reverse, graph_type=None, target_graph_size=[1, 1]):
    # this function uses the hardware structure characteristics to determine all the feasible movements of
    # the given group of qubits (specified by `layout`) within the target graph
    # here, we only demonstrate the usage for three hardware architecture, including grid, heavy-hex, and octagonal architectures

    # layout: list of numbers (node indices)
    # graph_type: string, options include 'grid' or 'octagonal'
    # target_graph_size: a two-element list
    # node_dict_reverse: a dictionary that maps node positions and types to node indices

    if graph_type == 'grid':
        num_node_types = 1
        node_pos_array = []
        for node in layout:
            node_pos = node_dict[node]
            node_pos_array.append(node_pos)
        node_pos_array = np.array(node_pos_array)
        bounds = [min(node_pos_array[:, 0]), max(node_pos_array[:, 0]), min(node_pos_array[:, 1]), max(node_pos_array[:, 1])] # position of the layouts
        x_range = np.arange(-bounds[0], target_graph_size[0] - bounds[1]) # range of horizontal movements
        y_range = np.arange(-bounds[2], target_graph_size[1] - bounds[3]) # range of vertical movements
        x_grid, y_grid = np.meshgrid(x_range, y_range)
        feasible_movements = np.column_stack((x_grid.ravel(), y_grid.ravel()))
        return feasible_movements.tolist()

    elif graph_type == 'octagonal':
        num_node_types = 8
        node_pos_array = []
        for node in layout:
            node_pos = node_dict[node]
            node_pos_array.append(node_pos)
        node_pos_array = np.array(node_pos_array)
        bounds = [min(node_pos_array[:, 0]), max(node_pos_array[:, 0]), min(node_pos_array[:, 1]), max(node_pos_array[:, 1])] # position of the layouts
        x_range = np.arange(-bounds[0], target_graph_size[0] - bounds[1])
        y_range = np.arange(-bounds[2], target_graph_size[1] - bounds[3])
        x_grid, y_grid = np.meshgrid(x_range, y_range)
        feasible_movements = np.column_stack((x_grid.ravel(), y_grid.ravel()))
        return feasible_movements.tolist()

    elif graph_type == 'heavy_hex':
        num_node_types = 5
        # determine the movement range for heavy-hex graph
        feasible_movements = []
        # calculate the distance of the layout from the boundaries
        distance_matrix = np.zeros([len(layout), 4]) # store the distance of the nodes of the layout from its boundaries
        count_node = 0
        for node in layout:
            # retrieve the position of the node
            node_pos = node_dict[node]
            index_x, index_y, node_type = node_dict[node]
            # calculate the distance of the node from the boundaries of the target graph
            # determine the distance of a node from the target graph boundaries
            row_width = target_graph_size[0] * 4 + 3
            if node_type in [0, 1, 2, 3]:
                index_x_in_row = node % row_width
                distance_from_left = index_x_in_row // 4
                distance_from_right = (row_width - 1 - index_x_in_row) // 4
                distance_from_top = index_y
                distance_from_bottom = target_graph_size[1] - index_y
            elif node_type in [4]:
                distance_from_left = index_x + index_y // 2
                distance_from_right = target_graph_size[0] - distance_from_left
                distance_from_top = index_y
                distance_from_bottom = target_graph_size[1] - index_y - 1
            else:
                raise ValueError("Unsupported node type.")
            distance_matrix[count_node, :] = [distance_from_left, distance_from_right, distance_from_top, distance_from_bottom]
            count_node += 1
        
        # calculate the distance of the layout from the left, right, upper, and lower boundaries of the target graph
        x_range = np.arange(-min(distance_matrix[:, 0]), min(distance_matrix[:, 1]) + 1).astype(int)
        y_range = np.arange(-min(distance_matrix[:, 2]) + (min(distance_matrix[:, 2]) % 2), min(distance_matrix[:, 3]) + 1, 2).astype(int)
        
        # calculate range of movements (step-2 movement in y-direction)
        x_grid, y_grid = np.meshgrid(x_range, y_range)
        x_grid = x_grid - y_grid // 2
        feasible_movements_part_1 = np.column_stack((x_grid.ravel(), y_grid.ravel()))

        # move the layout by one step in the vertical direction
        shifted_layout = translation(layout, [0, 1], node_dict, node_dict_reverse)

        # calculate the distance of the shifted layout from the boundaries
        distance_matrix = np.zeros([len(shifted_layout), 4])
        count_node = 0
        for node in shifted_layout:
            # retrieve the position of the node
            node_pos = node_dict[node]
            index_x, index_y, node_type = node_dict[node]
            # calculate the distance of the node from the boundaries of the target graph
            # determine the distance of a node from the target graph boundaries
            row_width = target_graph_size[0] * 4 + 3
            if node_type in [0, 1, 2, 3]:
                index_x_in_row = node % row_width
                distance_from_left = index_x_in_row // 4
                distance_from_right = (row_width - 1 - index_x_in_row) // 4
                distance_from_top = index_y
                distance_from_bottom = target_graph_size[1] - index_y
            elif node_type in [4]:
                distance_from_left = index_x + index_y // 2
                distance_from_right = target_graph_size[0] - distance_from_left
                distance_from_top = index_y
                distance_from_bottom = target_graph_size[1] - index_y - 1
            else:
                raise ValueError("Unsupported node type.")
            distance_matrix[count_node, :] = [distance_from_left, distance_from_right, distance_from_top, distance_from_bottom]
            count_node += 1

        # calculate the distance of the layout from the left, right, upper, and lower boundaries of the target graph
        x_range = np.arange(-min(distance_matrix[:, 0]), min(distance_matrix[:, 1]) + 1).astype(int)
        y_range = np.arange(-min(distance_matrix[:, 2]) + (min(distance_matrix[:, 2]) % 2), min(distance_matrix[:, 3]) + 1, 2).astype(int)
        
        # calculate range of movements (step-2 movement in y-direction)
        x_grid, y_grid = np.meshgrid(x_range, y_range)
        x_grid = x_grid - y_grid // 2
        y_grid += 1
        feasible_movements_part_2 = np.column_stack((x_grid.ravel(), y_grid.ravel()))

        # merge the two parts of movement range
        feasible_movements = np.vstack((feasible_movements_part_1, feasible_movements_part_2))

        return feasible_movements.tolist()

    else:
        return feasible_range(layout, node_dict, node_dict_reverse, num_node_types)

In [10]:
def find_patterns_in_reduced_search_space(rx_target_graph, rx_pattern_graph, center_generating_set, r, node_dict, node_dict_reverse):
    # this function finds isomorphic subgraphs within a given target graph exploiting symmetries
    
    # rx_target_graph: a rustworkx target graph, defined by `functions_set_graph.ipynb`
    # rx_pattern_graph: a rustworkx pattern graph
    # center_generating_set: a list of node indices for a generating set around the center of the target graph
    # r: the order of the neighborhood that serves as the reduced search space

    isomorphisms_list = []

    # convert rustworkx graphs to networkx graphs for further processing
    nx_target_graph = convert_rustworkx_to_networkx(rx_target_graph)
    nx_pattern_graph = convert_rustworkx_to_networkx(rx_pattern_graph)

    # determine the reduced search space (a subgraph) within the target graph
    neighbor_generating_set = neighborhood_networkx(r, center_generating_set, nx_target_graph) # find the neighborhood of the centered generating set
    search_space = nx.induced_subgraph(nx_target_graph, neighbor_generating_set) # find the subgraph induced by the neighbors of the generating set

    # define the subgraph matching problem in the reduced search space
    GM = GraphMatcher(search_space, nx_pattern_graph) # VF2 algorithm is used
    isomorphisms = list(GM.subgraph_monomorphisms_iter()) # create an iterator over all isomorphisms

    # solve the subgraph matching problem
    isomorphisms_list = []
    for iso in isomorphisms:
        num_qubits = len(iso.values())
        physical_by_logical = list(range(num_qubits))
        for physical_qubit, logical_qubit in iso.items():
            physical_by_logical[logical_qubit] = physical_qubit
        isomorphisms_list.append(physical_by_logical)

    return isomorphisms_list

In [11]:
def is_layout_equivalent(layout_1, layout_2, node_dict):
    # this function determines if the two inputs layouts are equivalent up to a translation
    
    if len(layout_1) != len(layout_2):
        return False

    layout_1_pos = np.array([node_dict[node] for node in layout_1])
    layout_2_pos = np.array([node_dict[node] for node in layout_2])
    diff = layout_1_pos - layout_2_pos
    diff_pos_first = list(diff[0][0:2])
    
    for node_index in range(len(layout_1)):
        condition_1 = (list(diff[node_index][0:2]) == diff_pos_first)
        condition_2 = (diff[node_index][2] == 0)
        if (not condition_1) or (not condition_2):
            return False

    return True

In [12]:
def remove_equivalent_layouts(layouts, node_dict):
    # this function detects the existence of equivalent circuit layouts from the input
    # and remove the redundant ones, leaving only one representative circuit layout

    reduced_layouts = []
    
    for layout in layouts:
        is_equivalent = any(is_layout_equivalent(layout, _layout, node_dict) for _layout in reduced_layouts)
        if not is_equivalent:
            reduced_layouts.append(layout)

    return reduced_layouts

In [13]:
def symmetry_based_subgraph_matching(target_graph, pattern_graph, node_dict, node_dict_reverse, center_generating_set, time_limit=None):
    # this function finds all the subgraphs of the given `target_graph` that are isomorphic to the `pattern_graph`
    # target_graph: target graph defined in rustworkx
    # pattern_graph: pattern graph defined in rustworkx
    # node_dict: a mapping from the indices of nodes to their positions and types
    # node_dict_reverse: a mapping from the positions and types of nodes to their indices
    # center_generating_set: a generating set of the target graph located in its center
    # time_limit: time limit of the subgraph matching

    # start the timer if a time_limit is provided
    if time_limit is not None:  
        start_time = time.time()

    # create a candidate region in the target_graph
    nx_pattern_graph = convert_rustworkx_to_networkx(pattern_graph)
    radius = nx.radius(nx_pattern_graph) # calculate the radius of the pattern graph
    layouts = find_patterns_in_reduced_search_space(target_graph, pattern_graph, center_generating_set, radius, node_dict, node_dict_reverse)

    # remove equivalent (redundant) layouts found in the reduced search space
    layouts = remove_equivalent_layouts(layouts, node_dict)

    # using translation to extend the rest layouts to obtain all the isomorphic subgraphs (feasible quantum circuit mappings)
    unique_layout, count_layout = np.unique(np.array(list(node_dict_reverse.keys()))[:, 2], return_counts=True)
    num_node_types = len(count_layout) # retrieve the number of node types (inequivalent nodes)
    all_layouts = []

    # serial movements & universal
    for layout in layouts:
        moves = feasible_range(layout, node_dict, node_dict_reverse, num_node_types) # determine the range of translation
        for move in moves:
            all_layouts.append(translation(layout, move, node_dict, node_dict_reverse)) # implement the feasible translation
            if (time_limit is not None) and (time.time() - start_time > time_limit):
                return all_layouts # early stop when time is up

    return all_layouts

In [14]:
def symmetry_based_subgraph_matching_optimized(target_graph, pattern_graph, node_dict, node_dict_reverse, center_generating_set, target_graph_type, target_graph_size):
    # this function finds all the subgraphs of the given `target_graph` that are isomorphic to the `pattern_graph`
    # target_graph: target graph defined in rustworkx
    # pattern_graph: pattern graph defined in rustworkx
    # node_dict: a mapping from the indices of nodes to their positions and types
    # node_dict_reverse: a mapping from the positions and types of nodes to their indices
    # center_generating_set: a generating set of the target graph located in its center
    # target_graph_type: string, could be 'grid' or 'heavy_hex' or 'octagonal'
    # target_graph_size: a two-element list that specifies the width and height of the target graph

    # create a candidate region in the target_graph
    nx_pattern_graph = convert_rustworkx_to_networkx(pattern_graph)
    radius = nx.radius(nx_pattern_graph) # calculate the radius of the pattern graph
    layouts = find_patterns_in_reduced_search_space(target_graph, pattern_graph, center_generating_set, radius, node_dict, node_dict_reverse)

    # remove equivalent (redundant) layouts found in the reduced search space
    layouts = remove_equivalent_layouts(layouts, node_dict)

    # using translation to extend the rest layouts to obtain all the isomorphic subgraphs (feasible quantum circuit mappings)
    unique_layout, count_layout = np.unique(np.array(list(node_dict_reverse.keys()))[:, 2], return_counts=True)
    num_node_types = len(count_layout) # retrieve the number of node types (inequivalent nodes)
    all_layouts = []

    # vectorized movements & hardware-specific
    for layout in layouts:
        moves = feasible_range_hardware_specific(layout, node_dict, node_dict_reverse, target_graph_type, target_graph_size) # determine the range of translation
        all_layouts += translation_vectorized(layout, np.array(moves), node_dict, node_dict_reverse, target_graph_type) # implement the feasible translation

    return all_layouts