In [104]:
%pip install numpy matplotlib qiskit qiskit-aer scipy --quiet

In [105]:
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple
from qiskit import QuantumCircuit, transpile
from qiskit.quantum_info import SparsePauliOp, Operator, partial_trace, Statevector, Pauli
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import RealAmplitudes, real_amplitudes
from qiskit_aer import Aer
from qiskit.primitives import StatevectorSampler
from qiskit_aer.primitives import SamplerV2 as Sampler
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')

In [107]:
class BarrenPlateauCircuit:
#Class to demonstrate barren plateaus in variational quantum circuits. Implements hardware-efficient ansatz with configurable depth.

    def __init__(self,num_qubits,num_layers,entanglement='full'): #Circuit Initialization
        self.num_qubits = num_qubits
        self.num_layers = num_layers
        self.entanglement = entanglement
        self.num_params = self._calculate_num_params()
        self.params = ParameterVector('θ',self.num_params)
        self.circuit = self._build_circuit()

    def _calculate_num_params(self):
        return 3*self.num_qubits*self.num_layers

    def _build_circuit(self):
        qc = QuantumCircuit(self.num_qubits)
        param_idx = 0
        for layer in range(self.num_layers):
            for qubit in range(self.num_qubits): # Rotation layer
                qc.rx(self.params[param_idx],qubit)
                param_idx += 1
                qc.ry(self.params[param_idx],qubit)
                param_idx += 1
                qc.rz(self.params[param_idx],qubit)
                param_idx += 1

            # Entanglement layer
            if layer < self.num_layers - 1:  # No entanglement after last layer
                self._add_entanglement_layer(qc)
        return qc

    def _add_entanglement_layer(self,qc): #Based on specific pattern
        if self.entanglement == 'full': #all-to-all connectivity, taking this example for demo
            for i in range(self.num_qubits):
                for j in range(i+1,self.num_qubits):
                    qc.cx(i,j)
        elif self.entanglement == 'linear': #nearest-neighbor
            for i in range(self.num_qubits - 1):
                qc.cx(i,i+1)
        elif self.entanglement == 'circular': #nearest-neighbor with periodic boundary
            for i in range(self.num_qubits - 1):
                qc.cx(i,i+1)
            if self.num_qubits > 2:
                qc.cx(self.num_qubits-1,0)

    def get_circuit(self,params=None):
        if params is None:
            return self.circuit
        return self.circuit.assign_parameters(params)

In [108]:
def compute_gradient(circ,params,obs='Z',method='parameter_shift'):
#Compute gradient of expectation value using parameter shift rule.

    gradients = np.zeros(len(params))
    shift = np.pi / 2
    for i in range(len(params)):
        params_plus = params.copy() #Forward shift
        params_plus[i] += shift
        exp_plus = compute_expectation(circ,params_plus,obs)
        params_minus = params.copy() #Backward shift
        params_minus[i] -= shift
        exp_minus = compute_expectation(circ,params_minus,obs)
        gradients[i] = (exp_plus - exp_minus) / 2 #Parameter shift rule
    return gradients

In [109]:
def compute_expectation(circ,params,obs='Z'):
#Compute expectation value of an observable, taking a specific example.

    qc = circ.get_circuit(params) #Bind parameters to circuit
    backend = Aer.get_backend('statevector_simulator') #Get statevector
    job = backend.run(qc)
    statevector = job.result().get_statevector()

    if obs == 'Z': #Create Pauli object from observable string
        obs_pauli = Pauli('Z'+'I'*(circ.num_qubits - 1)) #Pauli Z on first qubit
    elif obs == 'global':
        obs_pauli = Pauli('Z'*circ.num_qubits) #Sum of Zs on all qubits
    else:
        raise ValueError(f"Unsupported observable: {obs}")

    expectation = statevector.expectation_value(obs_pauli)
    return np.real(expectation) #Ensure that the result is real

In [110]:
def analyze_barren_plateaus(num_qubits,num_layers,num_samples,entanglement):
#Analyze gradient variance for different circuit configurations.

    results = {}
    for n_qubits in num_qubits:
        results[n_qubits] = {}

        for n_layers in num_layers:
            print(f"Analyzing: {n_qubits} qubits, {n_layers} layers")
            circuit = BarrenPlateauCircuit(n_qubits, n_layers, entanglement)
            gradient_norms = []

            for sample in range(num_samples):
                params = np.random.uniform(0, 2*np.pi, circuit.num_params) #Random initialization
                gradients = compute_gradient(circuit, params, obs='Z') #Compute gradient
                gradient_norms.append(np.linalg.norm(gradients)) #Store gradient norm

            results[n_qubits][n_layers] = {
                'mean': np.mean(gradient_norms),
                'std': np.std(gradient_norms),
                'variance': np.var(gradient_norms),
                'raw_norms': gradient_norms
            }
    return results

**Initial Plots for Barren Plateau Generation:** used for initial tests, omitted from final result as a comparative plot will be used.

In [112]:
def plot_barren_plateaus(results):

    fig, axes = plt.subplots(2,2,figsize=(12,10))

    #Plot 1:Gradient variance vs layers for different number of qubits
    ax = axes[0, 0]
    for n_qubits in results.keys():
        layers = sorted(results[n_qubits].keys())
        variances = [results[n_qubits][l]['variance'] for l in layers]
        ax.semilogy(layers, variances, 'o-', label=f'{n_qubits} qubits')
    ax.set_xlabel('Number of Layers')
    ax.set_ylabel('Gradient Variance (log scale)')
    ax.set_title('Gradient Variance vs Circuit Depth')
    ax.legend()
    ax.grid(True, alpha=0.3)

    #Plot 2:Gradient distribution for deepest circuit
    ax = axes[0, 1]
    max_qubits = max(results.keys())
    max_layers = max(results[max_qubits].keys())
    gradient_norms = results[max_qubits][max_layers]['raw_norms']
    ax.hist(gradient_norms, bins=30, alpha=0.7, edgecolor='black')
    ax.set_xlabel('Gradient Norm')
    ax.set_ylabel('Frequency')
    ax.set_title(f'Gradient Distribution ({max_qubits} qubits, {max_layers} layers)')
    ax.axvline(x=np.mean(gradient_norms), color='red', linestyle='--', label=f'Mean: {np.mean(gradient_norms):.4f}')
    ax.legend()

    #Plot 3:Mean gradient vs qubits for different depths
    ax = axes[1, 0]
    all_layers = set()
    for n_qubits in results.keys():
        all_layers.update(results[n_qubits].keys())

    for n_layers in sorted(all_layers):
        qubits = []
        means = []
        for n_qubits in sorted(results.keys()):
            if n_layers in results[n_qubits]:
                qubits.append(n_qubits)
                means.append(results[n_qubits][n_layers]['mean'])
        if qubits:
            ax.semilogy(qubits, means, 'o-', label=f'{n_layers} layers')

    ax.set_xlabel('Number of Qubits')
    ax.set_ylabel('Mean Gradient Norm (log scale)')
    ax.set_title('Scaling of Gradient with System Size')
    ax.legend()
    ax.grid(True, alpha=0.3)

    #Plot 4:Visualization of exponential decay
    ax = axes[1, 1]
    for n_qubits in results.keys():
        layers = sorted(results[n_qubits].keys())
        means = [results[n_qubits][l]['mean'] for l in layers]
        stds = [results[n_qubits][l]['std'] for l in layers]
        ax.errorbar(layers, means, yerr=stds, fmt='o-', capsize=5, label=f'{n_qubits} qubits', alpha=0.7)

    ax.set_xlabel('Number of Layers')
    ax.set_ylabel('Mean Gradient Norm ± Std')
    ax.set_title('Gradient Decay with Error Bars')
    ax.set_yscale('log')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

In [67]:
print("Barren Plateaus Generation")
print("Creating a simple barren plateau circuit:") #Basic demo
circuit = BarrenPlateauCircuit(n_qubits=4, n_layers=3, entanglement='linear')
print(f"Circuit has {circuit.n_qubits} qubits and {circuit.num_params} parameters")

#Compute gradient for random parameters
print("Computing gradients for random initialization:")
random_params = np.random.uniform(0, 2*np.pi, circuit.num_params)
gradients = compute_gradient(circuit, random_params)
print(f"Gradient norm: {np.linalg.norm(gradients):.4f}")
print(f"Mean gradient magnitude: {np.mean(np.abs(gradients)):.4f}")

#Analyze barren plateaus
num_qubits=[4,6,8]
num_layers=[2,4,8,12,16]
num_samples=50
print("Analyzing barren plateau behavior:")
results = analyze_barren_plateaus(num_qubits,num_layers,num_samples,entanglement='linear')

print("Plotting results:")
plot_barren_plateaus(results)

### **HC-GO Analysis**

In [113]:
class PauliContraction:
#Handles the efficient backward contraction of Pauli operators.
#Computes O_{in} = Tr_Block [ U^dag(O_{future}(x) I_block + I_qcr(x) H_local)U]

    @staticmethod
    def conjugate_and_trace(operator_from_future: SparsePauliOp,local_hamiltonian: SparsePauliOp,unitary_circuit: QuantumCircuit,
        num_qcr: int,num_block: int)->SparsePauliOp:

        id_block = SparsePauliOp(["I"*num_block]) #Construct the total operator on k+b qubits
        id_qcr = SparsePauliOp(["I"*num_qcr])

        term1 = id_block.tensor(operator_from_future) #Assume QCR is on the lower indices and Block on upper
        term2 = local_hamiltonian.tensor(id_qcr) #H_local acts on Block.
        total_op = term1 + term2

        u_op = Operator(unitary_circuit) #Conjugate by Unitary
        rotated_dense = u_op.adjoint().compose(total_op).compose(u_op) #For larger systems, Clifford logic might be required

        from qiskit.quantum_info import DensityMatrix #Partial Trace
        dm_rotated_dense = DensityMatrix(rotated_dense)

        trace_indices = list(range(num_qcr,num_qcr + num_block)) #Trace out the BLOCK qubits, num_qcr to num_qcr+num_block-1
        reduced_dense = partial_trace(dm_rotated_dense,trace_indices)
        return SparsePauliOp.from_operator(reduced_dense).simplify()

In [115]:
class CompressedShadow:
    def __init__(self,k_qubits: int,num_snapshots: int):
        self.k = k_qubits
        self.num_snapshots = num_snapshots
        self.sampler = StatevectorSampler() #Uses StatevectorSampler for exact simulation in this demo

    def collect_shadows(self,state_circuit: QuantumCircuit) -> Tuple[np.ndarray, np.ndarray]: #Executes random measurements
        bases = np.random.randint(0, 3, size=(self.num_snapshots, self.k))
        pub_circuits = []

        for i in range(self.num_snapshots):
            qc = state_circuit.copy()

            #Apply basis rotations example
            for q in range(self.k):
                b = bases[i, q]
                if b == 0:
                    qc.h(q)
                elif b == 1:
                    qc.sdg(q)
                    qc.h(q)

            qc.measure_all()
            pub_circuits.append(qc)

        #Execute batch
        job = self.sampler.run(pub_circuits)
        result = job.result()

        bitstrings = []
        for i in range(self.num_snapshots):
            counts = result[i].data.meas.get_counts() #StatevectorSampler returns the counts dictionary
            bstr = list(counts.keys())[0] # Parse bitstring
            bits = [int(bstr[self.k-1-j]) for j in range(self.k)]
            bitstrings.append(bits)

        return np.array(bitstrings), bases

    def estimate_expectation(self,shadow_data,observable: SparsePauliOp,k_means: int = 10)->float:
        bitstrings, bases = shadow_data
        num_total = len(bitstrings)
        if num_total == 0: return 0.0

        chunk_size = max(1, num_total // k_means)
        means = []

        #Single snapshot estimator
        def get_val(b_str,basis_list,pauli_str):
            val = 1.0
            for q in range(self.k):
                char = pauli_str[self.k - 1 - q] #Pauli label is in a reverse order
                if char == 'I': continue

                needed_basis = {'X':0,'Y':1,'Z':2}[char]
                if basis_list[q]!= needed_basis:
                    return 0.0

                sign = -1 if b_str[q] == 1 else 1 #Eigenvalue is (-1)^b*3
                val *= (3 * sign)
            return val

        for k_idx in range(k_means):
            start = k_idx*chunk_size
            end = min((k_idx+1)*chunk_size,num_total)
            chunk_sum = 0.0

            for lbl, coeff in zip(observable.paulis.to_labels(),observable.coeffs):
                term_val = 0.0
                for i in range(start, end):
                    term_val += get_val(bitstrings[i],bases[i],lbl)
                chunk_sum += np.real(coeff)*(term_val/(end - start))

            means.append(chunk_sum)
        return np.median(means)

In [116]:
class HCGOAnsatz:
    def __init__(self,num_blocks: int,block_size: int,context_size: int):
        self.cells = []
        self.params = []

        # Initialize cells
        for _ in range(num_blocks):
            ansatz = real_amplitudes(num_qubits=context_size + block_size, reps=1) #Acts on k(context) + b(block) qubits
            self.cells.append(ansatz)
            self.params.append(np.random.uniform(0, 0.1, size=ansatz.num_parameters)) #Small random initialization

    def get_cell_circuit(self, index, params):
        return self.cells[index].assign_parameters(params)

In [118]:
def run_hcgo_training(global_hamiltonian_terms,num_blocks,block_size,context_size,iterations):

    ansatz = HCGOAnsatz(num_blocks,block_size,context_size)
    shadow_engine = CompressedShadow(context_size,num_snapshots=200)
    contraction_engine = PauliContraction()

    print(f"Starting Training: {num_blocks} blocks, Block Size={block_size}, Context={context_size}")

    all_gradient_norms = []

    for it in range(iterations):
        iteration_gradient_norms = []

        #Backward Pass (PUSH Operators)
        ops_from_future = [None]*num_blocks
        current_accum = SparsePauliOp(["I"*context_size],coeffs=[0.0])

        for i in reversed(range(num_blocks)):
            ops_from_future[i] = current_accum
            U_circ = ansatz.get_cell_circuit(i,ansatz.params[i])
            H_loc = global_hamiltonian_terms[i]
            current_accum = contraction_engine.conjugate_and_trace(current_accum,H_loc,U_circ,context_size,block_size)

        #Forward Pass (Local Optimization)
        current_qcr_circuit = QuantumCircuit(context_size) #Starting with the QCR in |0...0>

        total_energy_est = 0.0

        for i in range(num_blocks):
            shadow_data = shadow_engine.collect_shadows(current_qcr_circuit) #Collect the shadows of rho_{i-1}

            #Prepare the objective function
            ops_target = ops_from_future[i]
            h_loc = global_hamiltonian_terms[i]

            term1 = SparsePauliOp(["I"*block_size]).tensor(ops_target)
            term2 = h_loc.tensor(SparsePauliOp(["I"*context_size]))
            O_total = term1 + term2

            def objective(p_trial):
                u_circ = ansatz.cells[i].assign_parameters(p_trial)
                u_op = Operator(u_circ)

                op_rot = u_op.adjoint().compose(O_total).compose(u_op) #Rotate the observable backwards by U
                op_rot_sparse = SparsePauliOp.from_operator(op_rot)
                return shadow_engine.estimate_expectation(shadow_data, op_rot_sparse)

            #A simple gradient descent
            curr_p = ansatz.params[i]
            grad = np.zeros_like(curr_p)
            shift = np.pi/2 #Parameter shift(approx value for demo)
            base_val = objective(curr_p)

            for p_idx in range(len(curr_p)):
                p_plus = curr_p.copy(); p_plus[p_idx] += shift
                p_minus = curr_p.copy(); p_minus[p_idx] -= shift #Actual experimentation values depended
                grad[p_idx] = 0.5*(objective(p_plus) - objective(p_minus))

            gradient_norm = np.linalg.norm(grad)
            iteration_gradient_norms.append(gradient_norm)
            ansatz.params[i] -= 0.1 * grad
            new_val = objective(ansatz.params[i])

            if i == 0: total_energy_est = new_val

            #Update the QCR circuit for the next step
            #Append U_i, then reset the Block wires to 0 for the next step
            #Map the (k+b) output back to k input for next step
            full_step = QuantumCircuit(context_size + block_size)
            full_step.append(current_qcr_circuit, range(context_size))
            full_step.append(ansatz.get_cell_circuit(i,ansatz.params[i]),range(context_size + block_size))

            #Pure state approximation
            sv = Statevector(full_step)
            rho_next = partial_trace(sv,list(range(context_size,context_size + block_size))) #Project to the dominant eigenvector of rho_next
            evals, evecs = np.linalg.eigh(rho_next.data)
            psi_approx = evecs[:, -1] #Taking the largest eigenvector

            new_qcr = QuantumCircuit(context_size)
            new_qcr.initialize(psi_approx, range(context_size))
            current_qcr_circuit = new_qcr

        all_gradient_norms.append(iteration_gradient_norms)
    return ansatz.params, all_gradient_norms

In [121]:
def generate_local_hamiltonian(block_size: int)->SparsePauliOp:
#Dynamically constructs a SparsePauliOp representing a 1D Ising-like local Hamiltonian for a given block_size.

    pauli_labels = []
    coeffs = []

    for i in range(block_size - 1):
        term_list = ['I'] * block_size
        term_list[block_size - 1 - i] = 'Z'
        term_list[block_size - 1 - (i + 1)] = 'Z'
        pauli_labels.append("".join(term_list))
        coeffs.append(1.0)

    for i in range(block_size):
        term_list = ['I'] * block_size
        term_list[block_size - 1 - i] = 'X'
        pauli_labels.append("".join(term_list))
        coeffs.append(0.5)

    return SparsePauliOp(pauli_labels, coeffs).simplify()

In [129]:
def analyze_hcgo_mitigation(num_qubits: List[int],block_size: int,context_size: int,iterations: int):
#HCGO simulation across different system sizes and numbers of HCGO blocks. Aggregates and returns the variance of local gradient norms.

    hcgo_results = {}

    for total_qubits in num_qubits:
        hcgo_results[total_qubits] = {}
        num_hcgo_blocks = total_qubits//block_size
        if total_qubits % block_size != 0: #Ensure the number of blocks is integer
            print(f"Warning: {total_qubits} is not perfectly divisible by block_size {block_size}. Skipping...")
            continue

        print(f"\n--- Analyzing HCGO: Total Qubits={total_qubits}, Num Blocks={num_hcgo_blocks} ---")

        hamiltonian_terms = [generate_local_hamiltonian(block_size) for _ in range(num_hcgo_blocks)] #Generate local Hamiltonians for all blocks

        #Execute HCGO training
        _, hcgo_gradient_norms = run_hcgo_training(global_hamiltonian_terms=hamiltonian_terms,num_blocks=num_hcgo_blocks,block_size=block_size,
            context_size=context_size,iterations=iterations)

        #Calculate the variance for each block across all iterations
        if hcgo_gradient_norms:
            gradients_per_block = list(map(list, zip(*hcgo_gradient_norms)))
            block_variances = [np.var(block_norms) for block_norms in gradients_per_block]
        else:
            block_variances = [0.0] * num_hcgo_blocks # Handle case with no gradients

        hcgo_results[total_qubits][num_hcgo_blocks] = block_variances
        print(f"HCGO Gradient Variances for {total_qubits} qubits, {num_hcgo_blocks} blocks: {block_variances}")

    return hcgo_results

In [137]:
print("Executing HCGO mitigation analysis for various system sizes")
num_qubits_list = [4,6,8] #Total number of qubits in the system
block_size = 2 #Number of qubits per block
context_size = 2 #Number of qubits in the Quantum Context Register
iterations = 5 #Number of training iterations

hcgo_analysis_results = analyze_hcgo_mitigation(num_qubits=num_qubits_list,block_size=block_size,context_size=context_size,
    iterations=iterations)

#print("Analysis complete")

In [131]:
def plot_hcgo_analysis_results(hcgo_analysis_results):

    if not hcgo_analysis_results:
        print("No analysis results to plot!")
        return

    fig, ax = plt.subplots(figsize=(10, 6))

    total_qubits_values = sorted(hcgo_analysis_results.keys())
    mean_norms = []
    num_blocks_labels = []

    for total_qubits in total_qubits_values:
        for num_blocks, mean_norm in hcgo_analysis_results[total_qubits].items():
            mean_norms.append(mean_norm)
            num_blocks_labels.append(f'{total_qubits} qubits ({num_blocks} blocks)')

    ax.bar(num_blocks_labels,mean_norms,color='skyblue')
    ax.set_xlabel('Total Qubits/Number of Blocks')
    ax.set_ylabel('Mean L2 Norm of Local Gradient')
    ax.set_title('Mean Local Gradient Norms')
    ax.tick_params(axis='x',rotation=45)
    plt.yscale('log')
    plt.tight_layout()
    plt.show()

In [133]:
def plot_comparative_gradients_line_graph(barren_plateau_results,hcgo_analysis_results,num_layers_bp,block_size_hcgo=2):

    if not barren_plateau_results and not hcgo_analysis_results:
        print("No results to plot for comparison.")
        return

    fig, ax = plt.subplots(figsize=(12, 7))

    bp_qubit_counts = sorted(barren_plateau_results.keys())
    hcgo_qubit_counts = sorted(hcgo_analysis_results.keys())
    qubit_counts_common = sorted(list(set(bp_qubit_counts) & set(hcgo_qubit_counts)))

    if not qubit_counts_common:
        print("No common qubit counts found for comparison.")
        return

    colors = plt.cm.viridis(np.linspace(0, 1, len(qubit_counts_common)))
    markers = ['o', 's', '^', 'D']

    for i, n_qubits in enumerate(qubit_counts_common):
        #Barren Plateau data
        bp_variances_for_qubit = [barren_plateau_results[n_qubits][l]['variance'] for l in num_layers_bp if l in barren_plateau_results[n_qubits]]
        bp_layers_for_qubit = [l for l in num_layers_bp if l in barren_plateau_results[n_qubits]]

        if bp_variances_for_qubit:
            ax.plot(bp_layers_for_qubit, bp_variances_for_qubit, marker=markers[i % len(markers)], linestyle='-', linewidth=2,
                    color=colors[i], label=f'{n_qubits} Qubits (Unmitigated BP Variance)')

        #HCGO (Mitigated BP) data: Plot layer-wise variance for each block
        num_hcgo_blocks_for_qubit = n_qubits // block_size_hcgo

        if n_qubits in hcgo_analysis_results and num_hcgo_blocks_for_qubit in hcgo_analysis_results[n_qubits]:
            hcgo_block_variances = hcgo_analysis_results[n_qubits][num_hcgo_blocks_for_qubit]
            hcgo_block_layers = np.arange(1, num_hcgo_blocks_for_qubit + 1) # Represent blocks as layers

            ax.plot(hcgo_block_layers, hcgo_block_variances, marker=markers[i % len(markers)], linestyle='--',
                    linewidth=2, color=colors[i], label=f'{n_qubits} Qubits (HCGO Mitigated Block Variance)')

    ax.set_xlabel('Number of Layers / HCGO Blocks') # Updated X-axis label
    ax.set_ylabel('Gradient Variance (log)')
    ax.set_title('Comparative Gradient Analysis: Barren Plateaus vs HCGO Mitigation (Variance)') # Updated title
    ax.set_yscale('log')
    ax.legend(loc='upper right',bbox_to_anchor=(1.0, 1.0))
    ax.grid(True,which="both",ls="--",c='0.7',alpha=0.6)
    fig.tight_layout()
    plt.show()

In [136]:
num_qubits_bp=[4,6,8]
num_layers_bp=[2,4,8,12,16]
num_samples_bp=50

results = analyze_barren_plateaus(num_qubits_bp, num_layers_bp, num_samples_bp, entanglement='linear') #Example
block_size_for_hcgo_plot = 2 #Example

plot_comparative_gradients_line_graph(results,hcgo_analysis_results,num_layers_bp,block_size_hcgo=block_size_for_hcgo_plot)