/
stabilityanalyzer.py
1985 lines (1626 loc) · 97.1 KB
/
stabilityanalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Defines the DriftResults class"""
#***************************************************************************************************
# Copyright 2015, 2019 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS, the U.S. Government retains certain rights
# in this software.
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 or in the LICENSE file in the root pyGSTi directory.
#***************************************************************************************************
import copy as _copy
import itertools as _itertools
import warnings as _warnings
import numpy as _np
from pygsti.extras.drift import probtrajectory as _ptraj
from pygsti.extras.drift import signal as _sig
from pygsti import data as _data
from pygsti.data import datasetconstruction as _dsconst
def compute_auto_tests(shape, ids=False):
"""
Returns the tests we'll automatically perform on time-series data, when a specific
sets of tests is not given. Each test is specified by a tuple of length <= 3 containing
some subset of 'dataset', 'circuit' and 'outcome', and a set of tests is a list of such
tuples.
Parameters
----------
shape : tuple
The "shape" of the time-sereis data that is being tested. A 3-tuple, whereby
the shape[0] is the number of DataSets in the MultiDataSet being tested,
shape[1] is the number of circuits, and shape[2] is the number of outcomes
per circuit.
ids : bool, optional.
Whether or not we're returning the auto tests for a MultiDataSet containing
independendet data sets. If the MultiDataSet we are testing only contains
1 DataSet it does not matter what this is set to.
Returns
-------
tuple
A tuple containing the auto-generated tests to run.
"""
if ids:
auto_tests = ((), ('dataset', ), ('dataset', 'circuit'))
else:
auto_tests = (('dataset', ), ('dataset', 'circuit'))
condensed_auto_tests, junk = condense_tests(shape, auto_tests, None)
return condensed_auto_tests
def condense_tests(shape, tests, weightings=None):
"""
Condenses a set of tests, by removing any tests that are equivalent given the meta-parameters
of the data (e.g., the number of circuits).
Parameters
----------
shape : tuple
The "shape" of the time-sereis data that is being tested. A 3-tuple, whereby
the shape[0] is the number of DataSets in the MultiDataSet being tested,
shape[1] is the number of circuits, and shape[2] is the number of outcomes
per circuit.
tests : list
The set of tests to condense, based on `shape`.
weightings : dict, optional
A dictonary containing significance weightings for the tests, whereby the keys
are the tests (the elemenets of the list `tests`) and the values are floats that
are Bonferonni weightings for splitting significance.
Returns
-------
tuple
A tuple containing the auto-generated tests to run.
if weightings is not None:
dict
A dictionary of weightings that has condensed the weightings in `weightings`,
so that the weighting on a test that is being dropped (or relabelled) is
redistributed to the equivalant test that we are implementing.
"""
# This is the values of `shape` that correspond to a trivial axis in the data.
trivialshape = {}
trivialshape['dataset'] = 1
trivialshape['circuit'] = 1
trivialshape['outcome'] = 2
condtests = []
if weightings is not None: condweightings = {}
else: condweightings = None
for test in tests:
condtest = []
for i, axislabel in enumerate(('dataset', 'circuit', 'outcome')):
if axislabel in test:
if shape[i] > trivialshape[axislabel]:
condtest.append(axislabel)
condtest = tuple(condtest)
if condtest not in condtests:
condtests.append(condtest)
if weightings is not None:
condweightings[condtest] = weightings[test]
else:
if weightings is not None:
condweightings[condtest] += weightings[test]
return condtests, condweightings
def compute_valid_tests():
"""
Returns the set of all valid tests (specified by tuples), in the form of a
list.
"""
valid_tests = []
valid_tests.append(())
valid_tests.append(('dataset', ))
valid_tests.append(('dataset', 'circuit'))
valid_tests.append(('dataset', 'circuit', 'outcome'))
valid_tests.append(('circuit', ))
valid_tests.append(('circuit', 'outcome'))
valid_tests.append(('outcome', ))
valid_tests.append(('dataset', 'outcome'))
return valid_tests
def check_valid_tests(tests):
"""
Checks whether all the tuples in `tests` constitute valid tests, as specified.
"""
valid_tests = compute_valid_tests()
for test in tests: assert(test in valid_tests), "This is an invalid set of tests for drift detection!"
def compute_valid_inclass_corrections():
"""
Returns the set of all valid `inclass_correction` dicts -- an input to the .run_instability_detection() of
a StabilityAnalyzer. See the doctring of that method for more information on that input.
"""
valid_inclass_corrections = []
valid_inclass_corrections.append({'dataset': 'Bonferroni', 'circuit': 'Benjamini-Hochberg',
'outcome': 'Benjamini-Hochberg', 'spectrum': 'Benjamini-Hochberg'})
valid_inclass_corrections.append({'dataset': 'Bonferroni', 'circuit': 'Bonferroni',
'outcome': 'Benjamini-Hochberg', 'spectrum': 'Benjamini-Hochberg'})
valid_inclass_corrections.append({'dataset': 'Bonferroni', 'circuit': 'Bonferroni',
'outcome': 'Bonferroni', 'spectrum': 'Benjamini-Hochberg'})
valid_inclass_corrections.append({'dataset': 'Bonferroni', 'circuit': 'Bonferroni',
'outcome': 'Bonferroni', 'spectrum': 'Bonferroni'})
return valid_inclass_corrections
def populate_inclass_correction(inclass_correction={}):
"""
Populates empty parts of an `inclass_correction` dictionary with auto values. This dictionary is an
input to the .run_instability_detection() a StabilityAnalyzer. See the doctring of that method for
more information on that input.
The auto inclass_correction is to default to a Bonferroni correction at all levels above the lowest
level where a correction has been specified.
"""
autocorrection = 'Bonferroni'
for key in ('dataset', 'circuit', 'outcome', 'spectrum'):
if key not in inclass_correction:
inclass_correction[key] = autocorrection
if key in inclass_correction:
# As soon as the correction changes from Bonferroni, we switch to that correction.
autocorrection = inclass_correction[key]
valid_inclass_corrections = compute_valid_inclass_corrections()
assert(inclass_correction in valid_inclass_corrections), "This is an invalid inclass correction!"
return inclass_correction
def compute_auto_betweenclass_weighting(tests, betweenclass_weighting=True):
"""
Finds the automatic weighting used between classes of test, e.g., a
"top level" Bonferroni correction, or no correction, etc.
"""
if betweenclass_weighting:
betweenclass_weighting = {test: 1 / len(tests) for test in tests}
else:
betweenclass_weighting = {test: 1 for test in tests}
return betweenclass_weighting
def compute_auto_estimator(transform):
"""
The default method for estimating the parameters of a parameterized probability trajectory (i.e., this is not
the method used for the model selection, only the method used to estimate the amplitudes in the parameterized
model). This returns the fastest method that is pretty reliable for that transform, rather than the most
statistically well-motivated choice (which is mle in all cases).
Parameters
----------
transform : str
The type of transform that we are auto-selecting an estimation method for.
Returns
-------
str
The name of the estimator to use.
"""
if transform == 'dct': auto_estimator = 'filter'
elif transform == 'lsp': auto_estimator = 'mle'
else:
raise ValueError("No auto estimation method available for {} transform!".format(transform))
return auto_estimator
class StabilityAnalyzer(object):
"""
The StabilityAnalyzer is designed to implement a stability analysis on time-series data from quantum
circuits. It stores this time-series data, and contains methods for implementing instability detection
and characterization, using spectral analysis techniques. These methods work on time-series data from
any set of circuits, because they are entirely agnostic to the details of the circuits, e.g., they
are not based on the process matrix model of GST.
This object encapsulates stand-alone data analysis methods, but it is also the basis for implementing
time-resolved benchmarking or tomography (e.g., time-resolved RB, GST or Ramsey spectroscopy).
in pyGSTi.
"""
def __init__(self, ds, transform='auto', marginalize='auto', mergeoutcomes=None, constnumtimes='auto',
ids=False):
"""
Initialize a StabilityAnalyzer, by inputing time-series data and some information on how it should be
processed.
Some of the nominally allowed values for the inputs are not yet functional. For
entirely non-functional code an assert() will flag up the input as not yet allowed, and for untested
and perhaps unreliable code a warning will be flagged but the code will still run.
Parameters
----------
ds : DataSet or MultiDataSet
A DataSet containing time-series data to be analyzed for signs of instability.
transform : str, optional
The type of transform to use in the spectral analysis. Options are:
* 'auto': An attempt is made to choose the best transform given the "meta-data" of the data,
e.g., the variability in the time-step between data points. For beginners,
'auto' is the best option. If you are familiar with the underlying methods, the
meta-data of the input, and the relative merits of the different transform, then
it is probably better to choose this yourself -- as the auto-selection is not hugely
sophisticated.
* 'dct' : The Type-II Discrete Cosine Transform (with an orthogonal normalization). This is
the only tested option, and it is our recommended option when the data is
approximately equally-spaced, i.e., the time-step between each "click" for each
circuit is almost a constant. (the DCT transform implicitly assumes that this
time-step is exactly constant)
* 'dft' : The discrete Fourier transform (with an orthogonal normalization). **This is an**
**experimental feature, and the results are unreliable with this transform**
* 'lsp' : The Lomb-Scargle periodogram. **This is an experimental feature, and the code is**
**untested with this transform**
marginalize : str or bool, optional
True, False or 'auto'. Whether or not to marginalize multi-qubit data, to look for instability
in the marginalized probability distribution over the two outcomes for each qubit. Cannot be
set to True if mergeoutcomes is not None.
mergeoutcomes : None or Dict, optional
If not None, a dictionary of outcome-merging dictionaries. Each dictionary contained as a
value of `mergeoutcomes` is used to create a new DataSet, where the values have been merged
according to that dictionary (see the aggregate_dataset_outcomes() function inside datasetconstructions.py).
The corresponding key is used as the key for that DataSet, when it is stored in a MultiDataSet,
and the instability analysis is implemented on each DataSet. This is a more general data
coarse-grainin option than `marginalize`.
constnumtimes : str or bool, optional
True, False or 'auto'. If True then data is discarded from the end of the "clickstream" for
each circuit until all circuits have the same length clickstream, i.e., the same number of
data aquisition times. If 'auto' then it is set to True or False depending on the meta-data of
the data and the type of transform being used.
ids: True or False, optional
Whether the multiple DataSets should be treat as generated from independent random variables.
If the input is a DataSet and `marginalize` is False and `mergeoutcomes` is None then this
input is irrelevant: there is only ever one DataSet being analyzed. But in general multiple
DataSets are concurrently analyzed. This is irrelevant for independent analyses of the DataSets,
but the analysis is capable of also implementing a joint analysis of the DataSets. This joint
analysis is only valid on the assumption of independent DataSets, and so this analysis will not
be permitted unless `ids` is set to True. Note that the set of N marginalized data from N-qubit
circuits are generally not independent -- even if the circuits contain no 2-qubit gates then
crosstalk can causes dependencies. However, as long as the dependencies are weak then settings
this to True is likely ok.
Returns
-------
StabilityAnalyzer
A new StabilityAnalyzer object, with data formatted and written in, ready for stability
analyzes to be implemented.
"""
assert(isinstance(ds, _data.DataSet)), "The input data must be a pyGSTi DataSet!"
tempds = ds.copy_nonstatic() # Copy so that we can edit the dataset.
multids = _data.MultiDataSet() # This is where the formatted data is recorded
# We need the data to have the same number of total counts per-time for all the circuits.
assert(tempds.has_constant_totalcounts_pertime), "Data must contain" \
+ "a constant number of total counts as a function of time-step and circuit!"
if not isinstance(constnumtimes, bool):
assert(constnumtimes == 'auto'), "The only non-boolean option is 'auto'!"
constnumtimes = True
# future: update the code to allow for this, and remove this assert.
else:
assert(constnumtimes), "Currently the analysis requires" \
+ "`constnumtimes` to be True!"
if constnumtimes:
tempds = _dsconst.trim_to_constant_numtimesteps(tempds)
assert(tempds.has_constant_totalcounts), "Data formatting has failed!"
# Checks that the specified transform is valid, and writes it into the object.
if transform == 'auto': transform = 'dct'
assert(transform in ('dct', 'lsp', 'dft')), "This is not a valid choice for the transform!"
# future: ammend when the lsp code is functional, and remove when it is tested.
if transform == 'lsp':
_warnings.warn("The Lomb-Scargle-based analysis is an experimental feature! The results may be unrealiable,"
+ " and probability trajectories cannot be estimated!")
# future: ammend when the dft code is functional, and remove when it is tested.
if transform == 'dft':
_warnings.warn("The DFT-based analysis is an experimental feature! The results *are* unrealiable,"
+ " and probability trajectories cannot be estimated!")
self.transform = transform
# Check that we have valid and consistent `marginalize` and `mergeoutcomes`, and write thems into object
if isinstance(marginalize, str):
assert(marginalize == 'auto'), "`marignalize` must be a boolean or 'auto'!"
if mergeoutcomes is not None:
marginalize = False # A mergOutcomesDictDict means we can't marginalize as well.
else:
marginalize = True # If there is no mergOutcomesDictDict we marginalize by default.
else:
assert(isinstance(marginalize, bool)), "`marginalize` must be a boolean or 'auto'!"
if mergeoutcomes is not None:
assert(not marginalize), "Cannot marginalize when a `mergeoutcomes` is specified!"
self._marginalize = marginalize
self._mergeoutcomes = mergeoutcomes
# Do any outcome merging.
if mergeoutcomes is not None:
for dskey, mergeoutcomesdict in mergeoutcomes.items():
mergds = _dsconst.aggregate_dataset_outcomes(tempds, mergeoutcomesdict)
mergds.done_adding_data()
multids.add_dataset(dskey, mergds)
# Do any marginalization, labelling qubits as integers.
if marginalize:
n = len(ds.outcome_labels[0][0])
if n > 1:
for i in range(n):
margds = _dsconst.filter_dataset(tempds, (i,), filtercircuits=False)
margds.done_adding_data()
multids.add_dataset(str(i), margds)
else:
multids.add_dataset('0', tempds)
if not mergeoutcomes and not marginalize:
multids.add_dataset('0', tempds)
# Data formatting complete, so write it into object.
self.data = multids
# future: update this, as it only works because `constnumtimes` = True.
dskey = list(self.data.keys())[0]
circuit = list(self.data[dskey].keys())[0]
numtimes = len(self.data[dskey][circuit].time)
if numtimes <= 50:
_warnings.warn('There are not enough timestamps per circuit to'
+ ' guarantee that the analysis will be reliable!')
# If there's only one DataSet, we set `ids` to True because its value is
# conceptually meaningless, and it's more convenient in the code to set it to True.
if not ids:
if len(multids.keys()):
ids = True
# Decide on the dofreductions, and write it (and the ids) into object.
if ids:
dofreduction = {'dataset': 0, 'circuit': 0, 'outcome': 1}
else:
dofreduction = {'dataset': None, 'circuit': 0, 'outcome': 1}
self._ids = ids
self._dofreduction = dofreduction
# Properties where power spectra are stored.
self._contains_spectra = False # A bool keep tracking of whether spectra have been added yet.
self.dof = 1 # The chi2 dof of each "base" spectrum under a null hypothesis, for calculating p values etc.
# Will become a list of frequency lists, with each element of the list being the frequencies for one or more
# circuit's spectra.
self._frequencies = None
# Will become a dictionary of ``pointers`` that designate the index of the `self._frequencies` list that the
# frequencies for a circuit correspond to. The key is the circuit index (in self.data.keys()) and the value
# is the index in self._frequncies.
self._freqpointers = None
self._dofalt = {} # A dictionary containing alternative dofs, so that it can be adjusted in special cases.
# Properties where results of drift detection are stored.
self._driftdetectors = []
self._def_detection = None
self._significance = {}
self._tests = {}
self._freqstest = {}
self._condtests = {}
self._condbetweenclass_weighting = {}
self._test_significance = {}
self._inclass_correction = {}
self._betweenclass_correction = {}
self._power_sigthreshold = {}
self._driftfreqinds = {}
self._driftdetected_global = {}
self._driftdetected_class = {}
# Properties where results of drift characterization (i.e., probability traces) are stored.
self._def_probtrajectories = None
self._probtrajectories = {}
return None
def __str__(self):
if not self._contains_spectra:
s = "A StabilityAnalyzer containing times-series data, but waiting for power spectra to be generated,"
s + " and a stability analysis to be performed."
return s
if self._def_detection is None:
s = "A StabilityAnalyzer containing times-series data and power spectra, but waiting for a stability"
s + " analysis to be performed."
return s
detectorkey = self._def_detection
driftdetected = self.instability_detected(detectorkey=detectorkey, test=None)
if driftdetected:
s = "Instability *has* been detected,"
else:
s = "Instability has *not* been detected,"
s += " from tests at a global significance of {}%" .format(100 * self._significance[detectorkey])
return s
def compute_spectra(self, frequencies='auto', freqpointers={}):
""""
Generates and records power spectra. This is the first stage in instability detection
and characterization with a StabilityAnalyzer.
Parameters
----------
frequencies : 'auto' or list, optional
The frequencies that the power spectra are calculated for. If 'auto' these are automatically
determined from the meta-data of the time-series data (e.g., using the mean time between data
points) and the transform being used. If not 'auto', then a list of lists, where each list is
a set of frequencies that are the frequencies corresponding to one or more power spectra. The
frequencies that should be paired to a given power spectrum are specified by `freqpointers`.
These frequencies (whether automatically calculated or explicitly input) have a fundmentally
different meaning depending on whether the transform is time-stamp aware (here, the LSP) or not
(here, the DCT and DFT).
Time-stamp aware transforms take the frequencies to calculate powers at *as an input*, so the
specified frequencies are, explicitly, the frequencies associated with the powers. The task
of choosing the frequencies amounts to picking the best set of frequencies at which to interogate
the true probability trajectory for components. As there are complex factors involved in this
choice that the code has no way of knowing, sometimes it is best to choose them yourself. E.g.,
if different frequencies are used for different circuits it isn't possible to (meaningfully)
averaging power spectra across circuits, but this might be preferable if the time-step is
sufficiently different between different circuits -- it depends on your aims.
For time-stamp unaware transforms, these frequencies should be the frequencies that, given
that we're implementing the, e.g., DCT, the generated power spectrum is *implicitly* with respect
to. In the case of data on a fixed time-grid, i.e., equally spaced data, then there is a
precise set of frequencies implicit in the transform (which will be accurately extracted with
frequencies set to `auto`). Otherwise, these frequencies are explicitly at least slightly
ad hoc, and choosing these frequencies amounts to choosing those frequencies that "best"
approximate the properties being interogatted with fitting each, e.g., DCT basis function
to the (timestamp-free) data. The 'auto' option bases there frequencies solely on the
mean time step and the number of times, and is a decent option when the time stamps are roughly
equally spaced for each circuit.
These frequencies should be in units of 1/t where 't' is the unit of the time stamps.
freqpointers : dict, optional
Specifies which frequencies correspond to which power spectra. The keys are power spectra labels,
and the values are integers that point to the index of `frequencies` (a list of lists) that the
relevant frquencies are found at. Whenever a power spectra is not included in `freqpointers` then
this defaults to 0. So if `frequencies` is specified and is a list containing a single list (of
frequencies) then `freqpointers` can be left as the empty dictionary.
Returns
-------
None
"""
if isinstance(frequencies, str):
assert(frequencies == 'auto')
frequencies, freqpointers = _sig.compute_auto_frequencies(self.data, self.transform)
self._frequencies = frequencies
self._freqpointers = freqpointers
numfrequencies = len(self._frequencies[0])
for freqs in self._frequencies[1:]:
assert(numfrequencies == len(freqs)), "The number of frequencies must be fixed for all circuits!"
self._axislabels = ('dataset', 'circuit', 'outcome') # todo: explain.
dskeys = tuple(self.data.keys())
circuits = tuple(self.data[dskeys[0]].keys())
outcomes = tuple(self.data.outcome_labels)
arrayshape = []
arrayshape = (len(dskeys), len(circuits), len(outcomes), numfrequencies)
self._shape = arrayshape
self._numfrequencies = numfrequencies
self._tupletoindex = {} # todo: explain
self._basespectra = _np.zeros(tuple(arrayshape), float) # An empty array that will be populated with spectra.
self._derivedspectra = {} # Stores derivative spectra, eg, the spectrum obtained by averaging along all axes.
# If the type of transform used calculates modes as well as powers, initializes an array of the relevant data
# type to store them.
if self.transform == 'dct':
self._modes = _np.zeros(tuple(arrayshape), complex)
if self.transform == 'lsp':
self._modes = None
if self.transform == 'dft':
self._modes = _np.zeros(tuple(arrayshape), float)
# This requires a fixed counts or it'll throw an error. That is also enforced in __init__() but that might be
# removed in the future, so we check it here again.
counts = []
for ds in self.data.values():
counts.append(ds.totalcounts_pertime)
assert(_np.var(_np.array(counts)) == 0), "An equal number of counts at every time-step " \
"in every circuit is currently required!"
counts = counts[0]
self.counts = counts
for i, dskey in enumerate(dskeys):
ds = self.data[dskey]
for j, circuit in enumerate(circuits):
# The time-series data to generate power spectra for.
times, outcomedict = ds[circuit].timeseries_for_outcomes
# Go through the outcomes and generate a power spectrum for the clickstream of each outcome
for k, outcome in enumerate(outcomes):
# Record where the spectrum is written in the array.
self._tupletoindex[(dskey, circuit, outcome)] = (i, j, k)
# Finds the pointer to the frequencies for the spectrum we're about to calculate.
pointer = self._freqpointers.get(j, 0)
# The frequencies for this spectrum. Note that if we're using a transform that doesn't
# actually take into account a set of frequencies it is necessary that this set of
# frequencies is "correct" in the sense that it corresponds to what'll be calculated.
freqs = self._frequencies[pointer]
# Calculates the power spectrum for this (dskey, circuit, outcome) tuple.
modes, powers = _sig.spectrum(outcomedict[outcome], times=times, null_hypothesis=None,
counts=counts, frequencies=freqs, transform=self.transform,
returnfrequencies=False)
self._basespectra[i, j, k] = powers
if modes is not None: self._modes[i, j, k] = modes
# todo: calculate any dof alternative here (perhaps do inside the spectrum function?),
# and record it.
self._contains_spectra = True
return None
def dof_reduction(self, axislabel):
"""
Find the null hypothesis chi2 degree of freedom (DOF) reduction when averaging power spectrum along
`axislabel`.
- Under null hypohesis, each base spectrum is distributed according to a chi2/k with a DOF k
for some k (obtainable via num_degrees_of_freedom()).
- Under the composite null hypothesis the power spectrum, averaged along the `axislabel` axis
is chi2/(numspectra*k - l) with a DOF of (numspectra*k - l) for some l, where numspectra is the
number of spectrum along this axis.
This function returns `l`.
"""
return self._dofreduction[axislabel]
def _check_dofreduction_set(self, axislabel):
"""
Checks whether the DOF reduction is set along an axis, i.e., it is not None. If it is None, that means
that we do not know how to reduce the DOF when averaging along that axis.
"""
if self._dofreduction.get(axislabel, None) is None:
return False
else:
return True
def num_degrees_of_freedom(self, label, adjusted=False):
"""
Returns the number of degrees of freedom (DOF) for a power in the power spectrum specified by `label`.
This is the DOF in the chi2 distribution that the powers are (approximately) distributed according to,
under the null hypothesis of stable circuits.
Parameters
----------
label: tuple
The label specifying power spectrum, or type of power spectrum
adjusted: bool, optional
Currently does nothing. Placeholder for future improvement
Returns
-------
float
"""
if not adjusted:
dof = 1
for i, axislabel in enumerate(self._axislabels):
if axislabel not in label:
dofreduction = self._dofreduction.get(axislabel)
assert(dofreduction is not None), "Cannot obtain the DOF for this type of spectrum!"
dof = dof * (self._shape[i] - dofreduction)
else:
raise NotImplementedError("This has not yet been implemented!")
return dof
def num_spectra(self, label):
"""
The number of power spectra in the "class" of spectra specified by `label`, where `label` is
a tuple containing some subset of 'dataset', 'circuit' and 'outcome'. The strings it contains
specified those axes that are not averaged, and those not contained are axes that are averaged.
For example, ('circuit',) is the number of power spectra after averaging over DataSets (often 1) and
outcomes (often 2), and it is just the total number of circuits.
"""
numspectra = 1
for i, axislabel in enumerate(self._axislabels):
if axislabel in label:
numspectra = numspectra * self._shape[i]
return numspectra
def same_frequencies(self, dictlabel={}):
"""
Checks whether all the "base" power spectra defined by `dictlabel` are all with respect to the same frequencies.
Parameters
----------
dictlabel : dict, optional
Specifies the set of "base" power spectra. Keys should be a subset of 'dataset', 'circuit' and 'outcome'.
For each string included as a key, we restrict to only those power spectra with associated with the
corresponding value. For example, if 'circuit' is a key the value should be a Circuit and we are looking
at only those power spectra obtained from the data for that circuit. So an empty dict means we look at every
base spectra
Returns
-------
Bool
True only if the spectra defined by `dictlabel` are all with respect to the same frequencies.
"""
# If there's no frequency pointers stored it's automatically true, becuase then all spectra
# are for the frequencies stored as self._frequencies[0].
if len(self._freqpointers) == 0: return True
iterator = [] # A list of list-like to iterate over to consider all the spectra in question.
for i, axislabel in enumerate(self._axislabels):
if axislabel in dictlabel.keys():
iterator.append([dictlabel[axislabel], ])
else:
iterator.append(range(self._shape[i]))
if 'circuit' in dictlabel.keys():
circuitindex = self._index('circuit', dictlabel['circuit'])
else:
circuitindex = 0
# Find the frequency pointer for an arbitrary one of the spectra in question, and return the default of
# 0 if there isn't one.
reference_freqpointer = self._freqpointers.get(circuitindex, 0)
# Iterate through the indices to all the spectra under consideration, and check their frequency pointers
# are the same as the reference.
for indices in _itertools.product(*iterator):
freqpointer = self._freqpointers.get(indices, 0)
# If the frequency pointer is different to the reference then the frequencies aren't all the same.
if freqpointer != reference_freqpointer:
return False
return True
def averaging_allowed(self, dictlabel={}, checklevel=2):
"""
Checks whether we can average over the specified "base" power spectra.
Parameters
----------
dictlabel : dict, optional
Specifies the set of "base" power spectra. Keys should be a subset of 'dataset', 'circuit' and 'outcome'.
For each string included as a key, we restrict to only those power spectra with associated with the
corresponding value. For example, if 'circuit' is a key the value should be a Circuit and we are looking
at only those power spectra obtained from the data for that circuit. So an empty dict means we look at every
base spectra
checklevel : int, optiona;
The degree of checking.
- 0: the function is trivial, and returns True
- 1: the function checks that all the power spectra to be averaged are associated with the same
frequencies
- 2+: checks that we can calculate the DOF for that averaged power spectrum, so that hypothesis
testing can be implemented on it.
Returns
-------
Bool
True if the power spectra pass the tests for the validity of averaging over them.
"""
if checklevel == 0: # Does no checking if `checklevel` is 0.
return True
if checklevel >= 1:
# If `checklevel` >= 1 checks that the frequencies are all the same.
if not self.same_frequencies(dictlabel):
return False
# If `checklevel` >= 2 checks that DOF reduction is not None for all
# axes on which we are trying to average.
if checklevel >= 2:
for i, axislabel in enumerate(self._axislabels):
if axislabel not in dictlabel.keys():
# If dimension along axis is 1 averaging is trivial so we always allow.
if not self._check_dofreduction_set(axislabel) and self._shape[i] > 1:
return False
return True
def _index(self, axislabel, key):
"""
Returns the spectra array index for a key along a specific axis. For example,
`axislabel` could be 'circuit' and 'key' and Circuit for the DataSet, and this
will return the index in spectra array to find the spectra for that circuit.
"""
if axislabel == 'dataset':
return list(self.data.keys()).index(key)
elif axislabel == 'circuit':
return list(self.data[self.data.keys()[0]].keys()).index(key)
elif axislabel == 'outcome':
return self.data.outcome_labels.index(key)
else:
raise ValueError("axislabel must be one of `dataset`, `circuit` and `outcome`!")
def _dictlabel_to_averaging_axes_and_array_indices(self, dictlabel):
"""
Returns the axes to average over, and the indices of the reduced array, for a spectrum
labelled by the input dictionary
Parameters
----------
dictlabel: dict
A dictionary labelling a spectrum, with keys are some subset of 'dataset', 'circuit'
and 'outcome', and values that are valid options for those keys. E.g., for 'circuit'
this is one of the Circuits in the stored DataSet.
Returns
-------
tuple
The axes to average over. This is the axes corresponding to each of 'dataset', 'circuit'
and 'outcome' that aren't elements of the input dictionary
tuple
The indices for the spectrum after any averaging. This is the indices for the values
of the keys.
"""
indices = []
averageaxes = []
for i, axislabel in enumerate(self._axislabels):
if axislabel in dictlabel.keys():
indices.append(self._index(axislabel, dictlabel[axislabel]))
else:
averageaxes.append(i)
return tuple(averageaxes), tuple(indices)
def power_spectrum(self, dictlabel=None, returnfrequencies=True, checklevel=2):
"""
Returns a power spectrum.
Parameters
----------
dictlabel : dict, optional
A power spectra has been calculated for each (DataSet, Circuit, circuit) outcome
triple of the stored data. We can average over any of these axes. The `dictlabel`
specifies which axes to average over, and which value to specify for the parameters
that we are not averaging over. The keys are a subset of 'dataset', 'circuit'
and 'outcome'. For any of these strings not included, we average over that axis.
For each string included, the item should be a key for one of the stored DataSets,
a Circuit in the DataSet, and a possible outcome for that Circuit, respectively.
If None then defaults to {}, corresponding to the power spectrum obtained by
averarging over all the "base" spectra.
returnfrequences: bool, optional
Whether to also return the corresponding frequencies, as the first argument.
checklevel : int, optional
The level of checking to do, when averaging over axes, for whether averaging over
that axis is a meaningful thing to do. If checklevel = 0 then no checks are performed.
If checklevel > 0 then the function checks that all the power spectra to be averaged
are associated with the same frequencies. If checklevel > 1 we can also check that
we can calculate the DOF for that averaged power spectrum, so that hypothesis testing
can be implemented on it.
Returns
-------
if returnfrequencies:
array
The frequencies associated with the returned powers.
array
The power spectrum.
"""
if dictlabel is None: dictlabel = {}
assert(self._contains_spectra is not None), "Spectra must be generated before they can be accessed!"
if len(dictlabel) == len(self._axislabels):
arrayindices = self._tupletoindex[(dictlabel['dataset'], dictlabel['circuit'], dictlabel['outcome'])]
spectrum = self._basespectra[arrayindices].copy()
if returnfrequencies:
circuitindex = arrayindices[1]
freq = self._frequencies[self._freqpointers.get(circuitindex, 0)]
return freq, spectrum
else:
return spectrum
else:
return self._get_averaged_spectrum(dictlabel, returnfrequencies, checklevel)
def _get_averaged_spectrum(self, dictlabel={}, returnfrequencies=True, checklevel=2):
"""
A subroutine of the method `power_spectrum()`. See the docstring of that method for details.
"""
# Check whether the requested averaging is allowed, with a check at the specified rigour level.
assert(self.averaging_allowed(dictlabel, checklevel=checklevel)), "This averaging is not permissable! To do it \
anyway, reduce `checklevel`."
# Find the axes we're averaging over, and the array indices for the averaged spectra array.
axes, indices = self._dictlabel_to_averaging_axes_and_array_indices(dictlabel)
# Performs the averaging a picks out the specified spectrum
spectrum = _np.mean(self._basespectra, axis=axes)[indices]
if not returnfrequencies:
return spectrum
else:
# Pick an arbitrary bases spectrum index out of those spectra averaged to obtain the
# spectrum we are returning.
spectrumindex = []
for axislabel in self._axislabels:
spectrumindex.append(dictlabel.get(axislabel, 0))
# Find the frequencies associated with that spectrum. Note that this set of frequencies may not
# be meaningful if `checklevel` is 0.
freq = self._frequencies[self._freqpointers.get(tuple(spectrumindex), 0)]
return freq, spectrum
def maximum_power(self, dictlabel={}, freqsubset=None):
"""
Returns the maximum power in a power spectrum.
Parameters
----------
dictlabel : dict, optional
The dictionary labelling the spectrum. The same format as in the power_spectrum() method.
See the docstring of that method for more details.
freqsubset : list, optional
A list of indices, that specify the subset of frequency indices to maximize over.
Returns
-------
float
The maximal power in the spectrum.
"""
spectrum = self.power_spectrum(dictlabel)
if freqsubset is None:
maxpower = _np.max(spectrum)
else:
maxpower = _np.max(spectrum[freqsubset])
return maxpower
def maximum_power_pvalue(self, dictlabel={}, freqsubset=None, cutoff=0):
"""
The p-value of the maximum power in a power spectrum.
Parameters
----------
dictlabel : dict, optional
The dictionary labelling the spectrum. The same format as in the power_spectrum() method.
See the docstring of that method for more details.
freqsubset : list, optional
A list of indices, that specify the subset of frequency indices to consider.
cutoof : float, optional
The minimal allowed p-value.
Returns
-------
float
The p-value of the maximal power in the specified spectrum.
"""
maxpower = self.maximum_power(dictlabel=dictlabel, freqsubset=freqsubset)
# future: update adjusted to True when the function allows it.
dof = self.num_degrees_of_freedom(tuple(dictlabel.keys()), adjusted=False)
pvalue = _sig.power_to_pvalue(maxpower, dof)
pvalue = max(cutoff, pvalue)
return pvalue
def run_instability_detection(self, significance=0.05, freqstest=None, tests='auto', inclass_correction={},
betweenclass_weighting='auto', saveas='default', default=True, overwrite=False,
verbosity=1):
"""
Runs instability detection, by performing statistical hypothesis tests on the power spectra generated
from the time-series data. Before running this method it is necessary to generate power spectra using
the compute_spectra() method.
Parameters
----------
significance : float, optional
The global significance level. With defaults for all other inputs (a wide range of non-default options),
the family-wise error rate of the set of all hypothesis tests performed is controlled to this value.
freqstest : None or list, optional
If not not None, a list of the frequency indices at which to test the powers. Leave as None to perform
comprehensive testing of the power spectra.
tests : 'auto' or tuple, optional
Specifies the set of hypothesis tests to perform. If 'auto' then an set of tests is automatically
chosen. This set of tests will be suitable for most purposes, but sometimes it is useful to override
this. If a tuple, the elements are "test classes", that specifies a set of hypothesis tests to run,
and each test class is itself specified by a tuple. The tests specified by each test class in this
tuple are all implemented. A test class is a tuple containing some subset of 'dataset', 'circuit'
and 'outcome', which specifies a set of power spectra. Specifically, a power spectra has been calculated
for the clickstream for every combination of eachinput DataSet (e.g., there are multiple DataSets if there
has been marginalization of multi-qubit data), each Circuit in the DataSet, and each possible outcome in
the DataSet. For each of "dataset", "circuit" and "outcome" *not* included in a tuple defining a test class,
the coresponding "axis" of the 3-dimensional array of spectra is averaged over, and these spectra are then
tested. So the tuple () specifies the "test class" whereby we test the power spectrum obtained by averaging
all power spectra; the tuple ('dataset','circuit') specifies the "test class" whereby we average only over
outcomes, obtaining a single power spectrum for each DataSet and Circuit combination, which we test.
The default option for "tests" is appropriate for most circumstances, and it consists of (), ('dataset')
and ('dataset', 'circuit') with duplicates removed (e.g., if there is a single DataSet then () is equivalent
to ('dataset')).
inclass_correction : dict, optional
A dictionary with keys 'dataset', 'circuit', 'outcome' and 'spectrum', and values that specify the type of
multi-test correction used to account for the multiple tests being implemented. This specifies how the
statistically significance is maintained within the tests implemented in a single "test class".
betweenclass_weighting : 'auto' or dict, optional
The weighting to use to maintain statistical significance between the different classes of test being
implemented. If 'auto' then a standard Bonferroni correction is used.
default : bool, optional
This method can be run multiple times, to implement independent instability detection runs (if you are
doing this, make sure you know what you're doing! For example, deciding on-the-fly what hypothesis
tests to run is statistically very dubious!). One of these is set as the default: unless specified otherwise
its results are returned by all of the "get" methods. One the first call to this method, this is
is ignored and it is always set to True.
saveas : str, optional
This string specifies the name under which the results of this run of instability detection is saved.