/
resourceservice.py
1334 lines (1185 loc) · 57.7 KB
/
resourceservice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Copyright 2019 Inmanta
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Contact: code@inmanta.com
"""
import asyncio
import datetime
import logging
import os
import uuid
from collections import abc, defaultdict
from collections.abc import Sequence
from typing import Any, Callable, Optional, Union, cast
from asyncpg.connection import Connection
from asyncpg.exceptions import UniqueViolationError
from pydantic import ValidationError
from tornado.httputil import url_concat
from inmanta import const, data, util
from inmanta.const import STATE_UPDATE, TERMINAL_STATES, TRANSIENT_STATES, VALID_STATES_ON_STATE_UPDATE, Change, ResourceState
from inmanta.data import APILIMIT, InvalidSort
from inmanta.data.dataview import (
DiscoveredResourceView,
ResourceHistoryView,
ResourceLogsView,
ResourcesInVersionView,
ResourceView,
)
from inmanta.data.model import (
AttributeStateChange,
DiscoveredResource,
LatestReleasedResource,
LogLine,
ReleasedResourceDetails,
Resource,
ResourceAction,
ResourceHistory,
ResourceIdStr,
ResourceLog,
ResourceType,
ResourceVersionIdStr,
VersionedResource,
VersionedResourceDetails,
)
from inmanta.protocol import handle, methods, methods_v2
from inmanta.protocol.common import ReturnValue
from inmanta.protocol.exceptions import BadRequest, Conflict, NotFound
from inmanta.protocol.return_value_meta import ReturnValueWithMeta
from inmanta.resources import Id
from inmanta.server import SLICE_AGENT_MANAGER, SLICE_DATABASE, SLICE_RESOURCE, SLICE_TRANSPORT
from inmanta.server import config as opt
from inmanta.server import protocol
from inmanta.server.agentmanager import AgentManager
from inmanta.server.validate_filter import InvalidFilter
from inmanta.types import Apireturn, JsonType, PrimitiveTypes
from inmanta.util import parse_timestamp
LOGGER = logging.getLogger(__name__)
def error_and_log(message: str, **context: Any) -> None:
"""
:param message: message to return both to logger and to remote caller
:param context: additional context to attach to log
"""
ctx = ",".join([f"{k}: {v}" for k, v in context.items()])
LOGGER.error("%s %s", message, ctx)
raise BadRequest(message)
class ResourceActionLogLine(logging.LogRecord):
"""A special log record that is used to report log lines that come from the agent"""
def __init__(self, logger_name: str, level: int, msg: str, created: datetime.datetime) -> None:
super().__init__(
name=logger_name,
level=level,
pathname="(unknown file)",
lineno=0,
msg=msg,
args=[],
exc_info=None,
func=None,
sinfo=None,
)
self.created = created.timestamp()
self.msecs = (self.created - int(self.created)) * 1000
self.relativeCreated = (self.created - logging._startTime) * 1000
class ResourceService(protocol.ServerSlice):
"""Resource Manager service"""
agentmanager_service: "AgentManager"
def __init__(self) -> None:
super().__init__(SLICE_RESOURCE)
self._resource_action_loggers: dict[uuid.UUID, logging.Logger] = {}
self._resource_action_handlers: dict[uuid.UUID, logging.Handler] = {}
# Dict: environment_id: (model_version, increment, negative_increment)
self._increment_cache: dict[uuid.UUID, Optional[tuple[int, abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]]]] = {}
# lock to ensure only one inflight request
self._increment_cache_locks: dict[uuid.UUID, asyncio.Lock] = defaultdict(lambda: asyncio.Lock())
def get_dependencies(self) -> list[str]:
return [SLICE_DATABASE, SLICE_AGENT_MANAGER]
def get_depended_by(self) -> list[str]:
return [SLICE_TRANSPORT]
async def prestart(self, server: protocol.Server) -> None:
await super().prestart(server)
self.agentmanager_service = cast("AgentManager", server.get_slice(SLICE_AGENT_MANAGER))
async def start(self) -> None:
self.schedule(
data.ResourceAction.purge_logs, opt.server_purge_resource_action_logs_interval.get(), cancel_on_stop=False
)
await super().start()
async def stop(self) -> None:
await super().stop()
self._close_resource_action_loggers()
def clear_env_cache(self, env: data.Environment) -> None:
LOGGER.log(const.LOG_LEVEL_TRACE, "Clearing cache for %s", env.id)
self._increment_cache[env.id] = None
@staticmethod
def get_resource_action_log_file(environment: uuid.UUID) -> str:
"""Get the correct filename for the given environment
:param environment: The environment id to get the file for
:return: The path to the logfile
"""
return os.path.join(opt.log_dir.get(), opt.server_resource_action_log_prefix.get() + str(environment) + ".log")
def get_resource_action_logger(self, environment: uuid.UUID) -> logging.Logger:
"""Get the resource action logger for the given environment. If the logger was not created, create it.
:param environment: The environment to get a logger for
:return: The logger for the given environment.
"""
if environment in self._resource_action_loggers:
return self._resource_action_loggers[environment]
resource_action_log = self.get_resource_action_log_file(environment)
file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode="a+")
# Most logs will come from agents. We need to use their level and timestamp and their formatted message
file_handler.setFormatter(logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(name)-10s %(message)s"))
file_handler.setLevel(logging.DEBUG)
resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER).getChild(str(environment))
resource_action_logger.setLevel(logging.DEBUG)
resource_action_logger.addHandler(file_handler)
self._resource_action_loggers[environment] = resource_action_logger
self._resource_action_handlers[environment] = file_handler
return resource_action_logger
def _close_resource_action_loggers(self) -> None:
"""Close all resource action loggers and their associated handlers"""
try:
while True:
env, logger = self._resource_action_loggers.popitem()
self.close_resource_action_logger(env, logger)
except KeyError:
pass
def close_resource_action_logger(self, env: uuid.UUID, logger: Optional[logging.Logger] = None) -> None:
"""Close the given logger for the given env.
:param env: The environment to close the logger for
:param logger: The logger to close, if the logger is none it is retrieved
"""
if logger is None:
if env in self._resource_action_loggers:
logger = self._resource_action_loggers.pop(env)
else:
return
handler = self._resource_action_handlers.pop(env)
logger.removeHandler(handler)
handler.flush()
handler.close()
def log_resource_action(
self, env: uuid.UUID, resource_ids: Sequence[str], log_level: int, ts: datetime.datetime, message: str
) -> None:
"""Write the given log to the correct resource action logger"""
logger = self.get_resource_action_logger(env)
if len(resource_ids) == 0:
message = "no resources: " + message
elif len(resource_ids) > 1:
message = "multiple resources: " + message
else:
message = resource_ids[0] + ": " + message
log_record = ResourceActionLogLine(logger.name, log_level, message, ts)
logger.handle(log_record)
@handle(methods.get_resource, resource_id="id", env="tid")
async def get_resource(
self,
env: data.Environment,
resource_id: ResourceVersionIdStr,
logs: bool,
status: bool,
log_action: const.ResourceAction,
log_limit: int,
connection: Optional[Connection] = None,
) -> Apireturn:
# Validate resource version id
try:
Id.parse_resource_version_id(resource_id)
except ValueError:
return 400, {"message": f"{resource_id} is not a valid resource version id"}
async with data.ResourceAction.get_connection(connection) as con:
resv = await data.Resource.get(env.id, resource_id, con)
if resv is None:
return 404, {"message": "The resource with the given id does not exist in the given environment"}
if status is not None and status:
return 200, {"status": resv.status}
actions: list[data.ResourceAction] = []
if bool(logs):
action_name = None
if log_action is not None:
action_name = log_action.name
actions = await data.ResourceAction.get_log(
environment=env.id, resource_version_id=resource_id, action=action_name, limit=log_limit, connection=con
)
return 200, {"resource": resv, "logs": actions}
# This endpoint doesn't have a method associated yet.
# Intended for use by other slices
async def get_resources_in_latest_version(
self,
environment: data.Environment,
resource_type: Optional[ResourceType] = None,
attributes: dict[PrimitiveTypes, PrimitiveTypes] = {},
connection: Optional[Connection] = None,
) -> list[Resource]:
result = await data.Resource.get_resources_in_latest_version(
environment.id, resource_type, attributes, connection=connection
)
return [r.to_dto() for r in result]
@handle(methods.get_resources_for_agent, env="tid")
async def get_resources_for_agent(
self, env: data.Environment, agent: str, version: int, sid: uuid.UUID, incremental_deploy: bool
) -> Apireturn:
if not self.agentmanager_service.is_primary(env, sid, agent):
return 409, {"message": f"This agent is not currently the primary for the endpoint {agent} (sid: {sid})"}
if incremental_deploy:
if version is not None:
return 500, {"message": "Cannot request increment for a specific version"}
result = await self.get_resource_increment_for_agent(env, agent)
else:
result = await self.get_all_resources_for_agent(env, agent, version)
return result
async def get_all_resources_for_agent(self, env: data.Environment, agent: str, version: int) -> Apireturn:
started = datetime.datetime.now().astimezone()
if version is None:
version = await data.ConfigurationModel.get_version_nr_latest_version(env.id)
if version is None:
return 404, {"message": "No version available"}
else:
exists = await data.ConfigurationModel.version_exists(environment=env.id, version=version)
if not exists:
return 404, {"message": "The given version does not exist"}
deploy_model = []
resources = await data.Resource.get_resources_for_version(env.id, version, agent)
resource_ids = []
for rv in resources:
deploy_model.append(rv.to_dict())
resource_ids.append(rv.resource_version_id)
# Don't log ResourceActions without resource_version_ids, because
# no API call exists to retrieve them.
if resource_ids:
now = datetime.datetime.now().astimezone()
log_line = data.LogLine.log(
logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent
)
self.log_resource_action(env.id, resource_ids, logging.INFO, now, log_line.msg)
ra = data.ResourceAction(
environment=env.id,
version=version,
resource_version_ids=resource_ids,
action=const.ResourceAction.pull,
action_id=uuid.uuid4(),
started=started,
finished=now,
messages=[log_line],
)
await ra.insert()
return 200, {"environment": env.id, "agent": agent, "version": version, "resources": deploy_model}
async def get_resource_increment_for_agent(self, env: data.Environment, agent: str) -> Apireturn:
started = datetime.datetime.now().astimezone()
version = await data.ConfigurationModel.get_version_nr_latest_version(env.id)
if version is None:
return 404, {"message": "No version available"}
increments: tuple[abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]] = await self.get_increment(env, version)
increment_ids, neg_increment = increments
now = datetime.datetime.now().astimezone()
def on_agent(res: ResourceIdStr) -> bool:
idr = Id.parse_id(res)
return idr.get_agent_name() == agent
# This is a bit subtle.
# Any resource we consider deployed has to be marked as such.
# Otherwise the agent will fail the deployment.
# Stale successful deployments can cause resource that were available to be now considered deployed.
# We don't do this back propagation on deploy,
# because it is about a lot of resource that need to grab a lock to check if they are stale
# We do it here, as we always have.
# This method only updates the state for resources that are currently in the available or deploying state.
# As such, it should not race with backpropagation on failure.
await self.mark_deployed(
env,
neg_increment,
now,
version,
filter=on_agent,
only_update_from_states={const.ResourceState.available, const.ResourceState.deploying},
)
resources = await data.Resource.get_resources_for_version(env.id, version, agent)
deploy_model: list[dict[str, Any]] = []
resource_ids: list[str] = []
for rv in resources:
if rv.resource_id not in increment_ids:
continue
# TODO double parsing of ID
def in_requires(req: ResourceIdStr) -> bool:
if req in increment_ids:
return True
idr = Id.parse_id(req)
return idr.get_agent_name() != agent
rv.attributes["requires"] = [r for r in rv.attributes["requires"] if in_requires(r)]
deploy_model.append(rv.to_dict())
resource_ids.append(rv.resource_version_id)
# Don't log ResourceActions without resource_version_ids, because
# no API call exists to retrieve them.
if resource_ids:
ra = data.ResourceAction(
environment=env.id,
version=version,
resource_version_ids=resource_ids,
action=const.ResourceAction.pull,
action_id=uuid.uuid4(),
started=started,
finished=now,
messages=[
data.LogLine.log(logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent)
],
)
await ra.insert()
return 200, {"environment": env.id, "agent": agent, "version": version, "resources": deploy_model}
async def mark_deployed(
self,
env: data.Environment,
resources_id: abc.Set[ResourceIdStr],
timestamp: datetime.datetime,
version: int,
filter: Callable[[ResourceIdStr], bool] = lambda x: True,
connection: Optional[Connection] = None,
only_update_from_states: Optional[set[const.ResourceState]] = None,
) -> None:
"""
Set the status of the provided resources as deployed
:param env: Environment to consider.
:param resources_id: Set of resources to mark as deployed.
:param timestamp: Timestamp for the log message and the resource action entry.
:param version: Version of the resources to consider.
:param filter: Filter function that takes a resource id as an argument and returns True if it should be kept.
"""
resources_version_ids: list[ResourceVersionIdStr] = [
ResourceVersionIdStr(f"{res_id},v={version}") for res_id in resources_id if filter(res_id)
]
logline = {
"level": "INFO",
"msg": "Setting deployed due to known good status",
"timestamp": util.datetime_iso_format(timestamp),
"args": [],
}
await self.resource_action_update(
env,
resources_version_ids,
action_id=uuid.uuid4(),
started=timestamp,
finished=timestamp,
status=const.ResourceState.deployed,
# does this require a different ResourceAction?
action=const.ResourceAction.deploy,
changes={},
messages=[logline],
change=const.Change.nochange,
send_events=False,
keep_increment_cache=True,
is_increment_notification=True,
only_update_from_states=only_update_from_states,
connection=connection,
)
async def _update_deploy_state(
self,
env: data.Environment,
resource_id: ResourceIdStr,
timestamp: datetime.datetime,
version: int,
status: ResourceState,
message: str,
fail_on_error: bool,
connection: Optional[Connection] = None,
can_overwrite_available: bool = True,
) -> None:
"""
Set the status of the provided resources as to skipped or failed
Performs all required bookkeeping for this.
Factored out the code to set a status on a resource
:param env: Environment to consider.
:param resource_id: resource to mark.
:param timestamp: Timestamp for the log message and the resource action entry.
:param version: Version of the resources to consider.
:param status: status to set
:param message: reason to log on the transfer
:param fail_on_error: When encountering an undeployable state: fail or do nothing?.
:param can_overwrite_available: can we overwrite available.
If set to false, we return without changes if the current state is available
"""
resource_version_id = resource_id + ",v=" + str(version)
logline = LogLine(
level=const.LogLevel.INFO,
msg=f"Setting {status.value} because of {message}",
timestamp=timestamp,
)
assert status in [ResourceState.failed, ResourceState.skipped]
# this method is purpose specific for now.
async with data.Resource.get_connection(connection) as connection:
async with connection.transaction():
# validate resources
resource = await data.Resource.get_one(
environment=env.id,
resource_id=resource_id,
model=version,
# acquire lock on Resource before read and before lock on ResourceAction to prevent conflicts with
# cascading deletes
lock=data.RowLockMode.FOR_NO_KEY_UPDATE,
connection=connection,
)
if not resource:
raise NotFound("The resource with the given ids do not exist in the given environment.")
# no escape from terminal
if resource.status != status and resource.status in TERMINAL_STATES:
if not fail_on_error:
return
else:
LOGGER.error("Attempting to set undeployable resource to deployable state")
raise AssertionError("Attempting to set undeployable resource to deployable state")
if resource.status == ResourceState.available and not can_overwrite_available:
return
resource_action = data.ResourceAction(
environment=env.id,
version=version,
resource_version_ids=[resource_version_id],
action_id=uuid.uuid4(),
action=const.ResourceAction.deploy,
started=timestamp,
messages=[
{
**logline.dict(),
"timestamp": logline.timestamp.astimezone().isoformat(timespec="microseconds"),
}
],
changes={},
status=status,
change=const.Change.nochange,
finished=timestamp,
)
await resource_action.insert(connection=connection)
self.log_resource_action(
env.id,
[resource_version_id],
logline.level.to_int,
logline.timestamp,
logline.msg,
)
self.clear_env_cache(env)
await resource.update_fields(
last_deploy=timestamp,
status=status,
last_non_deploying_status=const.NonDeployingResourceState(status),
connection=connection,
)
async def get_increment(
self, env: data.Environment, version: int, connection: Optional[Connection] = None
) -> tuple[abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]]:
"""
Get the increment for a given environment and a given version of the model from the _increment_cache if possible.
In case of cache miss, the increment calculation is performed behind a lock to make sure it is only done once per
version, per environment.
:param env: The environment to consider.
:param version: The version of the model to consider.
:param connection: connection to use towards the DB.
When the connection is in a transaction, we will always invalidate the cache
"""
def _get_cache_entry() -> Optional[tuple[abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]]]:
"""
Returns a tuple (increment, negative_increment) if a cache entry exists for the given environment and version
or None if no such cache entry exists.
"""
cache_entry = self._increment_cache.get(env.id, None)
if cache_entry is None:
# No cache entry found
return None
(version_cache_entry, incr, neg_incr) = cache_entry
if version_cache_entry != version:
# Cache entry exists for another version
return None
return incr, neg_incr
increment: Optional[tuple[abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]]] = _get_cache_entry()
if increment is None or (connection is not None and connection.is_in_transaction()):
lock = self._increment_cache_locks[env.id]
async with lock:
increment = _get_cache_entry()
if increment is None:
increment = await data.ConfigurationModel.get_increment(env.id, version, connection=connection)
self._increment_cache[env.id] = (version, *increment)
return increment
@handle(methods_v2.resource_deploy_done, env="tid", resource_id="rvid")
async def resource_deploy_done(
self,
env: data.Environment,
resource_id: Id,
action_id: uuid.UUID,
status: ResourceState,
messages: list[LogLine] = [],
changes: dict[str, AttributeStateChange] = {},
change: Optional[Change] = None,
keep_increment_cache: bool = False,
) -> None:
resource_id_str = resource_id.resource_version_str()
finished = datetime.datetime.now().astimezone()
changes_with_rvid = {
resource_id_str: {attr_name: attr_change.model_dump()} for attr_name, attr_change in changes.items()
}
if status not in VALID_STATES_ON_STATE_UPDATE:
error_and_log(
f"Status {status} is not a valid status at the end of a deployment.",
resource_ids=[resource_id_str],
action=const.ResourceAction.deploy,
action_id=action_id,
)
if status in TRANSIENT_STATES:
error_and_log(
"No transient state can be used to mark a deployment as done.",
status=status,
resource_ids=[resource_id_str],
action=const.ResourceAction.deploy,
action_id=action_id,
)
async with data.Resource.get_connection() as connection:
async with connection.transaction():
resource = await data.Resource.get_one(
connection=connection,
environment=env.id,
resource_id=resource_id.resource_str(),
model=resource_id.version,
# acquire lock on Resource before read and before lock on ResourceAction to prevent conflicts with
# cascading deletes
lock=data.RowLockMode.FOR_UPDATE,
)
if resource is None:
raise NotFound("The resource with the given id does not exist in the given environment.")
# no escape from terminal
if resource.status != status and resource.status in TERMINAL_STATES:
LOGGER.error("Attempting to set undeployable resource to deployable state")
raise AssertionError("Attempting to set undeployable resource to deployable state")
resource_action = await data.ResourceAction.get(action_id=action_id, connection=connection)
if resource_action is None:
raise NotFound(
f"No resource action exists for action_id {action_id}. Ensure "
f"`/resource/<resource_id>/deploy/start` is called first. "
)
if resource_action.finished is not None:
raise Conflict(
f"Resource action with id {resource_id_str} was already marked as done at {resource_action.finished}."
)
for log in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
self.log_resource_action(
env.id,
[resource_id_str],
log.level.to_int,
log.timestamp,
log.msg,
)
await resource_action.set_and_save(
messages=[
{
**log.model_dump(),
"timestamp": log.timestamp.astimezone().isoformat(timespec="microseconds"),
}
for log in messages
],
changes=changes_with_rvid,
status=status,
change=change,
finished=finished,
connection=connection,
)
extra_fields = {}
if status == ResourceState.deployed:
extra_fields["last_success"] = resource_action.started
propagate_last_produced_events = False
# keep track IF we need to propagate if we are stale
# but only do it at the end of the transaction
if change != Change.nochange:
# We are producing an event
extra_fields["last_produced_events"] = finished
propagate_last_produced_events = True
await resource.update_fields(
last_deploy=finished,
status=status,
last_non_deploying_status=const.NonDeployingResourceState(status),
**extra_fields,
connection=connection,
)
# final resource update
if not keep_increment_cache:
self.clear_env_cache(env)
if "purged" in resource.attributes and resource.attributes["purged"] and status == const.ResourceState.deployed:
await data.Parameter.delete_all(environment=env.id, resource_id=resource.resource_id, connection=connection)
propagate_deploy_state = status == ResourceState.failed or status == ResourceState.skipped
await self.propagate_resource_state_if_stale(
connection, env, [resource_id], finished, status, propagate_last_produced_events, propagate_deploy_state
)
self.add_background_task(data.ConfigurationModel.mark_done_if_done(env.id, resource.model))
waiting_agents = {(Id.parse_id(prov).get_agent_name(), resource.resource_version_id) for prov in resource.provides}
for agent, resource_id in waiting_agents:
aclient = self.agentmanager_service.get_agent_client(env.id, agent)
if aclient is not None:
if change is None:
change = const.Change.nochange
await aclient.resource_event(
tid=env.id,
id=agent,
resource=resource_id,
send_events=False,
state=status,
change=change,
changes=changes_with_rvid,
)
async def propagate_resource_state_if_stale(
self,
connection: Connection,
env: data.Environment,
resource_ids: list[Id],
last_produced_events: datetime.datetime,
deploy_state: ResourceState,
propagate_last_produced_events: bool,
propagate_deploy_state: bool,
) -> None:
if propagate_deploy_state or propagate_last_produced_events:
# lock out release version
await env.acquire_release_version_lock(connection=connection)
latest_version = await data.ConfigurationModel.get_version_nr_latest_version(env.id, connection=connection)
for resource_id in resource_ids:
if latest_version is not None and latest_version > resource_id.version:
# we are stale, forward propagate our status
# this is required because:
# upon release of the newer version our old status may have been copied over into the new version
# (by the increment calculation)
# the new version may thus hide this failure
# issue #6475
# the release_version_lock above ensure we can not race with release itself
# this is at the end of the transaction to not block release too long
# and vice versa
if propagate_deploy_state:
await self._update_deploy_state(
env,
resource_id.resource_str(),
last_produced_events,
latest_version,
deploy_state,
f"update on stale version {resource_id.version}",
fail_on_error=False,
connection=connection,
can_overwrite_available=False,
)
if propagate_last_produced_events:
await data.Resource.update_last_produced_events_if_newer(
env.id, resource_id.resource_str(), latest_version, last_produced_events, connection=connection
)
@handle(methods.resource_action_update, env="tid")
async def resource_action_update(
self,
env: data.Environment,
resource_ids: list[ResourceVersionIdStr],
action_id: uuid.UUID,
action: const.ResourceAction,
started: datetime.datetime,
finished: datetime.datetime,
status: Optional[Union[const.ResourceState, const.DeprecatedResourceState]],
messages: list[dict[str, Any]],
changes: dict[str, Any],
change: const.Change,
send_events: bool,
keep_increment_cache: bool = False,
is_increment_notification: bool = False,
only_update_from_states: Optional[set[const.ResourceState]] = None,
*,
connection: Optional[Connection] = None,
) -> Apireturn:
"""
:param is_increment_notification: is this the increment calucation setting the deployed status,
instead of an actual deploy? Used to keep track of the last_success field on the resources,
which should not be updated for increments.
"""
def convert_legacy_state(
status: Optional[Union[const.ResourceState, const.DeprecatedResourceState]]
) -> Optional[const.ResourceState]:
if status is None or isinstance(status, const.ResourceState):
return status
if status == const.DeprecatedResourceState.processing_events:
return const.ResourceState.deploying
else:
raise BadRequest(f"Unsupported deprecated resources state {status.value}")
status = convert_legacy_state(status)
# can update resource state
is_resource_state_update = action in STATE_UPDATE
# this ra is finishing
is_resource_action_finished = finished is not None
if is_resource_state_update:
# if status update, status is required
if status is None:
error_and_log(
"Cannot perform state update without a status.",
resource_ids=resource_ids,
action=action,
action_id=action_id,
)
# and needs to be valid
if status not in VALID_STATES_ON_STATE_UPDATE:
error_and_log(
f"Status {status} is not valid on action {action}",
resource_ids=resource_ids,
action=action,
action_id=action_id,
)
if status in TRANSIENT_STATES:
if not is_resource_action_finished:
pass
else:
error_and_log(
"The finished field must not be set for transient states",
status=status,
resource_ids=resource_ids,
action=action,
action_id=action_id,
)
else:
if is_resource_action_finished:
pass
else:
error_and_log(
"The finished field must be set for none transient states",
status=status,
resource_ids=resource_ids,
action=action,
action_id=action_id,
)
assert all(Id.is_resource_version_id(rvid) for rvid in resource_ids)
resources: list[data.Resource]
async with data.Resource.get_connection(connection) as connection:
async with connection.transaction():
# validate resources
resources = await data.Resource.get_resources(
env.id,
resource_ids,
# acquire lock on Resource before read and before lock on ResourceAction to prevent conflicts with
# cascading deletes
lock=data.RowLockMode.FOR_NO_KEY_UPDATE,
connection=connection,
)
if len(resources) == 0 or (len(resources) != len(resource_ids)):
return (
404,
{
"message": "The resources with the given ids do not exist in the given environment. "
"Only %s of %s resources found." % (len(resources), len(resource_ids))
},
)
if only_update_from_states is not None:
resources = [resource for resource in resources if resource.status in only_update_from_states]
if not resources:
return 200, {"message": "no resources with the given state found"}
resource_ids = [resource.resource_version_id for resource in resources]
# validate transitions
if is_resource_state_update:
# no escape from terminal
if any(resource.status != status and resource.status in TERMINAL_STATES for resource in resources):
LOGGER.error("Attempting to set undeployable resource to deployable state")
raise AssertionError("Attempting to set undeployable resource to deployable state")
# get instance
resource_action = await data.ResourceAction.get(action_id=action_id, connection=connection)
if resource_action is None:
# new
if started is None:
return 500, {"message": "A resource action can only be created with a start datetime."}
version = Id.parse_id(resource_ids[0]).version
resource_action = data.ResourceAction(
environment=env.id,
version=version,
resource_version_ids=resource_ids,
action_id=action_id,
action=action,
started=started,
)
await resource_action.insert(connection=connection)
else:
# existing
if resource_action.finished is not None:
return (
500,
{
"message": (
"An resource action can only be updated when it has not been finished yet. This action "
"finished at %s" % resource_action.finished
)
},
)
for msg in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
self.log_resource_action(
env.id,
resource_ids,
const.LogLevel(msg["level"]).to_int,
parse_timestamp(msg["timestamp"]),
msg["msg"],
)
await resource_action.set_and_save(
messages=[
{
**msg,
"timestamp": parse_timestamp(msg["timestamp"]).isoformat(timespec="microseconds"),
}
for msg in messages
],
changes=changes,
status=status,
change=change,
finished=finished,
connection=connection,
)
async def update_fields_resource(
resource: data.Resource, connection: Optional[Connection] = None, **kwargs: object
) -> None:
"""
This method ensures that the `last_non_deploying_status` field in the database
is updated correctly when the `status` field of a resource is updated.
"""
if "status" in kwargs and kwargs["status"] is not ResourceState.deploying:
kwargs["last_non_deploying_status"] = const.NonDeployingResourceState(kwargs["status"])
await resource.update_fields(**kwargs, connection=connection)
if is_resource_state_update:
# transient resource update
if not is_resource_action_finished:
for res in resources:
await update_fields_resource(res, status=status, connection=connection)
if not keep_increment_cache:
self.clear_env_cache(env)
return 200
else:
# final resource update
if not keep_increment_cache:
self.clear_env_cache(env)
propagate_last_produced_events = change != Change.nochange
await self.propagate_resource_state_if_stale(
connection,
env,
[Id.parse_id(res) for res in resource_ids],
finished,
status, # mypy can't figure out this is never None here
propagate_last_produced_events,
status == ResourceState.failed or status == ResourceState.skipped,
)
model_version = None
for res in resources:
extra_fields = {}
if status == ResourceState.deployed and not is_increment_notification:
extra_fields["last_success"] = resource_action.started
if propagate_last_produced_events:
extra_fields["last_produced_events"] = finished
await update_fields_resource(
res, last_deploy=finished, status=status, **extra_fields, connection=connection
)
model_version = res.model
if (
"purged" in res.attributes
and res.attributes["purged"]
and status == const.ResourceState.deployed
):
await data.Parameter.delete_all(
environment=env.id, resource_id=res.resource_id, connection=connection
)
if is_resource_state_update and is_resource_action_finished:
self.add_background_task(data.ConfigurationModel.mark_done_if_done(env.id, model_version))