-
Notifications
You must be signed in to change notification settings - Fork 251
/
0083.yml
91 lines (91 loc) · 4.22 KB
/
0083.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
version: 83
description: Allow adding metadata to quarantined workers
migrationScript: |-
begin
alter table queue_workers add column quarantine_details jsonb;
end
downgradeScript: |-
begin
alter table queue_workers drop column quarantine_details;
end
methods:
quarantine_queue_worker_with_last_date_active:
deprecated: true
quarantine_queue_worker_with_last_date_active_and_details:
description: |-
Update the quarantine_until date for a worker. The Queue service interprets a date in the past
as "not quarantined". This function also "bumps" the expiration of the worker so that un-quarantined
workers do not immediately expire. Returns the worker row just as get_queue_worker would, or no rows if
no such worker exists.
Additional metadata can be added to the worker to help identify the reason for the quarantine.
Worker will keep a history of all quarantine details.
mode: write
serviceName: queue
args: task_queue_id_in text, worker_group_in text, worker_id_in text, quarantine_until_in timestamptz, quarantine_details_in jsonb
returns: table(task_queue_id text, worker_group text, worker_id text, quarantine_until timestamptz, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, quarantine_details jsonb)
body: |-
begin
return query update queue_workers
set
quarantine_until = quarantine_until_in,
expires = greatest(queue_workers.expires, now() + interval '1 day'),
-- append new row to the quarantine details array if it exists or create new
quarantine_details = case
when queue_workers.quarantine_details is null then
jsonb_build_array(quarantine_details_in)
else
queue_workers.quarantine_details || quarantine_details_in
end
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in
returning
queue_workers.task_queue_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks,
queue_workers.last_date_active,
queue_workers.quarantine_details;
end
get_queue_worker_with_wm_join:
deprecated: true
get_queue_worker_with_wm_join_2:
description: |-
Get a non-expired queue worker by worker_pool_id, worker_group, and worker_id.
Workers are not considered expired until after their quarantine date expires.
This also performs an outer join with the worker_manager.worker table for more data.
mode: read
serviceName: worker_manager
args: task_queue_id_in text, worker_group_in text, worker_id_in text, expires_in timestamptz
returns: table(worker_pool_id text, worker_group text, worker_id text, quarantine_until timestamptz, quarantine_details jsonb, expires timestamptz, first_claim timestamptz, recent_tasks jsonb, last_date_active timestamptz, state text, capacity int4, provider_id text, etag uuid)
body: |-
begin
return query
select
queue_workers.task_queue_id as worker_pool_id,
queue_workers.worker_group,
queue_workers.worker_id,
queue_workers.quarantine_until,
queue_workers.quarantine_details,
queue_workers.expires,
queue_workers.first_claim,
queue_workers.recent_tasks,
queue_workers.last_date_active,
workers.state,
workers.capacity,
workers.provider_id,
public.gen_random_uuid()
from queue_workers
full outer join workers on workers.worker_id = queue_workers.worker_id
and workers.worker_pool_id = queue_workers.task_queue_id
and workers.worker_group = queue_workers.worker_group
where
queue_workers.task_queue_id = task_queue_id_in and
queue_workers.worker_group = worker_group_in and
queue_workers.worker_id = worker_id_in and
(queue_workers.expires > expires_in or queue_workers.quarantine_until > expires_in);
end