forked from pgq/skytools-legacy
/
pgq-sql.txt
149 lines (83 loc) · 4.13 KB
/
pgq-sql.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
= PgQ - queue for PostgreSQL =
== Queue creation ==
pgq.create_queue(queue_name text)
Initialize event queue.
Returns 0 if event queue already exists, 1 otherwise.
== Producer ==
pgq.insert_event(queue_name text, ev_type, ev_data)
pgq.insert_event(queue_name text, ev_type, ev_data, extra1, extra2, extra3, extra4)
Generate new event. This should be called inside main tx - thus
rollbacked with it if needed.
== Consumer ==
pgq.register_consumer(queue_name text, consumer_id text)
Attaches this consumer to particular event queue.
Returns 0 if the consumer was already attached, 1 otherwise.
pgq.unregister_consumer(queue_name text, consumer_id text)
Unregister and drop resources allocated to customer.
pgq.next_batch(queue_name text, consumer_id text)
Allocates next batch of events to consumer.
Returns batch id (int8), to be used in processing functions. If no batches
are available, returns NULL. That means that the ticker has not cut them yet.
This is the appropriate moment for consumer to sleep.
pgq.get_batch_events(batch_id int8)
`pgq.get_batch_events()` returns a set of events in this batch.
There may be no events in the batch. This is normal. The batch must still be closed
with pgq.finish_batch().
Event fields: (ev_id int8, ev_time timestamptz, ev_txid int8, ev_retry int4, ev_type text,
ev_data text, ev_extra1, ev_extra2, ev_extra3, ev_extra4)
pgq.event_failed(batch_id int8, event_id int8, reason text)
Tag event as 'failed' - it will be stored, but not further processing is done.
pgq.event_retry(batch_id int8, event_id int8, retry_seconds int4)
Tag event for 'retry' - after x seconds the event will be re-inserted
into main queue.
pgq.finish_batch(batch_id int8)
Tag batch as finished. Until this is not done, the consumer will get
same batch again.
After calling finish_batch consumer cannot do any operations with events
of that batch. All operations must be done before.
== Failed queue operation ==
Events tagged as failed just stay on their queue. Following
functions can be used to manage them.
pgq.failed_event_list(queue_name, consumer)
pgq.failed_event_list(queue_name, consumer, cnt, offset)
pgq.failed_event_count(queue_name, consumer)
Get info about the queue.
Event fields are same as for pgq.get_batch_events()
pgq.failed_event_delete(queue_name, consumer, event_id)
pgq.failed_event_retry(queue_name, consumer, event_id)
Remove an event from queue, or retry it.
== Info operations ==
pgq.get_queue_info()
Get list of queues.
Result: (queue_name, queue_ntables, queue_cur_table, queue_rotation_period, queue_switch_time, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag)
pgq.get_consumer_info()
pgq.get_consumer_info(queue_name)
pgq.get_consumer_info(queue_name, consumer)
Get list of active consumers.
Result: (queue_name, consumer_name, lag, last_seen, last_tick, current_batch, next_tick)
pgq.get_batch_info(batch_id)
Get info about batch.
Result fields: (queue_name, consumer_name, batch_start, batch_end, prev_tick_id, tick_id, lag)
== Notes ==
Consumer *must* be able to process same event several times.
== Example ==
First, create event queue:
select pgq.create_queue('LogEvent');
Then, producer side can do whenever it wishes:
select pgq.insert_event('LogEvent', 'data', 'DataFor123');
First step for consumer is to register:
select pgq.register_consumer('LogEvent', 'TestConsumer');
Then it can enter into consuming loop:
begin;
select pgq.next_batch('LogEvent', 'TestConsumer'); [into batch_id]
commit;
That will reserve a batch of events for this consumer.
To see the events in batch:
select * from pgq.get_batch_events(batch_id);
That will give all events in batch. The processing does not need to be happen
all in one transaction, framework can split the work how it wants.
If a events failed or needs to be tried again, framework can call:
select pgq.event_retry(batch_id, event_id, 60);
select pgq.event_failed(batch_id, event_id, 'Record deleted');
When all done, notify database about it:
select pgq.finish_batch(batch_id)