-
Notifications
You must be signed in to change notification settings - Fork 2
/
store.py
1889 lines (1583 loc) · 65.3 KB
/
store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
=====
Store
=====
The file system for storing and updating state variables during an experiment.
"""
import copy
import logging as log
from pprint import pformat
import uuid
import numpy as np
from pint import Quantity
from typing import Optional
from vivarium import divider_registry, serializer_registry, updater_registry
from vivarium.core.process import Process, Step
from vivarium.library.dict_utils import deep_merge, deep_merge_check, MULTI_UPDATE_KEY
from vivarium.library.topology import without, dict_to_paths
from vivarium.core.types import Processes, Topology, State, Steps, Flow
_EMPTY_UPDATES = None, None, None, None, None, None
def generate_state(
processes: Processes,
topology: Topology,
initial_state: Optional[State],
steps: Optional[Steps] = None,
flow: Optional[Flow] = None,
) -> 'Store':
"""Initialize a simulation's state.
Args:
processes: Simulation processes.
topology: Topology linking process ports to stores.
initial_state: Initial simulation state. Omitted variables will
be assigned values based on schema defaults.
Returns:
Initialized state.
"""
store = Store({})
steps = steps or {}
store.generate(tuple(), processes, steps, flow, topology, initial_state)
store.build_topology_views()
return store
def key_for_value(d, looking):
"""Get the key associated with a value in a dictionary.
Only top-level keys are searched.
Args:
d: The dictionary.
looking: The value to look for.
Returns:
The associated key, or None if no key found.
"""
found = None
for key, value in d.items():
if looking == value:
found = key
break
return found
def hierarchy_depth(hierarchy, path=()):
"""
Create a mapping of every path in the hierarchy to the node living at
that path in the hierarchy.
"""
base = {}
for key, inner in hierarchy.items():
down = tuple(path + (key,))
if isinstance(inner, dict):
base.update(hierarchy_depth(inner, down))
else:
base[down] = inner
return base
def _always_true(_):
return True
def _identity(y):
return y
def topology_path(topology, path):
'''
get the subtopology at the path inside the given topology.
'''
if not path:
topology, path
else:
head = path[0]
tail = path[1:]
if head in topology:
subtopology = topology[head]
if isinstance(subtopology, tuple):
return subtopology, tail
elif isinstance(subtopology, dict):
if '_path' in subtopology:
if tail and tail[0] in subtopology:
down = topology_path(subtopology, tail)
if down:
return subtopology['_path'] + down[0], down[1]
else:
return subtopology['_path'], tail
else:
return topology_path(subtopology, tail)
def insert_topology(topology, port_path, target_path):
assert isinstance(port_path, tuple)
assert len(port_path) > 0
head = port_path[0]
tail = port_path[1:]
if not tail:
topology[head] = target_path
else:
subtopology = topology[head]
if isinstance(subtopology, tuple):
relative_path = tuple(['..' for _ in subtopology]) + target_path
new_topology = insert_topology(
{'_path': subtopology},
tail,
relative_path)
topology[head] = new_topology
elif isinstance(subtopology, dict):
topology[head] = insert_topology(subtopology, tail, target_path)
else:
raise Exception(f'invalid topology {topology} at key {head}')
return topology
def convert_path(path):
if isinstance(path, list):
path = tuple(path)
elif not isinstance(path, tuple):
path = (path,)
return path
class Store:
"""Holds a subset of the overall model state
The total state of the model can be broken down into :term:`stores`,
each of which is represented by an instance of this `Store` class.
The store's state is a set of :term:`variables`, each of which is
defined by a set of :term:`schema key-value pairs`. The valid schema
keys are listed in :py:attr:`schema_keys`, and they are:
* **_default** (Type should match the variable value): The default
value of the variable.
* **_updater** (:py:class:`str`): The name of the :term:`updater` to
use. By default this is ``accumulate``.
* **_divider** (:py:class:`str`): The name of the :term:`divider` to
use. Note that ``_divider`` is not included in the ``schema_keys``
set because it can be applied to any node in the hierarchy, not
just leaves (which represent variables).
* **_value** (Type should match the variable value): The current
value of the variable. This is ``None`` by default.
* **_properties** (:py:class:`dict`): Extra properties of the
variable that don't have a specific schema key. This is an empty
dictionary by default.
* **_emit** (:py:class:`bool`): Whether to emit the variable to the
:term:`emitter`. This is ``False`` by default.
"""
schema_keys = {
'_default',
'_updater',
'_value',
'_properties',
'_emit',
'_serializer',
}
def __init__(self, config, outer=None, source=None):
self.outer = outer
self.inner = {}
self.subschema = {}
self.subtopology = {}
self.properties = {}
self.default = None
self.updater = None
self.value = None
self.units = None
self.divider = None
self.emit = False
self.sources = {}
self.leaf = False
self.serializer = None
self.topology = {}
self.topology_view = None
# self.flow is None when this node has no flow (either because
# it is not a Step or because it is a Step treated like a
# Deriver) and a list when this node holds a Step with
# dependencies. The list is empty if the Step has no
# dependencies but should not be treated like a Deriver.
self.flow = None
self.apply_config(config, source)
def __getitem__(self, path):
'''Retrieve a :term:`hierarchy` node by its :term:`path`.
.. WARNING:
This function is **experimental** and part of the
:term:`store API`.
Args:
path: Path, relative to ``self``, of the store to retrieve.
Returns:
The store at ``path``. Note that the store is returned, not
the value of that store.
'''
path = convert_path(path)
return self.get_path(path)
def __setitem__(self, path, value):
'''Set a :term:`hierarchy` node's value by its :term:`path`.
.. WARNING:
This function is **experimental** and part of the
:term:`store API`.
Args:
path: Path, relative to ``self``, of the store to modify.
value: The value to be stored as the value of the store at
``path``.
'''
path = convert_path(path)
self.set_path(path, value)
def create(self, path, value=None, absolute=False, **kwargs):
path = convert_path(path)
if value:
kwargs['_value'] = value
if '_default' not in kwargs:
kwargs['_default'] = kwargs.get('_value')
if absolute:
top = self.top()
store = top.establish_path(path, config=kwargs)
top.apply_subschema_path(path)
else:
store = self.establish_path(path, config=kwargs)
self.apply_subschema_path(path)
store.apply_defaults()
return store
def connect(self, path, value, absolute=False):
'''Wire a store's process to another store.
This function must not be used unless ``self`` holds a
:term:`process`.
.. WARNING:
This function is **experimental** and part of the
:term:`store API`.
Args:
path: Path of the port to connect.
value: The store (or the path to the store) to connect to
the port at ``path``.
Raises:
AssertionError: If ``self.value`` is not an instance of
:py:class:`vivarium.core.process.Process`.
Exception: If ``value`` is a :py:class:`Store` that is in a
different tree than ``self``.
'''
path = convert_path(path)
assert isinstance(self.value, Process), \
f'cannot connect non-process {self.value} at {self.path_for()} to {path}'
if isinstance(value, Store):
target_store = value
if self.independent_store(target_store):
raise Exception(
f"the store being inserted at {path} is from a different tree "
f"at {target_store.path_for()}: {target_store.get_value()}")
else:
store_path = convert_path(value)
if absolute:
target_store = self.top().get_path(store_path)
else:
target_store = self.outer.get_path(store_path)
# update the topology
self.update_topology(path, target_store)
def set_path(self, path, value):
'''Set a value at a path in the hierarchy.
.. WARNING:
This function is **experimental** and part of the
:term:`store API`.
Args:
path: The :term:`path` relative to ``self`` where the value
should be set.
value: The value to set. The store node at ``path`` will
hold ``value`` when this function returns.
'''
# this case only when called directly
if len(path) == 0:
if isinstance(value, Store):
self.value = value.value
else:
self.value = value
elif len(path) == 1:
final = path[0]
if isinstance(value, Store):
raise Exception(
f"the store being inserted at {path} is already in a tree "
f"at {value.path_for()}: {value.get_value()}")
# TODO: make it so a Store can be added here if it has no outer
# if value.outer:
# raise Exception(f"the store being inserted at {path} is already in a tree
# at {value.path_for()}: {value.get_value()}")
# else:
# # place the store at this point in the tree
# self.inner[final] = value
# value.outer = self
elif isinstance(value, Process):
self.insert({
'processes': value.generate_processes({'name': final}),
'topology': value.generate_topology({'name': final}),
'initial_state': value.initial_state()})
else:
down = self.get_path((final,))
if down:
if not down.leaf:
Exception(f'trying to set the value {value} of a branch at {down.path_for()}')
down.value = value
else:
Exception(f'trying to set the value {value} at a path that does not exist {final} at {self.path_for()}')
elif len(path) > 1:
head = path[0]
tail = path[1:]
down = self.get_path((head,))
if down:
down.set_path(tail, value)
else:
Exception(f'trying to set the value {value} at a path that does not exist {path} at {self.path_for()}')
else:
raise Exception("this should never happen")
def update_topology(self, port_path, target_store):
"""Update the topology with a new port-path pair.
To use this function ``self`` must hold a :term:`process`.
.. WARNING:
This function is **experimental** and part of the
:term:`store API`.
Args:
port_path: Port of the new topology entry.
target_store: The store to wire to ``port_path``.
Raises:
AssertionError: If ``self.value`` is not an instance of
:py:class:`vivarium.core.process.Process`.
"""
assert isinstance(self.value, Process), \
f'assigning topology from {port_path} to {target_store.path_for()} ' \
f'at {self.path_for()} is invalid, not a process'
topology = copy.deepcopy(self.topology)
self.topology = insert_topology(
topology, port_path, self.outer.path_to(target_store))
self.value.schema = self.value.get_schema()
self.outer.topology_ports(
self.value.schema,
self.topology,
source=self.path_for())
# cache the process's view
self.topology_view = self.schema_topology(
self.value.schema,
self.topology)
def independent_store(self, store):
return self.top() != store.top()
def path_to(self, to):
"""return a path from self to the given Store"""
self_path = self.path_for()
to_path = to.path_for()
while len(self_path) > 0 and len(to_path) > 0 and self_path[0] == to_path[0]:
self_path = self_path[1:]
to_path = to_path[1:]
path = [
'..'
for _ in self_path]
path.extend(to_path)
return tuple(path)
def check_default(self, new_default):
"""Check a new default value.
Compare a new default value to the existing default. If they
conflict, decide which to rely on and log a warning.
Returns:
The new default value.
"""
defaults_conflict = False
if self.default is not None:
self_default_comp = self.default
new_default_comp = new_default
if isinstance(self_default_comp, np.ndarray):
self_default_comp = self.default.tolist()
if isinstance(new_default_comp, np.ndarray):
new_default_comp = new_default.tolist()
defaults_conflict != (self_default_comp == new_default_comp)
if defaults_conflict:
if (
not isinstance(new_default, np.ndarray)
and not isinstance(self.default, np.ndarray)
and new_default == 0
and self.default != 0
):
log.debug(
'_default schema conflict: %s and %s. selecting %s',
str(self.default), str(new_default), str(self.default))
return self.default
log.debug(
'_default schema conflict: %s and %s. selecting %s',
str(self.default), str(new_default), str(new_default))
return new_default
def check_value(self, new_value):
"""Check a new schema value.
Args:
new_value: The new value.
Returns:
The new value.
Raises:
Exception: If the store already has a value and the new
value is different from the existing one.
"""
if self.value is not None and new_value != self.value:
raise Exception(
'_value schema conflict: {} and {}'.format(
new_value, self.value))
return new_value
def merge_subtopology(self, subtopology):
"""Merge a new subtopology with the store's existing one."""
self.subtopology = deep_merge(self.subtopology, subtopology)
def apply_subschema_config(self, subschema):
"""Merge a new subschema config with the current subschema."""
self.subschema = deep_merge(
self.subschema,
subschema)
def apply_config(self, config, source=None):
"""
Expand the tree by applying additional config.
Special keys for the config are:
* _default - Default value for this node.
* _properties - An arbitrary map of keys to values. This can be used
for any properties which exist outside of the operation of the
tree (like mass or energy).
* _updater - Which updater to use. Default is 'accumulate' which
adds the new value to the existing value, but 'set' is common
as well. You can also provide your own function here instead
of a string key into the updater library.
* _emit - whether or not to emit the values under this point in
the tree.
* _divider - What to do with this node when division happens.
Default behavior is to leave it alone, but you can also pass
'split' here, or a function of your choosing. If you need
other values from the state you need to supply a dictionary
here containing the updater and the topology for where the
other state values are coming from. This has two keys:
* divider - a function that takes the existing value and any
values supplied from the adjoining topology.
* topology - a mapping of keys to paths where the value for
those keys will be found. This will be passed in as the
second argument to the divider function.
* _subschema/* - If this node was declared to house an unbounded set
of related states, the schema for these states is held in this
nodes subschema and applied whenever new subkeys are added
here.
* _subtopology - The subschema is informed by the subtopology to
map the process perspective to the actual tree structure.
* _topology - If this node stores a :term:`process`, then the
process's topology must be provided under this key. This key
may only be provided if the node stores a :term:`process`.
* _flow - If this node stores a :term:`step`, then the step's
dependencies must be specified under this key as a list of
:term:`paths` relative to the step's parent node in the
:term:`hierarchy`. This key must not be provided unless the
node holds a step.
"""
if '*' in config:
self.apply_subschema_config(config['*'])
config = without(config, '*')
if '_subschema' in config:
if source:
self.sources[source] = config['_subschema']
self.apply_subschema_config(config['_subschema'])
config = without(config, '_subschema')
if '_subtopology' in config:
self.merge_subtopology(config['_subtopology'])
config = without(config, '_subtopology')
if '_topology' in config:
self.topology = config['_topology']
config = without(config, '_topology')
if '_flow' in config:
flow = config.pop('_flow')
if flow != {}:
self.flow = flow
if '_divider' in config:
self.divider = config['_divider']
if isinstance(self.divider, str):
self.divider = divider_registry.access(self.divider)
if isinstance(self.divider, dict) and isinstance(
self.divider['divider'], str):
self.divider['divider'] = divider_registry.access(
self.divider['divider'])
config = without(config, '_divider')
if self.schema_keys & set(config.keys()):
if self.inner:
raise Exception(
'trying to assign leaf values to a branch at: {}'.format(
self.path_for()))
self.leaf = True
if '_units' in config:
self.units = config['_units']
self.serializer = serializer_registry.access('units')
if '_serializer' in config:
self.serializer = config['_serializer']
if isinstance(self.serializer, str):
self.serializer = serializer_registry.access(
self.serializer)
if '_default' in config:
self.default = self.check_default(config.get('_default'))
if isinstance(self.default, Quantity):
self.units = self.units or self.default.units
self.serializer = (self.serializer or
serializer_registry.access('units'))
elif isinstance(self.default, list) and \
len(self.default) > 0 and \
isinstance(self.default[0], Quantity):
self.units = self.units or self.default[0].units
self.serializer = (self.serializer or
serializer_registry.access('units'))
elif isinstance(self.default, np.ndarray):
self.serializer = (self.serializer or
serializer_registry.access('numpy'))
if '_value' in config:
self.value = self.check_value(config.get('_value'))
if isinstance(self.value, Quantity):
self.units = self.value.units
self.updater = config.get(
'_updater',
self.updater or 'accumulate',
)
self.properties = deep_merge(
self.properties,
config.get('_properties', {}))
self.emit = config.get('_emit', self.emit)
if source:
self.sources[source] = config
else:
if self.leaf and config:
if self.value:
raise Exception(
f'trying to assign create inner for leaf node: '
f'{self.path_for()} with value {self.value}')
self.leaf = False
for key, child in config.items():
if key not in self.inner:
self.inner[key] = Store(child, outer=self, source=source)
else:
self.inner[key].apply_config(child, source=source)
if self.topology and not isinstance(self.value, Process):
raise ValueError(
f'Attempting to create Store at {self.path_for()} '
f'with topology {self.topology}, which is not allowed '
f'because the Store value ({self.value}) is not a '
'Process.')
if self.flow and not isinstance(self.value, Step):
raise ValueError(
f'Attempting to create Store at {self.path_for()} '
f'with flow {self.flow}, which is not allowed because '
f'the Store value ({self.value}) is not a Step.')
def get_updater(self, update):
"""Get the updater to use for an update applied to this store.
Args:
update: The update.
Returns:
If available, the updater specified in the update. If no
such updater is specified, return this store's default
updater. If necessary, retrieves updater from the registry.
"""
updater = self.updater
if isinstance(update, dict) and '_updater' in update:
updater = update['_updater']
if isinstance(updater, str):
updater = updater_registry.access(updater)
return updater
def get_config(self, sources=False):
"""
Assemble a dictionary representation of the config for this node.
A desired property is that the node can be exactly recreated by
applying the resulting config to an empty node again.
"""
config = {}
if self.properties:
config['_properties'] = self.properties
if self.subschema:
config['_subschema'] = self.subschema
if self.subtopology:
config['_subtopology'] = self.subtopology
if self.divider:
config['_divider'] = self.divider
if sources and self.sources:
config['_sources'] = self.sources
if self.inner:
child_config = {
key: child.get_config(sources)
for key, child in self.inner.items()}
config.update(child_config)
else:
config.update({
'_default': self.default,
'_value': self.value})
if self.updater:
config['_updater'] = self.updater
if self.units:
config['_units'] = self.units
if self.emit:
config['_emit'] = self.emit
return config
def top(self):
"""
Find the top of this tree.
"""
if self.outer:
return self.outer.top()
return self
def path_for(self):
"""
Find the path to this node.
"""
if self.outer:
key = key_for_value(self.outer.inner, self)
above = self.outer.path_for()
return above + (key,)
return tuple()
def get_value(self, condition=None, f=None):
"""
Pull the values out of the tree in a structure symmetrical to the tree.
"""
if self.inner:
if condition is None:
condition = _always_true
if f is None:
f = _identity
return {
key: f(child.get_value(condition, f))
for key, child in self.inner.items()
if condition(child)}
if self.subschema:
return {}
return self.value
def get_processes(self):
"""
Get all processes in this store. Does not include steps.
"""
if self.inner:
inner_processes = {}
for key, child in self.inner.items():
if child.inner:
child_processes = child.get_processes()
if child_processes:
inner_processes[key] = child_processes
elif (
isinstance(child.value, Process)
and not isinstance(child.value, Step)):
inner_processes[key] = child.value
if inner_processes:
return inner_processes
elif isinstance(self.value, Process):
return self.value
return None
def get_steps(self):
"""Get all steps under this store."""
if self.inner:
inner_processes = {}
for key, child in self.inner.items():
if child.inner:
child_processes = child.get_steps()
if child_processes:
inner_processes[key] = child_processes
elif isinstance(child.value, Step):
inner_processes[key] = child.value
if inner_processes:
return inner_processes
elif isinstance(self.value, Process):
return self.value
return None
def get_topology(self):
"""
Get the topology for all processes in this store.
"""
if self.inner:
inner_topology = {}
for key, child in self.inner.items():
child_topology = child.get_topology()
if child_topology:
inner_topology[key] = child_topology
if inner_topology:
return inner_topology
elif self.topology:
return self.topology
return None
def get_flow(self):
"""Get the flow for all :term:`steps` under this node.
For example:
>>> from vivarium.core.store import Store
>>> from vivarium.core.process import Step
>>> class MyStep(Step):
... def ports_schema(self):
... return {
... 'port': ['variable'],
... }
... def next_update(self, timestep, states):
... return {}
>>> schema = {
... 'agent1': {
... 'store': {
... 'variable': {
... '_default': 0,
... },
... },
... 'step1': {
... '_value': MyStep(),
... '_topology': {
... 'port': ('store',),
... },
... '_flow': [],
... },
... 'step2': {
... '_value': MyStep(),
... '_topology': {
... 'port': ('store',),
... },
... '_flow': [('step1',)],
... },
... },
... }
>>> store = Store(schema)
>>> store.get_flow()
{'agent1': {'step1': [], 'step2': [('step1',)]}}
"""
if self.inner:
inner_flow = {}
for key, child in self.inner.items():
child_flow = child.get_flow()
if child_flow is not None:
inner_flow[key] = child_flow
if inner_flow:
return inner_flow
elif self.flow is not None:
return self.flow
return None
def get_path(self, path):
"""
Get the node at the given path relative to this node.
"""
if path:
step = path[0]
if step == '..':
child = self.outer
else:
child = self.inner.get(step)
if child:
return child.get_path(path[1:])
elif isinstance(self.value, Process):
towards = topology_path(self.topology, path)
if towards:
target = self.outer.get_path(towards[0])
return target.get_path(towards[1])
else:
raise Exception(f"there is no path from leaf node {self.path_for()} to {path}")
return self
def get_paths(self, paths):
"""Get the nodes at each of the specified paths.
Args:
paths: Map from keys to paths.
Returns:
A dictionary with the same keys as ``paths``. Each key is
mapped to the Store object at the associated path.
"""
return {
key: self.get_path(path)
for key, path in paths.items()}
def get_values(self, paths):
"""Get the values at each of the provided paths.
Args:
paths: Map from keys to paths.
Returns:
A dictionary with the same keys as ``paths``. Each key is
mapped to the value at the associated path.
"""
return {
key: self.get_in(path)
for key, path in paths.items()}
def get_in(self, path):
"""Get the value at ``path`` relative to this store."""
return self.get_path(path).get_value()
def get_template(self, template):
"""
Pass in a template dict with None for each value you want to
retrieve from the tree!
"""
state = {}
for key, value in template.items():
child = self.inner[key]
if value is None:
state[key] = child.get_value()
else:
state[key] = child.get_template(value)
return state
def emit_data(self):
"""Emit the value at this Store.
Obeys the schema (namely emits only if ``_emit`` is true). Also
applies serializers and converts units as necessary.
Returns:
The value to emit, or None if nothing should be emitted.
"""
data = {}
if self.inner:
for key, child in self.inner.items():
child_data = child.emit_data()
if child_data is not None or child_data == 0:
data[key] = child_data
return data
if self.emit:
if self.serializer:
if isinstance(self.value, list) and self.units:
return self.serializer.serialize(
[v.to(self.units) for v in self.value])
if self.units:
return self.serializer.serialize(
self.value.to(self.units))
if isinstance(self.value, list):
return self.value
return self.serializer.serialize(self.value)
if self.units:
return self.value.to(self.units).magnitude
return self.value
return None
def delete_path(self, path):
"""
Delete the subtree at the given path.
"""
if not path:
self.inner = {}
self.value = None
return self
target = self.get_path(path[:-1])
remove = path[-1]
if remove in target.inner:
lost = target.inner[remove]
del target.inner[remove]
return lost
return None
def divide_value(self):
"""
Apply the divider for each node to the value in that node to
assemble two parallel divided states of this subtree.
"""
if self.divider:
# divider is either a function or a dict with topology and/or config
if isinstance(self.divider, dict):
divider = self.divider['divider']
if isinstance(divider, str):
divider = divider_registry.access(divider)
args = {}
if 'topology' in self.divider:
topology = self.divider['topology']
args.update({'state': self.topology_state(topology)})
if 'config' in self.divider:
config = self.divider['config']
args.update({'config': config})
return divider(self.get_value(), **args)
return self.divider(self.get_value())