In [1]:
import numpy as np
from itertools import combinations, chain
from collections import Counter
from qiskit import Aer, QuantumCircuit, execute
from scipy.optimize import minimize
import matplotlib.pyplot as plt
import time

# 实验设置
m = 3
N = 2
n = 4
l = 2
p_ij = np.array([[1, 1, 2], [3, 1, 1], [2, 1, 1], [2, 2, 2]])
W = [3]
K1 = 100
P = 80
num_qubits = n * N + l
optimal_c_max = 3
shots_per_run = 1000
num_runs = 10
p_values = [3, 6]
b = [[int(bit) for bit in format(j, f'0{N}b')] for j in range(4)]

# 量子电路函数（同上，省略以节省空间，实际代码需包含）
def append_multi_rzz_term(qc, qubits, gamma):
    if len(qubits) == 0:
        return
    if len(qubits) == 1:
        qc.rz(2 * gamma, qubits[0])
        return
    target = qubits[-1]
    control_qubits = qubits[:-1]
    for control in control_qubits:
        qc.cx(control, target)
    qc.rz(2 * gamma, target)
    for control in reversed(control_qubits):
        qc.cx(control, target)

def append_zz_term(qc, qubit1, qubit2, gamma):
    qc.cx(qubit1, qubit2)
    qc.rz(gamma, qubit2)
    qc.cx(qubit1, qubit2)

def append_z_term(qc, qubit, gamma):
    qc.rz(gamma, qubit)

def append_mixer_term(qc, qubit, beta):
    qc.rx(2 * beta, qubit)

def get_cost_circuit1(gamma, qc):
    for h in range(l):
        coeff = -0.5 * 2 ** h
        qc.rz(-2 * gamma * coeff, n * N + h)
    return qc

def get_cost_circuit2(gamma, qc):
    for j in W:
        for i in range(n):
            all_subsets = list(chain.from_iterable(combinations(range(N), r) for r in range(N + 1)))
            for S in all_subsets:
                if not S:
                    coeff_S = K1 / (2 ** N)
                    qc.rz(2 * gamma * coeff_S, i * N)
                else:
                    coeff_S = K1 / (2 ** N)
                    for k in S:
                        coeff_S *= (1 - 2 * b[j][k])
                    if coeff_S != 0:
                        qubits = [i * N + k for k in S]
                        append_multi_rzz_term(qc, qubits, gamma * coeff_S)
    return qc

def get_cost_circuit3(gamma, qc):
    for j in range(m):
        for i in range(n):
            for i_prime in range(n):
                all_subsets_i = list(chain.from_iterable(combinations(range(N), r) for r in range(N + 1)))
                all_subsets_iprime = list(chain.from_iterable(combinations(range(N), r) for r in range(N + 1)))
                for S_i in all_subsets_i:
                    for S_iprime in all_subsets_iprime:
                        coeff_Si = 1
                        for k in S_i:
                            coeff_Si *= (1 - 2 * b[j][k])
                        coeff_Siprime = 1
                        for k in S_iprime:
                            coeff_Siprime *= (1 - 2 * b[j][k])
                        coeff = P * p_ij[i][j] * p_ij[i_prime][j] / (2 ** (2 * N)) * coeff_Si * coeff_Siprime
                        if coeff != 0:
                            qubits = [i * N + k for k in S_i] + [i_prime * N + k for k in S_iprime]
                            count = Counter(qubits)
                            qubits_final = [x for x in count if count[x] % 2 != 0]
                            if qubits_final:
                                append_multi_rzz_term(qc, qubits_final, gamma * coeff)
        for i in range(n):
            all_subsets_i = list(chain.from_iterable(combinations(range(N), r) for r in range(N + 1)))
            for S_i in all_subsets_i:
                coeff_Si = 1
                for k in S_i:
                    coeff_Si *= (1 - 2 * b[j][k])
                if coeff_Si != 0:
                    coeff = -2 * P * p_ij[i][j] / (2 ** N) * coeff_Si * (2 ** l - 1)
                    qubits_load = [i * N + k for k in S_i]
                    append_multi_rzz_term(qc, qubits_load, gamma * coeff)
                    for h in range(l):
                        coeff_h = -2 * P * p_ij[i][j] / (2 ** N) * coeff_Si * 2 ** h
                        qubits_h = qubits_load + [n * N + h]
                        append_multi_rzz_term(qc, qubits_h, gamma * coeff_h)
        for h in range(l):
            for h_prime in range(h, l):
                if h == h_prime:
                    coeff = P / 4 * (2 ** h) * (2 ** h)
                else:
                    coeff = P / 4 * (2 ** h) * (2 ** h_prime)
                    append_zz_term(qc, n * N + h, n * N + h_prime, 2 * gamma * coeff)
    return qc

def get_mixer_circuit(beta, qc):
    for i in range(num_qubits):
        append_mixer_term(qc, i, beta)
    return qc

# 成本函数
def cost_function(params, p):
    beta, gamma = params[:p], params[p:]
    qc = QuantumCircuit(num_qubits, num_qubits)
    qc.h(range(num_qubits))
    for i in range(p):
        get_cost_circuit1(gamma[i], qc)
        get_cost_circuit2(gamma[i], qc)
        get_cost_circuit3(gamma[i], qc)
        get_mixer_circuit(beta[i], qc)
    qc.measure(range(num_qubits), range(num_qubits))
    backend = Aer.get_backend('qasm_simulator')
    result = execute(qc, backend, seed_simulator=20, shots=shots_per_run).result()
    counts = result.get_counts(qc)
    total_counts = sum(counts.values())
    total_E = 0
    for bitstring, count in counts.items():
        s = np.zeros((n, N), dtype=int)
        z = np.zeros(l, dtype=int)
        for i in range(n):
            for k in range(N):
                s[i][k] = int(bitstring[n * N + l - 1 - (i * N + k)])
        for h in range(l):
            z[h] = int(bitstring[n * N + l - 1 - (n * N + h)])
        C_max = sum(2 ** h * z[h] for h in range(l))
        E = C_max
        for j in W:
            for i in range(n):
                M_i = sum(2 ** (N - 1 - k) * s[i][k] for k in range(N))
                f_binary_j = 1 if M_i == j else 0
                E += K1 * f_binary_j
        for j in range(m):
            load_j = 0
            for i in range(n):
                M_i = sum(2 ** (N - 1 - k) * s[i][k] for k in range(N))
                f_binary_j = 1 if M_i == j else 0
                load_j += p_ij[i][j] * f_binary_j
            alpha_s = max(0, load_j - C_max)
            E += P * alpha_s ** 2
        total_E += E * (count / total_counts)
    return total_E, counts, shots_per_run

# 计算 P_opt
def compute_p_opt(counts):
    total_counts = sum(counts.values())
    optimal_counts = 0
    for bitstring, count in counts.items():
        s = np.zeros((n, N), dtype=int)
        z = np.zeros(l, dtype=int)
        for i in range(n):
            for k in range(N):
                s[i][k] = int(bitstring[n * N + l - 1 - (i * N + k)])
        for h in range(l):
            z[h] = int(bitstring[n * N + l - 1 - (n * N + h)])
        C_max = sum(2 ** h * z[h] for h in range(l))
        penalty_unavailable = 0
        for j in W:
            for i in range(n):
                M_i = sum(2 ** (N - 1 - k) * s[i][k] for k in range(N))
                f_binary_j = 1 if M_i == j else 0
                penalty_unavailable += K1 * f_binary_j
        penalty_load = 0
        for j in range(m):
            load_j = 0
            for i in range(n):
                M_i = sum(2 ** (N - 1 - k) * s[i][k] for k in range(N))
                f_binary_j = 1 if M_i == j else 0
                load_j += p_ij[i][j] * f_binary_j
            alpha_s = max(0, load_j - C_max)
            penalty_load += P * alpha_s ** 2
        if C_max == optimal_c_max and penalty_unavailable == 0 and penalty_load == 0:
            optimal_counts += count
    return optimal_counts / total_counts if total_counts > 0 else 0

# 优化器
def optimize_qaoa(params, p):
    def objective(params):
        expectation, _, shots = cost_function(params, p)
        return expectation
    options = {'rhobeg': 0.01, 'maxiter': 3}
    result = minimize(objective, params, method='COBYLA', options=options)
    return result

# INTERP 初始化
def interpolate_parameters(prev_params, p_old, p_new, delta=0.1):
    if p_old == 0:
        return np.concatenate([np.random.uniform(0, np.pi, p_new), np.random.uniform(0, 2 * np.pi, p_new)])
    prev_beta = prev_params[:p_old]
    prev_gamma = prev_params[p_old:]
    new_beta = np.zeros(p_new)
    new_gamma = np.zeros(p_new)
    for i in range(p_new):
        t = i / (p_new - 1) if p_new > 1 else 0
        new_beta[i] = (1 - t) * prev_beta[0] + t * prev_beta[-1]
        new_gamma[i] = (1 - t) * prev_gamma[0] + t * prev_gamma[-1]
    new_beta[-1] += np.random.uniform(0, delta)
    new_gamma[-1] += np.random.uniform(0, delta)
    return np.concatenate([new_beta, new_gamma])

# Gantt 图
def plot_gantt(counts, p):
    max_count = max(counts.values())
    best_bitstring = max(counts, key=counts.get)
    s = np.zeros((n, N), dtype=int)
    z = np.zeros(l, dtype=int)
    for i in range(n):
        for k in range(N):
            s[i][k] = int(best_bitstring[n * N + l - 1 - (i * N + k)])
    for h in range(l):
        z[h] = int(best_bitstring[n * N + l - 1 - (n * N + h)])
    C_max = sum(2 ** h * z[h] for h in range(l))
    schedule = [[] for _ in range(m)]
    for i in range(n):
        M_i = sum(2 ** (N - 1 - k

) * s[i][k] for k in range(N))
        if M_i < m:
            schedule[M_i].append((i, p_ij[i][M_i]))
    fig, ax = plt.subplots()
    for j in range(m):
        start = 0
        for op, duration in schedule[j]:
            ax.broken_barh([(start, duration)], (j-0.4, 0.8), facecolors='blue')
            ax.text(start + duration/2, j, f'Op{op}', ha='center', va='center')
            start += duration
    ax.set_ylim(-0.5, m-0.5)
    ax.set_xlim(0, C_max + 1)
    ax.set_yticks(range(m))
    ax.set_yticklabels([f'M{j}' for j in range(m)])
    ax.set_xlabel('Time')
    ax.set_title(f'Random + COBYLA (p={p}, C_max={C_max})')
    plt.savefig(f'gantt_random_cobyla_p{p}.png')
    plt.close()

# 主实验
def run_experiment(p):
    p_opt_values = []
    shot_counts = []
    best_counts = None
    best_p_opt = -1
    start_time = time.time()
    for run in range(num_runs):
        min_energy = float('inf')
        init_point = np.array([])
        counts = {}
        total_shots = 0
        num_initial_sets = 5
        for k in range(num_initial_sets):
            if k == 0 and run == 0:
                init_point_temp = np.concatenate([np.random.uniform(0, np.pi, p), np.random.uniform(0, 2 * np.pi, p)])
            else:
                init_point_temp = interpolate_parameters(init_point, len(init_point)//2, p)
            result = optimize_qaoa(init_point_temp, p)
            energy, counts_temp, shots = cost_function(result.x, p)
            total_shots += shots + (num_initial_sets * shots_per_run if k == 0 else shots_per_run)
            p_opt = compute_p_opt(counts_temp)
            if p_opt > best_p_opt:
                best_p_opt = p_opt
                min_energy = energy
                init_point = result.x
                counts = counts_temp
                best_counts = counts_temp
        p_opt_values.append(best_p_opt)
        shot_counts.append(total_shots)
    end_time = time.time()
    mean_p_opt = np.mean(p_opt_values)
    std_p_opt = np.std(p_opt_values)
    mean_shots = np.mean(shot_counts)
    runtime = end_time - start_time
    print(f"Random + COBYLA, p={p}")
    print(f"Mean P_opt: {mean_p_opt:.4f}, Std P_opt: {std_p_opt:.4f}")
    print(f"Mean Shots: {mean_shots:.0f}")
    print(f"Runtime: {runtime:.2f} seconds")
    if best_counts:
        plot_gantt(best_counts, p)
    with open(f'results_random_cobyla_p{p}.txt', 'w') as f:
        f.write(f"Random + COBYLA, p={p}\n")
        f.write(f"Mean P_opt: {mean_p_opt:.4f}\n")
        f.write(f"Std P_opt: {std_p_opt:.4f}\n")
        f.write(f"Mean Shots: {mean_shots:.0f}\n")
        f.write(f"Runtime: {runtime:.2f} seconds\n")
    return mean_p_opt, std_p_opt, mean_shots



In [2]:

# 运行
results = []
for p in p_values:
    mean_p_opt, std_p_opt, mean_shots = run_experiment(p)
    results.append(('Random + COBYLA', p, mean_p_opt, std_p_opt, mean_shots))

  backend = Aer.get_backend('qasm_simulator')
  result = execute(qc, backend, seed_simulator=20, shots=shots_per_run).result()


Random + COBYLA, p=3
Mean P_opt: 0.0626, Std P_opt: 0.0137
Mean Shots: 14000
Runtime: 147.20 seconds
Random + COBYLA, p=6
Mean P_opt: 0.0656, Std P_opt: 0.0102
Mean Shots: 14000
Runtime: 263.51 seconds


: 