-
Notifications
You must be signed in to change notification settings - Fork 982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simultaneous readout #4307
Simultaneous readout #4307
Changes from all commits
82f5827
d64fea4
066d785
fc56f35
fc4c38f
ce1a938
af953aa
682d361
f8ab64a
14b36d2
e06bd87
908f09a
29a42fa
5e8d6e5
86f4e06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,21 +11,20 @@ | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from typing import Any, Dict, Iterable, TYPE_CHECKING | ||
|
||
"""Single qubit readout experiments using parallel or isolated statistics.""" | ||
import dataclasses | ||
import time | ||
from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING | ||
|
||
import sympy | ||
import numpy as np | ||
|
||
from cirq import circuits, ops | ||
from cirq import circuits, ops, study | ||
|
||
if TYPE_CHECKING: | ||
import cirq | ||
|
||
|
||
@dataclasses.dataclass(frozen=True) | ||
@dataclasses.dataclass | ||
class SingleQubitReadoutCalibrationResult: | ||
"""Result of estimating single qubit readout error. | ||
|
||
|
@@ -96,19 +95,136 @@ def estimate_single_qubit_readout_errors( | |
the probabilities. Also stores a timestamp indicating the time when | ||
data was finished being collected from the sampler. | ||
""" | ||
num_qubits = len(list(qubits)) | ||
return estimate_parallel_single_qubit_readout_errors( | ||
sampler=sampler, | ||
qubits=qubits, | ||
repetitions=repetitions, | ||
trials=2, | ||
bit_strings=np.array([[0] * num_qubits, [1] * num_qubits]), | ||
) | ||
|
||
|
||
def estimate_parallel_single_qubit_readout_errors( | ||
sampler: 'cirq.Sampler', | ||
*, | ||
qubits: Iterable['cirq.Qid'], | ||
trials: int = 20, | ||
repetitions: int = 1000, | ||
trials_per_batch: Optional[int] = None, | ||
bit_strings: np.ndarray = None, | ||
) -> SingleQubitReadoutCalibrationResult: | ||
"""Estimate single qubit readout error using parallel operations. | ||
|
||
For each trial, prepare and then measure a random computational basis | ||
bitstring on qubits using gates in parallel. | ||
Returns a SingleQubitReadoutCalibrationResult which can be used to | ||
compute readout errors for each qubit. | ||
|
||
Args: | ||
sampler: The `cirq.Sampler` used to run the circuits. | ||
qubits: The qubits being tested. | ||
repetitions: The number of measurement repetitions to perform for | ||
each trial. | ||
trials: The number of bitstrings to prepare. | ||
trials_per_batch: If provided, split the experiment into batches | ||
with this number of trials in each batch. | ||
bit_strings: Optional numpy array of shape (trials, qubits) where the | ||
first dimension is the number of the trial and the second | ||
dimension is the qubit (ordered by the qubit order from | ||
the qubits parameter). Each value should be a 0 or 1 which | ||
specifies which state the qubit should be prepared into during | ||
that trial. If not provided, the function will generate random | ||
bit strings for you. | ||
|
||
Returns: | ||
A SingleQubitReadoutCalibrationResult storing the readout error | ||
probabilities as well as the number of repetitions used to estimate | ||
the probabilities. Also stores a timestamp indicating the time when | ||
data was finished being collected from the sampler. Note that, | ||
if there did not exist a trial where a given qubit was set to |0〉, | ||
the zero-state error will be set to `nan` (not a number). Likewise | ||
for qubits with no |1〉trial and one-state error. | ||
""" | ||
qubits = list(qubits) | ||
|
||
zeros_circuit = circuits.Circuit(ops.measure_each(*qubits, key_func=repr)) | ||
ones_circuit = circuits.Circuit( | ||
ops.X.on_each(*qubits), ops.measure_each(*qubits, key_func=repr) | ||
if trials <= 0: | ||
raise ValueError("Must provide non-zero trials for readout calibration.") | ||
if repetitions <= 0: | ||
raise ValueError("Must provide non-zero repetition for readout calibration.") | ||
if bit_strings is None: | ||
bit_strings = np.random.randint(0, 2, size=(trials, len(qubits))) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: else not needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is worth keeping in, since it avoids a scan of the bit_string array when it is not passed in. This probably has a slight performance gain. We don't need to do the sanity checks if we generate the bit_string array. |
||
if not hasattr(bit_strings, 'shape') or bit_strings.shape != (trials, len(qubits)): | ||
raise ValueError( | ||
'bit_strings must be numpy array ' | ||
f'of shape (trials, qubits) ({trials}, {len(qubits)}) ' | ||
f"but was {bit_strings.shape if hasattr(bit_strings, 'shape') else None}" | ||
) | ||
if not np.all((bit_strings == 0) | (bit_strings == 1)): | ||
raise ValueError('bit_strings values must be all 0 or 1') | ||
if trials_per_batch is None: | ||
trials_per_batch = trials | ||
if trials_per_batch <= 0: | ||
raise ValueError("Must provide non-zero trials_per_batch for readout calibration.") | ||
|
||
all_sweeps: List[study.Sweepable] = [] | ||
num_batches = (trials + trials_per_batch - 1) // trials_per_batch | ||
|
||
# Initialize circuits | ||
flip_symbols = sympy.symbols(f'flip_0:{len(qubits)}') | ||
flip_circuit = circuits.Circuit( | ||
[ops.X(q) ** s for q, s in zip(qubits, flip_symbols)], | ||
[ops.measure_each(*qubits, key_func=repr)], | ||
) | ||
|
||
zeros_result = sampler.run(zeros_circuit, repetitions=repetitions) | ||
ones_result = sampler.run(ones_circuit, repetitions=repetitions) | ||
all_circuits = [flip_circuit] * num_batches | ||
|
||
# Initialize sweeps | ||
for batch in range(num_batches): | ||
single_sweeps = [] | ||
for qubit_idx in range(len(qubits)): | ||
trial_range = range( | ||
batch * trials_per_batch, min((batch + 1) * trials_per_batch, trials) | ||
) | ||
single_sweeps.append( | ||
study.Points( | ||
key=f'flip_{qubit_idx}', | ||
points=[bit_strings[bit][qubit_idx] for bit in trial_range], | ||
) | ||
) | ||
total_sweeps = study.Zip(*single_sweeps) | ||
all_sweeps.append(total_sweeps) | ||
|
||
# Execute circuits | ||
results = sampler.run_batch(all_circuits, all_sweeps, repetitions=repetitions) | ||
timestamp = time.time() | ||
|
||
zero_state_errors = {q: np.mean(zeros_result.measurements[repr(q)]) for q in qubits} | ||
one_state_errors = {q: 1 - np.mean(ones_result.measurements[repr(q)]) for q in qubits} | ||
# Analyze results | ||
zero_state_trials = np.zeros((1, len(qubits))) | ||
one_state_trials = np.zeros((1, len(qubits))) | ||
zero_state_totals = np.zeros((1, len(qubits))) | ||
one_state_totals = np.zeros((1, len(qubits))) | ||
for batch_result in results: | ||
for trial_idx, trial_result in enumerate(batch_result): | ||
all_measurements = trial_result.data[[repr(x) for x in qubits]].to_numpy() | ||
sample_counts = np.einsum('ij->j', all_measurements) | ||
zero_state_trials += sample_counts * (1 - bit_strings[trial_idx]) | ||
zero_state_totals += repetitions * (1 - bit_strings[trial_idx]) | ||
one_state_trials += (repetitions - sample_counts) * bit_strings[trial_idx] | ||
one_state_totals += repetitions * bit_strings[trial_idx] | ||
|
||
zero_state_errors = { | ||
q: zero_state_trials[0][qubit_idx] / zero_state_totals[0][qubit_idx] | ||
if zero_state_totals[0][qubit_idx] > 0 | ||
else np.nan | ||
for qubit_idx, q in enumerate(qubits) | ||
} | ||
one_state_errors = { | ||
q: one_state_trials[0][qubit_idx] / one_state_totals[0][qubit_idx] | ||
if one_state_totals[0][qubit_idx] > 0 | ||
else np.nan | ||
for qubit_idx, q in enumerate(qubits) | ||
} | ||
|
||
return SingleQubitReadoutCalibrationResult( | ||
zero_state_errors=zero_state_errors, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
cirq.experiments.SingleQubitReadoutCalibrationResult(zero_state_errors={cirq.LineQubit(0): 0.1}, one_state_errors={cirq.LineQubit(0): 0.2}, repetitions=1000, timestamp=0.3) | ||
cirq.experiments.SingleQubitReadoutCalibrationResult(zero_state_errors={cirq.LineQubit(0): 0.1}, one_state_errors={cirq.LineQubit(0): 0.2}, repetitions=1000, timestamp=0.3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One other random question: Why is
trials_per_batch
something we want at the experiment level instead of allowing the user to construct a customcirq.Sampler
where they can make arbitrary calls torun_batch
and then behind the scenes it get's broken up into chunks by the sampler ? This feels like a strange place to have something like this am I missing something ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, but we don't actually have this sampler right now. The other thing is that this is a sweep (not a batch), so it's a bit harder to break up. We could deprecate this parameter once we have a proper batching sampler maybe?