# 量子OXゲームソルバ

参考文献:
https://arxiv.org/abs/2503.21514

元実装:
https://github.com/mofukuru/OX_experiment.git


In [None]:
# gpu使用時にエラーが起きた場合は、pip uninstall qiskit-aer-gpu -yを実行してからpip install qiskit-aer-gpuを実行する
! pip install numpy torch tqdm qiskit qiskit-machine-learning qiskit-aer qiskit-ibm-runtime qiskit-algorithms qiskit-aer-gpu ipywidgets


Collecting qiskit
  Downloading qiskit-2.0.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting qiskit-machine-learning
  Downloading qiskit_machine_learning-0.8.2-py3-none-any.whl.metadata (13 kB)
Collecting qiskit-aer
  Downloading qiskit_aer-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.2 kB)
Collecting qiskit-ibm-runtime
  Downloading qiskit_ibm_runtime-0.39.0-py3-none-any.whl.metadata (21 kB)
Collecting qiskit-algorithms
  Downloading qiskit_algorithms-0.3.1-py3-none-any.whl.metadata (4.2 kB)
Collecting qiskit-aer-gpu
  Downloading qiskit_aer_gpu-0.15.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.3 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.

In [None]:
import os
import tqdm
import time
import random
import numpy as np
from copy import deepcopy
import ipywidgets as widgets
from IPython.display import display, clear_output

import torch
from torch import nn, optim
from torch.nn import functional as F

from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import RealAmplitudes, EfficientSU2, ZFeatureMap, ZZFeatureMap, PauliFeatureMap
from qiskit.quantum_info import SparsePauliOp
from qiskit_aer import AerSimulator
from qiskit_ibm_runtime import Estimator, Sampler, QiskitRuntimeService
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager


## qiskit machine learningがqiskitの最新バージョンに対応していないため疑似的に作成

In [None]:
# qiskit machine learningはqiskit ver2にまだ対応していないため、疑似的に作成

import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Function
from numbers import Integral
from typing import cast

from qiskit import QuantumCircuit
from qiskit.circuit import Parameter, ParameterVector
from qiskit.quantum_info import SparsePauliOp
from qiskit.result import QuasiDistribution
from qiskit_aer import AerSimulator
from qiskit_aer.primitives import SamplerV2 as Sampler, EstimatorV2 as Estimator

backend = AerSimulator()

# --- ユーティリティ関数 ---
def get_default_interpret(num_qubits):
    """
    SamplerQNNのデフォルトのinterpret関数。
    各ビット文字列の確率をそのまま出力として使用します。
    Args:
        num_qubits (int): 量子ビット数
        primitive_result (PrimitiveResult): Samplerの実行結果
    Returns:
        function: interpret関数
    """
    def interpret(primitive_result): # 引数名を primitive_result に変更
        output_len = 2**num_qubits
        batch_size = len(primitive_result) # PrimitiveResultの長さをバッチサイズとする
        outputs = np.zeros((batch_size, output_len))

        # PrimitiveResultの各ExperimentResultを処理
        for i, experiment_result in enumerate(primitive_result):
            quasi_dist = experiment_result.data.meas.get_counts()
            total_shots = sum(quasi_dist.values())
            probabilities = {k: v / total_shots for k, v in quasi_dist.items()}

            counts = QuasiDistribution(probabilities)
            counts = {k: v for k, v in counts.items() if int(k) < output_len}
            for b, v in counts.items():
                if isinstance(b, Integral):
                    b = (cast(int, b),)
                b = (i, *b)
                outputs[b] += v

        return outputs
    return interpret



# --- ScratchEstimatorQNN ---
class ScratchEstimatorQNN:
    """
    Estimatorの機能を持つQNNのスクラッチ実装。
    """
    def __init__(self, circuit: QuantumCircuit, observables: list[SparsePauliOp],
                 input_params: list[Parameter] = None, weight_params: list[Parameter] = None,
                 estimator: Estimator = None):
        """
        Args:
            circuit (QuantumCircuit): パラメータ化された量子回路。
            observables (list[SparsePauliOp]): 測定するオブザーバブルのリスト。
            input_params (list[Parameter], optional): 入力データに対応する回路パラメータ。
            weight_params (list[Parameter], optional): 学習可能な重みに対応する回路パラメータ。
            estimator (Estimator, optional): Qiskit Estimatorインスタンス。指定されない場合はデフォルトを使用。
        """
        self.circuit = circuit
        self.observables = observables if isinstance(observables, list) else [observables]
        self.input_params = input_params or []
        self.weight_params = weight_params or []
        self.estimator = estimator or Estimator() # デフォルトのEstimatorを使用

        self._param_order = self.input_params + self.weight_params

        self.circuit.measure_all()

    def _bind_parameters(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        入力と重みを回路パラメータにバインドするための値のリストを作成。
        バッチ処理を想定。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            list[list[float]]: バッチ内の各実行に対するパラメータ値のリスト。
        """
        batch_size = 1
        if inputs is not None:
            if inputs.ndim == 1:
                inputs = inputs.reshape(1, -1)
            batch_size = inputs.shape[0]

        if weights is not None:
            if weights.ndim == 1: # 単一の重みセットの場合、バッチ全体で共有
                weights_batch = np.tile(weights, (batch_size, 1))
            elif weights.ndim == 2 and weights.shape[0] == 1 and batch_size > 1: # (1, num_weights) でバッチサイズが大きい場合
                weights_batch = np.tile(weights, (batch_size, 1))
            elif weights.ndim == 2 and weights.shape[0] == batch_size:
                weights_batch = weights
            else:
                raise ValueError(f"重みの形状が無効です: {weights.shape}")
        else: # 重みがない場合
            weights_batch = np.empty((batch_size, 0))


        if inputs is None: # 入力がない場合
            inputs_batch = np.empty((batch_size, 0))
        else:
            inputs_batch = inputs

        if inputs_batch.shape[0] != weights_batch.shape[0] and inputs_batch.shape[0]!=0 and weights_batch.shape[0]!=0 :
             if inputs_batch.shape[0] == 1 : # (1, num_inputs) でバッチサイズが大きい場合
                inputs_batch = np.tile(inputs_batch, (weights_batch.shape[0],1))
             elif weights_batch.shape[0] == 1:
                weights_batch = np.tile(weights_batch, (inputs_batch.shape[0],1))
             else:
                raise ValueError(f"入力と重みのバッチサイズが一致しません: inputs {inputs_batch.shape[0]}, weights {weights_batch.shape[0]}")

        if inputs_batch.shape[1] != len(self.input_params):
            raise ValueError(f"入力データの次元 ({inputs_batch.shape[1]}) が入力パラメータ数 ({len(self.input_params)}) と一致しません。")
        if weights_batch.shape[1] != len(self.weight_params):
            raise ValueError(f"重みの次元 ({weights_batch.shape[1]}) が重みパラメータ数 ({len(self.weight_params)}) と一致しません。")

        # パラメータを正しい順序で結合
        # self._param_order = self.input_params + self.weight_params
        # parameter_values の各行が [input_vals, weight_vals] となるようにする

        # Qiskit Estimator はパラメータ名をキーとする辞書のリストか、
        # またはパラメータオブジェクトのリストに対応する数値のリストのリストを受け取る
        # ここでは後者 (list[list[float]]) を作成する

        all_parameter_values = []
        for i in range(max(inputs_batch.shape[0], weights_batch.shape[0])): # バッチサイズでループ
            current_params = {}
            if inputs is not None:
                for j, p_obj in enumerate(self.input_params):
                    current_params[p_obj] = inputs_batch[i, j]
            if weights is not None:
                for j, p_obj in enumerate(self.weight_params):
                    current_params[p_obj] = weights_batch[i, j]

            # self._param_order に従って値を並べる
            ordered_values = [current_params[p] for p in self._param_order]
            all_parameter_values.append(ordered_values)

        return all_parameter_values


    def forward(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        フォワードパス。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: 各オブザーバブルの期待値 (batch_size, num_observables)。
        """
        parameter_values = self._bind_parameters(inputs, weights)

        # Estimatorは複数の回路、オブザーバブル、パラメータセットを受け付ける
        # ここでは単一回路、複数オブザーバブル、複数パラメータセット
        num_observables = len(self.observables)
        batch_size = len(parameter_values)

        circuits_to_run = [self.circuit] * batch_size
        observables_to_run = [self.observables] * batch_size # 各パラメータセットに対して全てのオブザーバブルを評価

        pubs_list = []
        for i in range(batch_size):
        # Estimator.run() は job を返すので、 .result() を呼ぶ必要がある
            pub = (circuits_to_run[i], observables_to_run[i], parameter_values[i])
            pubs_list.append(pub)
        job = self.estimator.run(pubs_list)
        result = job.result()
        output_values = np.array([res.data.evs for res in result])

        return output_values

    def _parameter_shift_gradient_for_param(self, param_idx: int, param_values_base: list[float],
                                           is_weight_param: bool):
        """
        単一のパラメータに関する勾配をパラメータシフト則で計算。
        Args:
            param_idx (int): 勾配を計算するパラメータのインデックス (self.input_params または self.weight_params 内)。
            param_values_base (list[float]): 現在のパラメータ値 (self._param_order に従う)。
            is_weight_param (bool): Trueなら重みパラメータ、Falseなら入力パラメータ。
        Returns:
            np.ndarray: このパラメータに関する各オブザーバブルの勾配 (num_observables,)。
        """

        shift = np.pi / 2

        # シフト対象の Parameter オブジェクトを決定 (変更なし)
        if is_weight_param:
            target_param_obj = self.weight_params[param_idx]
        else:
            target_param_obj = self.input_params[param_idx]

        # self._param_order 内でのグローバルインデックスを特定 (変更なし)
        global_param_idx_in_order = -1
        for i, p_obj_in_order in enumerate(self._param_order):
            if p_obj_in_order == target_param_obj:
                global_param_idx_in_order = i
                break
        if global_param_idx_in_order == -1:
            raise ValueError(f"ターゲットパラメータ {target_param_obj} が self._param_order に見つかりません。")

        # シフトしたパラメータ値のリストを作成 (変更なし)
        params_plus_list = list(param_values_base)
        params_plus_list[global_param_idx_in_order] += shift

        params_minus_list = list(param_values_base)
        params_minus_list[global_param_idx_in_order] -= shift

        # --- ここから EstimatorV2 への変換 ---
        # Pubに渡すためのパラメータ値の辞書を作成
        # self._param_order には Parameter オブジェクトが正しい順序で格納されている想定
        param_values_dict_plus = dict(zip(self._param_order, params_plus_list))
        param_values_dict_minus = dict(zip(self._param_order, params_minus_list))

        # EstimatorV2 用の Pub を作成
        pub_plus = (
            self.circuit,
            self.observables,  # self.observables は SparsePauliOp のリスト
            param_values_dict_plus
        )
        pub_minus = (
            self.circuit,
            self.observables,
            param_values_dict_minus
        )

        # EstimatorV2 の run メソッドを呼び出し (Pub のリストを渡す)
        job_plus = self.estimator.run([pub_plus])  # 単一のPubでもリストで渡す
        job_minus = self.estimator.run([pub_minus]) # 単一のPubでもリストで渡す
        # --- EstimatorV2 への変換ここまで ---

        # 結果の取得 (前回の修正 .data.evs を使用する形であれば、V1/V2で互換性があることが多い)
        # 各jobは1つのPubの結果なので、result()[0]でExperimentResultにアクセス
        exp_vals_plus = job_plus.result()[0].data.evs
        exp_vals_minus = job_minus.result()[0].data.evs

        gradient = 0.5 * (np.array(exp_vals_plus) - np.array(exp_vals_minus))
        return gradient

    def backward_weights(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        重みに関する勾配（ヤコビアン）を計算。
        d(output_j) / d(weight_i) を各バッチサンプルについて計算。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: ヤコビアン (batch_size, num_observables, num_weight_params)。
        """
        if not self.weight_params:
            return np.array([]).reshape(inputs.shape[0] if inputs is not None else 1, len(self.observables), 0)

        param_values_batch = self._bind_parameters(inputs, weights)
        batch_size = len(param_values_batch)
        num_weight_params = len(self.weight_params)
        num_observables = len(self.observables)

        jacobians = np.zeros((batch_size, num_observables, num_weight_params))

        for i in range(batch_size): # バッチ内の各サンプルに対して
            current_param_values = param_values_batch[i]
            for j in range(num_weight_params): # 各重みパラメータに対して
                grad = self._parameter_shift_gradient_for_param(
                    param_idx=j,
                    param_values_base=current_param_values,
                    is_weight_param=True
                )
                jacobians[i, :, j] = grad

        return jacobians

    # backward_inputs も同様に実装可能 (is_weight_param=False とする)
    def backward_inputs(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        入力に関する勾配（ヤコビアン）を計算。
        d(output_j) / d(input_i) を各バッチサンプルについて計算。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: ヤコビアン (batch_size, num_outputs, num_input_params)。
        """
        if not self.input_params:
             return np.array([]).reshape(weights.shape[0] if weights is not None else 1, len(self.observables), 0)


        param_values_batch = self._bind_parameters(inputs, weights)
        batch_size = len(param_values_batch)
        num_input_params = len(self.input_params)
        num_observables = len(self.observables)

        jacobians = np.zeros((batch_size, num_observables, num_input_params))

        for i in range(batch_size): # バッチ内の各サンプルに対して
            current_param_values = param_values_batch[i]
            for j in range(num_input_params): # 各入力パラメータに対して
                grad = self._parameter_shift_gradient_for_param(
                    param_idx=j,
                    param_values_base=current_param_values,
                    is_weight_param=False # 入力パラメータの勾配
                )
                jacobians[i, :, j] = grad

        return jacobians

# --- ScratchSamplerQNN ---
class ScratchSamplerQNN:
    """
    Samplerの機能を持つQNNのスクラッチ実装。
    """
    def __init__(self, circuit: QuantumCircuit,
                 input_params: list[Parameter] = None, weight_params: list[Parameter] = None,
                 sampler: Sampler = None, interpret = None, output_shape = None):
        """
        Args:
            circuit (QuantumCircuit): パラメータ化された量子回路。
            input_params (list[Parameter], optional): 入力データに対応する回路パラメータ。
            weight_params (list[Parameter], optional): 学習可能な重みに対応する回路パラメータ。
            sampler (Sampler, optional): Qiskit Samplerインスタンス。指定されない場合はデフォルトを使用。
            interpret (callable, optional): Samplerの出力をQNNの出力に変換する関数。
                                           デフォルトは全ビット文字列の確率。
                                           (result) -> np.ndarray (batch_size, output_dim)
            output_shape (tuple, optional): interpret関数の出力の形状 (output_dim,)。
                                           interpretがNoneの場合や、形状が自明でない場合に指定。
        """
        self.circuit = circuit
        self.input_params = input_params or []
        self.weight_params = weight_params or []
        self.sampler = sampler or Sampler()

        if interpret is None:
            self.interpret = get_default_interpret(circuit.num_qubits)
            # デフォルトのinterpretは 2^num_qubits の長さのベクトルを返す
            self._output_dim = (2**circuit.num_qubits,) if output_shape is None else output_shape
        else:
            self.interpret = interpret
            if output_shape is None:
                raise ValueError("interpret関数を指定する場合、output_shapeも指定する必要があります。")
            self._output_dim = output_shape

        self._param_order = self.input_params + self.weight_params
        self.circuit.measure_all()


    def _bind_parameters(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """ ScratchEstimatorQNN と同じ実装 """
        batch_size = 1
        if inputs is not None:
            if inputs.ndim == 1:
                inputs = inputs.reshape(1, -1)
            batch_size = inputs.shape[0]

        if weights is not None:
            if weights.ndim == 1:
                weights_batch = np.tile(weights, (batch_size, 1))
            elif weights.ndim == 2 and weights.shape[0] == 1 and batch_size > 1:
                weights_batch = np.tile(weights, (batch_size, 1))
            elif weights.ndim == 2 and weights.shape[0] == batch_size:
                weights_batch = weights
            else:
                raise ValueError(f"重みの形状が無効です: {weights.shape}")
        else:
            weights_batch = np.empty((batch_size, 0))


        if inputs is None:
            inputs_batch = np.empty((batch_size, 0))
        else:
            inputs_batch = inputs

        if inputs_batch.shape[0] != weights_batch.shape[0] and inputs_batch.shape[0]!=0 and weights_batch.shape[0]!=0 :
             if inputs_batch.shape[0] == 1 :
                inputs_batch = np.tile(inputs_batch, (weights_batch.shape[0],1))
             elif weights_batch.shape[0] == 1:
                weights_batch = np.tile(weights_batch, (inputs_batch.shape[0],1))
             else:
                raise ValueError(f"入力と重みのバッチサイズが一致しません: inputs {inputs_batch.shape[0]}, weights {weights_batch.shape[0]}")


        if inputs_batch.shape[1] != len(self.input_params):
            raise ValueError(f"入力データの次元 ({inputs_batch.shape[1]}) が入力パラメータ数 ({len(self.input_params)}) と一致しません。")
        if weights_batch.shape[1] != len(self.weight_params):
            raise ValueError(f"重みの次元 ({weights_batch.shape[1]}) が重みパラメータ数 ({len(self.weight_params)}) と一致しません。")

        all_parameter_values = []
        for i in range(max(inputs_batch.shape[0], weights_batch.shape[0])):
            current_params = {}
            if inputs is not None:
                for j, p_obj in enumerate(self.input_params):
                    current_params[p_obj] = inputs_batch[i, j]
            if weights is not None:
                for j, p_obj in enumerate(self.weight_params):
                    current_params[p_obj] = weights_batch[i, j]

            ordered_values = [current_params[p] for p in self._param_order]
            all_parameter_values.append(ordered_values)

        return all_parameter_values

    def forward(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        フォワードパス。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: interpret関数によって処理されたQNNの出力 (batch_size, output_dim)。
        """
        parameter_values = self._bind_parameters(inputs, weights)
        batch_size = len(parameter_values)

        pubs_list = []
        shots = 1024
        for i in range(batch_size):
            param_values_dict = dict(zip(self._param_order, parameter_values[i]))
            pub = (self.circuit, param_values_dict, shots)
            pubs_list.append(pub)
        job = self.sampler.run(pubs_list)
        result = job.result()

        # interpret関数は SamplerResult を受け取り、(batch_size, output_dim) のndarrayを返すことを期待
        interpreted_output = self.interpret(result)

        return interpreted_output

    def _parameter_shift_gradient_for_param(self, param_idx: int, param_values_base: list[float],
                                           is_weight_param: bool):
        """
        単一のパラメータに関する勾配をパラメータシフト則で計算。
        Args:
            param_idx (int): 勾配を計算するパラメータのインデックス。
            param_values_base (list[float]): 現在のパラメータ値。
            is_weight_param (bool): Trueなら重みパラメータ、Falseなら入力パラメータ。
        Returns:
            np.ndarray: このパラメータに関するQNN出力の勾配 (output_dim,)。
        """
        shift = np.pi / 2

        if is_weight_param:
            target_param_obj = self.weight_params[param_idx]
        else:
            target_param_obj = self.input_params[param_idx]

        global_param_idx = -1
        for i, p in enumerate(self._param_order):
            if p == target_param_obj:
                global_param_idx = i
                break
        if global_param_idx == -1:
            raise ValueError("指定されたパラメータが回路パラメータリストに見つかりません。")

        params_plus = list(param_values_base)
        params_plus[global_param_idx] += shift

        params_minus = list(param_values_base)
        params_minus[global_param_idx] -= shift

        # Samplerで実行 (バッチサイズ1で2回実行)
        params_plus_dict = dict(zip(self._param_order, params_plus))
        shots = 1024
        pub_plus = (self.circuit, params_plus_dict, shots)
        job_plus = self.sampler.run([pub_plus])
        output_plus = self.interpret(job_plus.result())[0] # バッチサイズ1なので[0]

        params_minus_dict = dict(zip(self._param_order, params_minus))
        pub_minus = (self.circuit, params_minus_dict, shots)
        job_minus = self.sampler.run([pub_minus])
        output_minus = self.interpret(job_minus.result())[0] # バッチサイズ1なので[0]

        gradient = 0.5 * (output_plus - output_minus)
        return gradient # (output_dim,)

    def backward_weights(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        重みに関する勾配（ヤコビアン）を計算。
        d(output_j) / d(weight_i) を各バッチサンプルについて計算。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: ヤコビアン (batch_size, output_dim, num_weight_params)。
        """
        if not self.weight_params:
            return np.array([]).reshape(inputs.shape[0] if inputs is not None else 1, self._output_dim[0], 0)


        param_values_batch = self._bind_parameters(inputs, weights)
        batch_size = len(param_values_batch)
        num_weight_params = len(self.weight_params)
        output_dim = self._output_dim[0]

        jacobians = np.zeros((batch_size, output_dim, num_weight_params))

        for i in range(batch_size):
            current_param_values = param_values_batch[i]
            for j in range(num_weight_params):
                grad = self._parameter_shift_gradient_for_param(
                    param_idx=j,
                    param_values_base=current_param_values,
                    is_weight_param=True
                )
                jacobians[i, :, j] = grad

        return jacobians

    def backward_inputs(self, inputs: np.ndarray = None, weights: np.ndarray = None):
        """
        入力に関する勾配（ヤコビアン）を計算。
        d(output_j) / d(input_i) を各バッチサンプルについて計算。
        Args:
            inputs (np.ndarray, optional): 入力データ (batch_size, num_input_params)。
            weights (np.ndarray, optional): 重み (num_weight_params,) または (batch_size, num_weight_params)。
        Returns:
            np.ndarray: ヤコビアン (batch_size, output_dim, num_input_params)。
        """
        if not self.input_params:
            return np.array([]).reshape(weights.shape[0] if weights is not None else 1, self._output_dim[0], 0)


        param_values_batch = self._bind_parameters(inputs, weights)
        batch_size = len(param_values_batch)
        num_input_params = len(self.input_params)
        output_dim = self._output_dim[0]

        jacobians = np.zeros((batch_size, output_dim, num_input_params))

        for i in range(batch_size): # バッチ内の各サンプルに対して
            current_param_values = param_values_batch[i]
            for j in range(num_input_params): # 各入力パラメータに対して
                grad = self._parameter_shift_gradient_for_param(
                    param_idx=j,
                    param_values_base=current_param_values,
                    is_weight_param=False # 入力パラメータの勾配
                )
                jacobians[i, :, j] = grad

        return jacobians

# --- ScratchTorchConnector ---
class QuantumFunction(Function):
    """ PyTorchの自動微分と連携するためのカスタム関数 """

    @staticmethod
    def forward(ctx, qnn_instance, input_data_tensor: torch.Tensor, weight_tensor: torch.Tensor):
        """
        フォワードパス。
        Args:
            ctx: backwardのために情報を保存するコンテキストオブジェクト。
            qnn_instance: ScratchEstimatorQNN または ScratchSamplerQNN のインスタンス。
            input_data_tensor (torch.Tensor): 入力データ。
            weight_tensor (torch.Tensor): 重み。
        Returns:
            torch.Tensor: QNNの出力。
        """
        # PyTorchテンソルをNumPy配列に変換
        # detach() で計算グラフから切り離し、cpu() でCPUに移動 (必要な場合)、numpy() で変換
        inputs_np = input_data_tensor.detach().cpu().numpy() if input_data_tensor is not None else None
        weights_np = weight_tensor.detach().cpu().numpy()

        # QNNのフォワードパスを実行
        qnn_output_np = qnn_instance.forward(inputs=inputs_np, weights=weights_np)
        qnn_output_tensor = torch.tensor(qnn_output_np, dtype=weight_tensor.dtype, device=weight_tensor.device) # 重みと同じdtype

        # backwardで使うために保存
        ctx.qnn_instance = qnn_instance
        ctx.input_data_np = inputs_np # NumPy配列を保存
        ctx.weight_np = weights_np   # NumPy配列を保存

        # 入力と重みのうち、勾配が必要なものだけを保存する
        # requires_grad フラグをチェックして、不要な計算を避ける
        # ただし、ここでは簡単のため、両方計算するQNNメソッドを呼び出す想定
        # TorchConnector側で、どの勾配を返すか制御する

        return qnn_output_tensor

    @staticmethod
    def backward(ctx, grad_output: torch.Tensor): # grad_output は期待されるデバイス (例: cuda:0) 上にある
        qnn = ctx.qnn_instance
        inputs_np = ctx.input_data_np
        weights_np = ctx.weight_np

        grad_output_np = grad_output.cpu().numpy() # 計算のためにCPUへ

        jacobian_weights_np = qnn.backward_weights(inputs=inputs_np, weights=weights_np)
        grad_weights_np = np.einsum('bo,bow->bw', grad_output_np, jacobian_weights_np)
        # grad_weights_tensor を grad_output と同じデバイスに配置
        grad_weights_tensor = torch.tensor(grad_weights_np, dtype=grad_output.dtype, device=grad_output.device)

        grad_input_tensor = None
        if inputs_np is not None and qnn.input_params:
            jacobian_inputs_np = qnn.backward_inputs(inputs=inputs_np, weights=weights_np)
            grad_inputs_np = np.einsum('bo,boi->bi', grad_output_np, jacobian_inputs_np)
            # grad_input_tensor を grad_output と同じデバイスに配置
            grad_input_tensor = torch.tensor(grad_inputs_np, dtype=grad_output.dtype, device=grad_output.device)

        # qnn_instance はテンソルではないため、勾配は None
        # input_data_tensor および weight_tensor に対する勾配を正しいデバイスで返す
        return None, grad_input_tensor, grad_weights_tensor


class ScratchTorchConnector(nn.Module):
    """
    QNNをPyTorchの nn.Module に接続するクラス。
    """
    def __init__(self, qnn, weight_shape, input_shape=None):
        """
        Args:
            qnn: ScratchEstimatorQNN または ScratchSamplerQNN のインスタンス。
            weight_shape (tuple): QNNの重みの形状 (num_weights,)。
            input_shape (tuple, optional): QNNの入力の形状 (num_inputs,)。
                                           入力がない場合はNone。
        """
        super().__init__()
        self.qnn = qnn

        # PyTorchのパラメータとして重みを登録
        self.weight = nn.Parameter(torch.randn(weight_shape)) # 初期値はランダム

        self._has_input = bool(input_shape is not None and qnn.input_params)


    def forward(self, inputs: torch.Tensor = None):
        """
        フォワードパス。
        Args:
            inputs (torch.Tensor, optional): 入力データ (batch_size, num_inputs)。
                                            QNNが入力パラメータを持たない場合はNone。
        Returns:
            torch.Tensor: QNNの出力。
        """
        if self._has_input:
            if inputs is None:
                raise ValueError("QNNは入力を期待していますが、入力がNoneです。")
            # inputsは (batch_size, num_input_features)
            # weightは (num_weights,)
            # QuantumFunctionは (qnn, input_tensor, weight_tensor) を期待
            # weight_tensor を (1, num_weights) にしてブロードキャストさせるか、
            # バッチサイズ分複製するか。
            # ScratchQNN側で (1, W) と (B, I) の組み合わせを扱えるようにしたので、
            # そのまま渡して良いはず。
            # QuantumFunction.forward内で、weights_npは (num_weights,) になる。
            # ScratchQNNの_bind_parametersで (batch_size, num_weights) に拡張される。
            return QuantumFunction.apply(self.qnn, inputs, self.weight)
        else:
            # 入力がない場合、inputs引数は無視される（あるいはNoneを渡す）
            # QuantumFunction.apply に渡す inputs は None にする
            # ただし、QuantumFunction は inputs が None の場合を処理できるようにする必要がある
            # 現在の QuantumFunction は inputs_np が None になることを許容している
            # ScratchQNN も inputs が None の場合を処理できる
            return QuantumFunction.apply(self.qnn, None, self.weight)


## deviceの指定

In [None]:
# deviceの指定

REAL_DEVICE = False

if torch.cuda.is_available() == True:
    backend = AerSimulator(method="statevector", device="GPU", cuStateVec_enable=True)
else:
    backend = AerSimulator(method="statevector")


## 三目ならべのルール

In [None]:
class TicTacToe:
    """
    reset_board: init board
    get_state: get the current board
    display_board: display board
    is_on_board: confirm to be on board
    is_valid_action: confirm to be valid action
    place: place O or X
    get_possible_actions: get the possible action
    _is_win: judge winning
    _is_draw: judge draw
    gameover: finish game
    checkwinner: judge who wins
    """
    def __init__(self, board_size=3):
        self.board_size = board_size
        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
        self.reset_board()

    def reset_board(self) -> torch.tensor:
        self.board = torch.zeros([self.board_size,self.board_size],dtype=torch.int32,device=self.device)

    def get_state(self, player: int) -> torch.tensor:
        return deepcopy(self.board)* player

    def display_board(self, sleep_secs=0.8) -> None:
        print("  0 1 2")
        for i, j in enumerate(self.board):
            print(" ".join([str(i)] + ["O" if x == 1 else "X" if x == -1 else " " for x in j]))
        print()
        time.sleep(sleep_secs)

    def is_on_board(self, row: int, col: int) -> bool:
        return 0 <= row < self.board_size and 0 <= col < self.board_size

    def is_valid_action(self, action: list, player: int) -> bool:
        if action is None:
            return True

        row, col = action
        if self.is_on_board(row, col) and self.board[row, col] == 0:
            return True
        return False

    def place(self, action: list, player: int) -> bool:
        if action is None:
            return True

        if not self.is_valid_action(action, player):
            print("Invalid action")
            return False

        row, col = action
        self.board[row, col] = player
        return True

    def get_possible_actions(self) -> list:
        available_actions = []
        for row in range(self.board_size):
            for col in range(self.board_size):
                if self.board[row, col] == 0:
                    available_actions.append((row, col))
        return available_actions

    def _is_win(self) -> int:
        num_r = torch.sum(self.board, axis=0)
        num_c = torch.sum(self.board, axis=1)
        for i in range(3):
            if num_r[i] == 3 or num_c[i] == 3:
                return 1
            elif num_r[i] == -3 or num_c[i] == -3:
                return -1
        num_rd = self.board[0,2]+self.board[1,1]+self.board[2,0]
        num_ld = self.board[0,0]+self.board[1,1]+self.board[2,2]
        if num_rd == 3 or num_ld == 3:
            return 1
        elif num_rd == -3 or num_ld == -3:
            return -1
        return 0

    def _is_draw(self, available_actions: list) -> bool:
        if (len(available_actions) == 0):
            return True
        else:
            return False

    def gameover(self) -> bool:
        if self._is_draw(self.get_possible_actions()) or self._is_win() != 0:
            return True
        else:
            return False

    def checkwinner(self) -> int:
        if self._is_draw(self.get_possible_actions()):
            return 0
        else:
            return self._is_win()


## QNNを作成するための構成要素

In [None]:
class QNNComponent:
    """
    Making QNN circuit.

    Attributes:
        n_qubits: int
            num of qubits
    """
    def __init__(self, n_qubits: int):
        self.n_qubits = n_qubits

    def TPE(self, reps: int=1) -> QuantumCircuit:
        """
        Making Embedding.

        Parameters:
            reps:
                num of repeats

        Returns:
            qc:
                circuit making TPE
        """
        qc = QuantumCircuit(self.n_qubits)
        theta = ParameterVector("theta", self.n_qubits)

        for _ in range(reps):
            for qubit_index in range(self.n_qubits):
                qc.rx(theta[qubit_index], qubit_index)

        qc.assign_parameters(theta, inplace=True)

        return qc

    def HEE(self, reps: int=1) -> QuantumCircuit:
        """
        Making Embedding.

        Parameters:
            reps:
                num of repeats

        Returns:
            qc:
                circuit making HEE
        """
        qc = QuantumCircuit(self.n_qubits)
        theta = ParameterVector("theta", self.n_qubits)

        for _ in range(reps):
            for qubit_index in range(self.n_qubits):
                qc.rx(theta[qubit_index], qubit_index)
            for qubit_index in range(self.n_qubits-1):
                qc.cx(qubit_index, qubit_index+1)

        qc.assign_parameters(theta, inplace=True)

        return qc

    """
    Making FeatureMap+Ansatz Circuit
    Embeddings:
        ZFeatureMap: Z
        ZZFeatureMap: ZZ
        TPE: T
        HEE: H
    Ansatz:
        RealAmplitudes: R
        EfficientSU2: E
    """

    def make_circuit(
            self,
            embedding_type: str,
            ansatz_type: str,
            feature_map_reps: int=1,
            ansatz_reps: int=1
        ):
        """
        Making FeatureMap+Ansatz Circuit

        Parameters:
            embedding_type: str
                ["ZFeatureMap", "ZZFeatureMap", "TPE", "HEE"]
            ansatz_type: str
                ["RealAmplitudes", "EfficientSU2"]
            feature_map_reps: int
                times to repeat feature_map
            ansatz_reps: int
                times to repeat ansatz
        """
        circuit = QuantumCircuit(self.n_qubits)

        if embedding_type == "ZFeatureMap":
            feature_map = ZFeatureMap(self.n_qubits, reps=feature_map_reps)
        elif embedding_type == "ZZFeatureMap":
            feature_map = ZZFeatureMap(self.n_qubits, reps=feature_map_reps)
        elif embedding_type == "TPE":
            feature_map = self.TPE(reps=feature_map_reps)
        elif embedding_type == "HEE":
            feature_map = self.HEE(reps=feature_map_reps)
        else:
            print("Unknown embedding type.")
            exit(1)

        if ansatz_type == "RealAmplitudes":
            ansatz = RealAmplitudes(self.n_qubits, reps=ansatz_reps)
        elif ansatz_type == "EfficientSU2":
            ansatz = EfficientSU2(self.n_qubits, reps=ansatz_reps)
        else:
            print("Unknown ansatz type.")
            exit(1)

        circuit.compose(feature_map, inplace=True)
        circuit.compose(ansatz, inplace=True)

        return circuit, feature_map.parameters, ansatz.parameters


## 古典モデルと古典量子ハイブリッドモデルの構築

In [None]:
# 比較用の古典NNを抜粋

class CCNN2(nn.Module):
    """
    Making Classical Convolutional Neural Network(CCNN2)
    This NN is indicated in the paper as ""Weaker""
    """
    def __init__(self):
        super().__init__()
        self.board_size = 3
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, bias=False)
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(16, 9, bias=True)
        self.tanh = nn.Tanh()

    def forward(self, state):
        x = torch.reshape(state, (1,3,3))
        x = self.conv1(x)
        x = torch.flatten(x)
        x = self.tanh(self.linear(x))
        return x


In [None]:
# 出力として擬確率分布を返す、qiskitではSamplerと呼ばれている。

class CNN_QNN_CNN_sampler(nn.Module):
    def __init__(self, embedding_type: str, ansatz_type: str, board_size=3, n_qubits=7, feature_map_reps=1, ansatz_reps=1):
        super().__init__()
        self.board_size = board_size

        self.n_qubits = n_qubits
        self.circuit, feature_map_parameters, ansatz_parameters = \
            QNNComponent(self.n_qubits).make_circuit(
                embedding_type=embedding_type,
                ansatz_type=ansatz_type,
                feature_map_reps=feature_map_reps,
                ansatz_reps=ansatz_reps
            )

        self.pm = generate_preset_pass_manager(backend=backend, optimization_level=1)
        self.isa_circuit = self.pm.run(self.circuit)
        self.qnn = ScratchSamplerQNN(
            circuit=self.isa_circuit,
            input_params=list(feature_map_parameters),
            weight_params=list(ansatz_parameters),
        )
        self.cqnn = nn.Sequential(
            nn.Linear(9, self.n_qubits, bias=True),
            ScratchTorchConnector(
                qnn=self.qnn,
                weight_shape=(len(ansatz_parameters),),
                input_shape=(len(feature_map_parameters),)
            ),
            nn.Linear(2**self.n_qubits, 9, bias=True),
            nn.Tanh()
        )

    def forward(self, state):
        x = self.cqnn(state)
        return x


In [None]:
# 出力として期待値を返す、qiskitではEstimatorと呼ばれている。

class CNN_QNN_CNN_estimator(nn.Module):
    def __init__(self, embedding_type: str, ansatz_type: str, board_size=3, n_qubits=10, feature_map_reps=1, ansatz_reps=1):
        super().__init__()
        self.board_size = board_size
        self.n_qubits = n_qubits
        self.linear1 = nn.Linear(9, self.n_qubits, bias=True)
        self.linear2 = nn.Linear(self.n_qubits, 9, bias=True)

        self.circuit, feature_map_parameters, ansatz_parameters = \
            QNNComponent(self.n_qubits).make_circuit(
                embedding_type=embedding_type,
                ansatz_type=ansatz_type,
                feature_map_reps=feature_map_reps,
                ansatz_reps=ansatz_reps
            )

        self.observable = []
        for i in range(self.n_qubits-1, -1, -1):
            s_op = ""
            for j in range(self.n_qubits):
                if j == i:
                    s_op += "Z"
                else:
                    s_op += "I"
            if REAL_DEVICE:
                self.observable.append(SparsePauliOp.from_list([(
                    "I"*(self.backend_num_qubits-self.n_qubits) + s_op), 1]))
            else:
                self.observable.append(SparsePauliOp.from_list([(s_op, 1)]))

        self.pm = generate_preset_pass_manager(backend=backend, optimization_level=1)
        self.isa_circuit = self.pm.run(self.circuit)
        self.qnn = ScratchEstimatorQNN(
            circuit=self.isa_circuit,
            observables=self.observable,
            input_params=list(feature_map_parameters),
            weight_params=list(ansatz_parameters),
        )
        self.cqnn = nn.Sequential(
            self.linear1,
            ScratchTorchConnector(
                qnn=self.qnn,
                weights_shape=(len(ansatz_parameters),),
                inputs_shape=(len(feature_map_parameters),)
            ),
            self.linear2,
            nn.Tanh()
        )

    def forward(self, state):
        x = self.cqnn(state)
        return x


## エージェントの作成

In [None]:
class Agent:
    def __init__(self, player: int):
        self.player = player
        self.stop = False

    def train(self) -> bool:
        self.stop = False

    def eval(self) -> bool:
        self.stop = True

    def check_state(self, tictactoe: classmethod) -> torch.tensor:
        return tictactoe.get_state(self.player)

    def check_actions(self, tictactoe: classmethod) -> list:
        return tictactoe.get_possible_actions()


In [None]:
# 対人戦用

class Human(Agent):
    def __init__(self, player=1):
        super().__init__(player)
        self.stop = True

    def action(self, tictactoe: classmethod) -> None:
        moves = self.check_actions(tictactoe)

        while True:
            try:
                row, col = map(int, input("please input: {row col} ").split())
                if (row, col) in moves:
                    return row, col
                else:
                    print("Invalid input!")
            except ValueError:
                print("Invalid input!!")


In [None]:
# 乱択戦略

class RandomPolicy(Agent):
    def __init__(self, player=1):
        super().__init__(player)
        self.stop = True

    def action(self, tictactoe: classmethod) -> int:
        moves = self.check_actions(tictactoe)

        row, col = random.choice(moves)
        return row, col


In [None]:
# 古典NNを用いたエージェント

class CNNAgent(Agent):
    def __init__(self, player=1, board_size=3, network="CCNN2"):
        super().__init__(player)
        self.discount = 0.9
        self.epsilon = 0.1
        self.lr = 0.01
        self.alpha = 0.01
        self.board_size = board_size
        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

        if network == "CCNN2":
            NN = CCNN2()
        self.NN = NN
        self.NN.to(self.device)
        self.optimizer = optim.Adam(self.NN.parameters())

        if network == "CNN_instead_sampler" or "CNN_instead_estimator":
            self.optimizer.param_groups[0]["params"][2].requires_grad=False

    def action(self, tictactoe: classmethod) -> int:
        board = self.check_state(tictactoe)
        possible_actions = self.check_actions(tictactoe)
        if len(possible_actions) == 0:
            return None
        if not self.stop and random.random() < self.epsilon:
            return random.choice(possible_actions)
        else:
            best_action, _ = self._get_the_best(board, possible_actions)
            return best_action

    def train(self):
        super().train()
        self.NN.train()

    def eval(self):
        super().eval()
        self.NN.eval()

    def get_qvalues(self, state):
        state = state.to(torch.float32).to(self.device)
        state = state.view(-1)
        qvalues = self.NN(state).view(self.board_size, self.board_size)
        return qvalues

    def _get_the_best(self, board, possible_moves):
        qvalues = self.get_qvalues(board)
        best_move = None
        best_q_value = -float('inf')
        for mv_x, mv_y in possible_moves:
            q_value = qvalues[mv_x, mv_y]
            if q_value > best_q_value:
                best_q_value = q_value
                best_move = (mv_x, mv_y)
        return best_move, best_q_value

    def update(self, tictactoe, state, action, reward):
        if self.stop:
            return None

        with torch.no_grad():
            board = self.check_state(tictactoe)
            moves = self.check_actions(tictactoe)
            _, best_value = self._get_the_best(board, moves)
            next_max = max(0, best_value)
            target_q = torch.tensor(reward + self.discount * next_max)
            target_q = target_q.to(torch.float32).to(self.device)

        x, y = action
        old_qvalue = self.get_qvalues(state)[x, y]
        loss = nn.functional.huber_loss(old_qvalue, target_q)

        l2 = torch.tensor(0., requires_grad=True)
        for w in self.NN.parameters():
            l2 = l2 + torch.norm(w)**2
        loss = loss + self.alpha*l2

        self.loss_v = loss.item()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return self.loss_v


In [None]:
# 古典量子ハイブリッドNNを用いたエージェント

class CQCAgent(Agent):
    def __init__(
            self,
            embedding_type: str=None,
            ansatz_type: str=None,
            nn_network: int=None,
            player=1,
            board_size=3,
            n_qubits=10,
            feature_map_reps=1,
            ansatz_reps=1
        ):
        super().__init__(player)
        self.discount = 0.9
        self.epsilon = 0.1
        self.board_size = board_size
        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

        if nn_network == 2:
            NN = CNN_QNN_CNN_sampler(
                embedding_type=embedding_type,
                ansatz_type=ansatz_type,
                board_size=board_size,
                n_qubits=n_qubits,
                feature_map_reps=feature_map_reps,
                ansatz_reps=ansatz_reps
            )
        elif nn_network == 3:
            NN = CNN_QNN_CNN_estimator(
                embedding_type=embedding_type,
                ansatz_type=ansatz_type,
                board_size=board_size,
                n_qubits=n_qubits,
                feature_map_reps=feature_map_reps,
                ansatz_reps=ansatz_reps
            )

        self.HNN = NN
        self.HNN.to(self.device)
        self.optimizer = optim.Adam(self.HNN.parameters())

    def action(self, tictactoe: classmethod) -> int:
        board = self.check_state(tictactoe)
        possible_actions = self.check_actions(tictactoe)
        if len(possible_actions) == 0:
            return None
        if not self.stop and random.random() < self.epsilon:
            return random.choice(possible_actions)
        else:
            best_action, _ = self._get_the_best(board, possible_actions)
            return best_action

    def train(self):
        super().train()
        self.HNN.train()

    def eval(self):
        super().eval()
        self.HNN.eval()

    def get_qvalues(self, state):
        state = state.to(torch.float32).to(self.device)
        state = state.view(-1)
        qvalues = self.HNN(state).view(self.board_size, self.board_size)
        return qvalues

    def _get_the_best(self, board, possible_moves):
        qvalues = self.get_qvalues(board)
        best_move = None
        best_q_value = -float('inf')
        for mv_x, mv_y in possible_moves:
            q_value = qvalues[mv_x, mv_y]
            if q_value > best_q_value:
                best_q_value = q_value
                best_move = (mv_x, mv_y)
        return best_move, best_q_value

    def update(self, tictactoe, state, action, reward):
        if self.stop:
            return None

        with torch.no_grad():
            board = self.check_state(tictactoe)
            moves = self.check_actions(tictactoe)
            _, best_value = self._get_the_best(board, moves)
            next_max = max(0, best_value)
            target_q = torch.tensor(reward + self.discount * next_max)
            target_q = target_q.to(torch.float32).to(self.device)

        x, y = action
        old_qvalue = self.get_qvalues(state)[x, y]
        loss = nn.functional.huber_loss(old_qvalue, target_q)
        self.loss_v = loss.item()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return self.loss_v


## 対戦環境の作成

In [None]:
class Env:
    def __init__(self, agent1, agent2, noised_network=False, tictactoc=TicTacToe(), elorate=1500):
        self.agent1 = agent2
        self.agent2 = agent1
        self.tictactoc = tictactoc
        self.loss_f1 = []
        self.loss_f2 = []
        self.P1 = []
        self.P2 = []
        self.Draw = []
        self.rate =  [elorate]
        self.elorate = 1500
        self.noised_network = noised_network

    def _get_reward_for_agent1(self):
        winner = self.tictactoc.checkwinner()
        if winner == 1:
            return 1
        elif winner == -1:
            return -1
        else:
            return 0

    def _get_reward_for_agent2(self):
        winner = self.tictactoc.checkwinner()
        if winner == 1:
            return -1
        elif winner == -1:
            return 1
        else:
            return 0

    # def plot_loss(self, train):
    #     if not self.agent1.stop and train:
    #         plt.plot(range(len(self.loss_f1)), self.loss_f1, label="player1loss")
    #     if not self.agent2.stop and train:
    #         plt.plot(range(len(self.loss_f2)), self.loss_f2, label="player2loss")
    #     if ((not self.agent1.stop) or (not self.agent2.stop)) and train:
    #         plt.xlabel("# of episodes")
    #         plt.ylabel("Average loss value")
    #         plt.title("Training loss")
    #         plt.legend()
    #         plt.show()
    #     self.loss_f1.clear()
    #     self.loss_f2.clear()

    def collect_winrate(self, record):
        winrate = record[1] / (record[0]+record[1]+record[-1])
        self.P1.append(winrate)

        winrate = record[-1] / (record[0]+record[1]+record[-1])
        self.P2.append(winrate)

        winrate = record[0] / (record[0]+record[1]+record[-1])
        self.Draw.append(winrate)

    # def plot_winrate(self):
    #     plt.plot(range(1,len(self.P1)+1), self.P1, label="player1wins")
    #     plt.plot(range(1,len(self.P2)+1), self.P2, label="player2wins")
    #     plt.plot(range(1,len(self.Draw)+1), self.Draw, label="draw")
    #     plt.xlabel("# of episodes")
    #     plt.ylabel("rate")
    #     plt.title("Rate of player1 wins, player2 wins and draw")
    #     plt.legend()
    #     plt.show()
    #     self.P1.clear()
    #     self.P2.clear()
    #     self.Draw.clear()

    def collect_elorate(self, win_count, draw_count, iters, rate_update):
        K = 0.35
        iters //= rate_update
        W = 1/(10**((1500-self.rate[iters])/400) + 1)
        # new_rate = self.rate[iters] + K*(win_count - (rate_update-draw_count)*W)
        new_rate = self.rate[iters] + K*(win_count + 0.5*draw_count - rate_update*W)
        self.rate.append(new_rate)

    # def plot_elorate(self, rate_update):
    #     plt.plot(range(0,len(self.P1)+1, rate_update), self.rate, label="rate")
    #     plt.xlabel("# of games")
    #     plt.ylabel("elorate")
    #     plt.legend()
    #     plt.show()
    #     self.elorate=self.rate[-1]
    #     self.rate.clear()

    def train(self, episodes, train=True, visualize=False):
        record = {0: 0, 1: 0, -1: 0}
        draw_count = 0
        win_count = 0
        ### change
        rate_update = 100
        for i in tqdm.tqdm(range(episodes)):
            # Swap the first and second move after each session.
            tmp = self.agent1
            self.agent1 = self.agent2
            self.agent2 = tmp
            winner = self.game_exec(train, visualize)
            if i%2 == 0:
                record[winner] += 1
                if winner == 1:
                    win_count += 1
                if winner == 0:
                    draw_count += 1
            else:
                record[-winner] += 1
                if -winner == 1:
                    win_count += 1
                if winner == 0:
                    draw_count += 1
            if (i+1)%rate_update == 0:
                self.collect_elorate(win_count, draw_count, i, rate_update)
                win_count = 0
                draw_count = 0
            self.collect_winrate(record)
        print("result:")
        print("Player1   Draw   Player2")
        print(f"  {record[1]}      {record[0]}      {record[-1]}  ")
        # self.plot_loss(train)
        print(self.P1)
        print(self.rate)
        r = self.rate.copy()
        # self.plot_elorate(rate_update)
        # self.plot_winrate()
        return r

    def game_exec(self, train=False, visualize=True):
        self.tictactoc.reset_board()
        self.loss_f1t = []
        self.loss_f2t = []
        self.steps = 0
        if visualize:
            self.tictactoc.display_board()
        while not self.tictactoc.gameover():
            state = self.tictactoc.get_state(player=1)
            while True:
                self.agent1.player = 1
                action = self.agent1.action(self.tictactoc)
                if self.tictactoc.is_valid_action(action, player=1):
                    self.tictactoc.place(action, player=1)
                    self.steps += 1
                    if train and not self.agent1.stop:
                        reward = 0
                        if self.tictactoc.gameover():
                            reward = self._get_reward_for_agent1()
                        if action is not None:
                            self.loss_num = self.agent1.update(self.tictactoc, state, action, reward)
                            self.loss_f1t.append(self.loss_num)
                    break
                else:
                    print(f"Invalid action was provided: ({action})")
            if self.tictactoc.gameover():
                if visualize:
                    self.tictactoc.display_board()
                break
            if visualize:
                print("Black's turn:")
                self.tictactoc.display_board()

            state = self.tictactoc.get_state(player=-1)
            while True:
                self.agent2.player = -1
                action = self.agent2.action(self.tictactoc)
                if self.tictactoc.is_valid_action(action, player=-1):
                    self.tictactoc.place(action, player=-1)
                    self.steps += 1
                    if train and not self.agent2.stop:
                        reward = 0
                        if self.tictactoc.gameover():
                            reward = self._get_reward_for_agent2()
                        if action is not None:
                            self.loss_num = self.agent2.update(self.tictactoc, state, action, reward)
                            self.loss_f2t.append(self.loss_num)
                    break
                else:
                    print(f"Invalid action was provided: ({action})")
            if self.tictactoc.gameover():
                if visualize:
                    self.tictactoc.display_board()
                break
            if visualize:
                print("White's turn")
                self.tictactoc.display_board()

        winner = self.tictactoc.checkwinner()
        if not self.agent1.stop:
            self.loss_f1.append(np.mean(np.array(self.loss_f1t)))
        if not self.agent2.stop:
            self.loss_f2.append(np.mean(np.array(self.loss_f2t)))
            # print(f"avg {np.mean(np.array(self.loss_f1t))} and {np.mean(np.array(self.loss_f2t))}")
            # print(f"sum {sum(self.loss_f1t)} and {sum(self.loss_f2t)}")
        # print(self.steps)

        if visualize:
            if winner == 0:
                print("Draw")
            elif winner == 1:
                print("Player1 wins!!")
            else:
                print("Player2 wins!!")
        return winner


## 学習

In [None]:
def train_cnn(network: str):
    agent = CNNAgent(network=network)
    agent2 = CNNAgent(network=network)
    print(agent.NN)

    num_games = 250
    agent.train()
    agent2.train()
    environment = Env(agent, agent2)
    environment.train(num_games)

    model_dir = "./models/train/"
    model_path = f"./models/train/train_cnn_agent_games_{num_games}.pth"
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    torch.save(agent.NN.state_dict(), model_path)


In [None]:
def train_cqcnn(
        embedding_type: str=None,
        ansatz_type: str=None,
        n_qubits: int=10,
        nn_network: int=None,
        feature_map_reps: int=1,
        ansatz_reps: int=1
    ):
    """
    nn_network:
        2: CNN_QNN_CNN_sampler
        3: CNN_QNN_CNN_estimator
    """

    agent = CQCAgent(
        embedding_type=embedding_type,
        ansatz_type=ansatz_type,
        nn_network=nn_network,
        n_qubits=n_qubits,
        feature_map_reps=feature_map_reps,
        ansatz_reps=ansatz_reps,
    )
    agent2 = CQCAgent(
        embedding_type=embedding_type,
        ansatz_type=ansatz_type,
        nn_network=nn_network,
        n_qubits=n_qubits,
        feature_map_reps=feature_map_reps,
        ansatz_reps=ansatz_reps,
    )
    print(agent.HNN)

    num_games = 250
    agent.train()
    environment = Env(agent, agent2)
    environment.train(num_games)

    model_dir = "./models/train"
    model_path = os.path.join(
        "./models/train/",
        f"train_cqcnn_agent_games_{num_games}_n_qubits_{n_qubits}.pth"
    )
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    torch.save(agent.HNN.state_dict(), model_path)


In [None]:
def evaluate_cnn(network: str):
    rate_per_NN = []
    randagent = RandomPolicy()

    num_games=250
    elorate = 1500
    model_path = f"./models/train/train_cnn_agent_games_{num_games}.pth"
    model_weights = torch.load(model_path)
    agent = CNNAgent(network=network)
    agent.NN.load_state_dict(model_weights)

    agent.eval()

    environment = Env(agent, randagent, elorate=elorate)
    rate = environment.train(10000)
    elorate = environment.elorate
    rate_per_NN.append(rate)
    result_dir = "./result/"
    result_path = result_dir + f"train_cnn_agent_games_{num_games}.json"
    if not os.path.exists(result_dir):
        os.makedirs(result_dir)
    with open(result_path, "a") as f:
        print(rate_per_NN, file=f)
        f.write("\n")


In [None]:
def evaluate_cqcnn(
        embedding_type: str=None,
        ansatz_type: str=None,
        n_qubits: int=10,
        nn_network: int=None,
        feature_map_reps: int=1,
        ansatz_reps: int=1
    ):

    rate_per_NN = []
    randagent = RandomPolicy()

    num_games = 250
    elorate = 1500
    model_path = os.path.join(
        "./models/train/",
        f"train_cqcnn_agent_games_{num_games}_n_qubits_{n_qubits}.pth"
    )
    model_weights = torch.load(model_path)
    agent = CQCAgent(
        embedding_type=embedding_type,
        ansatz_type=ansatz_type,
        nn_network=nn_network,
        n_qubits=n_qubits,
        feature_map_reps=feature_map_reps,
        ansatz_reps=ansatz_reps,
    )
    agent.HNN.load_state_dict(model_weights)

    agent.eval()

    environment = Env(agent, randagent, noised_network=True, elorate=elorate)
    rate = environment.train(10000)
    elorate = environment.elorate
    rate_per_NN.append(rate)
    result_dir = "./result/"
    result_path = os.path.join(
        "./result/",
        f"train_cqcnn_agent_for_network_games_{num_games}_n_qubits_{n_qubits}.json",
    )
    if not os.path.exists(result_dir):
        os.makedirs(result_dir)
    with open(result_path, "a") as f:
        print(rate_per_NN, file=f)
        f.write("\n")


In [None]:
# 古典: ここにあるNNは"CCNN2"のみ

print("Train:")
train_cnn("CCNN2")

print("Eval:")
evaluate_cnn("CCNN2")


Train:
CCNN2(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), bias=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear): Linear(in_features=16, out_features=9, bias=True)
  (tanh): Tanh()
)


  target_q = torch.tensor(reward + self.discount * next_max)
100%|██████████| 250/250 [00:08<00:00, 28.44it/s]


result:
Player1   Draw   Player2
  162      22      66  
[1.0, 0.5, 0.3333333333333333, 0.25, 0.4, 0.3333333333333333, 0.42857142857142855, 0.5, 0.5555555555555556, 0.5, 0.5454545454545454, 0.5, 0.46153846153846156, 0.42857142857142855, 0.4666666666666667, 0.4375, 0.47058823529411764, 0.4444444444444444, 0.47368421052631576, 0.45, 0.47619047619047616, 0.45454545454545453, 0.4782608695652174, 0.4583333333333333, 0.48, 0.5, 0.48148148148148145, 0.5, 0.5172413793103449, 0.5, 0.5161290322580645, 0.5, 0.5151515151515151, 0.5, 0.4857142857142857, 0.4722222222222222, 0.4864864864864865, 0.47368421052631576, 0.46153846153846156, 0.45, 0.4634146341463415, 0.4523809523809524, 0.46511627906976744, 0.45454545454545453, 0.4444444444444444, 0.45652173913043476, 0.46808510638297873, 0.4583333333333333, 0.46938775510204084, 0.48, 0.47058823529411764, 0.46153846153846156, 0.4716981132075472, 0.48148148148148145, 0.4909090909090909, 0.5, 0.5087719298245614, 0.5, 0.5084745762711864, 0.5, 0.50819672131147

100%|██████████| 10000/10000 [01:11<00:00, 139.09it/s]

result:
Player1   Draw   Player2
  6704      1200      2096  
[1.0, 1.0, 1.0, 1.0, 1.0, 0.8333333333333334, 0.8571428571428571, 0.875, 0.8888888888888888, 0.8, 0.8181818181818182, 0.8333333333333334, 0.8461538461538461, 0.7857142857142857, 0.8, 0.8125, 0.8235294117647058, 0.8333333333333334, 0.8421052631578947, 0.85, 0.8571428571428571, 0.8636363636363636, 0.8695652173913043, 0.875, 0.88, 0.8846153846153846, 0.8888888888888888, 0.8928571428571429, 0.8620689655172413, 0.8333333333333334, 0.8064516129032258, 0.78125, 0.7878787878787878, 0.7941176470588235, 0.8, 0.8055555555555556, 0.8108108108108109, 0.7894736842105263, 0.7692307692307693, 0.75, 0.7560975609756098, 0.7380952380952381, 0.7209302325581395, 0.7272727272727273, 0.7333333333333333, 0.717391304347826, 0.723404255319149, 0.7083333333333334, 0.7142857142857143, 0.7, 0.7058823529411765, 0.7115384615384616, 0.7169811320754716, 0.7222222222222222, 0.7272727272727273, 0.7142857142857143, 0.7192982456140351, 0.7068965517241379, 0.711




In [None]:
# 古典量子ハイブリッド
# embedding_typeは"ZFeatureMap", "ZZFeatureMap", "TPE", "HEE"
# ansatz_typeは"RealAmplitudes", "EfficientSU2"
# nn_networkは2: Samplerまたは3: Estimatorのどちらか

print("Train:")
train_cqcnn("ZFeatureMap", "RealAmplitudes", 5, 2)

print("Eval:")
evaluate_cqcnn("ZFeatureMap", "RealAmplitudes", 5, 2)


Train:
CNN_QNN_CNN_sampler(
  (cqnn): Sequential(
    (0): Linear(in_features=9, out_features=5, bias=True)
    (1): ScratchTorchConnector()
    (2): Linear(in_features=32, out_features=9, bias=True)
    (3): Tanh()
  )
)


  target_q = torch.tensor(reward + self.discount * next_max)
100%|██████████| 250/250 [08:52<00:00,  2.13s/it]


result:
Player1   Draw   Player2
  199      15      36  
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.8888888888888888, 0.9, 0.9090909090909091, 0.9166666666666666, 0.9230769230769231, 0.9285714285714286, 0.8666666666666667, 0.875, 0.8235294117647058, 0.7777777777777778, 0.7894736842105263, 0.8, 0.8095238095238095, 0.8181818181818182, 0.782608695652174, 0.7916666666666666, 0.8, 0.7692307692307693, 0.7777777777777778, 0.7857142857142857, 0.7586206896551724, 0.7666666666666667, 0.7741935483870968, 0.78125, 0.7878787878787878, 0.7941176470588235, 0.8, 0.8055555555555556, 0.8108108108108109, 0.8157894736842105, 0.8205128205128205, 0.825, 0.8292682926829268, 0.8095238095238095, 0.7906976744186046, 0.7954545454545454, 0.8, 0.8043478260869565, 0.8085106382978723, 0.8125, 0.7959183673469388, 0.78, 0.7647058823529411, 0.7692307692307693, 0.7735849056603774, 0.7777777777777778, 0.7818181818181819, 0.7857142857142857, 0.7719298245614035, 0.7758620689655172, 0.7796610169491526, 0.7833333333333333, 0

100%|██████████| 10000/10000 [07:51<00:00, 21.22it/s]

result:
Player1   Draw   Player2
  6438      1473      2089  
[1.0, 0.5, 0.6666666666666666, 0.5, 0.6, 0.5, 0.42857142857142855, 0.5, 0.5555555555555556, 0.5, 0.5454545454545454, 0.5, 0.5384615384615384, 0.5, 0.4666666666666667, 0.4375, 0.47058823529411764, 0.5, 0.5263157894736842, 0.55, 0.5714285714285714, 0.5909090909090909, 0.6086956521739131, 0.5833333333333334, 0.6, 0.6153846153846154, 0.5925925925925926, 0.6071428571428571, 0.6206896551724138, 0.6333333333333333, 0.6451612903225806, 0.65625, 0.6666666666666666, 0.6764705882352942, 0.6571428571428571, 0.6666666666666666, 0.6756756756756757, 0.6842105263157895, 0.6923076923076923, 0.675, 0.6829268292682927, 0.6904761904761905, 0.6976744186046512, 0.7045454545454546, 0.6888888888888889, 0.6956521739130435, 0.7021276595744681, 0.7083333333333334, 0.6938775510204082, 0.68, 0.6862745098039216, 0.6923076923076923, 0.6792452830188679, 0.6851851851851852, 0.6909090909090909, 0.6785714285714286, 0.6842105263157895, 0.6896551724137931, 0.69




# おまけ

In [None]:
game = TicTacToe()
# ai_agent = RandomPolicy(player=-1)
num_games=250
n_qubits=5
model_path = os.path.join(
    "./models/train/",
    f"train_cqcnn_agent_games_{num_games}_n_qubits_{n_qubits}.pth"
)
model_weights = torch.load(model_path)
ai_agent = CQCAgent(
    embedding_type="ZFeatureMap",
    ansatz_type="RealAmplitudes",
    nn_network=2,
    n_qubits=n_qubits,
    feature_map_reps=1,
    ansatz_reps=1,
)
ai_agent.HNN.load_state_dict(model_weights)

ai_agent.eval()

human_player = 1  # O
ai_player = -1    # X

# --- UI要素の作成 ---
buttons = [[widgets.Button(description=' ', layout=widgets.Layout(width='60px', height='60px', margin='2px'), button_style='') for _ in range(3)] for _ in range(3)]
message_label = widgets.Label(value="あなたの番です (O)")
reset_button = widgets.Button(description="リセット")
output_area = widgets.Output() # エラーメッセージや追加情報用

# ボタンをグリッドに配置
button_rows = [widgets.HBox(row) for row in buttons]
board_widget = widgets.VBox(button_rows)

# --- UI更新関数 ---
def update_board_visuals():
    """盤面の状態に応じてボタンの表示を更新する"""
    board_state = game.board.cpu().numpy() #
    for r in range(3):
        for c in range(3):
            buttons[r][c].disabled = False # 一旦有効化
            buttons[r][c].button_style = ''
            if board_state[r, c] == human_player:
                buttons[r][c].description = 'O'
                buttons[r][c].button_style = 'success' # 'success' は緑色っぽいスタイル
                buttons[r][c].disabled = True
            elif board_state[r, c] == ai_player:
                buttons[r][c].description = 'X'
                buttons[r][c].button_style = 'danger' # 'danger' は赤色っぽいスタイル
                buttons[r][c].disabled = True
            else:
                buttons[r][c].description = ' '
                if game.gameover(): # ゲームオーバーなら空マスも無効化
                    buttons[r][c].disabled = True


def show_game_result():
    """ゲーム結果を表示し、盤面を操作不可にする"""
    winner = game.checkwinner() #
    if winner == human_player:
        message_label.value = "あなたの勝利です！ 🎉"
    elif winner == ai_player:
        message_label.value = "AIの勝利です! 🤖"
    else: # 引き分け
        message_label.value = "引き分けです。🤝"

    # 全てのボタンを無効化
    for r_idx in range(3):
        for c_idx in range(3):
            buttons[r_idx][c_idx].disabled = True
    reset_button.button_style = 'info' # リセットボタンを目立たせる

# --- AIのターン処理 ---
def ai_play():
    if game.gameover(): #
        return

    message_label.value = "AIが考え中です..."
    # AIエージェントに行動を選択させる
    # ai_action = ai_agent.action(game)
    # (仮のAIの動き、実際にはエージェントを使用)
    possible_actions = game.get_possible_actions() #
    if not possible_actions:
        return
    ai_action = random.choice(possible_actions) # RandomPolicy相当


    if ai_action and game.is_valid_action(ai_action, ai_player): #
        game.place(ai_action, ai_player) #
        update_board_visuals()
        if game.gameover(): #
            show_game_result()
        else:
            message_label.value = "あなたの番です (O)"
    else:
        # AIが有効な手を打てなかった場合（基本的には発生しないはず）
        with output_area:
            output_area.clear_output()
            print("AIが有効な手を打てませんでした。")
        if game.gameover(): #
             show_game_result() # 引き分けチェック
        else: # ゲームが続いていればプレイヤーのターンに戻す
            message_label.value = "あなたの番です (O)"


# --- ボタンのコールバック関数 ---
def on_button_clicked(b):
    """いずれかの盤面ボタンがクリックされたときの処理"""
    if game.gameover(): #
        return

    # クリックされたボタンの位置を特定
    r_clicked, c_clicked = -1, -1
    for r_idx in range(3):
        for c_idx in range(3):
            if buttons[r_idx][c_idx] == b:
                r_clicked, c_clicked = r_idx, c_idx
                break
        if r_clicked != -1:
            break

    action = [r_clicked, c_clicked]
    if game.is_valid_action(action, human_player): #
        game.place(action, human_player) #
        update_board_visuals()

        if game.gameover(): #
            show_game_result()
        else:
            # AIのターンへ
            ai_play()
    else:
        with output_area: # 無効な手はoutput_areaに表示
            output_area.clear_output()
            print(f"マス ({r_clicked}, {c_clicked}) は既に選択されているか、無効な場所です。")


def on_reset_clicked(b):
    """リセットボタンがクリックされたときの処理"""
    game.reset_board() #
    update_board_visuals()
    message_label.value = "あなたの番です (O)"
    reset_button.button_style = '' # ボタンのスタイルを元に戻す
    with output_area:
        output_area.clear_output()

# 各ボタンにコールバックを登録
for r in range(3):
    for c in range(3):
        buttons[r][c].on_click(on_button_clicked)

reset_button.on_click(on_reset_clicked)

# --- UIの表示 ---
# 初期盤面表示
update_board_visuals()

# ウィジェットをVBoxにまとめて表示
ui = widgets.VBox([message_label, board_widget, reset_button, output_area])
display(ui)


VBox(children=(Label(value='あなたの番です (O)'), VBox(children=(HBox(children=(Button(description=' ', layout=Layout…