In [None]:
from qiskit import QuantumCircuit, qasm3
from quantum_chip import QuantumChip
from hardware import HardwareParams
from qrmove_compiler import QRMoveCompiler
from utils import get_quantum_circuit
import math
import signal

In [None]:
# Default Settings
# quantum_alg: bv
# MRP_CP_ratio: 10
# extra_qubit_ratio: 15%
# qubit_num: 30


class TimeoutError(Exception):
    pass


def timeout_handler(signum, frame):
    raise TimeoutError("Function call timed out")


def experiment(quantum_alg, qubit_num, extra_qubit_num, MRP_CP_ratio):
    """
    quantum_alg: 量子算法
    qubit_num: 量子比特数量
    extra_qubit_ratio: 额外比特比例
    MRP_CP_ratio: MRP/CP 比例
    """
    # 设置信号处理器和超时时间（5分钟）
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(5 * 60)

    try:
        quantum_circuit: QuantumCircuit = get_quantum_circuit(quantum_alg, qubit_num)
        quantum_chip = QuantumChip("square", 50)
        hardware_param = HardwareParams(
            time_1q=300.0,  # 单比特门时间
            time_2q=300.0,  # 双比特门时间
            time_meas=300 * MRP_CP_ratio,  # 测量时间
            time_reset=0,  # 重置时间
        )
        qmc = QRMoveCompiler(
            quantum_circuit,
            quantum_chip,
            hardware_param,
            math.ceil(extra_qubit_num),
        )
        (
            start_depth,
            start_width,
            qr_map_depth,
            qr_map_width,
            qr_move_depth,
            qr_move_width,
        ) = qmc.compile_program()

        # 取消警报
        signal.alarm(0)
        return (
            start_depth,
            start_width,
            qr_map_depth,
            qr_map_width,
            qr_move_depth,
            qr_move_width,
        )

    except (TimeoutError, Exception) as e:
        # 取消警报以防万一
        signal.alarm(0)
        print(f"Experiment failed with error: {e}")
        return 0, 0, 0, 0, 0, 0

In [None]:
def experiment_var_extra_ratio_alg():
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from functools import partial
    
    # 存储每个参数组合的任务结果
    all_tasks = []
    
    # 创建所有参数组合
    for alg in ["bv", "vqa", "qaoa", "qft", "mul", "rd", "xor"]:
        for extra_ratio in [0.1, 0.2, 0.3]:
            qubit_num = 30
            all_tasks.append((alg, extra_ratio, qubit_num))
    
    # 为每个参数组合执行50次实验
    def run_experiments_for_params(alg, extra_ratio, qubit_num):
        results = []
        for _ in range(50):
            (
                start_depth,
                start_width,
                qr_map_depth,
                qr_map_width,
                qr_move_depth,
                qr_move_width,
            ) = experiment(alg, qubit_num, extra_ratio * qubit_num, 10)
            
            # 如果实验成功（不是全零结果），则添加到结果列表中
            if not (
                start_depth == 0
                and start_width == 0
                and qr_map_depth == 0
                and qr_map_width == 0
                and qr_move_depth == 0
                and qr_move_width == 0
            ):
                results.append(
                    (
                        start_depth,
                        start_width,
                        qr_map_depth,
                        qr_map_width,
                        qr_move_depth,
                        qr_move_width,
                    )
                )
        return alg, extra_ratio, qubit_num, results
    
    # 使用线程池并发执行所有实验
    results_dict = {}
    with ThreadPoolExecutor(max_workers=96) as executor:
        # 提交所有任务
        future_to_params = {
            executor.submit(run_experiments_for_params, alg, extra_ratio, qubit_num): (alg, extra_ratio, qubit_num)
            for alg, extra_ratio, qubit_num in all_tasks
        }
        
        # 收集结果
        for future in as_completed(future_to_params):
            alg, extra_ratio, qubit_num, valid_results = future.result()
            results_dict[(alg, extra_ratio)] = valid_results
    
    # 处理和打印结果
    for alg in ["bv", "vqa", "qaoa", "qft", "mul", "rd", "xor"]:
        for extra_ratio in [0.1, 0.2, 0.3]:
            qubit_num = 30
            valid_results = results_dict.get((alg, extra_ratio), [])
            
            print(
                f"-- 参数: {alg} 算法, 额外比特比例 {extra_ratio}, 量子比特数量 {qubit_num} --"
            )
            
            # 计算平均值
            if valid_results:
                avg_start_depth = sum(r[0] for r in valid_results) / len(valid_results)
                avg_start_width = sum(r[1] for r in valid_results) / len(valid_results)
                avg_qr_map_depth = sum(r[2] for r in valid_results) / len(valid_results)
                avg_qr_map_width = sum(r[3] for r in valid_results) / len(valid_results)
                avg_qr_move_depth = sum(r[4] for r in valid_results) / len(
                    valid_results
                )
                avg_qr_move_width = sum(r[5] for r in valid_results) / len(
                    valid_results
                )

                print(
                    f"平均值: {avg_start_depth}, {avg_start_width}, {avg_qr_map_depth}, {avg_qr_map_width}, {avg_qr_move_depth}, {avg_qr_move_width}"
                )
                print(f"有效实验次数: {len(valid_results)}")
            else:
                print("所有实验都失败了，无法计算平均值")

experiment_var_extra_ratio_alg()

In [None]:
def experiment_var_qubit_num():
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    # 存储每个参数组合的任务
    all_tasks = []
    
    # 创建所有参数组合
    qubit_nums = [10, 20, 30, 40, 50]
    algorithms = ["bv", "vqa", "qaoa", "qft", "mul", "rd", "xor"]
    
    for qubit_num in qubit_nums:
        for alg in algorithms:
            MRP_CP_ratio = 10
            extra_qubit_ratio = 0.15
            all_tasks.append((alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio))
    
    # 为每个参数组合执行50次实验
    def run_experiments_for_params(alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio):
        results = []
        for _ in range(50):
            (
                start_depth,
                start_width,
                qr_map_depth,
                qr_map_width,
                qr_move_depth,
                qr_move_width,
            ) = experiment(alg, qubit_num, extra_qubit_ratio * qubit_num, MRP_CP_ratio)
            
            # 如果实验成功（不是全零结果），则添加到结果列表中
            if not (
                start_depth == 0
                and start_width == 0
                and qr_map_depth == 0
                and qr_map_width == 0
                and qr_move_depth == 0
                and qr_move_width == 0
            ):
                results.append(
                    (
                        start_depth,
                        start_width,
                        qr_map_depth,
                        qr_map_width,
                        qr_move_depth,
                        qr_move_width,
                    )
                )
        return alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio, results
    
    # 使用线程池并发执行所有实验
    results_dict = {}
    with ThreadPoolExecutor(max_workers=96) as executor:
        # 提交所有任务
        future_to_params = {
            executor.submit(run_experiments_for_params, alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio): 
            (alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio)
            for alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio in all_tasks
        }
        
        # 收集结果
        for future in as_completed(future_to_params):
            alg, qubit_num, MRP_CP_ratio, extra_qubit_ratio, valid_results = future.result()
            results_dict[(alg, qubit_num)] = valid_results
    
    # 处理和打印结果
    for qubit_num in qubit_nums:
        for alg in algorithms:
            MRP_CP_ratio = 10
            extra_qubit_ratio = 0.15
            valid_results = results_dict.get((alg, qubit_num), [])
            
            print(
                f"-- 参数: {alg} 算法, 额外比特比例 {extra_qubit_ratio}, 量子比特数量 {qubit_num} --"
            )
            
            # 计算平均值
            if valid_results:
                avg_start_depth = sum(r[0] for r in valid_results) / len(valid_results)
                avg_start_width = sum(r[1] for r in valid_results) / len(valid_results)
                avg_qr_map_depth = sum(r[2] for r in valid_results) / len(valid_results)
                avg_qr_map_width = sum(r[3] for r in valid_results) / len(valid_results)
                avg_qr_move_depth = sum(r[4] for r in valid_results) / len(valid_results)
                avg_qr_move_width = sum(r[5] for r in valid_results) / len(valid_results)

                print(
                    f"平均值: {avg_start_depth}, {avg_start_width}, {avg_qr_map_depth}, {avg_qr_map_width}, {avg_qr_move_depth}, {avg_qr_move_width}"
                )
                print(f"有效实验次数: {len(valid_results)}")
            else:
                print("所有实验都失败了，无法计算平均值")

In [None]:
def experiment_var_mrp_ratio():
    from concurrent.futures import ThreadPoolExecutor, as_completed

    # 存储每个参数组合的任务
    all_tasks = []

    # 创建所有参数组合
    mrp_ratios = [5, 10, 15, 20, 25]
    for MRP_CP_ratio in mrp_ratios:
        all_tasks.append(("bv", 50, MRP_CP_ratio, 0.15 * 50))

    # 为每个参数组合执行50次实验
    def run_experiments_for_params(alg, qubit_num, MRP_CP_ratio, extra_qubit_num):
        results = []
        for _ in range(50):
            (
                start_depth,
                start_width,
                qr_map_depth,
                qr_map_width,
                qr_move_depth,
                qr_move_width,
            ) = experiment(alg, qubit_num, extra_qubit_num, MRP_CP_ratio)

            # 如果实验成功（不是全零结果），则添加到结果列表中
            if not (
                start_depth == 0
                and start_width == 0
                and qr_map_depth == 0
                and qr_map_width == 0
                and qr_move_depth == 0
                and qr_move_width == 0
            ):
                results.append(
                    (
                        start_depth,
                        start_width,
                        qr_map_depth,
                        qr_map_width,
                        qr_move_depth,
                        qr_move_width,
                    )
                )
        return alg, qubit_num, MRP_CP_ratio, extra_qubit_num, results

    # 使用线程池并发执行所有实验
    results_dict = {}
    with ThreadPoolExecutor(max_workers=96) as executor:
        # 提交所有任务
        future_to_params = {
            executor.submit(
                run_experiments_for_params,
                alg,
                qubit_num,
                MRP_CP_ratio,
                extra_qubit_num,
            ): (alg, qubit_num, MRP_CP_ratio, extra_qubit_num)
            for alg, qubit_num, MRP_CP_ratio, extra_qubit_num in all_tasks
        }

        # 收集结果
        for future in as_completed(future_to_params):
            alg, qubit_num, MRP_CP_ratio, extra_qubit_num, valid_results = (
                future.result()
            )
            results_dict[MRP_CP_ratio] = valid_results

    # 处理和打印结果
    for MRP_CP_ratio in mrp_ratios:
        valid_results = results_dict.get(MRP_CP_ratio, [])

        print(f"-- 参数: bv 算法, 量子比特数量 50, MRP/CP 比例 {MRP_CP_ratio} --")

        # 计算平均值
        if valid_results:
            avg_start_depth = sum(r[0] for r in valid_results) / len(valid_results)
            avg_start_width = sum(r[1] for r in valid_results) / len(valid_results)
            avg_qr_map_depth = sum(r[2] for r in valid_results) / len(valid_results)
            avg_qr_map_width = sum(r[3] for r in valid_results) / len(valid_results)
            avg_qr_move_depth = sum(r[4] for r in valid_results) / len(valid_results)
            avg_qr_move_width = sum(r[5] for r in valid_results) / len(valid_results)

            print(
                f"平均值: {avg_start_depth}, {avg_start_width}, {avg_qr_map_depth}, {avg_qr_map_width}, {avg_qr_move_depth}, {avg_qr_move_width}"
            )
            print(f"有效实验次数: {len(valid_results)}")
        else:
            print("所有实验都失败了，无法计算平均值")


experiment_var_mrp_ratio()

In [None]:
def experiment_var_width_depth():
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    # 存储每个参数组合的任务
    all_tasks = []
    
    # 创建所有参数组合
    qubit_num = 30
    MRP_CP_ratio = 10
    alg = "bv"
    for extra_qubit in range(30):
        all_tasks.append((alg, qubit_num, extra_qubit, MRP_CP_ratio))
    
    # 为每个参数组合执行50次实验
    def run_experiments_for_params(alg, qubit_num, extra_qubit, MRP_CP_ratio):
        results = []
        for _ in range(50):
            (
                start_depth,
                start_width,
                qr_map_depth,
                qr_map_width,
                qr_move_depth,
                qr_move_width,
            ) = experiment(alg, qubit_num, extra_qubit, MRP_CP_ratio)
            
            # 如果实验成功（不是全零结果），则添加到结果列表中
            if not (
                start_depth == 0
                and start_width == 0
                and qr_map_depth == 0
                and qr_map_width == 0
                and qr_move_depth == 0
                and qr_move_width == 0
            ):
                results.append(
                    (
                        start_depth,
                        start_width,
                        qr_map_depth,
                        qr_map_width,
                        qr_move_depth,
                        qr_move_width,
                    )
                )
        return alg, qubit_num, extra_qubit, MRP_CP_ratio, results
    
    # 使用线程池并发执行所有实验
    results_dict = {}
    with ThreadPoolExecutor(max_workers=96) as executor:
        # 提交所有任务
        future_to_params = {
            executor.submit(run_experiments_for_params, alg, qubit_num, extra_qubit, MRP_CP_ratio): 
            (alg, qubit_num, extra_qubit, MRP_CP_ratio)
            for alg, qubit_num, extra_qubit, MRP_CP_ratio in all_tasks
        }
        
        # 收集结果
        for future in as_completed(future_to_params):
            alg, qubit_num, extra_qubit, MRP_CP_ratio, valid_results = future.result()
            results_dict[extra_qubit] = valid_results
    
    # 处理和打印结果
    for extra_qubit in range(30):
        valid_results = results_dict.get(extra_qubit, [])
        
        print(
            f"-- 参数: {alg} 算法, 量子比特数量 {qubit_num}, 额外量子比特数 {extra_qubit}, MRP/CP 比例 {MRP_CP_ratio} --"
        )
        
        # 计算平均值
        if valid_results:
            avg_start_depth = sum(r[0] for r in valid_results) / len(valid_results)
            avg_start_width = sum(r[1] for r in valid_results) / len(valid_results)
            avg_qr_map_depth = sum(r[2] for r in valid_results) / len(valid_results)
            avg_qr_map_width = sum(r[3] for r in valid_results) / len(valid_results)
            avg_qr_move_depth = sum(r[4] for r in valid_results) / len(valid_results)
            avg_qr_move_width = sum(r[5] for r in valid_results) / len(valid_results)

            print(
                f"平均值: {avg_start_depth}, {avg_start_width}, {avg_qr_map_depth}, {avg_qr_map_width}, {avg_qr_move_depth}, {avg_qr_move_width}"
            )
            print(f"有效实验次数: {len(valid_results)}")
        else:
            print("所有实验都失败了，无法计算平均值")

experiment_var_width_depth()