# SSD Visualisations for Showcase Benchmark Circuits

This notebook focuses on the benchmark families that QuASAr uses to stress its Sparse State Descriptor (SSD) planning. It inspects the clustered entanglement, layered Clifford transition, and classical-controlled workloads introduced in `benchmarks.circuits`.

## Circuits covered

* All functions in `benchmarks.circuits` whose names contain `clustered_`
* All functions in `benchmarks.circuits` whose names contain `layered_`
* All functions in `benchmarks.circuits` whose names contain `classical_controlled_`

The helper utilities below automatically collect these generators, pick a representative qubit width (the median value from the showcase defaults when available), and rebuild the circuits with classical simplification disabled before extracting their SSD graphs.

In [None]:
import inspect
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Callable

import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
from IPython.display import display

PROJECT_ROOT = Path('..').resolve().parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

from benchmarks import circuits as benchmark_circuits
from quasar.circuit import Circuit
from quasar.method_selector import NoFeasibleBackendError
from quasar.planner import PlanResult
from quasar.scheduler import Scheduler

try:  # Showcase helper supplies display names and default widths when available
    from benchmarks.showcase_benchmarks import SHOWCASE_CIRCUITS
except ImportError:  # pragma: no cover - fallback for minimal environments
    SHOWCASE_CIRCUITS = {}

plt.rcParams.update({
    'figure.figsize': (10, 6),
    'axes.titlesize': 14,
    'axes.labelsize': 12,
})
pd.set_option('display.max_colwidth', None)
pd.options.display.float_format = '{:.3f}'.format
SCHEDULER = Scheduler()


In [None]:
TARGET_PREFIXES = ("clustered_", "layered_", "classical_controlled_")

@dataclass(frozen=True)
class TargetCircuitSpec:
    identifier: str
    display_name: str
    constructor: Callable[..., Circuit]
    kwargs: dict[str, object]
    description: str | None
    qubits: int


def _showcase_entry(identifier: str):
    base = identifier.removesuffix('_circuit')
    return SHOWCASE_CIRCUITS.get(base)


def _title_case_identifier(identifier: str) -> str:
    words = identifier.split('_')
    formatted = []
    for word in words:
        upper = word.upper()
        if upper in {"GHZ", "QFT", "W"}:
            formatted.append(upper)
        else:
            formatted.append(word.capitalize())
    return ' '.join(formatted)


def _default_width(identifier: str) -> int:
    entry = _showcase_entry(identifier)
    if entry is not None and getattr(entry, 'default_qubits', None):
        widths = tuple(entry.default_qubits)
        if widths:
            return widths[len(widths) // 2]
    if 'classical_controlled_' in identifier:
        return 28
    if 'clustered_' in identifier:
        return 40
    if 'layered_' in identifier:
        return 40
    return 20


def _display_name(identifier: str, width: int) -> str:
    entry = _showcase_entry(identifier)
    if entry is not None and getattr(entry, 'display_name', None):
        base = entry.display_name
    else:
        base = _title_case_identifier(identifier.removesuffix('_circuit'))
    return f"{base} ({width} qubits)"


def _description(identifier: str) -> str | None:
    entry = _showcase_entry(identifier)
    if entry is not None:
        return entry.description
    return None


def build_target_circuit_specs() -> tuple[TargetCircuitSpec, ...]:
    specs: list[TargetCircuitSpec] = []
    for name in sorted(dir(benchmark_circuits)):
        if not name.endswith('_circuit'):
            continue
        if not any(prefix in name for prefix in TARGET_PREFIXES):
            continue
        factory = getattr(benchmark_circuits, name)
        if not callable(factory):
            continue
        try:
            signature = inspect.signature(factory)
        except (TypeError, ValueError):
            continue
        if 'num_qubits' not in signature.parameters:
            continue
        width = _default_width(name)
        kwargs = {'num_qubits': width}
        specs.append(
            TargetCircuitSpec(
                identifier=name,
                display_name=_display_name(name, width),
                constructor=factory,
                kwargs=kwargs,
                description=_description(name),
                qubits=width,
            )
        )
    return tuple(specs)


TARGET_CIRCUIT_SPECS = build_target_circuit_specs()


def circuit_overview_table() -> pd.DataFrame:
    rows: list[dict[str, object]] = []
    for spec in TARGET_CIRCUIT_SPECS:
        rows.append(
            {
                'Circuit': spec.display_name,
                'Identifier': spec.identifier,
                'Qubits': spec.qubits,
                'Description': spec.description or '',
            }
        )
    if not rows:
        return pd.DataFrame(columns=['Circuit', 'Identifier', 'Qubits', 'Description'])
    table = pd.DataFrame(rows)
    return table.sort_values('Identifier').reset_index(drop=True)


def rebuild_without_simplification(circuit: Circuit) -> Circuit:
    """Return a fresh circuit instance with classical simplification disabled."""

    gate_specs = [gate.to_dict() for gate in circuit.gates]
    return Circuit(gate_specs, use_classical_simplification=False)


def instantiate_target_benchmarks():
    """Yield (spec, Circuit) pairs for the selected benchmarks."""

    for spec in TARGET_CIRCUIT_SPECS:
        base = spec.constructor(**spec.kwargs)
        yield spec, rebuild_without_simplification(base)

In [None]:
def partition_table(graph: nx.MultiDiGraph) -> pd.DataFrame:    """Return a summary of partition metadata recorded in the graph."""    rows = []    for node, data in graph.nodes(data=True):        if data.get('kind') != 'partition':            continue        rows.append({            'Partition': data.get('index'),            'Backend': data.get('backend'),            'Multiplicity': data.get('multiplicity'),            'Qubits': ', '.join(str(q) for q in data.get('qubits', ())),            'Boundary': ', '.join(str(q) for q in data.get('boundary_qubits', ())),            'Rank': data.get('rank'),            'Frontier': data.get('frontier'),            'Time cost': data.get('cost_time'),            'Memory cost': data.get('cost_memory'),        })    if not rows:        return pd.DataFrame()    return pd.DataFrame(rows).sort_values('Partition').reset_index(drop=True)def conversion_table(graph: nx.MultiDiGraph) -> pd.DataFrame:    """Return conversion layer metadata from the SSD graph."""    rows = []    for node, data in graph.nodes(data=True):        if data.get('kind') != 'conversion':            continue        rows.append({            'Conversion': data.get('index'),            'Boundary': ', '.join(str(q) for q in data.get('boundary', ())),            'Source': data.get('source'),            'Target': data.get('target'),            'Rank': data.get('rank'),            'Frontier': data.get('frontier'),            'Primitive': data.get('primitive'),            'Time cost': data.get('cost_time'),            'Memory cost': data.get('cost_memory'),        })    columns = [        'Conversion', 'Boundary', 'Source', 'Target',        'Rank', 'Frontier', 'Primitive', 'Time cost', 'Memory cost',    ]    if not rows:        return pd.DataFrame(columns=columns)    return pd.DataFrame(rows, columns=columns).sort_values('Conversion').reset_index(drop=True)def plan_table(plan: PlanResult | None) -> pd.DataFrame:    """Return a step-by-step summary for the planner result."""    columns = [        'Step', 'Start', 'End', 'Backend', 'Parallel groups', 'Time cost', 'Memory cost',    ]    if plan is None:        return pd.DataFrame(columns=columns)    steps = list(getattr(plan, "steps", []))    if not steps:        return pd.DataFrame(columns=columns)    costs = list(getattr(plan, "step_costs", []))    rows = []    for idx, step in enumerate(steps):        cost = costs[idx] if idx < len(costs) else None        parallel = [tuple(group) for group in step.parallel] if step.parallel else ()        rows.append({            'Step': idx,            'Start': step.start,            'End': step.end,            'Backend': step.backend.name,            'Parallel groups': parallel,            'Time cost': getattr(cost, 'time', None) if cost is not None else None,            'Memory cost': getattr(cost, 'memory', None) if cost is not None else None,        })    return pd.DataFrame(rows, columns=columns)def draw_ssd_graph(    graph: nx.MultiDiGraph,    *,    title: str,    seed: int = 2024,    ax: plt.Axes | None = None,) -> plt.Axes:    """Visualise partitions, conversions and backends for the SSD graph."""    if ax is None:        _, ax = plt.subplots(figsize=(10, 6))    pos = nx.spring_layout(graph, seed=seed)    node_styles = {        'partition': {'color': '#4e79a7', 'shape': 's', 'size': 1400, 'label': 'Partition'},        'conversion': {'color': '#e15759', 'shape': 'D', 'size': 1100, 'label': 'Conversion'},        'backend': {'color': '#59a14f', 'shape': 'o', 'size': 1000, 'label': 'Backend'},    }    for kind, style in node_styles.items():        nodes = [node for node, data in graph.nodes(data=True) if data.get('kind') == kind]        if not nodes:            continue        nx.draw_networkx_nodes(            graph,            pos,            nodelist=nodes,            node_color=style['color'],            node_shape=style['shape'],            node_size=style['size'],            label=style['label'],            ax=ax,        )    edge_styles = {        'dependency': {'style': 'solid', 'color': '#1f77b4', 'arrows': True},        'entanglement': {'style': 'dashed', 'color': '#ff7f0e', 'arrows': False},        'conversion_boundary': {'style': 'dotted', 'color': '#76b7b2', 'arrows': True, 'connectionstyle': 'arc3,rad=0.1'},        'backend_assignment': {'style': 'solid', 'color': '#b07aa1', 'arrows': True, 'connectionstyle': 'arc3,rad=0.15'},        'conversion_source': {'style': 'solid', 'color': '#9c755f', 'arrows': True, 'connectionstyle': 'arc3,rad=0.2'},        'conversion_target': {'style': 'solid', 'color': '#f28e2b', 'arrows': True, 'connectionstyle': 'arc3,rad=-0.2'},    }    for kind, style in edge_styles.items():        edges = [            (u, v)            for u, v, key, data in graph.edges(data=True, keys=True)            if data.get('kind') == kind        ]        if not edges:            continue        nx.draw_networkx_edges(            graph,            pos,            edgelist=edges,            ax=ax,            edge_color=style['color'],            style=style['style'],            arrows=style.get('arrows', True),            connectionstyle=style.get('connectionstyle', 'arc3,rad=0.0'),        )    labels = {}    for node, data in graph.nodes(data=True):        kind = data.get('kind')        if kind == 'partition':            label = f"P{data.get('index')}\n{data.get('backend')}"            multiplicity = data.get('multiplicity')            if multiplicity and multiplicity > 1:                label += f"\n×{multiplicity}"            labels[node] = label        elif kind == 'backend':            labels[node] = data.get('label', data.get('backend', ''))        elif kind == 'conversion':            labels[node] = f"C{data.get('index')}\n{data.get('source')}→{data.get('target')}"    nx.draw_networkx_labels(graph, pos, labels=labels, font_size=10, ax=ax)    ax.set_axis_off()    ax.set_title(title)    handles, labels = ax.get_legend_handles_labels()    if handles:        unique = dict(zip(labels, handles))        ax.legend(unique.values(), unique.keys(), loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=len(unique))    return ax

In [None]:

# Updated helpers reflecting the hierarchical SSD structure.
def _format_subsystems(groups) -> str:
    """Return a human readable representation for subsystem groups."""

    if not groups:
        return "-"
    return ", ".join("(" + ", ".join(str(q) for q in group) + ")" for group in groups)


def partition_table(graph: nx.MultiDiGraph) -> pd.DataFrame:
    """Return a summary of partition metadata recorded in the graph."""

    rows = []
    for node, data in graph.nodes(data=True):
        if data.get('kind') != 'partition':
            continue
        rows.append({
            'Partition': data.get('index'),
            'Backend': data.get('backend'),
            'Multiplicity': data.get('multiplicity'),
            'Subsystems': _format_subsystems(data.get('subsystems')),
            'Qubits': ', '.join(str(q) for q in data.get('qubits', ())),
            'Boundary': ', '.join(str(q) for q in data.get('boundary_qubits', ())),
            'Rank': data.get('rank'),
            'Frontier': data.get('frontier'),
            'Time cost': data.get('cost_time'),
            'Memory cost': data.get('cost_memory'),
        })
    columns = [
        'Partition',
        'Backend',
        'Multiplicity',
        'Subsystems',
        'Qubits',
        'Boundary',
        'Rank',
        'Frontier',
        'Time cost',
        'Memory cost',
    ]
    if not rows:
        return pd.DataFrame(columns=columns)
    return pd.DataFrame(rows, columns=columns).sort_values('Partition').reset_index(drop=True)


def draw_ssd_graph(
    graph: nx.MultiDiGraph,
    *,
    title: str,
    seed: int = 2024,
    ax: plt.Axes | None = None,
) -> plt.Axes:
    """Visualise partitions, conversions and backends for the SSD graph."""

    if ax is None:
        _, ax = plt.subplots(figsize=(10, 6))
    pos = nx.spring_layout(graph, seed=seed)

    node_styles = {
        'partition': {'color': '#4e79a7', 'shape': 's', 'size': 1400, 'label': 'Partition'},
        'conversion': {'color': '#e15759', 'shape': 'D', 'size': 1100, 'label': 'Conversion'},
        'backend': {'color': '#59a14f', 'shape': 'o', 'size': 1000, 'label': 'Backend'},
    }

    for kind, style in node_styles.items():
        nodes = [node for node, data in graph.nodes(data=True) if data.get('kind') == kind]
        if not nodes:
            continue
        nx.draw_networkx_nodes(
            graph,
            pos,
            nodelist=nodes,
            node_color=style['color'],
            node_shape=style['shape'],
            node_size=style['size'],
            label=style['label'],
            ax=ax,
        )

    edge_styles = {
        'dependency': {'style': 'solid', 'color': '#1f77b4', 'arrows': True},
        'entanglement': {'style': 'dashed', 'color': '#ff7f0e', 'arrows': False},
        'conversion_boundary': {'style': 'dotted', 'color': '#76b7b2', 'arrows': True, 'connectionstyle': 'arc3,rad=0.1'},
        'backend_assignment': {'style': 'solid', 'color': '#b07aa1', 'arrows': True, 'connectionstyle': 'arc3,rad=0.15'},
        'conversion_source': {'style': 'solid', 'color': '#9c755f', 'arrows': True, 'connectionstyle': 'arc3,rad=0.2'},
        'conversion_target': {'style': 'solid', 'color': '#f28e2b', 'arrows': True, 'connectionstyle': 'arc3,rad=-0.2'},
    }

    for kind, style in edge_styles.items():
        edges = [
            (u, v)
            for u, v, key, data in graph.edges(data=True, keys=True)
            if data.get('kind') == kind
        ]
        if not edges:
            continue
        nx.draw_networkx_edges(
            graph,
            pos,
            edgelist=edges,
            ax=ax,
            edge_color=style['color'],
            style=style['style'],
            arrows=style.get('arrows', True),
            connectionstyle=style.get('connectionstyle', 'arc3,rad=0.0'),
        )

    labels = {}
    for node, data in graph.nodes(data=True):
        kind = data.get('kind')
        if kind == 'partition':
            label = f"P{data.get('index')}
{data.get('backend')}"
            multiplicity = data.get('multiplicity')
            if multiplicity and multiplicity > 1:
                label += f"
×{multiplicity}"
            labels[node] = label
        elif kind == 'backend':
            labels[node] = data.get('label', data.get('backend', ''))
        elif kind == 'conversion':
            labels[node] = f"C{data.get('index')}
{data.get('source')}→{data.get('target')}"
    nx.draw_networkx_labels(graph, pos, labels=labels, font_size=10, ax=ax)

    ax.set_axis_off()
    ax.set_title(title)

    handles, labels = ax.get_legend_handles_labels()
    if handles:
        unique = dict(zip(labels, handles))
        ax.legend(unique.values(), unique.keys(), loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=len(unique))

    return ax


## Visualising the SSDs

The loop below instantiates each benchmark, extracts its SSD graph and renders the structure alongside tabular summaries for partitions and conversion layers. Use `circuit_overview_table()` to inspect the collected circuit metadata before plotting.

In [None]:
overview = circuit_overview_table()
if not overview.empty:
    display(overview)
    print()

for spec, circuit in instantiate_target_benchmarks():
    plan: PlanResult | None = None
    plan_error: str | None = None
    try:
        plan = SCHEDULER.prepare_run(circuit)
    except NoFeasibleBackendError as exc:
        plan_error = f'No feasible plan: {exc}'
    except Exception as exc:
        plan_error = f'Planning failed: {exc}'
    graph = circuit.to_networkx_ssd()
    total_qubits = graph.graph.get('total_qubits')
    num_partitions = graph.graph.get('num_partitions')
    print(f"{spec.display_name}: {num_partitions} partitions covering {total_qubits} qubits")
    if plan_error:
        print(f'  Planning error: {plan_error}')
    else:
        steps = list(getattr(plan, "steps", [])) if plan is not None else []
        step_count = len(steps)
        backends = sorted({step.backend.name for step in steps})
        backend_summary = ', '.join(backends) if backends else 'none'
        print(f'  Plan steps: {step_count} using {backend_summary}')

    fig, ax = plt.subplots(figsize=(10, 6))
    draw_ssd_graph(graph, title=spec.display_name, ax=ax, seed=42)
    plt.show()

    partitions = partition_table(graph)
    if not partitions.empty:
        display(partitions)

    conversions = conversion_table(graph)
    if not conversions.empty:
        display(conversions)

    if plan is not None and not plan_error:
        steps_df = plan_table(plan)
        if not steps_df.empty:
            display(steps_df)

    print()
