-
Notifications
You must be signed in to change notification settings - Fork 122
/
simulation.py
1654 lines (1457 loc) · 73.8 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""This module contains base classes for simulations.
The :class:`Simulation` class tries to put everything needed for a simulation in a structured form
and collects task like initializing the tensor network state, model and algorithm classes,
running the actual algorithm, possibly performing measurements and saving the results.
See :doc:`/intro/simulations` for an overview and
:doc:`/examples` for a list of example parameter yaml files.
"""
# Copyright (C) TeNPy Developers, GNU GPLv3
import os
import sys
from pathlib import Path
import time
import importlib
import warnings
import traceback
import numpy as np
import logging
import copy
from ..models.model import Model
from ..algorithms.algorithm import Algorithm
from ..networks.mps import InitialStateBuilder
from ..models.model import NearestNeighborModel
from ..tools import hdf5_io
from ..tools.cache import CacheFile
from ..tools.params import asConfig
from ..tools.events import EventHandler
from ..tools.misc import find_subclass, convert_memory_units
from ..tools.misc import update_recursive, get_recursive, set_recursive, merge_recursive
from ..tools.misc import setup_logging as setup_logging_
from .. import version
from .post_processing import DataLoader
from .measurement import (measurement_wrapper, _m_psi_method, _m_psi_method_wrapped,
_m_model_method, _m_model_method_wrapped)
__all__ = [
'Simulation',
'Skip',
'init_simulation',
'run_simulation',
'init_simulation_from_checkpoint',
'resume_from_checkpoint',
'run_seq_simulations',
'estimate_simulation_RAM',
'output_filename_from_dict',
]
class Simulation:
"""Base class for simulations.
The preferred way to run simulations is in a `with` statement, which allows us to redirect
error messages to the log files, timely warn about unused parameters and to properly close any
open files. In other words, use the simulation class like this::
with Simulation(options, ...) as sim:
results = sim.run()
The wrappers :func:`run_simulation` and :func:`run_seq_simulations` do that.
Parameters
----------
options : dict-like
The simulation parameters as outlined below.
Ideally, these options should be enough to fully specify all parameters of a simulation
to ensure reproducibility.
setup_logging : bool
Whether to call :meth:`setup_logging` at the beginning of initialization.
resume_data : None | dict
Ignored if None. If a dictionary, it should contain the data for resuming the simulation,
``results['resume_data']`` (see :attr:`results`).
Note that the dict is cleared after readout to allow freeing memory.
Options
-------
.. cfg:config :: Simulation
directory : str
If not None (default), switch to that directory at the beginning of the simulation.
log_params : dict
Log parameters; see :cfg:config:`log`.
overwrite_output : bool
Whether an existing file may be overwritten.
Otherwise, if the file already exists we try to replace
``filename.ext`` with ``filename_01.ext`` (and further increasing numbers).
random_seed : int | None
If not ``None``, initialize the (legacy) numpy random generator with the given seed.
**Note** that models have their own :attr:`~tenpy.models.model.Model.rng` with
a separate (default) :cfg:option:`CouplingMPOModel.random_seed` in the `model_params`.
If this `random_seed` is set, we call
``model_params('random_seed', random_seed + 123456)``
sequential : dict
Parameters for running simulations sequentially, see :cfg:config:`sequential`.
Ignored by the simulation itself, but used by :func:`run_seq_simulations` and
:func:`resume_from_checkpoint` to run a whole sequence of simulations passing on the
state (and possible more).
max_errors_before_abort : int | None
We safeguard measurements with a try-except block to avoid loosing results after an expensive
simulation. This is the maximum number of errors happening during measurements
before we abort the whole simulation.
Setting this to None disables raising the error due to failed measurements
(also at the end of the simulation).
Attributes
----------
options : :class:`~tenpy.tools.params.Config`
Simulation parameters.
model : :class:`~tenpy.models.model.Model`
The model to be simulated.
psi :
The tensor network state updated by the algorithm.
engine :
The engine of the algorithm.
results : dict
Collection of all the results to be saved in the end.
In a standard simulation, it will have the following entries.
simulation_parameters: nested dict
The simulation parameters passed as `options`.
version_info : dict
Information of the used library/code versions and simulation class.
See :meth:`get_version_info`.
finished_run : bool
Useful to check whether the output file finished or was generated at a checkpoint.
This flag is set to `True` only right at the end of :meth:`run`
(or :meth:`resume_run`) before saving.
measurements : dict
Data of all the performed measurements.
psi :
The final tensor network state.
Only included if :cfg:option:`Simulation.save_psi` is True (default).
resume_data : dict
Additional data for resuming the algorithm run.
Not part of `self.results`, but only added in :meth:`prepare_results_for_save` with
the most up-to-date `resume_data` from
:meth:`~tenpy.algorithms.algorithm.Algorithm.get_resume_data`.
Only included if :cfg:option:`Simulation.save_resume_data` is True.
Note that this contains another (reference or even copy of) `psi`.
cache : :class:`~tenpy.tools.cache.DictCache`
Cache that can be used by algorithms.
measurement_event : :class:`~tenpy.tools.events.EventHandler`
An event that gets emitted each time when measurements should be performed.
The callback functions should take :attr:`psi`, the simulation class itself,
and a dictionary `results` as arguments.
They should directly write the results into that dictionary.
output_filename : str
Filename for output.
_backup_filename : str
When writing a file a second time, instead of simply overwriting it, move it to there.
In that way, we still have a non-corrupt version if something fails during saving.
errors_during_run : list of tuples
List holding errors that occurred during runtime, i.e. during measurements or post-processing.
This is read out (and possibly raises an Exception) at the end of :meth:`run`.
_init_walltime : float
Walltime at initialization of the simulation class.
Used as reference point in :meth:`walltime`.
_last_save : float
Time of the last call to :meth:`save_results`, initialized to :attr:`_init_walltime`.
loaded_from_checkpoint : bool
True when the simulation is loaded with :meth:`from_saved_checkpoint`.
grouped : int
By how many sites we grouped in :meth:`group_sites_for_algorithm`.
model_ungrouped :
Only set if `grouped` > 1. In that case, :attr:`model` is the modified/grouped model,
and `model_ungrouped` is the original ungrouped model.
final_processing : bool
Flag that indicates that we're in the final processing and want to avoid raising errors
before saving results.
"""
#: name of the default algorithm `engine` class
default_algorithm = 'TwoSiteDMRGEngine'
#: tuples as for :cfg:option:`Simulation.connect_measurements` that get added if
#: the :cfg:option:`Simulation.use_default_measurements` is True.
default_measurements = [
('tenpy.simulations.measurement', 'm_measurement_index', {}, 1),
('tenpy.simulations.measurement', 'm_bond_dimension'),
('tenpy.simulations.measurement', 'm_entropy'),
]
#: tuples as for :cfg:option:`Simulation.run_post_processing`, same structure as for measurements
default_post_processing = []
#: logger : An instance of a logger; see :doc:`/intro/logging`. NB: class attribute.
logger = logging.getLogger(__name__ + ".Simulation")
def __init__(self, options, *, setup_logging=True, resume_data=None):
self._init_walltime = time.time()
if not hasattr(self, 'loaded_from_checkpoint'):
self.loaded_from_checkpoint = False
self.options = options # delay conversion to Config: avoid logging before setup_logging
cwd = self.options.setdefault("directory", None)
if cwd is not None:
if not os.path.exists(cwd):
os.mkdir(cwd)
os.chdir(cwd)
self.fix_output_filenames()
if setup_logging:
log_params = self.options.setdefault('log_params', {})
setup_logging_(**log_params, output_filename=self.output_filename)
# now that we have logging running, catch up with log messages
self.logger.info("new simulation\n%s\n%s\n%s", "=" * 80, self.__class__.__name__, "=" * 80)
self.options = asConfig(self.options, self.__class__.__name__)
self.options.touch('directory', 'output_filename', 'output_filename_params',
'overwrite_output', 'skip_if_output_exists', 'safe_write', 'log_params',
'estimate_RAM_const_offset')
if cwd is not None:
self.logger.info("change directory to %s", cwd) # os.chdir(cwd) above
self.logger.info("output filename: %s", self.output_filename)
random_seed = self.options.get('random_seed', None)
if random_seed is not None:
if self.loaded_from_checkpoint:
warnings.warn("resetting `random_seed` for a simulation loaded from checkpoint."
"Depending on where you use random numbers, "
"this might or might not be what you want!")
np.random.seed(random_seed)
self.options.subconfig('model_params').setdefault('random_seed', random_seed + 123456)
self.results = {
'simulation_parameters': self.options,
'version_info': self.get_version_info(),
'finished_run': False,
}
self._last_save = time.time()
self.errors_during_run = [] # add tuples holding ("name_step", module_name, module_func, err_traceback)
self.measurement_event = EventHandler("psi, simulation, model, results")
if resume_data is not None:
if 'psi' in resume_data:
self.psi = resume_data['psi']
if 'model' in resume_data: # usually not: we can cheaply regenerate a model
self.model = resume_data['model']
self.results['resume_data'] = resume_data
self.options.touch('sequential') # added by :func:`run_seq_simulations` for completeness
self.cache = CacheFile.open()
self.grouped = 1
self.final_processing = False
self.max_errors_before_abort = self.options.get('max_errors_before_abort', 10)
def __enter__(self):
self.init_cache()
self.cache = self.cache.__enter__() # start cache context
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cache.__exit__(exc_type, exc_value, traceback) # exit cache context
if exc_type is not None:
self.logger.exception("simulation abort with the following exception",
exc_info=(exc_type, exc_value, traceback))
self.options.warn_unused(True)
def estimate_RAM(self):
"""Estimates the RAM usage for the simulation, without running it.
Returns
-------
RAM : int
The expected RAM usage in kB.
"""
self.init_model() # model, required for algorithm
self.init_state() # psi, required for algorithm
self.group_sites_for_algorithm() # algorithm might only work if grouped
self.init_algorithm() # create engine (subclass of Algorithm)
return self.engine.estimate_RAM()
def run(self):
"""Run the whole simulation.
Returns
-------
results : dict
The :attr:`results` as returned by :meth:`prepare_results_for_save`.
"""
if self.loaded_from_checkpoint:
warnings.warn("called `run()` on a simulation loaded from checkpoint. "
"You should probably call `resume_run()` instead!")
self.init_model()
self.init_state()
self.group_sites_for_algorithm()
self.init_algorithm()
self.init_measurements()
self.run_algorithm() # here we spent most of the time
self.final_processing = True
self.group_split()
self.final_measurements()
self.run_post_processing()
self.results['finished_run'] = True
results = self.save_results()
self.logger.info('finished simulation run\n' + "=" * 80)
self.options.warn_unused(True)
self._display_errors_during_run()
return results
@classmethod
def from_saved_checkpoint(cls, filename=None, checkpoint_results=None, **kwargs):
"""Re-initialize a given simulation class from checkpoint results.
You should probably call :meth:`resume_run` after successful initialization.
Instead of calling this directly, consider using :func:`resume_from_checkpoint`.
Parameters
----------
filename : None | str
The filename of the checkpoint to be loaded.
You can either specify the `filename` or the `checkpoint_results`.
checkpoint_results : None | dict
Alternatively to `filename` the results of the simulation so far, i.e. directly the
data dictionary saved at a simulation checkpoint.
**kwargs :
Further keyword arguments given to the `Simulation.__init__`.
"""
if filename is not None:
if checkpoint_results is not None:
raise ValueError("pass either filename or checkpoint_results")
checkpoint_results = hdf5_io.load(filename)
if checkpoint_results is None:
raise ValueError("you need to pass `filename` or `checkpoint_results`")
options = checkpoint_results['simulation_parameters']
# usually, we would say `sim = cls(options)`.
# the following 3 lines provide an additional hook setting :attr:`loaded_from_checkpoint`
# before calling the `__init__()`, such that other methods can be customized to this case.
sim = cls.__new__(cls)
sim.loaded_from_checkpoint = True # hook to disable parts of the __init__()
if 'resume_data' in checkpoint_results:
kwargs.setdefault('resume_data', checkpoint_results['resume_data'])
sim.__init__(options, **kwargs)
sim.results = checkpoint_results
if 'measurements' in checkpoint_results:
sim.results['measurements'] = {k: list(v)
for k, v in sim.results['measurements'].items()}
return sim
def resume_run(self):
"""Resume a simulation that was initialized from a checkpoint.
Returns
-------
results : dict
The :attr:`results` as returned by :meth:`prepare_results_for_save`.
"""
if not self.loaded_from_checkpoint:
warnings.warn("called `resume_run()` on a simulation *not* loaded from checkpoint. "
"You probably want `run()` instead!")
self.init_model()
if not hasattr(self, 'psi'):
# didn't get psi in resume_data, but might still have it in the results
if 'psi' not in self.results:
raise ValueError("psi not saved in the results: can't resume!")
self.psi = self.results['psi']
self.init_state() # does (almost) nothing if self.psi is already initialized
self.group_sites_for_algorithm()
self.init_algorithm() # automatically reads out and del's ``self.results['resume_data']``
# the relevant part from init_measurements(), but don't make a measurement
self._connect_measurements()
self.options.touch('measure_initial')
self.resume_run_algorithm() # continue with the actual algorithm
# here we spent most of the time
self.final_processing = True
self.group_split()
self.final_measurements()
self.run_post_processing()
self.results['finished_run'] = True
results = self.save_results()
self.logger.info('finished simulation (resume_)run\n' + "=" * 80)
self.options.warn_unused(True)
self._display_errors_during_run()
return results
def init_cache(self):
"""Initialize the :attr:`cache` from the options.
This method is only called automatically when the simulation is used in a
``with ...`` statement.
This is the case if you use :func:`run_simulation`, etc.
Options
-------
.. cfg:configoptions :: Simulation
cache_threshold_chi : int
If the `algorithm_params.trunc_params.chi_max` in :attr:`options` is smaller than
this threshold, do not initialize a (non-trivial) cache.
cache_params : dict
Dictionary with parameters for the cache, see
:meth:`~tenpy.tools.cache.CacheFile.open`.
"""
cache_threshold_chi = self.options.get("cache_threshold_chi", 2000)
chi = get_recursive(self.options, "algorithm_params.trunc_params.chi_max", default=None)
if chi is not None and chi < cache_threshold_chi:
self.options.touch("cache_params")
self.logger.info("No cache due to chi=%d < cache_threshold_chi = %d",
chi, cache_threshold_chi)
self.cache = CacheFile.open() # default = keep in RAM.
return
self.cache.close()
cache_params = self.options.get("cache_params", {})
self.cache = CacheFile.open(**cache_params)
# note: can't use a `with self.cache` statement, but emulate it:
# self.__enter__() calls this function followed by
# self.cache = self.cache.__enter__()
def init_model(self):
"""Initialize a :attr:`model` from the model parameters.
Skips initialization if :attr:`model` is already set.
Options
-------
.. cfg:configoptions :: Simulation
model_class : str | class
Mandatory. Class or name of a subclass of :class:`~tenpy.models.model.Model`.
model_params : dict
Dictionary with parameters for the model; see the documentation of the
corresponding `model_class`.
"""
model_class_name = self.options["model_class"] # no default value!
if hasattr(self, 'model'):
self.options.touch('model_params')
return # skip actually regenerating the model
ModelClass = find_subclass(Model, model_class_name)
params = self.options.subconfig('model_params')
self.model = ModelClass(params)
def init_state(self):
"""Initialize a tensor network :attr:`psi`.
Skips initialization if :attr:`psi` is already set.
Options
-------
.. cfg:configoptions :: Simulation
initial_state_builder_class : str | class
Class or name of a subclass of :class:`~tenpy.networks.mps.InitialStateBuilder`.
Used to initialize `psi` according to the `initial_state_params`.
initial_state_params : dict
Dictionary with parameters for building `psi`; see the documentation of the
`initial_state_builder_class`, e.g. :cfg:config:`InitialStateBuilder`.
save_psi : bool
Whether the final :attr:`psi` should be included into the output :attr:`results`.
"""
if not hasattr(self, 'psi'):
builder_class = self.options.get('initial_state_builder_class', 'InitialStateBuilder')
Builder = find_subclass(InitialStateBuilder, builder_class)
params = self.options.subconfig('initial_state_params')
initial_state_builder = Builder(self.model.lat, params, self.model.dtype)
self.psi = initial_state_builder.run()
else:
self.logger.info("initial state as given") # nothing to do
# but avoid warnings about unused parameters
self.options.touch('initial_state_builder_class', 'initial_state_params')
if self.options.get('save_psi', True):
self.results['psi'] = self.psi
def group_sites_for_algorithm(self):
"""Coarse-grain the model and state for the algorithm.
Options
-------
.. cfg:configoptions :: Simulation
group_sites : int
How many sites to group. 1 means no grouping.
group_to_NearestNeighborModel : bool
If True, convert the grouped model to a
:class:`~tenpy.models.model.NearestNeighborModel`.
Use this if you want to run TEBD with a model that was originally next-nearest
neighbor.
"""
group_sites = self.grouped = self.options.get("group_sites", 1)
to_NN = self.options.get("group_to_NearestNeighborModel", False)
if group_sites < 1:
raise ValueError("invalid `group_sites` = " + str(group_sites))
if group_sites > 1:
if not self.loaded_from_checkpoint or self.psi.grouped < group_sites:
self.psi.group_sites(group_sites)
self.model_ungrouped = self.model.copy()
self.model.group_sites(group_sites)
if to_NN:
self.model = NearestNeighborModel.from_MPOModel(self.model)
def group_split(self):
"""Split sites of psi that were grouped in :meth:`group_sites_for_algorithm`."""
if self.grouped > 1:
self.psi.group_split(self.options['algorithm_params']['trunc_params'])
self.model = self.model_ungrouped
del self.model_ungrouped
self.grouped = 1
def init_algorithm(self, **kwargs):
"""Initialize the algorithm.
If :attr:`results` has `'resume_data'`, it is read out, used for initialization
and removed from the results.
Parameters
----------
**kwargs :
Extra keyword arguments passed on to the Algorithm.__init__(),
for example the `resume_data` when calling `resume_run`.
Options
-------
.. cfg:configoptions :: Simulation
algorithm_class : str | class
Class or name of a subclass of :class:`~tenpy.algorithms.algorithm.Algorithm`.
The engine of the algorithm to be run.
algorithm_params : dict
Dictionary with parameters for the algorithm; see the documentation of the
`algorithm_class`.
connect_algorithm_checkpoint : list of tuple
Functions to connect to the :attr:`~tenpy.algorithms.Algorithm.checkpoint` event
of the algorithm.
Each tuple can be of length 2 to 4, with entries
``(module, function, kwargs, priority)``, the last two optionally.
The mandatory `module` and `function` specify a callback measurement function.
`kwargs` can specify extra keyword-arguments for the function,
`priority` allows to tune the order in which the measurement functions get called.
See :meth:`~tenpy.tools.events.EventHandler.connect_by_name` for more details.
"""
alg_class_name = self.options.get("algorithm_class", self.default_algorithm)
AlgorithmClass = find_subclass(Algorithm, alg_class_name)
if 'resume_data' in self.results:
self.logger.info("use `resume_data` for initializing the algorithm engine")
kwargs.setdefault('resume_data', self.results['resume_data'].copy())
# clean up: they are no longer up to date after algorithm initialization!
# up to date resume_data is added in :meth:`prepare_results_for_save`
self.results['resume_data'].clear()
del self.results['resume_data']
kwargs.setdefault('cache', self.cache)
params = self.options.subconfig('algorithm_params')
self.engine = AlgorithmClass(self.psi, self.model, params, **kwargs)
self.engine.checkpoint.connect(self.save_at_checkpoint)
con_checkpoint = list(self.options.get('connect_algorithm_checkpoint', []))
for entry in con_checkpoint:
self.engine.checkpoint.connect_by_name(*entry)
def init_measurements(self):
"""Initialize and prepare measurements.
Options
-------
.. cfg:configoptions :: Simulation
connect_measurements : list of tuple
Functions to connect to the :attr:`measurement_event`.
Each tuple can be of length 2 to 4, with entries
``(module, function, kwargs, priority)``, the last two optionally.
The mandatory `module` and `function` specify a callback measurement function.
`kwargs` can specify extra keyword-arguments for the function,
`priority` allows to tune the order in which the measurement functions get called.
See :meth:`~tenpy.tools.events.EventHandler.connect_by_name` for more details.
use_default_measurements : bool
Each Simulation class defines a list of :attr:`default_measurements` in the same
format as :cfg:option:`Simulation.connect_measurements`.
This flag allows to explicitly disable them.
measure_initial: bool
Whether to perform a measurement on the initial state, i.e., before starting the
algorithm run.
measure_at_algorithm_checkpoints : bool
Defaults to False. If True, make measurements at each algorithm checkpoint.
This can be useful to study e.g. the DMRG convergence with the number of sweeps.
Note that (depending on the algorithm) `psi` might not be in canonical form during
the algorithm run. In that case, you might need to also enable the
`canonicalize_before_measurement` option to get correct e.g. correct
long-range correlation functions. (On the other hand, local onsite expectation
values are likely fine without the explicit canonical_form() call.)
canonicalize_before_measurement : bool
If True, call `psi.canonical_form()` on the state used for measurement.
"""
self._connect_measurements()
if self.options.get('measure_initial', True):
self.make_measurements() # sets up self.results['measurements'] if necessary
def _connect_measurements(self):
if self.options.get('use_default_measurements', True):
def_meas = self.default_measurements + self.model.get_extra_default_measurements()
else:
def_meas = []
con_meas = list(self.options.get('connect_measurements', []))
for entry in def_meas + con_meas:
# (module_name, func_name, kwargs=None, priority=0) = entry
self._connect_measurements_fct(*entry)
measure_at_alg = self.options.get('measure_at_algorithm_checkpoints', False)
if measure_at_alg:
def make_simulation_measurements(algorithm):
assert algorithm is self.engine
self.make_measurements()
self.engine.checkpoint.connect(make_simulation_measurements)
def _connect_measurements_fct(self, module_name, func_name, extra_kwargs=None, priority=0):
if extra_kwargs is None:
extra_kwargs = {}
wrap = False
if func_name.startswith('wrap'):
wrap = True
func_name = func_name.split()[1]
# find measurement function
if module_name == 'psi_method':
# psi might change/only be created at beginning of measurement
# so the function needs to be extracted dynamically during measurement
# this is done in `tenpy.simulations.measurement._m_psi_method{_wrapped}()`
extra_kwargs['func_name'] = func_name
func = _m_psi_method_wrapped if wrap else _m_psi_method
wrap = False
elif module_name == 'model_method':
# analogous to psi_method
extra_kwargs['func_name'] = func_name
func = _m_model_method_wrapped if wrap else _m_model_method
wrap = False
elif module_name == 'simulation_method':
# the simulation class already exists, so we can directly get the corresponding method
func = getattr(self, func_name)
else:
# global functions should also exist already, so we can directly get them
func = hdf5_io.find_global(module_name, func_name)
if wrap:
if 'results_key' in extra_kwargs:
results_key = extra_kwargs['results_key']
del extra_kwargs['results_key']
else:
results_key = func_name
func = measurement_wrapper(func, results_key=results_key)
self.measurement_event.connect(func, priority, extra_kwargs)
def run_algorithm(self):
"""Run the algorithm.
Calls ``self.engine.run()``.
"""
self.engine.run()
def resume_run_algorithm(self):
"""Resume running the algorithm.
Calls ``self.engine.resume_run()``.
"""
# usual algorithms have a loop with break conditions, which we can just resume
self.engine.resume_run()
def make_measurements(self):
"""Perform measurements and merge the results into ``self.results['measurements']``."""
self.logger.info("make measurements")
results = self.perform_measurements()
self._merge_measurement_results(results)
def _merge_measurement_results(self, results):
"""Merge dictionary `results` from measurements into ``self.results['measurement']``."""
# merge the results into self.results['measurements']
previous_results = self.results.get('measurements', None)
if previous_results is None:
self.results['measurements'] = {k: [v] for k, v in results.items()}
return
previous_keys = set(previous_results.keys())
new_keys = set(results.keys())
new_keys_not_previous = new_keys - previous_keys
if new_keys_not_previous:
warnings.warn(f"measurement gave new keys {new_keys_not_previous!r} "
"fill up with `None` for previous measurements.")
some_previous_measurement = next(iter(previous_results.values()))
measurement_len = len(some_previous_measurement)
for key in new_keys_not_previous:
previous_results[key] = [None] * measurement_len
# actual merge
for k, v in results.items(): # only new keys
previous_results[k].append(v)
previous_keys_not_new = previous_keys - new_keys
if previous_keys_not_new:
warnings.warn(f"measurement didn't give keys {previous_keys_not_new!r} "
"we have from previous measurements, fill up with `None`")
for key in previous_keys_not_new:
previous_results[key].append(None)
# done
def perform_measurements(self):
"""Emits the :attr:`measurement_event` to call measurement functions and collect results.
Returns
-------
results : dict
The results from calling the measurement functions.
"""
# in case of a failed measurement, we should raise the exception at the end of the
# simulation?
results = {}
psi, model = self.get_measurement_psi_model(self.psi, self.model)
returned = [] # make sure list exists if try-clause fails
try:
returned = self.measurement_event.emit(results=results,
psi=psi,
model=model,
simulation=self)
# we safe-guard the measurements with try-except
# to avoid that mistakes in the measurement cause us to loose all our data,
# e.g. if we were running DMRG for days, and just have a stupid typo in a measurement function
except Exception:
err_traceback = traceback.format_exc()
self.errors_during_run.append(("measurement", "?", "?", err_traceback))
max_errs = self.max_errors_before_abort
if max_errs is not None and len(self.errors_during_run) >= max_errs and not self.final_processing:
tracebacks = [f"Error during {step} of {module_name} {module_func}\n{err_traceback}"
for (step, module_name, module_func, err_traceback) in self.errors_during_run]
raise RuntimeError('\n'.join(["Too many failed measurements \n"] + tracebacks))
# check for returned values, although there shouldn't be any
returned = [entry for entry in returned if entry is not None]
if len(returned) > 0:
msg = ("Some measurement function returned a value instead of writing to `results`.\n"
"Add it to measurement results as 'UNKNOWN'.")
warnings.warn(msg)
results['UNKNOWN'] = returned
return results
def get_measurement_psi_model(self, psi, model):
"""Get psi for measurements.
Sometimes, the `psi` we want to use for measurements is different from the one the
algorithm actually acts on.
Here, we split sites, if they were grouped in :meth:`group_sites_for_algorithm`.
Parameters
----------
psi :
Tensor network; initially just ``self.psi``.
The method should make a copy before modification.
model :
Model matching `psi` (in terms of indexing, MPS order, grouped sites, ...)
Initially just ``self.model``.
Returns
-------
psi :
The psi suitable as argument for generic measurement functions.
model :
Model matching `psi` (in terms of indexing, MPS order, grouped sites, ...)
"""
if self.options.get("canonicalize_before_measurement", False):
if psi is self.psi:
psi = psi.copy() # make copy before
psi.canonical_form()
if self.grouped > 1:
if psi is self.psi:
psi = psi.copy() # make copy before
psi.group_split(self.options['algorithm_params']['trunc_params'])
model = self.model_ungrouped
return psi, model
def final_measurements(self):
"""Perform a last set of measurements."""
self.make_measurements()
def run_post_processing(self):
"""Apply (several) post-processing steps.
.. cfg:configoptions :: Simulation
post_processing : list of tuple
Functions to perform post-processing with the :class:`DataLoader`.
This uses a similar syntax to the attr:`connect_measurements` in meth:`init_measurements`.
Each tuple can be of length 2 to 3, with entries ``(module, function, kwargs)``.
The kwargs can contain a ``results_key`` under which the results (unless None is returned)
are saved. All other kwargs are passed on to the function.
.. note ::
All post-processing functions should follow the syntax:
``def pp_function(DL, *, kwarg1, kwarg_2=default_2):``
where ``DL`` is an instance of the :class:`DataLoader`, ``kwarg_1`` is a necessary
keyword argument (no default value), while ``kwarg_2`` is an optional keyword argument (with
default value)
"""
def_pp = self.default_post_processing
man_pp = list(self.options.get('post_processing', []))
all_pp = def_pp + man_pp
if len(all_pp) > 0:
DL = DataLoader(simulation=self)
for pp_step in all_pp:
# pp_step : module_name, func_name, extra_kwargs=None
# use try, except, so we don't abort a Simulation if post-processing is not working
try:
self._post_processing(DL, *pp_step)
except Exception:
err_traceback = traceback.format_exc()
self.errors_during_run.append(("post_process", pp_step[0], pp_step[1], err_traceback))
def _post_processing(self, DL: DataLoader, module_name: str, func_name: str, extra_kwargs: dict = None):
"""Apply one post-processing step."""
# get function / from module_name namespace
if extra_kwargs is None:
extra_kwargs = {}
function = hdf5_io.find_global(module_name, func_name)
# check if results_key is supplied
if 'results_key' in extra_kwargs:
results_key = extra_kwargs['results_key']
del extra_kwargs['results_key']
else:
results_key = func_name
# perform post-processing
self.logger.info(f"calling post-processing function {func_name}")
pp_result = function(DL, **extra_kwargs)
# pp_result might be None, skip saving
if pp_result is not None:
# make sure we don't override any results
all_results_keys = self.results.keys()
if results_key in all_results_keys:
for i in range(1, 1000):
new_results_key = f"{results_key}_{i:d}"
if new_results_key not in all_results_keys:
results_key = new_results_key
break
else:
raise ValueError("specify different results_key, there are already too many of them!")
self.logger.info(f"Saving post-processing result under {results_key}")
self.results[results_key] = pp_result
def _display_errors_during_run(self):
if len(self.errors_during_run) > 0:
for (step, module_name, module_func, err_traceback) in self.errors_during_run:
msg = f"Error during {step} of {module_name} {module_func}\n{err_traceback}"
warnings.warn(msg)
if self.output_filename is not None and self.max_errors_before_abort is not None:
raise Exception("Error(s) occurred during the Simulation, see warning of error messages above -"
f"but we saved results anyways in {self.output_filename}.")
def get_version_info(self):
"""Try to save version info which is necessary to allow reproducibility."""
sim_module = self.__class__.__module__
# also try to extract git revision of the simulation class
if sim_module.startswith('tenpy') or sim_module == "__main__":
cwd = os.getcwd()
else:
# use the cwd of the file where the simulation class is defined
module = importlib.import_module(sim_module) # get module object
cwd = os.path.dirname(os.path.abspath(module.__file__))
git_rev = version._get_git_revision(cwd)
version_info = {
'tenpy': version.version_summary,
'simulation_class': self.__class__.__qualname__,
'simulation_module': sim_module,
'simulation_git_HEAD': git_rev,
}
return version_info
def get_output_filename(self):
"""Read out the `output_filename` from the options.
You can easily overwrite this method in subclasses to customize the outputfilename
depending on the options passed to the simulations.
Options
-------
.. cfg:configoptions :: Simulation
output_filename : path_like | None
If ``None`` (default), no output is written to files.
If a string, this filename is used for output (up to modifications by
:meth:`fix_output_filenames` to avoid overwriting previous results).
output_filename_params : dict
Instead of specifying the `output_filename` directly, this dictionary describes
the parameters that should be included into it.
Entries of the dictionary are keyword arguments to
:func:`output_filename_from_dict` with the simulation parameters
(:cfg:option:`Simulation`, or equivalently :attr:`options`) as `options`.
Returns
-------
output_filename : str | None
Filename for output; None disables any writing to files.
Relative to :cfg:option:`Simulation.directory`, if specified.
The file ending determines the output format.
"""
# note: this function shouldn't use logging: it's called before setup_logging()
output_filename_params = self.options.setdefault('output_filename_params', None)
if output_filename_params is not None:
default = output_filename_from_dict(self.options, **output_filename_params)
else:
default = None
output_filename = self.options.setdefault('output_filename', default)
return output_filename
def fix_output_filenames(self):
"""Determine the output filenames.
This function determines the :attr:`output_filename` and writes a one-line text into
that file to indicate that we're running a simulation generating it.
Further, :attr:`_backup_filename` is determined.
Options
-------
.. cfg:configoptions :: Simulation
skip_if_output_exists : bool
If True, raise :class:`Skip` if the output file already exists.
overwrite_output : bool
Only makes a difference if `skip_if_output_exists` is False and the file exists.
In that case, with `overwrite_output`, just save everything under that name again,
or with `overwrite_output`=False, replace
``filename.ext`` with ``filename_01.ext`` (and further increasing numbers)
until we get a filename that doesn't exist yet.
safe_write : bool
If True (default), perform a "safe" overwrite of `output_filename` as described
in :meth:`save_results`.
"""
# note: this function shouldn't use logging: it's called before setup_logging()
# hence, assume that `options` is still a pure dict, not a tenpy.tools.misc.Config
output_filename = self.get_output_filename()
overwrite_output = self.options.setdefault("overwrite_output", False)
skip_if_exists = self.options.setdefault("skip_if_output_exists", False)
if output_filename is None:
self.output_filename = None
self._backup_filename = None
return
out_fn = Path(output_filename) # convert to Path
self.output_filename = out_fn
self._backup_filename = self.get_backup_filename(out_fn)
if out_fn.exists():
if skip_if_exists:
# no need to touch options: not yet converted to config
raise Skip("simulation output filename already exists", out_fn)
if not overwrite_output and not self.loaded_from_checkpoint:
# adjust output filename to avoid overwriting stuff
root, ext = os.path.splitext(out_fn)
for i in range(1, 100):
new_out_fn = Path(root + '_' + str(i) + ext)
if not new_out_fn.exists():
break
else:
raise ValueError("Refuse to make another copy. CLEAN UP!")
warnings.warn(f"changed output filename to {new_out_fn!s}")
self.output_filename = out_fn = new_out_fn
self._backup_filename = self.get_backup_filename(out_fn)
# else: overwrite stuff in `save_results`
if overwrite_output and not self.loaded_from_checkpoint:
# move logfile to *.backup.log
log_fn = out_fn.with_suffix('.log')
backup_log_fn = self.get_backup_filename(log_fn)
if log_fn.exists() and backup_log_fn is not None:
if backup_log_fn.exists():
backup_log_fn.unlink()
log_fn.rename(backup_log_fn)
if self._backup_filename is not None and not self._backup_filename.exists():
import socket
text = "simulation initialized on {host!r} at {time!s}\n"
text = text.format(host=socket.gethostname(), time=time.asctime())
with self._backup_filename.open('w') as f:
f.write(text)
def get_backup_filename(self, output_filename):
"""Extract the name used for backups of `output_filename`.
Parameters
----------
output_filename : pathlib.Path
The filename where data is saved.
Returns
-------
backup_filename : pathlib.Path
The filename where to keep a backup while writing files to avoid.
"""
# note: this function shouldn't use logging
if self.options.setdefault("safe_write", True):
return output_filename.with_suffix('.backup' + output_filename.suffix)
else:
return None
def save_results(self, results=None):
"""Save the :attr:`results` to an output file.
Performs a "safe" overwrite of :attr:`output_filename` by first moving the old file
to :attr:`_backup_filename`, then writing the new file, and finally removing the backup.