forked from pgq/skytools-legacy
/
pgq.batch_event_sql.sql
133 lines (128 loc) · 5.19 KB
/
pgq.batch_event_sql.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
create or replace function pgq.batch_event_sql(x_batch_id bigint)
returns text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.batch_event_sql(1)
-- Creates SELECT statement that fetches events for this batch.
--
-- Parameters:
-- x_batch_id - ID of a active batch.
--
-- Returns:
-- SQL statement.
-- ----------------------------------------------------------------------
-- ----------------------------------------------------------------------
-- Algorithm description:
-- Given 2 snapshots, sn1 and sn2 with sn1 having xmin1, xmax1
-- and sn2 having xmin2, xmax2 create expression that filters
-- right txid's from event table.
--
-- Simplest solution would be
-- > WHERE ev_txid >= xmin1 AND ev_txid <= xmax2
-- > AND NOT txid_visible_in_snapshot(ev_txid, sn1)
-- > AND txid_visible_in_snapshot(ev_txid, sn2)
--
-- The simple solution has a problem with long transactions (xmin1 very low).
-- All the batches that happen when the long tx is active will need
-- to scan all events in that range. Here is 2 optimizations used:
--
-- 1) Use [xmax1..xmax2] for range scan. That limits the range to
-- txids that actually happened between two snapshots. For txids
-- in the range [xmin1..xmax1] look which ones were actually
-- committed between snapshots and search for them using exact
-- values using IN (..) list.
--
-- 2) As most TX are short, there could be lot of them that were
-- just below xmax1, but were committed before xmax2. So look
-- if there are ID's near xmax1 and lower the range to include
-- them, thus decresing size of IN (..) list.
-- ----------------------------------------------------------------------
declare
rec record;
sql text;
tbl text;
arr text;
part text;
select_fields text;
retry_expr text;
batch record;
begin
select s.sub_last_tick, s.sub_next_tick, s.sub_id, s.sub_queue,
txid_snapshot_xmax(last.tick_snapshot) as tx_start,
txid_snapshot_xmax(cur.tick_snapshot) as tx_end,
last.tick_snapshot as last_snapshot,
cur.tick_snapshot as cur_snapshot
into batch
from pgq.subscription s, pgq.tick last, pgq.tick cur
where s.sub_batch = x_batch_id
and last.tick_queue = s.sub_queue
and last.tick_id = s.sub_last_tick
and cur.tick_queue = s.sub_queue
and cur.tick_id = s.sub_next_tick;
if not found then
raise exception 'batch not found';
end if;
-- load older transactions
arr := '';
for rec in
-- active tx-es in prev_snapshot that were committed in cur_snapshot
select id1 from
txid_snapshot_xip(batch.last_snapshot) id1 left join
txid_snapshot_xip(batch.cur_snapshot) id2 on (id1 = id2)
where id2 is null
order by 1 desc
loop
-- try to avoid big IN expression, so try to include nearby
-- tx'es into range
if batch.tx_start - 100 <= rec.id1 then
batch.tx_start := rec.id1;
else
if arr = '' then
arr := rec.id1::text;
else
arr := arr || ',' || rec.id1::text;
end if;
end if;
end loop;
-- must match pgq.event_template
select_fields := 'select ev_id, ev_time, ev_txid, ev_retry, ev_type,'
|| ' ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4';
retry_expr := ' and (ev_owner is null or ev_owner = '
|| batch.sub_id::text || ')';
-- now generate query that goes over all potential tables
sql := '';
for rec in
select xtbl from pgq.batch_event_tables(x_batch_id) xtbl
loop
tbl := pgq.quote_fqname(rec.xtbl);
-- this gets newer queries that definitely are not in prev_snapshot
part := select_fields
|| ' from pgq.tick cur, pgq.tick last, ' || tbl || ' ev '
|| ' where cur.tick_id = ' || batch.sub_next_tick::text
|| ' and cur.tick_queue = ' || batch.sub_queue::text
|| ' and last.tick_id = ' || batch.sub_last_tick::text
|| ' and last.tick_queue = ' || batch.sub_queue::text
|| ' and ev.ev_txid >= ' || batch.tx_start::text
|| ' and ev.ev_txid <= ' || batch.tx_end::text
|| ' and txid_visible_in_snapshot(ev.ev_txid, cur.tick_snapshot)'
|| ' and not txid_visible_in_snapshot(ev.ev_txid, last.tick_snapshot)'
|| retry_expr;
-- now include older tx-es, that were ongoing
-- at the time of prev_snapshot
if arr <> '' then
part := part || ' union all '
|| select_fields || ' from ' || tbl || ' ev '
|| ' where ev.ev_txid in (' || arr || ')'
|| retry_expr;
end if;
if sql = '' then
sql := part;
else
sql := sql || ' union all ' || part;
end if;
end loop;
if sql = '' then
raise exception 'could not construct sql for batch %', x_batch_id;
end if;
return sql || ' order by 1';
end;
$$ language plpgsql; -- no perms needed