/
get-category-messages.sql
133 lines (116 loc) · 4.59 KB
/
get-category-messages.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
CREATE OR REPLACE FUNCTION message_store.get_category_messages(
category varchar,
"position" bigint DEFAULT 1,
batch_size bigint DEFAULT 1000,
correlation varchar DEFAULT NULL,
consumer_group_member bigint DEFAULT NULL,
consumer_group_size bigint DEFAULT NULL,
condition varchar DEFAULT NULL
)
RETURNS SETOF message_store.message
AS $$
DECLARE
_command text;
BEGIN
IF NOT is_category(get_category_messages.category) THEN
RAISE EXCEPTION
'Must be a category: %',
get_category_messages.category;
END IF;
position := COALESCE(position, 1);
batch_size := COALESCE(batch_size, 1000);
_command := '
SELECT
id::varchar,
stream_name::varchar,
type::varchar,
position::bigint,
global_position::bigint,
data::varchar,
metadata::varchar,
time::timestamp
FROM
messages
WHERE
category(stream_name) = $1 AND
global_position >= $2';
IF get_category_messages.correlation IS NOT NULL THEN
IF position('-' IN get_category_messages.correlation) > 0 THEN
RAISE EXCEPTION
'Correlation must be a category (Correlation: %)',
get_category_messages.correlation;
END IF;
_command := _command || ' AND
category(metadata->>''correlationStreamName'') = $4';
END IF;
IF (get_category_messages.consumer_group_member IS NOT NULL AND
get_category_messages.consumer_group_size IS NULL) OR
(get_category_messages.consumer_group_member IS NULL AND
get_category_messages.consumer_group_size IS NOT NULL) THEN
RAISE EXCEPTION
'Consumer group member and size must be specified (Consumer Group Member: %, Consumer Group Size: %)',
get_category_messages.consumer_group_member,
get_category_messages.consumer_group_size;
END IF;
IF get_category_messages.consumer_group_member IS NOT NULL AND
get_category_messages.consumer_group_size IS NOT NULL THEN
IF get_category_messages.consumer_group_size < 1 THEN
RAISE EXCEPTION
'Consumer group size must not be less than 1 (Consumer Group Member: %, Consumer Group Size: %)',
get_category_messages.consumer_group_member,
get_category_messages.consumer_group_size;
END IF;
IF get_category_messages.consumer_group_member < 0 THEN
RAISE EXCEPTION
'Consumer group member must not be less than 0 (Consumer Group Member: %, Consumer Group Size: %)',
get_category_messages.consumer_group_member,
get_category_messages.consumer_group_size;
END IF;
IF get_category_messages.consumer_group_member >= get_category_messages.consumer_group_size THEN
RAISE EXCEPTION
'Consumer group member must be less than the group size (Consumer Group Member: %, Consumer Group Size: %)',
get_category_messages.consumer_group_member,
get_category_messages.consumer_group_size;
END IF;
_command := _command || ' AND
MOD(@hash_64(cardinal_id(stream_name)), $6) = $5';
END IF;
IF get_category_messages.condition IS NOT NULL THEN
IF current_setting('message_store.sql_condition', true) IS NULL OR
current_setting('message_store.sql_condition', true) = 'off' THEN
RAISE EXCEPTION
'Retrieval with SQL condition is not activated';
END IF;
_command := _command || ' AND
(%s)';
_command := format(_command, get_category_messages.condition);
END IF;
_command := _command || '
ORDER BY
global_position ASC';
IF get_category_messages.batch_size != -1 THEN
_command := _command || '
LIMIT
$3';
END IF;
IF current_setting('message_store.debug_get', true) = 'on' OR current_setting('message_store.debug', true) = 'on' THEN
RAISE NOTICE '» get_category_messages';
RAISE NOTICE 'category ($1): %', get_category_messages.category;
RAISE NOTICE 'position ($2): %', get_category_messages.position;
RAISE NOTICE 'batch_size ($3): %', get_category_messages.batch_size;
RAISE NOTICE 'correlation ($4): %', get_category_messages.correlation;
RAISE NOTICE 'consumer_group_member ($5): %', get_category_messages.consumer_group_member;
RAISE NOTICE 'consumer_group_size ($6): %', get_category_messages.consumer_group_size;
RAISE NOTICE 'condition: %', get_category_messages.condition;
RAISE NOTICE 'Generated Command: %', _command;
END IF;
RETURN QUERY EXECUTE _command USING
get_category_messages.category,
get_category_messages.position,
get_category_messages.batch_size,
get_category_messages.correlation,
get_category_messages.consumer_group_member,
get_category_messages.consumer_group_size::smallint;
END;
$$ LANGUAGE plpgsql
VOLATILE;