diff --git a/qiskit_machine_learning/algorithms/classifiers/vqc.py b/qiskit_machine_learning/algorithms/classifiers/vqc.py index d08748fc8..a68d18cc4 100644 --- a/qiskit_machine_learning/algorithms/classifiers/vqc.py +++ b/qiskit_machine_learning/algorithms/classifiers/vqc.py @@ -12,7 +12,7 @@ """An implementation of variational quantum classifier.""" from __future__ import annotations -from typing import Callable, cast +from typing import Callable import numpy as np @@ -20,8 +20,10 @@ from qiskit.providers import Backend from qiskit.utils import QuantumInstance from qiskit.algorithms.optimizers import Optimizer, OptimizerResult +from qiskit.primitives import BaseSampler -from ...neural_networks import CircuitQNN +from ...deprecation import warn_deprecated, DeprecatedType +from ...neural_networks import CircuitQNN, SamplerQNN from ...utils import derive_num_qubits_feature_map_ansatz from ...utils.loss_functions import Loss @@ -56,32 +58,42 @@ def __init__( quantum_instance: QuantumInstance | Backend | None = None, initial_point: np.ndarray | None = None, callback: Callable[[np.ndarray, float], None] | None = None, + *, + sampler: BaseSampler | None = None, ) -> None: """ Args: - num_qubits: The number of qubits for the underlying - :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`. If ``None`` is given, - the number of qubits is derived from the feature map or ansatz. If neither of those - is given, raises an exception. The number of qubits in the feature map and ansatz - are adjusted to this number if required. + num_qubits: The number of qubits for the underlying QNN. + If ``None`` is given, the number of qubits is derived from the + feature map or ansatz. If neither of those is given, raises an exception. + The number of qubits in the feature map and ansatz are adjusted to this + number if required. feature_map: The (parametrized) circuit to be used as a feature map for the underlying - :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`. If ``None`` is given, - the ``ZZFeatureMap`` is used if the number of qubits is larger than 1. For a single - qubit classification problem the ``ZFeatureMap`` is used per default. + QNN. If ``None`` is given, the ``ZZFeatureMap`` is used if the number of qubits + is larger than 1. For a single qubit classification problem the ``ZFeatureMap`` + is used by default. ansatz: The (parametrized) circuit to be used as an ansatz for the underlying - :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`. If ``None`` is given - then the ``RealAmplitudes`` circuit is used. + QNN. If ``None`` is given then the ``RealAmplitudes`` circuit is used. loss: A target loss function to be used in training. Default value is ``cross_entropy``. optimizer: An instance of an optimizer to be used in training. When ``None`` defaults to SLSQP. warm_start: Use weights from previous fit to start next fit. - quantum_instance: The quantum instance to execute circuits on. + quantum_instance: Deprecated: If a quantum instance is sent and ``sampler`` is ``None``, + the underlying QNN will be of type + :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`, and the quantum + instance will be used to compute the neural network's results. If a sampler + instance is also set, it will override the `quantum_instance` parameter and + a :class:`~qiskit_machine_learning.neural_networks.SamplerQNN` + will be used instead. initial_point: Initial point for the optimizer to start from. callback: a reference to a user's callback function that has two parameters and returns ``None``. The callback can access intermediate data during training. On each iteration an optimizer invokes the callback and passes current weights as an array and a computed value as a float of the objective function being optimized. This allows to track how well optimization / training process is going on. + sampler: If a sampler instance is sent, the underlying QNN will be of type + :class:`~qiskit_machine_learning.neural_networks.SamplerQNN`, and the sampler + primitive will be used to compute the neural network's results. Raises: QiskitMachineLearningError: Needs at least one out of ``num_qubits``, ``feature_map`` or ``ansatz`` to be given. Or the number of qubits in the feature map and/or ansatz @@ -100,16 +112,32 @@ def __init__( self._circuit.compose(self.feature_map, inplace=True) self._circuit.compose(self.ansatz, inplace=True) - # construct circuit QNN - neural_network = CircuitQNN( - self._circuit, - input_params=self.feature_map.parameters, - weight_params=self.ansatz.parameters, - interpret=self._get_interpret(2), - output_shape=2, - quantum_instance=quantum_instance, - input_gradients=False, - ) + # needed for mypy + neural_network: SamplerQNN | CircuitQNN = None + if quantum_instance is not None and sampler is None: + warn_deprecated( + "0.5.0", DeprecatedType.ARGUMENT, old_name="quantum_instance", new_name="sampler" + ) + neural_network = CircuitQNN( + self._circuit, + input_params=self.feature_map.parameters, + weight_params=self.ansatz.parameters, + interpret=self._get_interpret(2), + output_shape=2, + quantum_instance=quantum_instance, + input_gradients=False, + ) + else: + # construct sampler QNN by default + neural_network = SamplerQNN( + sampler=sampler, + circuit=self._circuit, + input_params=self.feature_map.parameters, + weight_params=self.ansatz.parameters, + interpret=self._get_interpret(2), + output_shape=2, + input_gradients=False, + ) super().__init__( neural_network=neural_network, @@ -154,9 +182,10 @@ def _fit_internal(self, X: np.ndarray, y: np.ndarray) -> OptimizerResult: """ X, y = self._validate_input(X, y) num_classes = self._num_classes - cast(CircuitQNN, self._neural_network).set_interpret( - self._get_interpret(num_classes), num_classes - ) + + # instance check required by mypy (alternative to cast) + if isinstance(self._neural_network, (CircuitQNN, SamplerQNN)): + self._neural_network.set_interpret(self._get_interpret(num_classes), num_classes) return super()._minimize(X, y) diff --git a/qiskit_machine_learning/neural_networks/__init__.py b/qiskit_machine_learning/neural_networks/__init__.py index e0c0186fe..737e54d3d 100644 --- a/qiskit_machine_learning/neural_networks/__init__.py +++ b/qiskit_machine_learning/neural_networks/__init__.py @@ -47,6 +47,7 @@ TwoLayerQNN CircuitQNN EstimatorQNN + SamplerQNN Neural Network Metrics ====================== @@ -57,7 +58,6 @@ EffectiveDimension LocalEffectiveDimension - """ from .circuit_qnn import CircuitQNN @@ -67,6 +67,7 @@ from .opflow_qnn import OpflowQNN from .sampling_neural_network import SamplingNeuralNetwork from .two_layer_qnn import TwoLayerQNN +from .sampler_qnn import SamplerQNN __all__ = [ "NeuralNetwork", @@ -77,4 +78,5 @@ "EffectiveDimension", "LocalEffectiveDimension", "EstimatorQNN", + "SamplerQNN", ] diff --git a/qiskit_machine_learning/neural_networks/circuit_qnn.py b/qiskit_machine_learning/neural_networks/circuit_qnn.py index 21d029157..cfeb16b57 100644 --- a/qiskit_machine_learning/neural_networks/circuit_qnn.py +++ b/qiskit_machine_learning/neural_networks/circuit_qnn.py @@ -46,7 +46,7 @@ class SparseArray: # type: ignore class CircuitQNN(SamplingNeuralNetwork): - """A Sampling Neural Network based on a given quantum circuit.""" + """A sampling neural network based on a given quantum circuit.""" def __init__( self, diff --git a/qiskit_machine_learning/neural_networks/estimator_qnn.py b/qiskit_machine_learning/neural_networks/estimator_qnn.py index d1a6bec01..09ebf5ed0 100644 --- a/qiskit_machine_learning/neural_networks/estimator_qnn.py +++ b/qiskit_machine_learning/neural_networks/estimator_qnn.py @@ -171,29 +171,6 @@ def input_gradients(self, input_gradients: bool) -> None: """Turn on/off computation of gradients with respect to input data.""" self._input_gradients = input_gradients - def _preprocess( - self, - input_data: np.ndarray | None, - weights: np.ndarray | None, - ) -> tuple[np.ndarray | None, int | None]: - """ - Pre-processing during forward pass of the network. - """ - if input_data is not None: - num_samples = input_data.shape[0] - if weights is not None: - weights = np.broadcast_to(weights, (num_samples, len(weights))) - parameters = np.concatenate((input_data, weights), axis=1) - else: - parameters = input_data - else: - if weights is not None: - num_samples = 1 - parameters = np.broadcast_to(weights, (num_samples, len(weights))) - else: - return None, None - return parameters, num_samples - def _forward_postprocess(self, num_samples: int, result: EstimatorResult) -> np.ndarray: """Post-processing during forward pass of the network.""" if num_samples is None: @@ -205,7 +182,7 @@ def _forward( self, input_data: np.ndarray | None, weights: np.ndarray | None ) -> np.ndarray | None: """Forward pass of the neural network.""" - parameter_values_, num_samples = self._preprocess(input_data, weights) + parameter_values_, num_samples = self._preprocess_forward(input_data, weights) if num_samples is None: job = self.estimator.run(self._circuit, self._observables) else: @@ -250,7 +227,7 @@ def _backward( ) -> tuple[np.ndarray | None, np.ndarray]: """Backward pass of the network.""" # prepare parameters in the required format - parameter_values_, num_samples = self._preprocess(input_data, weights) + parameter_values_, num_samples = self._preprocess_forward(input_data, weights) if num_samples is None or (not self._input_gradients and self._num_weights == 0): return None, None diff --git a/qiskit_machine_learning/neural_networks/neural_network.py b/qiskit_machine_learning/neural_networks/neural_network.py index 643464f9b..67ac5b824 100644 --- a/qiskit_machine_learning/neural_networks/neural_network.py +++ b/qiskit_machine_learning/neural_networks/neural_network.py @@ -13,9 +13,9 @@ """A Neural Network abstract class for all (quantum) neural networks within Qiskit's machine learning module.""" +from __future__ import annotations from abc import ABC, abstractmethod -from typing import Tuple, Union, List, Optional import numpy as np @@ -45,7 +45,7 @@ def __init__( num_inputs: int, num_weights: int, sparse: bool, - output_shape: Union[int, Tuple[int, ...]], + output_shape: int | tuple[int, ...], input_gradients: bool = False, ) -> None: """ @@ -92,7 +92,7 @@ def sparse(self) -> bool: return self._sparse @property - def output_shape(self) -> Tuple[int, ...]: + def output_shape(self) -> tuple[int, ...]: """Returns the output shape.""" return self._output_shape @@ -117,8 +117,8 @@ def _validate_output_shape(self, output_shape): return output_shape def _validate_input( - self, input_data: Optional[Union[List[float], np.ndarray, float]] - ) -> Tuple[Union[np.ndarray, None], Union[Tuple[int, ...], None]]: + self, input_data: float | list[float] | np.ndarray | None + ) -> tuple[np.ndarray | None, tuple[int, ...] | None]: if input_data is None: return None, None input_ = np.array(input_data) @@ -144,16 +144,39 @@ def _validate_input( return input_, shape + def _preprocess_forward( + self, + input_data: np.ndarray | None, + weights: np.ndarray | None, + ) -> tuple[np.ndarray | None, int | None]: + """ + Pre-processing during forward pass of the network for the primitive-based networks. + """ + if input_data is not None: + num_samples = input_data.shape[0] + if weights is not None: + weights = np.broadcast_to(weights, (num_samples, len(weights))) + parameters = np.concatenate((input_data, weights), axis=1) + else: + parameters = input_data + else: + if weights is not None: + num_samples = 1 + parameters = np.broadcast_to(weights, (num_samples, len(weights))) + else: + return None, None + return parameters, num_samples + def _validate_weights( - self, weights: Optional[Union[List[float], np.ndarray, float]] - ) -> Union[np.ndarray, None]: + self, weights: float | list[float] | np.ndarray | None + ) -> np.ndarray | None: if weights is None: return None weights_ = np.array(weights) return weights_.reshape(self._num_weights) def _validate_forward_output( - self, output_data: np.ndarray, original_shape: Tuple[int, ...] + self, output_data: np.ndarray, original_shape: tuple[int, ...] ) -> np.ndarray: if original_shape and len(original_shape) >= 2: output_data = output_data.reshape((*original_shape[:-1], *self._output_shape)) @@ -164,8 +187,8 @@ def _validate_backward_output( self, input_grad: np.ndarray, weight_grad: np.ndarray, - original_shape: Tuple[int, ...], - ) -> Tuple[Union[np.ndarray, SparseArray], Union[np.ndarray, SparseArray]]: + original_shape: tuple[int, ...], + ) -> tuple[np.ndarray | SparseArray, np.ndarray | SparseArray]: if input_grad is not None and np.prod(input_grad.shape) == 0: input_grad = None if input_grad is not None and original_shape and len(original_shape) >= 2: @@ -183,9 +206,9 @@ def _validate_backward_output( def forward( self, - input_data: Optional[Union[List[float], np.ndarray, float]], - weights: Optional[Union[List[float], np.ndarray, float]], - ) -> Union[np.ndarray, SparseArray]: + input_data: float | list[float] | np.ndarray | None, + weights: float | list[float] | np.ndarray | None, + ) -> np.ndarray | SparseArray: """Forward pass of the network. Args: @@ -203,15 +226,15 @@ def forward( @abstractmethod def _forward( - self, input_data: Optional[np.ndarray], weights: Optional[np.ndarray] - ) -> Union[np.ndarray, SparseArray]: + self, input_data: np.ndarray | None, weights: np.ndarray | None + ) -> np.ndarray | SparseArray: raise NotImplementedError def backward( self, - input_data: Optional[Union[List[float], np.ndarray, float]], - weights: Optional[Union[List[float], np.ndarray, float]], - ) -> Tuple[Optional[Union[np.ndarray, SparseArray]], Optional[Union[np.ndarray, SparseArray]],]: + input_data: float | list[float] | np.ndarray | None, + weights: float | list[float] | np.ndarray | None, + ) -> tuple[np.ndarray | SparseArray | None, np.ndarray | SparseArray | None]: """Backward pass of the network. Args: @@ -236,6 +259,6 @@ def backward( @abstractmethod def _backward( - self, input_data: Optional[np.ndarray], weights: Optional[np.ndarray] - ) -> Tuple[Optional[Union[np.ndarray, SparseArray]], Optional[Union[np.ndarray, SparseArray]],]: + self, input_data: np.ndarray | None, weights: np.ndarray | None + ) -> tuple[np.ndarray | SparseArray | None, np.ndarray | SparseArray | None]: raise NotImplementedError diff --git a/qiskit_machine_learning/neural_networks/sampler_qnn.py b/qiskit_machine_learning/neural_networks/sampler_qnn.py new file mode 100644 index 000000000..a786bf6a0 --- /dev/null +++ b/qiskit_machine_learning/neural_networks/sampler_qnn.py @@ -0,0 +1,405 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""A Neural Network implementation based on the Sampler primitive.""" + +from __future__ import annotations +import logging + +from numbers import Integral +from typing import Callable, cast, Iterable, Sequence + +import numpy as np +from qiskit.algorithms.gradients import ( + BaseSamplerGradient, + ParamShiftSamplerGradient, + SamplerGradientResult, +) +from qiskit.circuit import Parameter, QuantumCircuit +from qiskit.primitives import BaseSampler, SamplerResult, Sampler +from qiskit_machine_learning.exceptions import QiskitMachineLearningError +import qiskit_machine_learning.optionals as _optionals + +from .neural_network import NeuralNetwork + +if _optionals.HAS_SPARSE: + # pylint: disable=import-error + from sparse import SparseArray +else: + + class SparseArray: # type: ignore + """Empty SparseArray class + Replacement if sparse.SparseArray is not present. + """ + + pass + + +logger = logging.getLogger(__name__) + + +class SamplerQNN(NeuralNetwork): + """A neural network implementation based on the Sampler primitive. + + The ``SamplerQNN`` is a neural network that takes in a parametrized quantum circuit + with designated parameters for input data and/or weights and translates the quasi-probabilities + estimated by the :class:`~qiskit.primitives.Sampler` primitive into predicted classes. Quite + often, a combined quantum circuit is used. Such a circuit is built from two circuits: + a feature map, it provides input parameters for the network, and an ansatz (weight parameters). + + The output can be set up in different formats, and an optional post-processing step + can be used to interpret the sampler's output in a particular context (e.g. mapping the + resulting bitstring to match the number of classes). + + In this example the network maps the output of the quantum circuit to two classes via a custom + `interpret` function: + + .. code-block:: + + from qiskit import QuantumCircuit + from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes + + from qiskit_machine_learning.neural_networks import SamplerQNN + + num_qubits = 2 + feature_map = ZZFeatureMap(feature_dimension=num_qubits) + ansatz = RealAmplitudes(num_qubits=num_qubits, reps=1) + + qc = QuantumCircuit(num_qubits) + qc.compose(feature_map, inplace=True) + qc.compose(ansatz, inplace=True) + + + def parity(x): + return "{:b}".format(x).count("1") % 2 + + + qnn = SamplerQNN( + circuit=qc, + input_params=feature_map.parameters, + weight_params=ansatz.parameters, + interpret=parity, + output_shape=2 + ) + + qnn.forward(input_data=[1, 2], weights=[1, 2, 3, 4]) + + The following attributes can be set via the constructor but can also be read and + updated once the SamplerQNN object has been constructed. + + Attributes: + + sampler (BaseSampler): The sampler primitive used to compute the neural network's results. + gradient (BaseSamplerGradient): A sampler gradient to be used for the backward pass. + """ + + def __init__( + self, + *, + circuit: QuantumCircuit, + sampler: BaseSampler | None = None, + input_params: Sequence[Parameter] | None = None, + weight_params: Sequence[Parameter] | None = None, + sparse: bool = False, + interpret: Callable[[int], int | tuple[int, ...]] | None = None, + output_shape: int | tuple[int, ...] | None = None, + gradient: BaseSamplerGradient | None = None, + input_gradients: bool = False, + ): + """ + Args: + sampler: The sampler primitive used to compute the neural network's results. + If ``None`` is given, a default instance of the reference sampler defined + by :class:`~qiskit.primitives.Sampler` will be used. + circuit: The parametrized quantum circuit that generates the samples of this network. + input_params: The parameters of the circuit corresponding to the input. + weight_params: The parameters of the circuit corresponding to the trainable weights. + sparse: Returns whether the output is sparse or not. + interpret: A callable that maps the measured integer to another unsigned integer or + tuple of unsigned integers. These are used as new indices for the (potentially + sparse) output array. If no interpret function is + passed, then an identity function will be used by this neural network. + output_shape: The output shape of the custom interpretation. It is ignored if no custom + interpret method is provided where the shape is taken to be + ``2^circuit.num_qubits``.. + gradient: An optional sampler gradient to be used for the backward pass. + If ``None`` is given, a default instance of + :class:`~qiskit.algorithms.gradients.ParamShiftSamplerGradient` will be used. + input_gradients: Determines whether to compute gradients with respect to input data. + Note that this parameter is ``False`` by default, and must be explicitly set to + ``True`` for a proper gradient computation when using + :class:`~qiskit_machine_learning.connectors.TorchConnector`. + Raises: + QiskitMachineLearningError: Invalid parameter values. + """ + # set primitive, provide default + if sampler is None: + sampler = Sampler() + self.sampler = sampler + + # set gradient + if gradient is None: + gradient = ParamShiftSamplerGradient(self.sampler) + self.gradient = gradient + + self._circuit = circuit.copy() + if len(self._circuit.clbits) == 0: + self._circuit.measure_all() + + if input_params is None: + input_params = [] + self._input_params = list(input_params) + + if weight_params is None: + weight_params = [] + self._weight_params = list(weight_params) + + if sparse: + _optionals.HAS_SPARSE.require_now("DOK") + + self.set_interpret(interpret, output_shape) + self._input_gradients = input_gradients + + super().__init__( + num_inputs=len(self._input_params), + num_weights=len(self._weight_params), + sparse=sparse, + output_shape=self._output_shape, + input_gradients=self._input_gradients, + ) + + @property + def circuit(self) -> QuantumCircuit: + """Returns the underlying quantum circuit.""" + return self._circuit + + @property + def input_params(self) -> Sequence[Parameter]: + """Returns the list of input parameters.""" + return self._input_params + + @property + def weight_params(self) -> Sequence[Parameter]: + """Returns the list of trainable weights parameters.""" + return self._weight_params + + @property + def interpret(self) -> Callable[[int], int | tuple[int, ...]] | None: + """Returns interpret function to be used by the neural network. If it is not set in + the constructor or can not be implicitly derived, then ``None`` is returned.""" + return self._interpret + + def set_interpret( + self, + interpret: Callable[[int], int | tuple[int, ...]] | None = None, + output_shape: int | tuple[int, ...] | None = None, + ) -> None: + """Change 'interpret' and corresponding 'output_shape'. + + Args: + interpret: A callable that maps the measured integer to another unsigned integer or + tuple of unsigned integers. See constructor for more details. + output_shape: The output shape of the custom interpretation. It is ignored if no custom + interpret method is provided where the shape is taken to be + ``2^circuit.num_qubits``. + """ + + # derive target values to be used in computations + self._output_shape = self._compute_output_shape(interpret, output_shape) + self._interpret = interpret if interpret is not None else lambda x: x + + def _compute_output_shape( + self, + interpret: Callable[[int], int | tuple[int, ...]] | None = None, + output_shape: int | tuple[int, ...] | None = None, + ) -> tuple[int, ...]: + """Validate and compute the output shape.""" + + # this definition is required by mypy + output_shape_: tuple[int, ...] = (-1,) + + if interpret is not None: + if output_shape is None: + raise QiskitMachineLearningError( + "No output shape given; it's required when using custom interpret!" + ) + if isinstance(output_shape, Integral): + output_shape = int(output_shape) + output_shape_ = (output_shape,) + else: + output_shape_ = output_shape # type: ignore + else: + if output_shape is not None: + # Warn user that output_shape parameter will be ignored + logger.warning( + "No interpret function given, output_shape will be automatically " + "determined as 2^num_qubits." + ) + output_shape_ = (2**self._circuit.num_qubits,) + + return output_shape_ + + def _postprocess(self, num_samples: int, result: SamplerResult) -> np.ndarray | SparseArray: + """ + Post-processing during forward pass of the network. + """ + + if self._sparse: + # pylint: disable=import-error + from sparse import DOK + + prob = DOK((num_samples, *self._output_shape)) + else: + prob = np.zeros((num_samples, *self._output_shape)) + + for i in range(num_samples): + counts = result.quasi_dists[i] + shots = sum(counts.values()) + + # evaluate probabilities + for b, v in counts.items(): + key = self._interpret(b) + if isinstance(key, Integral): + key = (cast(int, key),) + key = (i, *key) # type: ignore + prob[key] += v / shots + + if self._sparse: + return prob.to_coo() + else: + return prob + + def _postprocess_gradient( + self, num_samples: int, results: SamplerGradientResult + ) -> tuple[np.ndarray | SparseArray | None, np.ndarray | SparseArray]: + """ + Post-processing during backward pass of the network. + """ + + if self._sparse: + # pylint: disable=import-error + from sparse import DOK + + input_grad = ( + DOK((num_samples, *self._output_shape, self._num_inputs)) + if self._input_gradients + else None + ) + weights_grad = DOK((num_samples, *self._output_shape, self._num_weights)) + else: + + input_grad = ( + np.zeros((num_samples, *self._output_shape, self._num_inputs)) + if self._input_gradients + else None + ) + weights_grad = np.zeros((num_samples, *self._output_shape, self._num_weights)) + + if self._input_gradients: + num_grad_vars = self._num_inputs + self._num_weights + else: + num_grad_vars = self._num_weights + + for sample in range(num_samples): + for i in range(num_grad_vars): + grad = results.gradients[sample][i] + for k, val in grad.items(): + # get index for input or weights gradients + if self._input_gradients: + grad_index = i if i < self._num_inputs else i - self._num_inputs + else: + grad_index = i + + # interpret integer and construct key + key = self._interpret(k) + if isinstance(key, Integral): + key = (sample, int(key), grad_index) + else: + # if key is an array-type, cast to hashable tuple + key = tuple(cast(Iterable[int], key)) + key = (sample, *key, grad_index) + + # store value for inputs or weights gradients + if self._input_gradients: + # we compute input gradients first + if i < self._num_inputs: + input_grad[key] += val + else: + weights_grad[key] += val + else: + weights_grad[key] += val + + if self._sparse: + if self._input_gradients: + input_grad = input_grad.to_coo() # pylint: disable=no-member + weights_grad = weights_grad.to_coo() + + return input_grad, weights_grad + + def _forward( + self, + input_data: np.ndarray | None, + weights: np.ndarray | None, + ) -> np.ndarray | SparseArray | None: + """ + Forward pass of the network. + """ + parameter_values, num_samples = self._preprocess_forward(input_data, weights) + + if num_samples is not None and np.prod(parameter_values.shape) > 0: + # sampler allows batching + job = self.sampler.run([self._circuit] * num_samples, parameter_values) + try: + results = job.result() + except Exception as exc: + raise QiskitMachineLearningError("Sampler job failed.") from exc + result = self._postprocess(num_samples, results) + else: + result = None + + return result + + def _backward( + self, + input_data: np.ndarray | None, + weights: np.ndarray | None, + ) -> tuple[np.ndarray | SparseArray | None, np.ndarray | SparseArray | None]: + + """Backward pass of the network.""" + # prepare parameters in the required format + parameter_values, num_samples = self._preprocess_forward(input_data, weights) + + results = None + if num_samples is not None and np.prod(parameter_values.shape) > 0: + if self._input_gradients: + job = self.gradient.run([self._circuit] * num_samples, parameter_values) + try: + results = job.result() + except Exception as exc: + raise QiskitMachineLearningError("Sampler job failed.") from exc + else: + if len(parameter_values[0]) > self._num_inputs: + job = self.gradient.run( + [self._circuit] * num_samples, + parameter_values, + parameters=[self._circuit.parameters[self._num_inputs :]] * num_samples, + ) + try: + results = job.result() + except Exception as exc: + raise QiskitMachineLearningError("Sampler job failed.") from exc + + if results is None: + return None, None + + input_grad, weights_grad = self._postprocess_gradient(num_samples, results) + return input_grad, weights_grad # `None` for gradients wrt input data, see TorchConnector diff --git a/qiskit_machine_learning/neural_networks/sampling_neural_network.py b/qiskit_machine_learning/neural_networks/sampling_neural_network.py index 525abf1e3..72c8fa3e7 100644 --- a/qiskit_machine_learning/neural_networks/sampling_neural_network.py +++ b/qiskit_machine_learning/neural_networks/sampling_neural_network.py @@ -35,7 +35,7 @@ class SparseArray: # type: ignore class SamplingNeuralNetwork(NeuralNetwork): """ - A Sampling Neural Network abstract class for all (quantum) neural networks within Qiskit's + A sampling neural network abstract class for all (quantum) neural networks within Qiskit's machine learning module that generate samples instead of (expected) values. """ diff --git a/releasenotes/notes/add-sampler-qnn-a093431afc1c5441.yaml b/releasenotes/notes/add-sampler-qnn-a093431afc1c5441.yaml new file mode 100644 index 000000000..490b99bd2 --- /dev/null +++ b/releasenotes/notes/add-sampler-qnn-a093431afc1c5441.yaml @@ -0,0 +1,58 @@ +--- +features: + - | + Introduced Sampler Quantum Neural Network + (:class:`~qiskit_machine_learning.neural_networks.SamplerQNN`) based on (runtime) primitives. + This implementation leverages the sampler primitive + (see :class:`~qiskit.primitives.BaseSampler`) and the sampler gradients + (see :class:`~qiskit.algorithms.gradients.BaseSamplerGradient`) to enable runtime access and + more efficient computation of forward and backward passes more efficiently. + + The new :class:`~qiskit_machine_learning.neural_networks.SamplerQNN` exposes a similar + interface to the :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`, with a + few differences. One is the `quantum_instance` parameter. This parameter does not have + a direct replacement, and instead the `sampler` parameter must be used. The `gradient` parameter + keeps the same name as in the :class:`~qiskit_machine_learning.neural_networks.CircuitQNN` + implementation, but it no longer accepts Opflow gradient classes as inputs; + instead, this parameter expects an (optionally custom) primitive gradient. The `sampling` option + has been removed for the time being, as this information is not currently exposed by the `Sampler`, + and might correspond to future lower-level primitives. + + The existing training algorithms such as :class:`~qiskit_machine_learning.algorithms.VQC`, + that were based on the :class:`~qiskit_machine_learning.neural_networks.CircuitQNN`, are updated + to accept both implementations. The implementation + of :class:`~qiskit_machine_learning.algorithms.NeuralNetworkClassifier` has not changed. + + For example a :class:`~qiskit_machine_learning.algorithms.VQC` using + :class:`~qiskit_machine_learning.neural_networks.SamplerQNN` can be trained as follows: + + .. code-block:: python + + from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes + from qiskit.algorithms.optimizers import COBYLA + from qiskit.primitives import Sampler + from sklearn.datasets import make_blobs + + from qiskit_machine_learning.algorithms import VQC + + # generate a simple dataset + num_inputs = 20 + features, labels = make_blobs(n_samples=num_inputs, centers=2, center_box=(-1, 1), cluster_std=0.1) + + # construct feature map + feature_map = ZZFeatureMap(num_inputs) + + # construct ansatz + ansatz = RealAmplitudes(num_inputs, reps=1) + + # construct variational quantum classifier + vqc = VQC( + sampler=sampler, + feature_map=feature_map, + ansatz=ansatz, + loss="cross_entropy", + optimizer=COBYLA(maxiter=30), + ) + + # fit classifier to data + vqc.fit(features, labels) diff --git a/test/algorithms/classifiers/test_vqc.py b/test/algorithms/classifiers/test_vqc.py index e106162b2..be9bac8ff 100644 --- a/test/algorithms/classifiers/test_vqc.py +++ b/test/algorithms/classifiers/test_vqc.py @@ -19,11 +19,11 @@ import functools import itertools import unittest +import warnings from ddt import ddt, idata, unpack import numpy as np import scipy - from sklearn.datasets import make_classification from sklearn.preprocessing import MinMaxScaler, OneHotEncoder @@ -31,11 +31,12 @@ from qiskit.algorithms.optimizers import COBYLA, L_BFGS_B from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap, ZFeatureMap from qiskit.utils import QuantumInstance, algorithm_globals, optionals +from qiskit.primitives import Sampler from qiskit_machine_learning.algorithms import VQC from qiskit_machine_learning.exceptions import QiskitMachineLearningError -QUANTUM_INSTANCES = ["statevector", "qasm"] +RUN_METHODS = ["statevector", "sampler", "qasm"] NUM_QUBITS_LIST = [2, None] FEATURE_MAPS = ["zz_feature_map", None] ANSATZES = ["real_amplitudes", None] @@ -72,6 +73,7 @@ class TestVQC(QiskitMachineLearningTestCase): @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") def setUp(self): + super().setUp() algorithm_globals.random_seed = 1111111 self.num_classes_by_batch = [] @@ -89,11 +91,9 @@ def setUp(self): seed_simulator=algorithm_globals.random_seed, seed_transpiler=algorithm_globals.random_seed, ) - + sampler = Sampler() # We want string keys to ensure DDT-generated tests have meaningful names. self.properties = { - "statevector": statevector, - "qasm": qasm, "bfgs": L_BFGS_B(maxiter=5), "cobyla": COBYLA(maxiter=25), "real_amplitudes": RealAmplitudes(num_qubits=2, reps=1), @@ -103,13 +103,26 @@ def setUp(self): "no_one_hot": _create_dataset(6, 2, one_hot=False), } + self.run_methods = { # Tuple of type: (primitive, quantum_instance) + "sampler": (sampler, None), + "statevector": (None, statevector), + "qasm": (None, qasm), + } + # ignore deprecation warnings + warnings.filterwarnings("ignore", category=DeprecationWarning) + + def tearDown(self) -> None: + # restore warnings + super().tearDown() + warnings.filterwarnings("always", category=DeprecationWarning) + @idata( itertools.product( - QUANTUM_INSTANCES, NUM_QUBITS_LIST, FEATURE_MAPS, ANSATZES, OPTIMIZERS, DATASETS + RUN_METHODS, NUM_QUBITS_LIST, FEATURE_MAPS, ANSATZES, OPTIMIZERS, DATASETS ) ) @unpack - def test_VQC(self, q_i, num_qubits, f_m, ans, opt, d_s): + def test_VQC(self, run_method, num_qubits, f_m, ans, opt, d_s): """ Test VQC with binary and multiclass data using a range of quantum instances, numbers of qubits, feature maps, and optimizers. @@ -119,7 +132,8 @@ def test_VQC(self, q_i, num_qubits, f_m, ans, opt, d_s): "At least one of num_qubits, feature_map, or ansatz must be set by the user." ) - quantum_instance = self.properties.get(q_i) + sampler, quantum_instance = self.run_methods.get(run_method) + feature_map = self.properties.get(f_m) optimizer = self.properties.get(opt) ansatz = self.properties.get(ans) @@ -128,6 +142,7 @@ def test_VQC(self, q_i, num_qubits, f_m, ans, opt, d_s): initial_point = np.array([0.5] * ansatz.num_parameters) if ansatz is not None else None classifier = VQC( + sampler=sampler, quantum_instance=quantum_instance, num_qubits=num_qubits, feature_map=feature_map, @@ -147,27 +162,35 @@ def test_VQC(self, q_i, num_qubits, f_m, ans, opt, d_s): # the predicted value should be in the labels self.assertTrue(np.all(predict == unique_labels, axis=1).any()) - def test_VQC_non_parameterized(self): + @idata(RUN_METHODS[:-1]) + def test_VQC_non_parameterized(self, run_method): """ Test VQC without an optimizer set. """ + sampler, quantum_instance = self.run_methods.get(run_method) + classifier = VQC( + sampler=sampler, num_qubits=2, optimizer=None, - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, ) dataset = self.properties.get("binary") classifier.fit(dataset.x, dataset.y) score = classifier.score(dataset.x, dataset.y) self.assertGreater(score, 0.5) - @idata(DATASETS) - def test_warm_start(self, d_s): + @idata(itertools.product(DATASETS, RUN_METHODS[:-1])) + @unpack + def test_warm_start(self, d_s, run_method): """Test VQC when training from a warm start.""" + sampler, quantum_instance = self.run_methods.get(run_method) + classifier = VQC( + sampler=sampler, feature_map=self.properties.get("zz_feature_map"), - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, warm_start=True, ) dataset = self.properties.get(d_s) @@ -197,19 +220,23 @@ def wrapper(num_classes): return wrapper - def test_batches_with_incomplete_labels(self): + @idata(RUN_METHODS[:-1]) + def test_batches_with_incomplete_labels(self, run_method): """Test VQC when targets are one-hot and some batches don't have all possible labels.""" + sampler, quantum_instance = self.run_methods.get(run_method) + # Generate data with batches that have incomplete labels. x = algorithm_globals.random.random((6, 2)) y = np.asarray([0, 0, 1, 1, 2, 2]) y_one_hot = OneHotEncoder().fit_transform(y.reshape(-1, 1)) classifier = VQC( + sampler=sampler, feature_map=self.properties.get("zz_feature_map"), ansatz=self.properties.get("real_amplitudes"), warm_start=True, - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, ) classifier._get_interpret = self._get_num_classes(classifier._get_interpret) @@ -228,40 +255,48 @@ def test_batches_with_incomplete_labels(self): with self.subTest("Check correct number of classes is used to build CircuitQNN."): self.assertTrue((np.asarray(self.num_classes_by_batch) == 3).all()) - def test_multilabel_targets_raise_an_error(self): + @idata(RUN_METHODS[:-1]) + def test_multilabel_targets_raise_an_error(self, run_method): """Tests VQC multi-label input raises an error.""" + sampler, quantum_instance = self.run_methods.get(run_method) + # Generate multi-label data. x = algorithm_globals.random.random((3, 2)) y = np.asarray([[1, 1, 0], [1, 0, 1], [0, 1, 1]]) - classifier = VQC(num_qubits=2, quantum_instance=self.properties.get("statevector")) + classifier = VQC(sampler=sampler, num_qubits=2, quantum_instance=quantum_instance) with self.assertRaises(QiskitMachineLearningError): classifier.fit(x, y) - def test_changing_classes_raises_error(self): + @idata(RUN_METHODS[:-1]) + def test_changing_classes_raises_error(self, run_method): """Tests VQC raises an error when fitting new data with a different number of classes.""" + sampler, quantum_instance = self.run_methods.get(run_method) + targets1 = np.asarray([[0, 0, 1], [0, 1, 0]]) targets2 = np.asarray([[0, 1], [1, 0]]) features1 = algorithm_globals.random.random((len(targets1), 2)) features2 = algorithm_globals.random.random((len(targets2), 2)) classifier = VQC( - num_qubits=2, - warm_start=True, - quantum_instance=self.properties.get("statevector"), + sampler=sampler, num_qubits=2, warm_start=True, quantum_instance=quantum_instance ) classifier.fit(features1, targets1) with self.assertRaises(QiskitMachineLearningError): classifier.fit(features2, targets2) - @idata(itertools.product(QUANTUM_INSTANCES, LOSSES)) + @idata(itertools.product(RUN_METHODS, LOSSES)) @unpack - def test_sparse_arrays(self, q_i, loss): + def test_sparse_arrays(self, run_method, loss): """Tests VQC on sparse features and labels.""" - quantum_instance = self.properties.get(q_i) - classifier = VQC(num_qubits=2, loss=loss, quantum_instance=quantum_instance) + + sampler, quantum_instance = self.run_methods.get(run_method) + + classifier = VQC( + sampler=sampler, num_qubits=2, loss=loss, quantum_instance=quantum_instance + ) x = scipy.sparse.csr_matrix([[0, 0], [1, 1]]) y = scipy.sparse.csr_matrix([[1, 0], [0, 1]]) @@ -270,12 +305,17 @@ def test_sparse_arrays(self, q_i, loss): score = classifier.score(x, y) self.assertGreaterEqual(score, 0.5) - def test_categorical(self): + @idata(RUN_METHODS[:-1]) + def test_categorical(self, run_method): """Test VQC on categorical labels.""" + + sampler, quantum_instance = self.run_methods.get(run_method) + classifier = VQC( + sampler=sampler, num_qubits=2, optimizer=COBYLA(25), - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, ) dataset = self.properties.get("no_one_hot") features = dataset.x @@ -290,14 +330,19 @@ def test_categorical(self): predict = classifier.predict(features[0, :]) self.assertIn(predict, ["A", "B"]) - def test_circuit_extensions(self): + @idata(RUN_METHODS[:-1]) + def test_circuit_extensions(self, run_method): """Test VQC when the number of qubits is different compared to the feature map/ansatz.""" + + sampler, quantum_instance = self.run_methods.get(run_method) + num_qubits = 2 classifier = VQC( + sampler=sampler, num_qubits=num_qubits, feature_map=ZFeatureMap(1), ansatz=RealAmplitudes(1), - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, ) self.assertEqual(classifier.feature_map.num_qubits, num_qubits) self.assertEqual(classifier.ansatz.num_qubits, num_qubits) @@ -305,10 +350,11 @@ def test_circuit_extensions(self): qc = QuantumCircuit(1) with self.assertRaises(QiskitMachineLearningError): _ = VQC( + sampler=sampler, num_qubits=num_qubits, feature_map=qc, ansatz=qc, - quantum_instance=self.properties.get("statevector"), + quantum_instance=quantum_instance, ) diff --git a/test/connectors/test_torch_networks.py b/test/connectors/test_torch_networks.py index a424068b6..867fd5285 100644 --- a/test/connectors/test_torch_networks.py +++ b/test/connectors/test_torch_networks.py @@ -24,6 +24,7 @@ TwoLayerQNN, NeuralNetwork, EstimatorQNN, + SamplerQNN, ) from qiskit_machine_learning.connectors import TorchConnector @@ -109,20 +110,44 @@ def _create_estimator_qnn(self) -> EstimatorQNN: ) return qnn - @idata(["opflow", "circuit_qnn", "estimator_qnn"]) + def _create_sampler_qnn(self) -> SamplerQNN: + output_shape, interpret = 2, lambda x: f"{x:b}".count("1") % 2 + num_inputs = 2 + + feature_map = ZZFeatureMap(num_inputs) + ansatz = RealAmplitudes(num_inputs, entanglement="linear", reps=1) + + qc = QuantumCircuit(num_inputs) + qc.append(feature_map, range(num_inputs)) + qc.append(ansatz, range(num_inputs)) + + qnn = SamplerQNN( + circuit=qc, + input_params=feature_map.parameters, + weight_params=ansatz.parameters, + input_gradients=True, # for hybrid qnn + interpret=interpret, + output_shape=output_shape, + ) + return qnn + + @idata(["opflow", "circuit_qnn", "sampler_qnn", "estimator_qnn"]) def test_hybrid_batch_gradients(self, qnn_type: str): """Test gradient back-prop for batch input in a qnn.""" import torch from torch.nn import MSELoss from torch.optim import SGD - qnn: Optional[Union[CircuitQNN, TwoLayerQNN, EstimatorQNN]] = None + qnn: Optional[Union[CircuitQNN, TwoLayerQNN, SamplerQNN, EstimatorQNN]] = None if qnn_type == "opflow": qnn = self._create_opflow_qnn() output_size = 1 elif qnn_type == "circuit_qnn": qnn = self._create_circuit_qnn() output_size = 2 + elif qnn_type == "sampler_qnn": + qnn = self._create_sampler_qnn() + output_size = 2 elif qnn_type == "estimator_qnn": qnn = self._create_estimator_qnn() output_size = 1 diff --git a/test/neural_networks/test_circuit_vs_sampler_qnn.py b/test/neural_networks/test_circuit_vs_sampler_qnn.py new file mode 100644 index 000000000..a790de4f0 --- /dev/null +++ b/test/neural_networks/test_circuit_vs_sampler_qnn.py @@ -0,0 +1,111 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test Sampler QNN vs Circuit QNN.""" + +from test import QiskitMachineLearningTestCase + +import itertools +import unittest +import numpy as np +from ddt import ddt, idata + +from qiskit import BasicAer +from qiskit.algorithms.gradients import ParamShiftSamplerGradient +from qiskit.circuit import QuantumCircuit +from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap +from qiskit.opflow import Gradient +from qiskit.primitives import Sampler +from qiskit.utils import QuantumInstance, algorithm_globals + +from qiskit_machine_learning.neural_networks import CircuitQNN, SamplerQNN +import qiskit_machine_learning.optionals as _optionals + +SPARSE = [True, False] +INPUT_GRADS = [True, False] + + +@ddt +class TestCircuitQNNvsSamplerQNN(QiskitMachineLearningTestCase): + """Circuit vs Sampler QNN Tests. To be removed once CircuitQNN is deprecated""" + + def setUp(self): + super().setUp() + algorithm_globals.random_seed = 10598 + + self.parity = lambda x: f"{x:b}".count("1") % 2 + self.output_shape = 2 # this is required in case of a callable with dense output + + # define feature map and ansatz + num_qubits = 2 + feature_map = ZZFeatureMap(num_qubits, reps=1) + var_form = RealAmplitudes(num_qubits, reps=1) + # construct circuit + self.qc = QuantumCircuit(num_qubits) + self.qc.append(feature_map, range(2)) + self.qc.append(var_form, range(2)) + + # store params + self.input_params = list(feature_map.parameters) + self.weight_params = list(var_form.parameters) + + self.sampler = Sampler() + + @unittest.skipIf(not _optionals.HAS_SPARSE, "Sparse not available.") + @idata(itertools.product(SPARSE, INPUT_GRADS)) + def test_new_vs_old(self, config): + """Circuit vs Sampler QNN Test. To be removed once CircuitQNN is deprecated""" + + sparse, input_grads = config + qi_sv = QuantumInstance(BasicAer.get_backend("statevector_simulator")) + + circuit_qnn = CircuitQNN( + self.qc, + input_params=self.qc.parameters[:3], + weight_params=self.qc.parameters[3:], + sparse=sparse, + interpret=self.parity, + output_shape=self.output_shape, + quantum_instance=qi_sv, + gradient=Gradient("param_shift"), + input_gradients=input_grads, + ) + + sampler_qnn = SamplerQNN( + sampler=self.sampler, + circuit=self.qc, + input_params=self.qc.parameters[:3], + weight_params=self.qc.parameters[3:], + interpret=self.parity, + output_shape=self.output_shape, + gradient=ParamShiftSamplerGradient(self.sampler), + input_gradients=input_grads, + ) + + inputs = np.asarray(algorithm_globals.random.random(size=(3, circuit_qnn._num_inputs))) + weights = algorithm_globals.random.random(circuit_qnn._num_weights) + + circuit_qnn_fwd = circuit_qnn.forward(inputs, weights) + sampler_qnn_fwd = sampler_qnn.forward(inputs, weights) + + diff_fwd = circuit_qnn_fwd - sampler_qnn_fwd + self.assertAlmostEqual(np.max(np.abs(diff_fwd)), 0.0, places=3) + + circuit_qnn_input_grads, circuit_qnn_weight_grads = circuit_qnn.backward(inputs, weights) + sampler_qnn_input_grads, sampler_qnn_weight_grads = sampler_qnn.backward(inputs, weights) + + diff_weight = circuit_qnn_weight_grads - sampler_qnn_weight_grads + self.assertAlmostEqual(np.max(np.abs(diff_weight)), 0.0, places=3) + + if input_grads: + diff_input = circuit_qnn_input_grads - sampler_qnn_input_grads + self.assertAlmostEqual(np.max(np.abs(diff_input)), 0.0, places=3) diff --git a/test/neural_networks/test_sampler_qnn.py b/test/neural_networks/test_sampler_qnn.py new file mode 100644 index 000000000..338bb1a06 --- /dev/null +++ b/test/neural_networks/test_sampler_qnn.py @@ -0,0 +1,296 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test Sampler QNN with Terra primitives.""" + +from test import QiskitMachineLearningTestCase + +import itertools +import unittest +import numpy as np + +from ddt import ddt, idata + +from qiskit.circuit import Parameter, QuantumCircuit +from qiskit.primitives import Sampler +from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap +from qiskit.utils import algorithm_globals + +from qiskit_machine_learning.neural_networks.sampler_qnn import SamplerQNN +import qiskit_machine_learning.optionals as _optionals + +if _optionals.HAS_SPARSE: + # pylint: disable=import-error + from sparse import SparseArray +else: + + class SparseArray: # type: ignore + """Empty SparseArray class + Replacement if sparse.SparseArray is not present. + """ + + pass + + +DEFAULT = "default" +SHOTS = "shots" +SPARSE = [True, False] +SAMPLERS = [DEFAULT, SHOTS] +INTERPRET_TYPES = [0, 1, 2] +BATCH_SIZES = [2] +INPUT_GRADS = [True, False] + + +@ddt +class TestSamplerQNN(QiskitMachineLearningTestCase): + """Sampler QNN Tests.""" + + def setUp(self): + super().setUp() + algorithm_globals.random_seed = 12345 + + # define feature map and ansatz + num_qubits = 2 + feature_map = ZZFeatureMap(num_qubits, reps=1) + var_form = RealAmplitudes(num_qubits, reps=1) + + # construct circuit + self.qc = QuantumCircuit(num_qubits) + self.qc.append(feature_map, range(2)) + self.qc.append(var_form, range(2)) + + # store params + self.input_params = list(feature_map.parameters) + self.weight_params = list(var_form.parameters) + + # define interpret functions + def interpret_1d(x): + return sum((s == "1" for s in f"{x:0b}")) % 2 + + self.interpret_1d = interpret_1d + self.output_shape_1d = 2 # takes values in {0, 1} + + def interpret_2d(x): + return np.array([self.interpret_1d(x), 2 * self.interpret_1d(x)]) + + self.interpret_2d = interpret_2d + self.output_shape_2d = ( + 2, + 3, + ) # 1st dim. takes values in {0, 1} 2nd dim in {0, 1, 2} + + # define sampler primitives + self.sampler = Sampler() + self.sampler_shots = Sampler(options={"shots": 100, "seed": 42}) + + self.array_type = {True: SparseArray, False: np.ndarray} + + def _get_qnn( + self, sparse, sampler_type, interpret_id, input_params, weight_params, input_grads + ): + """Construct QNN from configuration.""" + + # get quantum instance + if sampler_type == SHOTS: + sampler = self.sampler_shots + elif sampler_type == DEFAULT: + sampler = self.sampler + else: + sampler = None + + # get interpret setting + interpret = None + output_shape = None + if interpret_id == 1: + interpret = self.interpret_1d + output_shape = self.output_shape_1d + elif interpret_id == 2: + interpret = self.interpret_2d + output_shape = self.output_shape_2d + + # construct QNN + qnn = SamplerQNN( + sampler=sampler, + circuit=self.qc, + input_params=input_params, + weight_params=weight_params, + sparse=sparse, + interpret=interpret, + output_shape=output_shape, + input_gradients=input_grads, + ) + return qnn + + def _verify_qnn( + self, qnn: SamplerQNN, batch_size: int, input_data: np.ndarray, weights: np.ndarray + ) -> None: + """ + Verifies that a QNN functions correctly + """ + # evaluate QNN forward pass + result = qnn.forward(input_data, weights) + + if input_data is None: + batch_size = 1 + + self.assertTrue(isinstance(result, self.array_type[qnn.sparse])) + # check forward result shape + self.assertEqual(result.shape, (batch_size, *qnn.output_shape)) + + # evaluate QNN backward pass + input_grad, weights_grad = qnn.backward(input_data, weights) + + if qnn.input_gradients: + if input_data is not None: + self.assertEqual(input_grad.shape, (batch_size, *qnn.output_shape, qnn.num_inputs)) + self.assertTrue(isinstance(input_grad, self.array_type[qnn.sparse])) + else: + # verify that input gradients are None if turned off + self.assertIsNone(input_grad) + if weights is not None: + self.assertEqual( + weights_grad.shape, (batch_size, *qnn.output_shape, qnn.num_weights) + ) + self.assertTrue(isinstance(weights_grad, self.array_type[qnn.sparse])) + else: + # verify that input gradients are None if turned off + self.assertIsNone(weights_grad) + + else: + # verify that input gradients are None if turned off + self.assertIsNone(input_grad) + if weights is not None: + self.assertEqual( + weights_grad.shape, (batch_size, *qnn.output_shape, qnn.num_weights) + ) + self.assertTrue(isinstance(weights_grad, self.array_type[qnn.sparse])) + + @unittest.skipIf(not _optionals.HAS_SPARSE, "Sparse not available.") + @idata(itertools.product(SPARSE, SAMPLERS, INTERPRET_TYPES, BATCH_SIZES, INPUT_GRADS)) + def test_sampler_qnn(self, config): + """Sampler QNN Test.""" + + sparse, sampler_type, interpret_type, batch_size, input_grads = config + # Test QNN with input and weight params + qnn = self._get_qnn( + sparse, + sampler_type, + interpret_type, + input_params=self.input_params, + weight_params=self.weight_params, + input_grads=True, + ) + input_data = np.zeros((batch_size, qnn.num_inputs)) + weights = np.zeros(qnn.num_weights) + self._verify_qnn(qnn, batch_size, input_data, weights) + + # Test QNN with no input params + qnn = self._get_qnn( + sparse, + sampler_type, + interpret_type, + input_params=None, + weight_params=self.weight_params + self.input_params, + input_grads=input_grads, + ) + input_data = None + weights = np.zeros(qnn.num_weights) + self._verify_qnn(qnn, batch_size, input_data, weights) + + # Test QNN with no weight params + qnn = self._get_qnn( + sparse, + sampler_type, + interpret_type, + input_params=self.weight_params + self.input_params, + weight_params=None, + input_grads=input_grads, + ) + input_data = np.zeros((batch_size, qnn.num_inputs)) + weights = None + self._verify_qnn(qnn, batch_size, input_data, weights) + + @unittest.skipIf(not _optionals.HAS_SPARSE, "Sparse not available.") + @idata(itertools.product(SPARSE, INTERPRET_TYPES, BATCH_SIZES)) + def test_sampler_qnn_gradient(self, config): + """Sampler QNN Gradient Test.""" + + # get configuration + sparse, interpret_id, batch_size = config + + # get QNN + qnn = self._get_qnn( + sparse, + DEFAULT, + interpret_id, + input_params=self.input_params, + weight_params=self.weight_params, + input_grads=True, + ) + + input_data = np.ones((batch_size, qnn.num_inputs)) + weights = np.ones(qnn.num_weights) + input_grad, weights_grad = qnn.backward(input_data, weights) + + # test input gradients + eps = 1e-2 + for k in range(qnn.num_inputs): + delta = np.zeros(input_data.shape) + delta[:, k] = eps + + f_1 = qnn.forward(input_data + delta, weights) + f_2 = qnn.forward(input_data - delta, weights) + + grad = (f_1 - f_2) / (2 * eps) + input_grad_ = input_grad.reshape((batch_size, -1, qnn.num_inputs))[:, :, k].reshape( + grad.shape + ) + diff = input_grad_ - grad + self.assertAlmostEqual(np.max(np.abs(diff)), 0.0, places=3) + + # test weight gradients + eps = 1e-2 + for k in range(qnn.num_weights): + delta = np.zeros(weights.shape) + delta[k] = eps + + f_1 = qnn.forward(input_data, weights + delta) + f_2 = qnn.forward(input_data, weights - delta) + + grad = (f_1 - f_2) / (2 * eps) + weights_grad_ = weights_grad.reshape((batch_size, -1, qnn.num_weights))[ + :, :, k + ].reshape(grad.shape) + diff = weights_grad_ - grad + self.assertAlmostEqual(np.max(np.abs(diff)), 0.0, places=3) + + def test_setters_getters(self): + """Test Sampler QNN properties.""" + params = [Parameter("input1"), Parameter("weight1")] + qc = QuantumCircuit(1) + qc.h(0) + qc.ry(params[0], 0) + qc.rx(params[1], 0) + qc.measure_all() + sampler_qnn = SamplerQNN( + circuit=qc, + input_params=[params[0]], + weight_params=[params[1]], + ) + with self.subTest("Test input_params getter."): + self.assertEqual(sampler_qnn.input_params, [params[0]]) + with self.subTest("Test weight_params getter."): + self.assertEqual(sampler_qnn.weight_params, [params[1]]) + with self.subTest("Test input_gradients setter and getter."): + self.assertFalse(sampler_qnn.input_gradients) + sampler_qnn.input_gradients = True + self.assertTrue(sampler_qnn.input_gradients)