forked from canonical/ops-scenario
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.py
2059 lines (1639 loc) · 70.6 KB
/
state.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import dataclasses
import datetime
import inspect
import random
import re
import string
from enum import Enum
from itertools import chain
from pathlib import Path, PurePosixPath
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
Final,
FrozenSet,
Generic,
Iterable,
List,
Literal,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)
from uuid import uuid4
import ops
import yaml
from ops import pebble
from ops.charm import CharmBase, CharmEvents
from ops.model import CloudCredential as CloudCredential_Ops
from ops.model import CloudSpec as CloudSpec_Ops
from ops.model import SecretRotate, StatusBase
from scenario.errors import MetadataNotFoundError, StateValidationError
from scenario.logger import logger as scenario_logger
if TYPE_CHECKING: # pragma: no cover
from scenario import Context
AnyJson = Union[str, bool, dict, int, float, list]
RawSecretRevisionContents = RawDataBagContents = Dict[str, str]
UnitID = int
CharmType = TypeVar("CharmType", bound=CharmBase)
logger = scenario_logger.getChild("state")
ATTACH_ALL_STORAGES = "ATTACH_ALL_STORAGES"
CREATE_ALL_RELATIONS = "CREATE_ALL_RELATIONS"
BREAK_ALL_RELATIONS = "BREAK_ALL_RELATIONS"
DETACH_ALL_STORAGES = "DETACH_ALL_STORAGES"
_ACTION_EVENT_SUFFIX = "_action"
# all builtin events except secret events. They're special because they carry secret metadata.
_BUILTIN_EVENTS = {
"start",
"stop",
"install",
"install",
"start",
"stop",
"remove",
"update_status",
"config_changed",
"upgrade_charm",
"pre_series_upgrade",
"post_series_upgrade",
"leader_elected",
"leader_settings_changed",
"collect_metrics",
}
_FRAMEWORK_EVENTS = {
"pre_commit",
"commit",
"collect_app_status",
"collect_unit_status",
}
_PEBBLE_READY_EVENT_SUFFIX = "_pebble_ready"
_PEBBLE_CUSTOM_NOTICE_EVENT_SUFFIX = "_pebble_custom_notice"
_PEBBLE_CHECK_FAILED_EVENT_SUFFIX = "_pebble_check_failed"
_PEBBLE_CHECK_RECOVERED_EVENT_SUFFIX = "_pebble_check_recovered"
_RELATION_EVENTS_SUFFIX = {
"_relation_changed",
"_relation_broken",
"_relation_joined",
"_relation_departed",
"_relation_created",
}
_STORAGE_EVENTS_SUFFIX = {
"_storage_detaching",
"_storage_attached",
}
_SECRET_EVENTS = {
"secret_changed",
"secret_removed",
"secret_rotate",
"secret_expired",
}
class ActionFailed(Exception):
"""Raised at the end of the hook if the charm has called ``event.fail()``."""
def __init__(self, message: str, state: "State"):
self.message = message
self.state = state
# This can be replaced with the KW_ONLY dataclasses functionality in Python 3.10+.
def _max_posargs(n: int):
class _MaxPositionalArgs:
"""Raises TypeError when instantiating objects if arguments are not passed as keywords.
Looks for a `_max_positional_args` class attribute, which should be an int
indicating the maximum number of positional arguments that can be passed to
`__init__` (excluding `self`).
"""
_max_positional_args = n
def __new__(cls, *args, **kwargs):
# inspect.signature guarantees the order of parameters is as
# declared, which aligns with dataclasses. Simpler ways of
# getting the arguments (like __annotations__) do not have that
# guarantee, although in practice it is the case.
parameters = inspect.signature(cls).parameters
required_args = [
name
for name in tuple(parameters)
if parameters[name].default is inspect.Parameter.empty
and name not in kwargs
]
n_posargs = len(args)
max_n_posargs = cls._max_positional_args
kw_only = {
name
for name in tuple(parameters)[max_n_posargs:]
if not name.startswith("_")
}
if n_posargs > max_n_posargs:
raise TypeError(
f"{cls.__name__} takes {max_n_posargs} positional "
f"argument{'' if max_n_posargs == 1 else 's'} but "
f"{n_posargs} {'was' if n_posargs == 1 else 'were'} "
f"given. The following arguments are keyword-only: "
f"{', '.join(kw_only)}",
) from None
# Also check if there are just not enough arguments at all, because
# the default TypeError message will incorrectly describe some of
# the arguments as positional.
elif n_posargs < len(required_args):
required_pos = [
f"'{arg}'"
for arg in required_args[n_posargs:]
if arg not in kw_only
]
required_kw = {
f"'{arg}'" for arg in required_args[n_posargs:] if arg in kw_only
}
if required_pos and required_kw:
details = f"positional: {', '.join(required_pos)} and keyword: {', '.join(required_kw)} arguments"
elif required_pos:
details = f"positional argument{'' if len(required_pos) == 1 else 's'}: {', '.join(required_pos)}"
else:
details = f"keyword argument{'' if len(required_kw) == 1 else 's'}: {', '.join(required_kw)}"
raise TypeError(f"{cls.__name__} missing required {details}") from None
return super().__new__(cls)
def __reduce__(self):
# The default __reduce__ doesn't understand that some arguments have
# to be passed as keywords, so using the copy module fails.
attrs = cast(Dict[str, Any], super().__reduce__()[2])
return (lambda: self.__class__(**attrs), ())
return _MaxPositionalArgs
def _add_signature(cls):
"""Set the __signature__ attribute on the class to its signature.
This tricks Sphinx's autodoc into using the `__init__` signature rather than
the `__new__` signature, which is always `*args, **kwargs` for classes
that use the _MaxPositionalArgs class.
"""
signature = inspect.signature(cls)
parameters = []
for position, param in enumerate(signature.parameters.values()):
if position >= cls._max_positional_args:
parameters.append(param.replace(kind=inspect.Parameter.KEYWORD_ONLY))
else:
parameters.append(param)
cls.__signature__ = signature.replace(parameters=parameters)
return cls
@_add_signature
@dataclasses.dataclass(frozen=True)
class JujuLogLine(_max_posargs(2)):
"""An entry in the Juju debug-log."""
level: str
"""The level of the message, for example ``INFO`` or ``ERROR``."""
message: str
"""The log message."""
@_add_signature
@dataclasses.dataclass(frozen=True)
class CloudCredential(_max_posargs(0)):
__doc__ = ops.CloudCredential.__doc__
auth_type: str
"""Authentication type."""
attributes: Dict[str, str] = dataclasses.field(default_factory=dict)
"""A dictionary containing cloud credentials.
For example, for AWS, it contains `access-key` and `secret-key`;
for Azure, `application-id`, `application-password` and `subscription-id`
can be found here.
"""
redacted: List[str] = dataclasses.field(default_factory=list)
"""A list of redacted generic cloud API secrets."""
def _to_ops(self) -> CloudCredential_Ops:
return CloudCredential_Ops(
auth_type=self.auth_type,
attributes=self.attributes,
redacted=self.redacted,
)
@_add_signature
@dataclasses.dataclass(frozen=True)
class CloudSpec(_max_posargs(1)):
__doc__ = ops.CloudSpec.__doc__
type: str
"""Type of the cloud."""
name: str = "localhost"
"""Juju cloud name."""
region: Optional[str] = None
"""Region of the cloud."""
endpoint: Optional[str] = None
"""Endpoint of the cloud."""
identity_endpoint: Optional[str] = None
"""Identity endpoint of the cloud."""
storage_endpoint: Optional[str] = None
"""Storage endpoint of the cloud."""
credential: Optional[CloudCredential] = None
"""Cloud credentials with key-value attributes."""
ca_certificates: List[str] = dataclasses.field(default_factory=list)
"""A list of CA certificates."""
skip_tls_verify: bool = False
"""Whether to skip TLS verfication."""
is_controller_cloud: bool = False
"""If this is the cloud used by the controller."""
def _to_ops(self) -> CloudSpec_Ops:
return CloudSpec_Ops(
type=self.type,
name=self.name,
region=self.region,
endpoint=self.endpoint,
identity_endpoint=self.identity_endpoint,
storage_endpoint=self.storage_endpoint,
credential=self.credential._to_ops() if self.credential else None,
ca_certificates=self.ca_certificates,
skip_tls_verify=self.skip_tls_verify,
is_controller_cloud=self.is_controller_cloud,
)
def _generate_secret_id():
# This doesn't account for collisions, but the odds are so low that it
# should not be possible in any realistic test run.
secret_id = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(20)
)
return f"secret:{secret_id}"
@_add_signature
@dataclasses.dataclass(frozen=True)
class Secret(_max_posargs(1)):
"""A Juju secret.
This class is used for both user and charm secrets.
"""
tracked_content: "RawSecretRevisionContents"
"""The content of the secret that the charm is currently tracking.
This is the content the charm will receive with a
:meth:`ops.Secret.get_content` call."""
latest_content: Optional["RawSecretRevisionContents"] = None
"""The content of the latest revision of the secret.
This is the content the charm will receive with a
:meth:`ops.Secret.peek_content` call."""
id: str = dataclasses.field(default_factory=_generate_secret_id)
"""The Juju ID of the secret.
This is automatically assigned and should not usually need to be explicitly set.
"""
owner: Literal["unit", "app", None] = None
"""Indicates if the secret is owned by *this* unit, *this* application, or
another application/unit.
If None, the implication is that read access to the secret has been granted
to this unit.
"""
remote_grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict)
"""Mapping from relation IDs to remote units and applications to which this
secret has been granted."""
label: Optional[str] = None
"""A human-readable label the charm can use to retrieve the secret.
If this is set, it implies that the charm has previously set the label.
"""
description: Optional[str] = None
"""A human-readable description of the secret."""
expire: Optional[datetime.datetime] = None
"""The time at which the secret will expire."""
rotate: Optional[SecretRotate] = None
"""The rotation policy for the secret."""
# what revision is currently tracked by this charm. Only meaningful if owner=False
_tracked_revision: int = 1
# what revision is the latest for this secret.
_latest_revision: int = 1
def __hash__(self) -> int:
return hash(self.id)
def __post_init__(self):
if self.latest_content is None:
# bypass frozen dataclass
object.__setattr__(self, "latest_content", self.tracked_content)
def _set_label(self, label):
# bypass frozen dataclass
object.__setattr__(self, "label", label)
def _track_latest_revision(self):
"""Set the current revision to the tracked revision."""
# bypass frozen dataclass
object.__setattr__(self, "_tracked_revision", self._latest_revision)
object.__setattr__(self, "tracked_content", self.latest_content)
def _update_metadata(
self,
content: Optional["RawSecretRevisionContents"] = None,
label: Optional[str] = None,
description: Optional[str] = None,
expire: Optional[datetime.datetime] = None,
rotate: Optional[SecretRotate] = None,
):
"""Update the metadata."""
# bypass frozen dataclass
object.__setattr__(self, "_latest_revision", self._latest_revision + 1)
# TODO: if this is done twice in the same hook, then Juju ignores the
# first call, it doesn't continue to update like this does.
# Fix when https://github.com/canonical/operator/issues/1288 is resolved.
if content:
object.__setattr__(self, "latest_content", content)
if label:
object.__setattr__(self, "label", label)
if description:
object.__setattr__(self, "description", description)
if expire:
if isinstance(expire, datetime.timedelta):
expire = datetime.datetime.now() + expire
object.__setattr__(self, "expire", expire)
if rotate:
object.__setattr__(self, "rotate", rotate)
def _normalise_name(s: str):
"""Event names, in Scenario, uniformly use underscores instead of dashes."""
return s.replace("-", "_")
@_add_signature
@dataclasses.dataclass(frozen=True)
class Address(_max_posargs(1)):
"""An address in a Juju network space."""
value: str
"""The IP address in the space."""
hostname: str = ""
"""A host name that maps to the address in :attr:`value`."""
cidr: str = ""
"""The CIDR of the address in :attr:`value`."""
@property
def address(self):
"""A deprecated alias for :attr:`value`."""
return self.value
@address.setter
def address(self, value):
object.__setattr__(self, "value", value)
@_add_signature
@dataclasses.dataclass(frozen=True)
class BindAddress(_max_posargs(1)):
"""An address bound to a network interface in a Juju space."""
addresses: List[Address]
"""The addresses in the space."""
interface_name: str = ""
"""The name of the network interface."""
mac_address: Optional[str] = None
"""The MAC address of the interface."""
def _hook_tool_output_fmt(self):
# dumps itself to dict in the same format the hook tool would
# todo support for legacy (deprecated) `interfacename` and `macaddress` fields?
dct = {
"interface-name": self.interface_name,
"addresses": [dataclasses.asdict(addr) for addr in self.addresses],
}
if self.mac_address:
dct["mac-address"] = self.mac_address
return dct
@_add_signature
@dataclasses.dataclass(frozen=True)
class Network(_max_posargs(2)):
"""A Juju network space."""
binding_name: str
"""The name of the network space."""
bind_addresses: List[BindAddress] = dataclasses.field(
default_factory=lambda: [BindAddress([Address("192.0.2.0")])],
)
"""Addresses that the charm's application should bind to."""
ingress_addresses: List[str] = dataclasses.field(
default_factory=lambda: ["192.0.2.0"],
)
"""Addresses other applications should use to connect to the unit."""
egress_subnets: List[str] = dataclasses.field(
default_factory=lambda: ["192.0.2.0/24"],
)
"""Subnets that other units will see the charm connecting from."""
def __hash__(self) -> int:
return hash(self.binding_name)
def _hook_tool_output_fmt(self):
# dumps itself to dict in the same format the hook tool would
return {
"bind-addresses": [
ba._hook_tool_output_fmt() for ba in self.bind_addresses
],
"egress-subnets": self.egress_subnets,
"ingress-addresses": self.ingress_addresses,
}
_next_relation_id_counter = 1
def _next_relation_id(*, update=True):
"""Get the ID the next relation to be created will get.
Pass update=False if you're only inspecting it.
Pass update=True if you also want to bump it.
"""
global _next_relation_id_counter
cur = _next_relation_id_counter
if update:
_next_relation_id_counter += 1
return cur
@_add_signature
@dataclasses.dataclass(frozen=True)
class RelationBase(_max_posargs(2)):
endpoint: str
"""Relation endpoint name. Must match some endpoint name defined in the metadata."""
interface: Optional[str] = None
"""Interface name. Must match the interface name attached to this endpoint in the metadata.
If left empty, it will be automatically derived from the metadata."""
id: int = dataclasses.field(default_factory=_next_relation_id)
"""Juju relation ID. Every new Relation instance gets a unique one,
if there's trouble, override."""
local_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict)
"""This application's databag for this relation."""
local_unit_data: "RawDataBagContents" = dataclasses.field(
default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(),
)
"""This unit's databag for this relation."""
@property
def _databags(self):
"""Yield all databags in this relation."""
yield self.local_app_data
yield self.local_unit_data
@property
def _remote_unit_ids(self) -> Tuple["UnitID", ...]:
"""Ids of the units on the other end of this relation."""
raise NotImplementedError()
def _get_databag_for_remote(
self,
unit_id: int, # noqa: U100
) -> "RawDataBagContents":
"""Return the databag for some remote unit ID."""
raise NotImplementedError()
def __post_init__(self):
if type(self) is RelationBase:
raise RuntimeError(
"RelationBase cannot be instantiated directly; "
"please use Relation, PeerRelation, or SubordinateRelation",
)
for databag in self._databags:
self._validate_databag(databag)
def __hash__(self) -> int:
return hash(self.id)
def _validate_databag(self, databag: dict):
if not isinstance(databag, dict):
raise StateValidationError(
f"all databags should be dicts, not {type(databag)}",
)
for v in databag.values():
if not isinstance(v, str):
raise StateValidationError(
f"all databags should be Dict[str,str]; "
f"found a value of type {type(v)}",
)
_DEFAULT_IP = "192.0.2.0"
_DEFAULT_JUJU_DATABAG = {
"egress-subnets": _DEFAULT_IP,
"ingress-address": _DEFAULT_IP,
"private-address": _DEFAULT_IP,
}
@dataclasses.dataclass(frozen=True)
class Relation(RelationBase):
"""An integration between the charm and another application."""
remote_app_name: str = "remote"
"""The name of the remote application, as in the charm's metadata."""
# local limit
limit: int = 1
"""The maximum number of integrations on this endpoint."""
remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict)
"""The current content of the application databag."""
remote_units_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field(
default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, # dedup
)
"""The current content of the databag for each unit in the relation."""
def __hash__(self) -> int:
return hash(self.id)
@property
def _remote_app_name(self) -> str:
"""Who is on the other end of this relation?"""
return self.remote_app_name
@property
def _remote_unit_ids(self) -> Tuple["UnitID", ...]:
"""Ids of the units on the other end of this relation."""
return tuple(self.remote_units_data)
def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents":
"""Return the databag for some remote unit ID."""
return self.remote_units_data[unit_id]
@property
def _databags(self):
"""Yield all databags in this relation."""
yield self.local_app_data
yield self.local_unit_data
yield self.remote_app_data
yield from self.remote_units_data.values()
@dataclasses.dataclass(frozen=True)
class SubordinateRelation(RelationBase):
"""A relation to share data between a subordinate and a principal charm."""
remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict)
"""The current content of the remote application databag."""
remote_unit_data: "RawDataBagContents" = dataclasses.field(
default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(),
)
"""The current content of the remote unit databag."""
remote_app_name: str = "remote"
"""The name of the remote application that *this unit* is attached to."""
remote_unit_id: int = 0
"""The ID of the remote unit that *this unit* is attached to."""
def __hash__(self) -> int:
return hash(self.id)
@property
def _remote_unit_ids(self) -> Tuple[int]:
"""Ids of the units on the other end of this relation."""
return (self.remote_unit_id,)
def _get_databag_for_remote(self, unit_id: int) -> "RawDataBagContents":
"""Return the databag for some remote unit ID."""
if unit_id is not self.remote_unit_id:
raise ValueError(
f"invalid unit id ({unit_id}): subordinate relation only has one "
f"remote and that has id {self.remote_unit_id}",
)
return self.remote_unit_data
@property
def _databags(self):
"""Yield all databags in this relation."""
yield self.local_app_data
yield self.local_unit_data
yield self.remote_app_data
yield self.remote_unit_data
@property
def remote_unit_name(self) -> str:
"""The full name of the remote unit, in the form ``remote/0``."""
return f"{self.remote_app_name}/{self.remote_unit_id}"
@dataclasses.dataclass(frozen=True)
class PeerRelation(RelationBase):
"""A relation to share data between units of the charm."""
peers_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field(
default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()},
)
"""Current contents of the peer databags."""
# Consistency checks will validate that *this unit*'s ID is not in here.
def __hash__(self) -> int:
return hash(self.id)
@property
def _databags(self):
"""Yield all databags in this relation."""
yield self.local_app_data
yield self.local_unit_data
yield from self.peers_data.values()
@property
def _remote_unit_ids(self) -> Tuple["UnitID", ...]:
"""Ids of the units on the other end of this relation."""
return tuple(self.peers_data)
def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents":
"""Return the databag for some remote unit ID."""
return self.peers_data[unit_id]
def _random_model_name():
import random
import string
space = string.ascii_letters + string.digits
return "".join(random.choice(space) for _ in range(20))
@_add_signature
@dataclasses.dataclass(frozen=True)
class Model(_max_posargs(1)):
"""The Juju model in which the charm is deployed."""
name: str = dataclasses.field(default_factory=_random_model_name)
"""The name of the model."""
uuid: str = dataclasses.field(default_factory=lambda: str(uuid4()))
"""A unique identifier for the model, typically generated by Juju."""
# whatever juju models --format=json | jq '.models[<current-model-index>].type' gives back.
# TODO: make this exhaustive.
type: Literal["kubernetes", "lxd"] = "kubernetes"
"""The type of Juju model."""
cloud_spec: Optional[CloudSpec] = None
"""Cloud specification information (metadata) including credentials."""
# for now, proc mock allows you to map one command to one mocked output.
# todo extend: one input -> multiple outputs, at different times
_CHANGE_IDS = 0
def _generate_new_change_id():
global _CHANGE_IDS
_CHANGE_IDS += 1
logger.info(
f"change ID unset; automatically assigning {_CHANGE_IDS}. "
f"If there are problems, pass one manually.",
)
return _CHANGE_IDS
@_add_signature
@dataclasses.dataclass(frozen=True)
class Exec(_max_posargs(1)):
"""Mock data for simulated :meth:`ops.Container.exec` calls."""
command_prefix: Sequence[str]
return_code: int = 0
"""The return code of the process.
Use 0 to mock the process ending successfully, and other values for failure.
"""
stdout: str = ""
"""Any content written to stdout by the process.
Provide content that the real process would write to stdout, which can be
read by the charm.
"""
stderr: str = ""
"""Any content written to stderr by the process.
Provide content that the real process would write to stderr, which can be
read by the charm.
"""
# change ID: used internally to keep track of mocked processes
_change_id: int = dataclasses.field(default_factory=_generate_new_change_id)
def __post_init__(self):
# The command prefix can be any sequence type, and a list is tidier to
# write when there's only one string. However, this object needs to be
# hashable, so can't contain a list. We 'freeze' the sequence to a tuple
# to support that.
object.__setattr__(self, "command_prefix", tuple(self.command_prefix))
def _run(self) -> int:
return self._change_id
@_add_signature
@dataclasses.dataclass(frozen=True)
class Mount(_max_posargs(0)):
"""Maps local files to a :class:`Container` filesystem."""
location: Union[str, PurePosixPath]
"""The location inside of the container."""
source: Union[str, Path]
"""The content to provide when the charm does :meth:`ops.Container.pull`."""
def _now_utc():
return datetime.datetime.now(tz=datetime.timezone.utc)
_next_notice_id_counter = 1
def _next_notice_id(*, update=True):
"""Get the ID the next Pebble notice to be created will get.
Pass update=False if you're only inspecting it.
Pass update=True if you also want to bump it.
"""
global _next_notice_id_counter
cur = _next_notice_id_counter
if update:
_next_notice_id_counter += 1
return str(cur)
@_add_signature
@dataclasses.dataclass(frozen=True)
class Notice(_max_posargs(1)):
"""A Pebble notice."""
key: str
"""The notice key, a string that differentiates notices of this type.
This is in the format ``domain/path``; for example:
``canonical.com/postgresql/backup`` or ``example.com/mycharm/notice``.
"""
id: str = dataclasses.field(default_factory=_next_notice_id)
"""Unique ID for this notice."""
user_id: Optional[int] = None
"""UID of the user who may view this notice (None means notice is public)."""
type: Union[pebble.NoticeType, str] = pebble.NoticeType.CUSTOM
"""Type of the notice."""
first_occurred: datetime.datetime = dataclasses.field(default_factory=_now_utc)
"""The first time one of these notices (type and key combination) occurs."""
last_occurred: datetime.datetime = dataclasses.field(default_factory=_now_utc)
"""The last time one of these notices occurred."""
last_repeated: datetime.datetime = dataclasses.field(default_factory=_now_utc)
"""The time this notice was last repeated.
See Pebble's `Notices documentation <https://github.com/canonical/pebble/#notices>`_
for an explanation of what "repeated" means.
"""
occurrences: int = 1
"""The number of times one of these notices has occurred."""
last_data: Dict[str, str] = dataclasses.field(default_factory=dict)
"""Additional data captured from the last occurrence of one of these notices."""
repeat_after: Optional[datetime.timedelta] = None
"""Minimum time after one of these was last repeated before Pebble will repeat it again."""
expire_after: Optional[datetime.timedelta] = None
"""How long since one of these last occurred until Pebble will drop the notice."""
def _to_ops(self) -> pebble.Notice:
return pebble.Notice(
id=self.id,
user_id=self.user_id,
type=self.type,
key=self.key,
first_occurred=self.first_occurred,
last_occurred=self.last_occurred,
last_repeated=self.last_repeated,
occurrences=self.occurrences,
last_data=self.last_data,
repeat_after=self.repeat_after,
expire_after=self.expire_after,
)
@_add_signature
@dataclasses.dataclass(frozen=True)
class CheckInfo(_max_posargs(1)):
"""A health check for a Pebble workload container."""
name: str
"""Name of the check."""
level: Optional[pebble.CheckLevel] = None
"""Level of the check."""
status: pebble.CheckStatus = pebble.CheckStatus.UP
"""Status of the check.
:attr:`ops.pebble.CheckStatus.UP` means the check is healthy (the number of
failures is fewer than the threshold), :attr:`ops.pebble.CheckStatus.DOWN`
means the check is unhealthy (the number of failures has reached the
threshold).
"""
failures: int = 0
"""Number of failures since the check last succeeded."""
threshold: int = 3
"""Failure threshold.
This is how many consecutive failures for the check to be considered 'down'.
"""
def _to_ops(self) -> pebble.CheckInfo:
return pebble.CheckInfo(
name=self.name,
level=self.level,
status=self.status,
failures=self.failures,
threshold=self.threshold,
)
@_add_signature
@dataclasses.dataclass(frozen=True)
class Container(_max_posargs(1)):
"""A Kubernetes container where a charm's workload runs."""
name: str
"""Name of the container, as found in the charm metadata."""
can_connect: bool = False
"""When False, all Pebble operations will fail."""
# This is the base plan. On top of it, one can add layers.
# We need to model pebble in this way because it's impossible to retrieve the layers from
# pebble or derive them from the resulting plan (which one CAN get from pebble).
# So if we are instantiating Container by fetching info from a 'live' charm, the 'layers'
# will be unknown. all that we can know is the resulting plan (the 'computed plan').
_base_plan: dict = dataclasses.field(default_factory=dict)
# We expect most of the user-facing testing to be covered by this 'layers' attribute,
# as it is all that will be known when unit-testing.
layers: Dict[str, pebble.Layer] = dataclasses.field(default_factory=dict)
"""All :class:`ops.pebble.Layer` definitions that have already been added to the container."""
service_statuses: Dict[str, pebble.ServiceStatus] = dataclasses.field(
default_factory=dict,
)
"""The current status of each Pebble service running in the container."""
# when the charm runs `pebble.pull`, it will return .open() from one of these paths.
# when the charm pushes, it will either overwrite one of those paths (careful!) or it will
# create a tempfile and insert its path in the mock filesystem tree
mounts: Dict[str, Mount] = dataclasses.field(default_factory=dict)
"""Provides access to the contents of the simulated container filesystem.
For example, suppose you want to express that your container has:
* ``/home/foo/bar.py``
* ``/bin/bash``
* ``/bin/baz``
this becomes::
mounts = {
'foo': Mount('/home/foo', pathlib.Path('/path/to/local/dir/containing/bar/py/')),
'bin': Mount('/bin/', pathlib.Path('/path/to/local/dir/containing/bash/and/baz/')),
}
"""
execs: Iterable[Exec] = frozenset()
"""Simulate executing commands in the container.
Specify each command the charm might run in the container and an :class:`Exec`
containing its return code and any stdout/stderr.
For example::
container = Container(
name='foo',
execs={
scenario.Exec(['whoami'], return_code=0, stdout='ubuntu'),
scenario.Exec(
['dig', '+short', 'canonical.com'],
return_code=0,
stdout='185.125.190.20\\n185.125.190.21',
),
}
)
"""
notices: List[Notice] = dataclasses.field(default_factory=list)
"""Any Pebble notices that already exist in the container."""
check_infos: FrozenSet[CheckInfo] = frozenset()
"""All Pebble health checks that have been added to the container."""
def __hash__(self) -> int:
return hash(self.name)
def __post_init__(self):
if not isinstance(self.execs, frozenset):
# Allow passing a regular set (or other iterable) of Execs.
object.__setattr__(self, "execs", frozenset(self.execs))
def _render_services(self):
# copied over from ops.testing._TestingPebbleClient._render_services()
services = {} # type: Dict[str, pebble.Service]
for key in sorted(self.layers.keys()):