Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
76 lines (67 sloc) 2.4 KB
drop schema if exists rabbitmq cascade;
create schema rabbitmq;
grant usage on schema rabbitmq to public;
create or replace function rabbitmq.send_message(
channel text,
routing_key text,
message text) returns void as $$
select pg_notify(
routing_key || '|' || message
$$ stable language sql;
create or replace function rabbitmq.on_row_change() returns trigger as $$
routing_key text;
row jsonb;
config jsonb;
excluded_columns text[];
col text;
-- we use the '.user-10.' part of the routing key to select which user should be able to receive these events.
routing_key := 'row_change'
'.table-'::text || TG_TABLE_NAME::text ||
'.event-'::text || TG_OP::text ||
'.user-'::text || request.user_id()::text;
if (TG_OP = 'DELETE') then
row := row_to_json(old)::jsonb;
elsif (TG_OP = 'UPDATE') then
row := row_to_json(new)::jsonb;
elsif (TG_OP = 'INSERT') then
row := row_to_json(new)::jsonb;
end if;
-- decide what row columns to send based on the config parameter
-- there is a 8000 byte hard limit on the payload size so sending many big columns is not possible
if ( TG_NARGS = 1 ) then
config := TG_ARGV[0];
if (config ? 'include') then
--excluded_columns := ARRAY(SELECT unnest(jsonb_object_keys(row)::text[]) EXCEPT SELECT unnest( array(select jsonb_array_elements_text(config->'include')) ));
-- this is a diff between two arrays
excluded_columns := array(
-- array of all row columns
select unnest(
array(select jsonb_object_keys(row))
-- array of included columns
select unnest(
array(select jsonb_array_elements_text(config->'include'))
end if;
if (config ? 'exclude') then
excluded_columns := array(select jsonb_array_elements_text(config->'exclude'));
end if;
if (current_setting('server_version_num')::int >= 100000) then
row := row - excluded_columns;
FOREACH col IN ARRAY excluded_columns
row := row - col;
end if;
end if;
perform rabbitmq.send_message('events', routing_key, row::text);
return null;
$$ stable language plpgsql;