In [1]:
# State

In [37]:
import pickle
import time
from typing import List, Union
import datetime as dt
from pathlib import Path
import copy
from collections import namedtuple

import numpy as np
import pandas as pd

from quara.data_analysis import report
from quara.data_analysis.projected_gradient_descent_backtracking import (
    ProjectedGradientDescentBacktracking,
    ProjectedGradientDescentBacktrackingOption,
)
from quara.data_analysis.weighted_probability_based_squared_error import (
    WeightedProbabilityBasedSquaredError,
    WeightedProbabilityBasedSquaredErrorOption,
)
from quara.data_analysis.weighted_relative_entropy import (
    WeightedRelativeEntropy,
    WeightedRelativeEntropyOption,
)
from quara.objects.composite_system import CompositeSystem
from quara.objects.elemental_system import ElementalSystem
from quara.objects.matrix_basis import get_normalized_pauli_basis
from quara.objects.povm import (
    Povm,
    get_x_povm,
    get_y_povm,
    get_z_povm,
)
from quara.objects.gate import Gate
from quara.objects.qoperation import QOperation
from quara.objects.state import State, get_z0_1q, get_z1_1q, get_x0_1q
from quara.protocol.qtomography.standard.standard_qst import StandardQst
from quara.protocol.qtomography.standard.linear_estimator import LinearEstimator
from quara.protocol.qtomography.standard.loss_minimization_estimator import (
    LossMinimizationEstimator,
)
from quara.protocol.qtomography.standard.projected_linear_estimator import (
    ProjectedLinearEstimator,
)

from quara.data_analysis.simulation import StandardQTomographySimulationSetting
from quara.data_analysis.generation_setting import (
    QOperationGenerationSetting,
    DepolarizedQOperationGenerationSetting,
)
from quara.data_analysis.simulation_check import StandardQTomographySimulationCheck

from quara.data_analysis.random_effective_lindbladian_generation_setting import (
    RandomEffectiveLindbladianGenerationSetting,
)

from quara.data_analysis import data_analysis
import quara.objects.qoperation_typical as qt

In [38]:
%reload_ext autoreload
%autoreload 2

# Object

In [39]:
import dataclasses

@dataclasses.dataclass
class NoiseSetting:
    qoperation_base: Union[QOperation, str]
    method: str
    para: bool

@dataclasses.dataclass
class TestSetting:
    true_object: NoiseSetting
    tester_objects: List[NoiseSetting]
    seed: int
    n_rep: int
    num_data: List[int]
    n_sample: int
    schedules: Union[str, List[List[int]]]
    case_names: List[str]
    estimators: list
    algo_list: List[tuple]
    loss_list: List[tuple]
    parametrizations: List[bool]
    c_sys: CompositeSystem

@dataclasses.dataclass
class Result:
    result_index: int
    simulation_setting: StandardQTomographySimulationSetting
    estimation_results: List["EstimationResult"]
    check_result: dict

@dataclasses.dataclass
class GenerationSettings:
    true_setting: QOperationGenerationSetting
    tester_settings: List[QOperationGenerationSetting]

# Settings

In [4]:
e_sys = ElementalSystem(0, get_normalized_pauli_basis())
c_sys = CompositeSystem([e_sys])

In [5]:
# シミュレーションに必要な設定を作る
case_names = [
    "Linear (True)",
    "Linear (False)",
    "ProjectedLinear (True)",
    "ProjectedLinear (False)",
    "Maximum-Likelihood (True)",
    "Maximum-Likelihood (False)",
    "Least Squares (True)",
    "Least Squares (False)",
]

parametrizations = [True, False, True, False, True, False, True, False]

estimators = [
    LinearEstimator(),
    LinearEstimator(),
    ProjectedLinearEstimator(),
    ProjectedLinearEstimator(),
    LossMinimizationEstimator(),
    LossMinimizationEstimator(),
    LossMinimizationEstimator(),
    LossMinimizationEstimator(),
]

loss_list = [
    (None, None),
    (None, None),
    (None, None),
    (None, None),
    (WeightedRelativeEntropy(4), WeightedRelativeEntropyOption("identity")),
    (WeightedRelativeEntropy(8), WeightedRelativeEntropyOption("identity")),
    (
        WeightedProbabilityBasedSquaredError(4),
        WeightedProbabilityBasedSquaredErrorOption("identity"),
    ),
    (
        WeightedProbabilityBasedSquaredError(8),
        WeightedProbabilityBasedSquaredErrorOption("identity"),
    ),
]


def generate_pgdb_algo_option():
    return ProjectedGradientDescentBacktrackingOption(
        mode_stopping_criterion_gradient_descent="sum_absolute_difference_variable",
        num_history_stopping_criterion_gradient_descent=1,
    )


algo_list = [
    (None, None),
    (None, None),
    (None, None),
    (None, None),
    (ProjectedGradientDescentBacktracking(), generate_pgdb_algo_option()),
    (ProjectedGradientDescentBacktracking(), generate_pgdb_algo_option()),
    (ProjectedGradientDescentBacktracking(), generate_pgdb_algo_option()),
    (ProjectedGradientDescentBacktracking(), generate_pgdb_algo_option()),
]

In [27]:
# Name of True Object
true_object_noise_setting = NoiseSetting(
    qoperation_base=("povm", "z"),
    method="random_effective_lindbladian",
    para={
        "lindbladian_base": "identity",
        "strength_h_part": 0.1,
        "strength_k_part": 0.1,
    },
)

# true_object_noise_setting = NoiseSetting(
#   qoperation_base=("povm", "z"), method="depolarized", para={"error_rate": 0.1,},
#)

# Name of Tester Object
tester_names = [("state", name) for name in ["x0", "y0", "z0", "z1"]]

tester_object_noise_settings = [
    NoiseSetting(
        qoperation_base=name,
        method="random_effective_lindbladian",
        para={
            "lindbladian_base": "identity",
            "strength_h_part": 0.1,
            "strength_k_part": 0.1,
        },
    )
    for name in tester_names
]

# tester_object_noise_settings = [
#   NoiseSetting(qoperation_base=name, method="depolarized", para={"error_rate": 0.1,},)
#  for name in tester_names
#]

# Test Setting
test_setting_0 = TestSetting(
    true_object=true_object_noise_setting,
    tester_objects=tester_object_noise_settings,
    seed=777,
    n_sample=2,
    n_rep=10,
    num_data=[10, 100],
    schedules="all",  # 全ての組み合わせ
    case_names=case_names,
    estimators=estimators,
    algo_list=algo_list,
    loss_list=loss_list,
    parametrizations=parametrizations,
    c_sys=c_sys,
)

# TODO: 複数のtest_settingをtest_settingsに格納する
# 時間短縮のため、今は1つだけ格納する
test_settings = [test_setting_0]

# Functions

In [28]:
# 1つのtest_settingから、QOperationGenerationSettingのリストを作る
def to_generation_setting(noise_setting: NoiseSetting, c_sys: CompositeSystem) -> "GenerationSetting":
    # TODO: 他に良い方法がないか検討
    name2class_map = {
        "depolarized": DepolarizedQOperationGenerationSetting,
        "random_effective_lindbladian": RandomEffectiveLindbladianGenerationSetting,
    }

    if noise_setting.method in name2class_map:
        target_class = name2class_map[noise_setting.method]
    else:
        message = f"noise_setting.method='{noise_setting.method}' is not implemented."
        raise NotImplementedError(message)
    return target_class(
        qoperation_base=noise_setting.qoperation_base,
        c_sys=c_sys,
        **noise_setting.para,
    )


def to_generation_settings(test_setting: TestSetting) -> GenerationSettings:
    true_setting = to_generation_setting(test_setting.true_object, test_setting.c_sys)
    tester_settings = [
        to_generation_setting(setting, test_setting.c_sys) for setting in test_setting.tester_objects
    ]
    generation_settings = GenerationSettings(
        true_setting=true_setting, tester_settings=tester_settings
    )
    return generation_settings


def to_simulation_setting(
    test_setting: TestSetting,
    true_object: "QOperation",
    tester_objects: List["QOperation"],
    case_index: int,
) -> StandardQTomographySimulationSetting:
    return StandardQTomographySimulationSetting(
        name=test_setting.case_names[case_index],
        estimator=test_setting.estimators[case_index],
        loss=test_setting.loss_list[case_index][0],
        loss_option=test_setting.loss_list[case_index][1],
        algo=test_setting.algo_list[case_index][0],
        algo_option=test_setting.algo_list[case_index][1],
        true_object=true_object,
        tester_objects=tester_objects,
        n_rep=test_setting.n_rep,
        seed=test_setting.seed,
        num_data=test_setting.num_data,
        schedules=test_setting.schedules,
    )

In [29]:
from quara.protocol.qtomography.standard.standard_qst import StandardQst
from quara.protocol.qtomography.standard.standard_qpt import StandardQpt
from quara.protocol.qtomography.standard.standard_povmt import StandardPovmt


def generate_qtomography(
    sim_setting: StandardQTomographySimulationSetting,
    para: bool,
    eps_proj_physical: float,
) -> "StandardQTomography":
    # TrueObjectに応じて、適切なQTomographyを生成する
    true_object = sim_setting.true_object
    tester_objects = sim_setting.tester_objects
    seed = sim_setting.seed

    if type(true_object) == State:
        return StandardQst(
            tester_objects,
            on_para_eq_constraint=para,
            seed=seed,
            eps_proj_physical=eps_proj_physical,
        )
    if type(true_object) == Povm:
        return StandardPovmt(
            tester_objects,
            on_para_eq_constraint=para,
            seed=seed,
            eps_proj_physical=eps_proj_physical,
            num_outcomes=len(true_object.vecs)
        )
    if type(true_object) == Gate:
        states = [t for t in tester_objects if type(t) == State]
        povms = [t for t in tester_objects if type(t) == Povm]

        return StandardQpt(
            states=states,
            povms=povms,
            on_para_eq_constraint=para,
            seed=seed,
            eps_proj_physical=eps_proj_physical,
        )
    message = f"type of sim_setting.true_object must be State, Povm, or Gate, not {type(true_object)}"
    print(f"{sim_setting.true_object}")
    raise TypeError(message)

In [30]:
def result2dict(result: Result) -> dict:
    result_dict = dict(
        test_setting_index=result.result_index["test_setting_index"],
        sample_index=result.result_index["sample_index"],
        case_index=result.result_index["case_index"],
        name=result.simulation_setting.name,
        total_result=result.check_result["total_result"],
    )

    def _make_warning_text(r):
        possibly_ok = r["detail"]["possibly_ok"]
        to_be_checked = r["detail"]["to_be_checked"]
        warning_text = ""
        if not possibly_ok:
            warning_text = (
                f"Consistency: possibly_ok={possibly_ok}, to_be_checked={to_be_checked}"
            )
        return warning_text

    check_result = {}
    warning_text = ""

    for r in result.check_result["results"]:
        check_result[r["name"]] = r["result"]
        if r["name"] == "Consistency":
            warning_text += _make_warning_text(r)

    result_dict.update(check_result)
    result_dict["warning"] = warning_text

    return result_dict

In [31]:
def write_results(results: List[Result], dir_path: str) -> None:
    dir_path = Path(dir_path)
    dir_path.mkdir(parents=True, exist_ok=True)
    path = dir_path / "check_result.csv"

    result_dict_list = [result2dict(r) for r in results]
    sample_result_df = pd.DataFrame(result_dict_list)
    sample_result_df.to_csv(path, index=None)

    print(f"Completed to write csv. {path}")


def write_result_case_unit(result: namedtuple, root_dir: str) -> None:
    test_setting_index = result.result_index["test_setting_index"]
    sample_index = result.result_index["sample_index"]
    case_index = result.result_index["case_index"]

    # Save all
    dir_path = Path(f"{root_dir}/{test_setting_index}/{sample_index}")
    dir_path.mkdir(parents=True, exist_ok=True)
    path = dir_path / f"case_{case_index}_result.pickle"
    with open(path, "wb") as f:
        pickle.dump(result, f)

    check_result = result.check_result
    path = dir_path / f"case_{case_index}_check_result.json"
    with open(path, "w") as f:
        json.dump(check_result, f, ensure_ascii=False, indent=4, separators=(",", ": "))


def write_result_sample_unit(results: List[namedtuple], root_dir: str) -> None:
    test_setting_index = results[0].result_index["test_setting_index"]
    sample_index = results[0].result_index["sample_index"]
    dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index)

    write_results(results, dir_path)


def write_result_test_setting_unit(results: List[namedtuple], root_dir: str) -> None:
    test_setting_index = results[0].result_index["test_setting_index"]
    dir_path = Path(root_dir) / str(test_setting_index)

    write_results(results, dir_path)


def write_pdf_report(results: List[Result], root_dir: str) -> None:
    test_setting_index = results[0].result_index["test_setting_index"]
    sample_index = results[0].result_index["sample_index"]

    dir_path = Path(root_dir) / str(test_setting_index) / str(sample_index)
    dir_path.mkdir(parents=True, exist_ok=True)
    path = dir_path / f"{test_setting_index}_{sample_index}_quara_report.pdf"

    estimation_results_list = [r.estimation_results for r in results]
    sim_settings = [r.simulation_setting for r in results]

    report.export_report(path, estimation_results_list, sim_settings)

In [32]:
def execute_simulation_case_unit(
    test_setting,
    true_object,
    tester_objects,
    case_index: int,
    sample_index: int,
    test_setting_index: int,
    root_dir: str,
) -> Result:
    # Generate QTomographySimulationSetting
    sim_setting = to_simulation_setting(
        test_setting, true_object, tester_objects, case_index
    )
    print(f"Case {case_index}: {sim_setting.name}")

    def _copy_sim_setting(source):
        return StandardQTomographySimulationSetting(
            name=source.name,
            true_object=source.true_object,
            tester_objects=source.tester_objects,
            estimator=source.estimator,
            loss=copy.deepcopy(source.loss),
            loss_option=source.loss_option,
            algo=copy.deepcopy(source.algo),
            algo_option=source.algo_option,
            seed=source.seed,
            n_rep=source.n_rep,
            num_data=source.num_data,
            schedules=source.schedules,
        )

    org_sim_setting = _copy_sim_setting(sim_setting)

    # QTomographyの作成
    qtomography = generate_qtomography(
        sim_setting,
        para=test_setting.parametrizations[case_index],
        eps_proj_physical=1e-13,
    )

    # 実行
    estimation_results = data_analysis.execute_simulation(
        qtomography=qtomography, simulation_setting=sim_setting
    )

    # 自動テスト
    sim_check = StandardQTomographySimulationCheck(sim_setting, estimation_results)
    check_result = sim_check.execute_all(show_detail=False, with_detail=True)

    # 結果の表示
    start_red = "\033[31m"
    end_color = "\033[0m"
    print(f"Total Result: {start_red}NG{end_color}")

    result_index = dict(
        test_setting_index=test_setting_index,
        sample_index=sample_index,
        case_index=case_index,
    )

    # 結果の格納
    result = Result(
        result_index=result_index,
        simulation_setting=org_sim_setting,
        estimation_results=estimation_results,
        check_result=check_result,
    )
    # Save
    write_result_case_unit(result, root_dir=root_dir)
    return result

In [33]:
def execute_simulation_sample_unit(
    test_setting,
    generation_settings,
    test_setting_index,
    sample_index,
    root_dir,
    pdf_mode: str = "only_ng",
) -> List[Result]:
    # sampleの生成
    true_object = generation_settings.true_setting.generate()
    true_object = true_object[0] if type(true_object) == tuple else true_object
    tester_objects = [
        tester_setting.generate() 
        for tester_setting in generation_settings.tester_settings
    ]
    tester_objects = [tester[0] if type(tester) == tuple else tester for tester in tester_objects]
    
    # TODO: remove codes for debugging
    # ランダムに生成されたコードがフィジカルかどうかを確認する
    # print(f"true_object = {true_object.is_()}")
    # print(true_object)
    # print(f"tester_object")
    # for t in tester_objects:
    #    print("=================")
    #    print(t.is_physical())
    #    print(t)

    results = []
    case_n = len(test_setting.case_names)

    for case_index in range(case_n):
        result = execute_simulation_case_unit(
            test_setting,
            true_object=true_object,
            tester_objects=tester_objects,
            case_index=case_index,
            sample_index=sample_index,
            test_setting_index=test_setting_index,
            root_dir=root_dir,
        )
        results.append(result)

    # Save
    write_result_sample_unit(results, root_dir=root_dir)

    # Save PDF
    if pdf_mode == "all":
        write_pdf_report(results, root_dir)
    elif pdf_mode == "only_ng":
        total_results = [r.check_result["total_result"] for r in results]
        print(f"total_result={np.all(total_results)}")
        if not np.all(total_results):
            write_pdf_report(results, root_dir)
    elif pdf_mode == "none":
        pass
    else:
        message = "`pdf_mode` must be 'all', 'only_ng', or 'none'."
        raise ValueError(message)

    return results


def execute_simulation_test_setting_unit(
    test_setting, dir_path, pdf_mode: str = "only_ng"
) -> List[Result]:
    generation_settings = to_generation_settings(test_setting)
    n_sample = test_setting.n_sample
    results = []

    for sample_index in range(n_sample):
        sample_results = execute_simulation_sample_unit(
            test_setting,
            generation_settings,
            test_setting_index,
            sample_index,
            root_dir,
            pdf_mode=pdf_mode,
        )
        results += sample_results

    # 保存（test_index単位）
    write_result_test_setting_unit(results, dir_path)
    return results

In [34]:
def print_summary(results: List[Result], elapsed_time: float) -> None:
    result_dict_list = [result2dict(r) for r in results]
    result_df = pd.DataFrame(result_dict_list)
    case_n = result_df.shape[0]
    ok_n = result_df[result_df["total_result"]].shape[0]
    ng_n = result_df[~result_df["total_result"]].shape[0]
    warning_n = result_df[
        result_df["total_result"] & result_df["warning"].isnull()
    ].shape[0]

    def _to_h_m_s(sec) -> tuple:
        m, s = divmod(int(sec), 60)
        h, m = divmod(m, 60)
        return h, m, s

    h, m, s = _to_h_m_s(elapsed_time)
    time_text = "{:.1f}s ".format(elapsed_time)
    time_text += f"({h}:{str(m).zfill(2)}:{str(s).zfill(2)})"

    start_red = "\033[31m"
    start_green = "\033[32m"
    start_yellow = "\033[33m"
    end_color = "\033[0m"

    summary_text = f"{start_yellow}=============={end_color} "
    summary_text += f"{start_green}OK: {ok_n} cases{end_color} ({start_yellow}{warning_n} warnings{end_color}), "
    summary_text += f"{start_red}NG: {ng_n} cases{end_color} "
    summary_text += f"{start_yellow} in {time_text}s=============={end_color}"

    print(summary_text)

# Execute

In [35]:
root_dir = "result_randomREL_povm"
all_results = []
start = time.time()

for test_setting_index, test_setting in enumerate(test_settings):
    test_results = execute_simulation_test_setting_unit(
        test_setting, root_dir, pdf_mode="only_ng"
    )
    all_results += test_results

# Save
write_results(all_results, root_dir)

elapsed_time = time.time() - start
print_summary(all_results, elapsed_time)

  0%|          | 0/10 [00:00<?, ?it/s]

Case 0: Linear (True)





ValueError: the sum of probabilities must equal 1. the sum of probabilities is 1.063337042386149

In [50]:
start = time.time()
time.sleep(3)
elapsed_time = time.time() - start
print_summary(all_results, elapsed_time)

KeyError: 'total_result'