-
Notifications
You must be signed in to change notification settings - Fork 76
/
river_job.sql
404 lines (385 loc) · 11.6 KB
/
river_job.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
CREATE TYPE river_job_state AS ENUM(
'available',
'cancelled',
'completed',
'discarded',
'pending',
'retryable',
'running',
'scheduled'
);
CREATE TABLE river_job(
id bigserial PRIMARY KEY,
args jsonb NOT NULL DEFAULT '{}'::jsonb,
attempt smallint NOT NULL DEFAULT 0,
attempted_at timestamptz,
attempted_by text[],
created_at timestamptz NOT NULL DEFAULT NOW(),
errors jsonb[],
finalized_at timestamptz,
kind text NOT NULL,
max_attempts smallint NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}' ::jsonb,
priority smallint NOT NULL DEFAULT 1,
queue text NOT NULL DEFAULT 'default' ::text,
state river_job_state NOT NULL DEFAULT 'available' ::river_job_state,
scheduled_at timestamptz NOT NULL DEFAULT NOW(),
tags varchar(255)[] NOT NULL DEFAULT '{}' ::varchar(255)[],
CONSTRAINT finalized_or_finalized_at_null CHECK (
(finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
(finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
),
CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128),
CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128)
);
-- name: JobCancel :one
WITH locked_job AS (
SELECT
id, queue, state, finalized_at
FROM river_job
WHERE river_job.id = @id
FOR UPDATE
),
notification AS (
SELECT
id,
pg_notify(
concat(current_schema(), '.', @control_topic::text),
json_build_object('action', 'cancel', 'job_id', id, 'queue', queue)::text
)
FROM
locked_job
WHERE
state NOT IN ('cancelled', 'completed', 'discarded')
AND finalized_at IS NULL
),
updated_job AS (
UPDATE river_job
SET
-- If the job is actively running, we want to let its current client and
-- producer handle the cancellation. Otherwise, immediately cancel it.
state = CASE WHEN state = 'running'::river_job_state THEN state ELSE 'cancelled'::river_job_state END,
finalized_at = CASE WHEN state = 'running'::river_job_state THEN finalized_at ELSE now() END,
-- Mark the job as cancelled by query so that the rescuer knows not to
-- rescue it, even if it gets stuck in the running state:
metadata = jsonb_set(metadata, '{cancel_attempted_at}'::text[], @cancel_attempted_at::jsonb, true)
FROM notification
WHERE river_job.id = notification.id
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id = @id::bigint
AND id NOT IN (SELECT id FROM updated_job)
UNION
SELECT *
FROM updated_job;
-- name: JobCountByState :one
SELECT count(*)
FROM river_job
WHERE state = @state;
-- name: JobDelete :one
WITH job_to_delete AS (
SELECT id
FROM river_job
WHERE river_job.id = @id
FOR UPDATE
),
deleted_job AS (
DELETE
FROM river_job
USING job_to_delete
WHERE river_job.id = job_to_delete.id
-- Do not touch running jobs:
AND river_job.state != 'running'::river_job_state
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id = @id::bigint
AND id NOT IN (SELECT id FROM deleted_job)
UNION
SELECT *
FROM deleted_job;
-- name: JobDeleteBefore :one
WITH deleted_jobs AS (
DELETE FROM river_job
WHERE id IN (
SELECT id
FROM river_job
WHERE
(state = 'cancelled' AND finalized_at < @cancelled_finalized_at_horizon::timestamptz) OR
(state = 'completed' AND finalized_at < @completed_finalized_at_horizon::timestamptz) OR
(state = 'discarded' AND finalized_at < @discarded_finalized_at_horizon::timestamptz)
ORDER BY id
LIMIT @max::bigint
)
RETURNING *
)
SELECT count(*)
FROM deleted_jobs;
-- name: JobGetAvailable :many
WITH locked_jobs AS (
SELECT
*
FROM
river_job
WHERE
state = 'available'::river_job_state
AND queue = @queue::text
AND scheduled_at <= now()
ORDER BY
priority ASC,
scheduled_at ASC,
id ASC
LIMIT @max::integer
FOR UPDATE
SKIP LOCKED
)
UPDATE
river_job
SET
state = 'running'::river_job_state,
attempt = river_job.attempt + 1,
attempted_at = now(),
attempted_by = array_append(river_job.attempted_by, @attempted_by::text)
FROM
locked_jobs
WHERE
river_job.id = locked_jobs.id
RETURNING
river_job.*;
-- name: JobGetByKindAndUniqueProperties :one
SELECT *
FROM river_job
WHERE kind = @kind
AND CASE WHEN @by_args::boolean THEN args = @args ELSE true END
AND CASE WHEN @by_created_at::boolean THEN tstzrange(@created_at_begin::timestamptz, @created_at_end::timestamptz, '[)') @> created_at ELSE true END
AND CASE WHEN @by_queue::boolean THEN queue = @queue ELSE true END
AND CASE WHEN @by_state::boolean THEN state::text = any(@state::text[]) ELSE true END;
-- name: JobGetByKindMany :many
SELECT *
FROM river_job
WHERE kind = any(@kind::text[])
ORDER BY id;
-- name: JobGetByID :one
SELECT *
FROM river_job
WHERE id = @id
LIMIT 1;
-- name: JobGetByIDMany :many
SELECT *
FROM river_job
WHERE id = any(@id::bigint[])
ORDER BY id;
-- name: JobGetStuck :many
SELECT *
FROM river_job
WHERE state = 'running'::river_job_state
AND attempted_at < @stuck_horizon::timestamptz
ORDER BY id
LIMIT @max;
-- name: JobInsertFast :one
INSERT INTO river_job(
args,
finalized_at,
kind,
max_attempts,
metadata,
priority,
queue,
scheduled_at,
state,
tags
) VALUES (
@args::jsonb,
@finalized_at,
@kind::text,
@max_attempts::smallint,
coalesce(@metadata::jsonb, '{}'),
@priority::smallint,
@queue::text,
coalesce(sqlc.narg('scheduled_at')::timestamptz, now()),
@state::river_job_state,
coalesce(@tags::varchar(255)[], '{}')
) RETURNING *;
-- name: JobInsertFull :one
INSERT INTO river_job(
args,
attempt,
attempted_at,
created_at,
errors,
finalized_at,
kind,
max_attempts,
metadata,
priority,
queue,
scheduled_at,
state,
tags
) VALUES (
@args::jsonb,
coalesce(@attempt::smallint, 0),
@attempted_at,
coalesce(sqlc.narg('created_at')::timestamptz, now()),
@errors::jsonb[],
@finalized_at,
@kind::text,
@max_attempts::smallint,
coalesce(@metadata::jsonb, '{}'),
@priority::smallint,
@queue::text,
coalesce(sqlc.narg('scheduled_at')::timestamptz, now()),
@state::river_job_state,
coalesce(@tags::varchar(255)[], '{}')
) RETURNING *;
-- Run by the rescuer to queue for retry or discard depending on job state.
-- name: JobRescueMany :exec
UPDATE river_job
SET
errors = array_append(errors, updated_job.error),
finalized_at = updated_job.finalized_at,
scheduled_at = updated_job.scheduled_at,
state = updated_job.state
FROM (
SELECT
unnest(@id::bigint[]) AS id,
unnest(@error::jsonb[]) AS error,
nullif(unnest(@finalized_at::timestamptz[]), '0001-01-01 00:00:00 +0000') AS finalized_at,
unnest(@scheduled_at::timestamptz[]) AS scheduled_at,
unnest(@state::text[])::river_job_state AS state
) AS updated_job
WHERE river_job.id = updated_job.id;
-- name: JobRetry :one
WITH job_to_update AS (
SELECT id
FROM river_job
WHERE river_job.id = @id
FOR UPDATE
),
updated_job AS (
UPDATE river_job
SET
state = 'available'::river_job_state,
scheduled_at = now(),
max_attempts = CASE WHEN attempt = max_attempts THEN max_attempts + 1 ELSE max_attempts END,
finalized_at = NULL
FROM job_to_update
WHERE river_job.id = job_to_update.id
-- Do not touch running jobs:
AND river_job.state != 'running'::river_job_state
-- If the job is already available with a prior scheduled_at, leave it alone.
AND NOT (river_job.state = 'available'::river_job_state AND river_job.scheduled_at < now())
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id = @id::bigint
AND id NOT IN (SELECT id FROM updated_job)
UNION
SELECT *
FROM updated_job;
-- name: JobSchedule :many
WITH jobs_to_schedule AS (
SELECT id
FROM river_job
WHERE
state IN ('retryable', 'scheduled')
AND queue IS NOT NULL
AND priority >= 0
AND river_job.scheduled_at <= @now::timestamptz
ORDER BY
priority,
scheduled_at,
id
LIMIT @max::bigint
FOR UPDATE
),
river_job_scheduled AS (
UPDATE river_job
SET state = 'available'::river_job_state
FROM jobs_to_schedule
WHERE river_job.id = jobs_to_schedule.id
RETURNING river_job.id
)
SELECT *
FROM river_job
WHERE id IN (SELECT id FROM river_job_scheduled);
-- name: JobSetCompleteIfRunningMany :many
WITH job_to_finalized_at AS (
SELECT
unnest(@id::bigint[]) AS id,
unnest(@finalized_at::timestamptz[]) AS finalized_at
),
job_to_update AS (
SELECT river_job.id, job_to_finalized_at.finalized_at
FROM river_job, job_to_finalized_at
WHERE river_job.id = job_to_finalized_at.id
AND river_job.state = 'running'::river_job_state
FOR UPDATE
),
updated_job AS (
UPDATE river_job
SET
finalized_at = job_to_update.finalized_at,
state = 'completed'
FROM job_to_update
WHERE river_job.id = job_to_update.id
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id IN (SELECT id FROM job_to_finalized_at EXCEPT SELECT id FROM updated_job)
UNION
SELECT *
FROM updated_job;
-- name: JobSetStateIfRunning :one
WITH job_to_update AS (
SELECT
id,
@state::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel
FROM river_job
WHERE id = @id::bigint
FOR UPDATE
),
updated_job AS (
UPDATE river_job
SET
state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state
ELSE @state::river_job_state END,
finalized_at = CASE WHEN should_cancel THEN now()
WHEN @finalized_at_do_update::boolean THEN @finalized_at
ELSE finalized_at END,
errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb)
ELSE errors END,
max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts
ELSE max_attempts END,
scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz
ELSE scheduled_at END
FROM job_to_update
WHERE river_job.id = job_to_update.id
AND river_job.state = 'running'::river_job_state
RETURNING river_job.*
)
SELECT *
FROM river_job
WHERE id = @id::bigint
AND id NOT IN (SELECT id FROM updated_job)
UNION
SELECT *
FROM updated_job;
-- A generalized update for any property on a job. This brings in a large number
-- of parameters and therefore may be more suitable for testing than production.
-- name: JobUpdate :one
UPDATE river_job
SET
attempt = CASE WHEN @attempt_do_update::boolean THEN @attempt ELSE attempt END,
attempted_at = CASE WHEN @attempted_at_do_update::boolean THEN @attempted_at ELSE attempted_at END,
errors = CASE WHEN @errors_do_update::boolean THEN @errors::jsonb[] ELSE errors END,
finalized_at = CASE WHEN @finalized_at_do_update::boolean THEN @finalized_at ELSE finalized_at END,
state = CASE WHEN @state_do_update::boolean THEN @state ELSE state END
WHERE id = @id
RETURNING *;