Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Upgrade script for Londiste - 2.1 to 3.1

  • Loading branch information...
commit 837d18f198b81f0d7e9ab70dae092b02dbefc5ac 1 parent ea471f9
@markokr authored
View
1  setup_skytools.py
@@ -64,6 +64,7 @@
'sql/pgq_coop/pgq_coop.upgrade.sql',
'sql/pgq_ext/pgq_ext.upgrade.sql',
'upgrade/final/pgq.upgrade_2.1_to_3.0.sql',
+ 'upgrade/final/londiste.upgrade_2.1_to_3.1.sql',
]
# sql files for special occasions
View
18 upgrade/Makefile
@@ -2,6 +2,7 @@
#SQLS = v2.1.5_londiste.sql v2.1.5_pgq_core.sql v2.1.5_pgq_ext.sql
#SQLS = v2.1.6_londiste.sql v2.1.6_pgq_ext.sql
#SQLS = v3.0_pgq_core.sql
+#SQLS = londiste.upgrade_2.1_to_3.1.sql
SRCS = $(addprefix src/, $(SQLS))
DSTS = $(addprefix final/, $(SQLS))
@@ -13,3 +14,20 @@ all: $(DSTS)
final/%.sql: src/%.sql
$(CATSQL) $< > $@
+final/londiste.upgrade_2.1_to_3.1.sql: src/londiste.2to3.sql ../sql/londiste/londiste.sql
+ echo "begin;" > $@
+ cat src/londiste.2to3.sql >> $@
+ grep -v 'create schema' ../sql/londiste/londiste.sql >> $@
+ echo "commit;" >> $@
+
+PSQL = psql -q
+
+ltest: ../sql/pgq_node/pgq_node.sql
+ $(PSQL) -d postgres -c "drop database if exists londiste_upgrade_test"
+ $(PSQL) -d postgres -c "create database londiste_upgrade_test"
+ $(PSQL) -d londiste_upgrade_test -f final/pgq_core_2.1.13.sql
+ $(PSQL) -d londiste_upgrade_test -f final/londiste.2.1.12.sql
+ $(PSQL) -d londiste_upgrade_test -f final/pgq.upgrade_2.1_to_3.0.sql
+ $(PSQL) -d londiste_upgrade_test -f ../sql/pgq_node/pgq_node.sql
+ $(PSQL) -d londiste_upgrade_test -f final/londiste.upgrade_2.1_to_3.1.sql
+
View
1,091 upgrade/final/londiste.2.1.12.sql
@@ -0,0 +1,1091 @@
+set default_with_oids = 'off';
+
+create schema londiste;
+
+create table londiste.provider_table (
+ nr serial not null,
+ queue_name text not null,
+ table_name text not null,
+ trigger_name text,
+ primary key (queue_name, table_name)
+);
+
+create table londiste.provider_seq (
+ nr serial not null,
+ queue_name text not null,
+ seq_name text not null,
+ primary key (queue_name, seq_name)
+);
+
+create table londiste.completed (
+ consumer_id text not null,
+ last_tick_id bigint not null,
+
+ primary key (consumer_id)
+);
+
+create table londiste.link (
+ source text not null,
+ dest text not null,
+ primary key (source),
+ unique (dest)
+);
+
+create table londiste.subscriber_table (
+ nr serial not null,
+ queue_name text not null,
+ table_name text not null,
+ snapshot text,
+ merge_state text,
+ trigger_name text,
+
+ skip_truncate bool,
+
+ primary key (queue_name, table_name)
+);
+
+create table londiste.subscriber_seq (
+ nr serial not null,
+ queue_name text not null,
+ seq_name text not null,
+
+ primary key (queue_name, seq_name)
+);
+
+create table londiste.subscriber_pending_fkeys (
+ from_table text not null,
+ to_table text not null,
+ fkey_name text not null,
+ fkey_def text not null,
+
+ primary key (from_table, fkey_name)
+);
+
+create table londiste.subscriber_pending_triggers (
+ table_name text not null,
+ trigger_name text not null,
+ trigger_def text not null,
+
+ primary key (table_name, trigger_name)
+);
+
+grant usage on schema londiste to public;
+grant select on londiste.provider_table to public;
+grant select on londiste.completed to public;
+grant select on londiste.link to public;
+grant select on londiste.subscriber_table to public;
+
+
+create type londiste.ret_provider_table_list as (
+ table_name text,
+ trigger_name text
+);
+
+create type londiste.ret_subscriber_table as (
+ table_name text,
+ merge_state text,
+ snapshot text,
+ trigger_name text,
+ skip_truncate bool
+);
+
+create or replace function londiste.find_column_types(tbl text)
+returns text as $$
+declare
+ res text;
+ col record;
+ tbl_oid oid;
+begin
+ tbl_oid := londiste.find_table_oid(tbl);
+ res := '';
+ for col in
+ SELECT CASE WHEN k.attname IS NOT NULL THEN 'k' ELSE 'v' END AS type
+ FROM pg_attribute a LEFT JOIN (
+ SELECT k.attname FROM pg_index i, pg_attribute k
+ WHERE i.indrelid = tbl_oid AND k.attrelid = i.indexrelid
+ AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped
+ ) k ON (k.attname = a.attname)
+ WHERE a.attrelid = tbl_oid AND a.attnum > 0 AND NOT a.attisdropped
+ ORDER BY a.attnum
+ loop
+ res := res || col.type;
+ end loop;
+
+ return res;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.find_table_fkeys(i_table_name text)
+returns setof londiste.subscriber_pending_fkeys as $$
+declare
+ fkey record;
+ tbl_oid oid;
+begin
+ select londiste.find_table_oid(i_table_name) into tbl_oid;
+
+ for fkey in
+ select n1.nspname || '.' || t1.relname as from_table, n2.nspname || '.' || t2.relname as to_table,
+ conname::text as fkey_name,
+ 'alter table only ' || quote_ident(n1.nspname) || '.' || quote_ident(t1.relname)
+ || ' add constraint ' || quote_ident(conname::text) || ' ' || pg_get_constraintdef(c.oid)
+ as fkey_def
+ from pg_constraint c, pg_namespace n1, pg_class t1, pg_namespace n2, pg_class t2
+ where c.contype = 'f' and (c.conrelid = tbl_oid or c.confrelid = tbl_oid)
+ and t1.oid = c.conrelid and n1.oid = t1.relnamespace
+ and t2.oid = c.confrelid and n2.oid = t2.relnamespace
+ order by 1,2,3
+ loop
+ return next fkey;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.find_rel_oid(tbl text, kind text)
+returns oid as $$
+declare
+ res oid;
+ pos integer;
+ schema text;
+ name text;
+begin
+ pos := position('.' in tbl);
+ if pos > 0 then
+ schema := substring(tbl for pos - 1);
+ name := substring(tbl from pos + 1);
+ else
+ schema := 'public';
+ name := tbl;
+ end if;
+ select c.oid into res
+ from pg_namespace n, pg_class c
+ where c.relnamespace = n.oid
+ and c.relkind = kind
+ and n.nspname = schema and c.relname = name;
+ if not found then
+ if kind = 'r' then
+ raise exception 'table not found';
+ elsif kind = 'S' then
+ raise exception 'seq not found';
+ else
+ raise exception 'weird relkind';
+ end if;
+ end if;
+
+ return res;
+end;
+$$ language plpgsql strict stable;
+
+create or replace function londiste.find_table_oid(tbl text)
+returns oid as $$
+begin
+ return londiste.find_rel_oid(tbl, 'r');
+end;
+$$ language plpgsql strict stable;
+
+create or replace function londiste.find_seq_oid(tbl text)
+returns oid as $$
+begin
+ return londiste.find_rel_oid(tbl, 'S');
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.find_table_triggers(i_table_name text)
+returns setof londiste.subscriber_pending_triggers as $$
+declare
+ tg record;
+ ver int4;
+begin
+ select setting::int4 into ver from pg_settings
+ where name = 'server_version_num';
+
+ if ver >= 90000 then
+ for tg in
+ select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def
+ from pg_trigger t, pg_class c, pg_namespace n
+ where n.oid = c.relnamespace and c.oid = t.tgrelid
+ and t.tgrelid = londiste.find_table_oid(i_table_name)
+ and not t.tgisinternal
+ loop
+ return next tg;
+ end loop;
+ else
+ for tg in
+ select n.nspname || '.' || c.relname as table_name, t.tgname::text as name, pg_get_triggerdef(t.oid) as def
+ from pg_trigger t, pg_class c, pg_namespace n
+ where n.oid = c.relnamespace and c.oid = t.tgrelid
+ and t.tgrelid = londiste.find_table_oid(i_table_name)
+ and not t.tgisconstraint
+ loop
+ return next tg;
+ end loop;
+ end if;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+create or replace function londiste.get_last_tick(i_consumer text)
+returns bigint as $$
+declare
+ res bigint;
+begin
+ select last_tick_id into res
+ from londiste.completed
+ where consumer_id = i_consumer;
+ return res;
+end;
+$$ language plpgsql security definer strict stable;
+
+
+create or replace function londiste.link_source(i_dst_name text)
+returns text as $$
+declare
+ res text;
+begin
+ select source into res from londiste.link
+ where dest = i_dst_name;
+ return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.link_dest(i_source_name text)
+returns text as $$
+declare
+ res text;
+begin
+ select dest into res from londiste.link
+ where source = i_source_name;
+ return res;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.cmp_list(list1 text, a_queue text, a_table text, a_field text)
+returns boolean as $$
+declare
+ sql text;
+ tmp record;
+ list2 text;
+begin
+ sql := 'select ' || quote_ident(a_field) || ' as name from ' || londiste.quote_fqname(a_table)
+ || ' where queue_name = ' || quote_literal(a_queue)
+ || ' order by 1';
+ list2 := '';
+ for tmp in execute sql loop
+ if list2 = '' then
+ list2 := tmp.name;
+ else
+ list2 := list2 || ',' || tmp.name;
+ end if;
+ end loop;
+ return list1 = list2;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.link(i_source_name text, i_dest_name text, prov_tick_id bigint, prov_tbl_list text, prov_seq_list text)
+returns text as $$
+declare
+ tmp text;
+ list text;
+ tick_seq text;
+ external boolean;
+ last_tick bigint;
+begin
+ -- check if all matches
+ if not londiste.cmp_list(prov_tbl_list, i_source_name,
+ 'londiste.subscriber_table', 'table_name')
+ then
+ raise exception 'not all tables copied into subscriber';
+ end if;
+ if not londiste.cmp_list(prov_seq_list, i_source_name,
+ 'londiste.subscriber_seq', 'seq_name')
+ then
+ raise exception 'not all seqs copied into subscriber';
+ end if;
+ if not londiste.cmp_list(prov_seq_list, i_dest_name,
+ 'londiste.provider_table', 'table_name')
+ then
+ raise exception 'linked provider queue does not have all tables';
+ end if;
+ if not londiste.cmp_list(prov_seq_list, i_dest_name,
+ 'londiste.provider_seq', 'seq_name')
+ then
+ raise exception 'linked provider queue does not have all seqs';
+ end if;
+
+ -- check pgq
+ select queue_external_ticker, queue_tick_seq into external, tick_seq
+ from pgq.queue where queue_name = i_dest_name;
+ if not found then
+ raise exception 'dest queue does not exist';
+ end if;
+ if external then
+ raise exception 'dest queue has already external_ticker turned on?';
+ end if;
+
+ if nextval(tick_seq) >= prov_tick_id then
+ raise exception 'dest queue ticks larger';
+ end if;
+
+ update pgq.queue set queue_external_ticker = true
+ where queue_name = i_dest_name;
+
+ insert into londiste.link (source, dest) values (i_source_name, i_dest_name);
+
+ return null;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.link_del(i_source_name text, i_dest_name text)
+returns text as $$
+begin
+ delete from londiste.link
+ where source = i_source_name
+ and dest = i_dest_name;
+ if not found then
+ raise exception 'no suck link';
+ end if;
+ return null;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_add_seq(
+ i_queue_name text, i_seq_name text)
+returns integer as $$
+declare
+ link text;
+begin
+ -- check if linked queue
+ link := londiste.link_source(i_queue_name);
+ if link is not null then
+ raise exception 'Linked queue, cannot modify';
+ end if;
+
+ perform 1 from pg_class
+ where oid = londiste.find_seq_oid(i_seq_name);
+ if not found then
+ raise exception 'seq not found';
+ end if;
+
+ insert into londiste.provider_seq (queue_name, seq_name)
+ values (i_queue_name, i_seq_name);
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.provider_add_table(
+ i_queue_name text,
+ i_table_name text,
+ i_col_types text
+) returns integer strict as $$
+declare
+ tgname text;
+ sql text;
+begin
+ if londiste.link_source(i_queue_name) is not null then
+ raise exception 'Linked queue, manipulation not allowed';
+ end if;
+
+ if position('k' in i_col_types) < 1 then
+ raise exception 'need key column';
+ end if;
+ if position('.' in i_table_name) < 1 then
+ raise exception 'need fully-qualified table name';
+ end if;
+ select queue_name into tgname
+ from pgq.queue where queue_name = i_queue_name;
+ if not found then
+ raise exception 'no such event queue';
+ end if;
+
+ tgname := i_queue_name || '_logger';
+ tgname := replace(lower(tgname), '.', '_');
+ insert into londiste.provider_table
+ (queue_name, table_name, trigger_name)
+ values (i_queue_name, i_table_name, tgname);
+
+ perform londiste.provider_create_trigger(
+ i_queue_name, i_table_name, i_col_types);
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.provider_add_table(
+ i_queue_name text,
+ i_table_name text
+) returns integer as $$
+begin
+ return londiste.provider_add_table(i_queue_name, i_table_name,
+ londiste.find_column_types(i_table_name));
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_create_trigger(
+ i_queue_name text,
+ i_table_name text,
+ i_col_types text
+) returns integer strict as $$
+declare
+ tgname text;
+begin
+ select trigger_name into tgname
+ from londiste.provider_table
+ where queue_name = i_queue_name
+ and table_name = i_table_name;
+ if not found then
+ raise exception 'table not found';
+ end if;
+
+ execute 'create trigger ' || quote_ident(tgname)
+ || ' after insert or update or delete on '
+ || londiste.quote_fqname(i_table_name)
+ || ' for each row execute procedure pgq.logtriga('
+ || quote_literal(i_queue_name) || ', '
+ || quote_literal(i_col_types) || ', '
+ || quote_literal(i_table_name) || ')';
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_get_seq_list(i_queue_name text)
+returns setof text as $$
+declare
+ rec record;
+begin
+ for rec in
+ select seq_name from londiste.provider_seq
+ where queue_name = i_queue_name
+ order by nr
+ loop
+ return next rec.seq_name;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_get_table_list(i_queue text)
+returns setof londiste.ret_provider_table_list as $$
+declare
+ rec londiste.ret_provider_table_list%rowtype;
+begin
+ for rec in
+ select table_name, trigger_name
+ from londiste.provider_table
+ where queue_name = i_queue
+ order by nr
+ loop
+ return next rec;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_notify_change(i_queue_name text)
+returns integer as $$
+declare
+ res text;
+ tbl record;
+begin
+ res := '';
+ for tbl in
+ select table_name from londiste.provider_table
+ where queue_name = i_queue_name
+ order by nr
+ loop
+ if res = '' then
+ res := tbl.table_name;
+ else
+ res := res || ',' || tbl.table_name;
+ end if;
+ end loop;
+
+ perform pgq.insert_event(i_queue_name, 'T', res);
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_refresh_trigger(
+ i_queue_name text,
+ i_table_name text,
+ i_col_types text
+) returns integer strict as $$
+declare
+ t_name text;
+ tbl_oid oid;
+begin
+ select trigger_name into t_name
+ from londiste.provider_table
+ where queue_name = i_queue_name
+ and table_name = i_table_name;
+ if not found then
+ raise exception 'table not found';
+ end if;
+
+ tbl_oid := londiste.find_table_oid(i_table_name);
+ perform 1 from pg_trigger
+ where tgrelid = tbl_oid
+ and tgname = t_name;
+ if found then
+ execute 'drop trigger ' || quote_ident(t_name)
+ || ' on ' || londiste.quote_fqname(i_table_name);
+ end if;
+
+ perform londiste.provider_create_trigger(i_queue_name, i_table_name, i_col_types);
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.provider_refresh_trigger(
+ i_queue_name text,
+ i_table_name text
+) returns integer strict as $$
+begin
+ return londiste.provider_refresh_trigger(i_queue_name, i_table_name,
+ londiste.find_column_types(i_table_name));
+end;
+$$ language plpgsql security definer;
+
+
+
+
+create or replace function londiste.provider_remove_seq(
+ i_queue_name text, i_seq_name text)
+returns integer as $$
+declare
+ link text;
+begin
+ -- check if linked queue
+ link := londiste.link_source(i_queue_name);
+ if link is not null then
+ raise exception 'Linked queue, cannot modify';
+ end if;
+
+ delete from londiste.provider_seq
+ where queue_name = i_queue_name
+ and seq_name = i_seq_name;
+ if not found then
+ raise exception 'seq not attached';
+ end if;
+
+ perform londiste.provider_notify_change(i_queue_name);
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.provider_remove_table(
+ i_queue_name text,
+ i_table_name text
+) returns integer as $$
+declare
+ tgname text;
+begin
+ if londiste.link_source(i_queue_name) is not null then
+ raise exception 'Linked queue, manipulation not allowed';
+ end if;
+
+ select trigger_name into tgname from londiste.provider_table
+ where queue_name = i_queue_name
+ and table_name = i_table_name;
+ if not found then
+ raise exception 'no such table registered';
+ end if;
+
+ begin
+ execute 'drop trigger ' || quote_ident(tgname) || ' on ' || londiste.quote_fqname(i_table_name);
+ exception
+ when undefined_table then
+ raise notice 'table % does not exist', i_table_name;
+ when undefined_object then
+ raise notice 'trigger % does not exist on table %', tgname, i_table_name;
+ end;
+
+ delete from londiste.provider_table
+ where queue_name = i_queue_name
+ and table_name = i_table_name;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+
+create or replace function londiste.quote_fqname(i_name text)
+returns text as $$
+declare
+ res text;
+ pos integer;
+ s text;
+ n text;
+begin
+ pos := position('.' in i_name);
+ if pos > 0 then
+ s := substring(i_name for pos - 1);
+ n := substring(i_name from pos + 1);
+ else
+ s := 'public';
+ n := i_name;
+ end if;
+ return quote_ident(s) || '.' || quote_ident(n);
+end;
+$$ language plpgsql strict immutable;
+
+
+create or replace function londiste.set_last_tick(
+ i_consumer text,
+ i_tick_id bigint)
+returns integer as $$
+begin
+ if i_tick_id is null then
+ delete from londiste.completed
+ where consumer_id = i_consumer;
+ else
+ update londiste.completed
+ set last_tick_id = i_tick_id
+ where consumer_id = i_consumer;
+ if not found then
+ insert into londiste.completed (consumer_id, last_tick_id)
+ values (i_consumer, i_tick_id);
+ end if;
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_add_seq(
+ i_queue_name text, i_seq_name text)
+returns integer as $$
+declare
+ link text;
+begin
+ insert into londiste.subscriber_seq (queue_name, seq_name)
+ values (i_queue_name, i_seq_name);
+
+ -- update linked queue if needed
+ link := londiste.link_dest(i_queue_name);
+ if link is not null then
+ insert into londiste.provider_seq
+ (queue_name, seq_name)
+ values (link, i_seq_name);
+ perform londiste.provider_notify_change(link);
+ end if;
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_add_table(
+ i_queue_name text, i_table text)
+returns integer as $$
+begin
+ insert into londiste.subscriber_table (queue_name, table_name)
+ values (i_queue_name, i_table);
+
+ -- linked queue is updated, when the table is copied
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+
+
+create or replace function londiste.subscriber_get_table_pending_fkeys(i_table_name text)
+returns setof londiste.subscriber_pending_fkeys as $$
+declare
+ fkeys record;
+begin
+ for fkeys in
+ select *
+ from londiste.subscriber_pending_fkeys
+ where from_table=i_table_name or to_table=i_table_name
+ order by 1,2,3
+ loop
+ return next fkeys;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_get_queue_valid_pending_fkeys(i_queue_name text)
+returns setof londiste.subscriber_pending_fkeys as $$
+declare
+ fkeys record;
+begin
+ for fkeys in
+ select pf.* from londiste.subscriber_pending_fkeys pf
+ join londiste.subscriber_table st_from
+ on (st_from.table_name = pf.from_table and st_from.merge_state = 'ok' and st_from.snapshot is null)
+ join londiste.subscriber_table st_to
+ on (st_to.table_name = pf.to_table and st_to.merge_state = 'ok' and st_to.snapshot is null)
+ -- change the AND to OR to allow fkeys between tables coming from different queues
+ where (st_from.queue_name = i_queue_name and st_to.queue_name = i_queue_name)
+ order by 1, 2, 3
+ loop
+ return next fkeys;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_drop_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+declare
+ fkey record;
+begin
+ select * into fkey
+ from londiste.find_table_fkeys(i_from_table)
+ where fkey_name = i_fkey_name and from_table = i_from_table;
+
+ if not found then
+ return 0;
+ end if;
+
+ insert into londiste.subscriber_pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
+
+ execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
+ || ' drop constraint ' || quote_ident(i_fkey_name);
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_restore_table_fkey(i_from_table text, i_fkey_name text)
+returns integer as $$
+declare
+ fkey record;
+begin
+ select * into fkey
+ from londiste.subscriber_pending_fkeys
+ where fkey_name = i_fkey_name and from_table = i_from_table;
+
+ if not found then
+ return 0;
+ end if;
+
+ delete from londiste.subscriber_pending_fkeys where fkey_name = fkey.fkey_name;
+
+ execute fkey.fkey_def;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.subscriber_get_seq_list(i_queue_name text)
+returns setof text as $$
+declare
+ rec record;
+begin
+ for rec in
+ select seq_name from londiste.subscriber_seq
+ where queue_name = i_queue_name
+ order by nr
+ loop
+ return next rec.seq_name;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_get_table_list(i_queue_name text)
+returns setof londiste.ret_subscriber_table as $$
+declare
+ rec londiste.ret_subscriber_table%rowtype;
+begin
+ for rec in
+ select table_name, merge_state, snapshot, trigger_name, skip_truncate
+ from londiste.subscriber_table
+ where queue_name = i_queue_name
+ order by nr
+ loop
+ return next rec;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
+-- compat
+create or replace function londiste.get_table_state(i_queue text)
+returns setof londiste.subscriber_table as $$
+declare
+ rec londiste.subscriber_table%rowtype;
+begin
+ for rec in
+ select * from londiste.subscriber_table
+ where queue_name = i_queue
+ order by nr
+ loop
+ return next rec;
+ end loop;
+ return;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_remove_seq(
+ i_queue_name text, i_seq_name text)
+returns integer as $$
+declare
+ link text;
+begin
+ delete from londiste.subscriber_seq
+ where queue_name = i_queue_name
+ and seq_name = i_seq_name;
+ if not found then
+ raise exception 'no such seq?';
+ end if;
+
+ -- update linked queue if needed
+ link := londiste.link_dest(i_queue_name);
+ if link is not null then
+ delete from londiste.provider_seq
+ where queue_name = link
+ and seq_name = i_seq_name;
+ perform londiste.provider_notify_change(link);
+ end if;
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_remove_table(
+ i_queue_name text, i_table text)
+returns integer as $$
+declare
+ link text;
+begin
+ delete from londiste.subscriber_table
+ where queue_name = i_queue_name
+ and table_name = i_table;
+ if not found then
+ raise exception 'no such table';
+ end if;
+
+ -- sync link
+ link := londiste.link_dest(i_queue_name);
+ if link is not null then
+ delete from londiste.provider_table
+ where queue_name = link
+ and table_name = i_table;
+ perform londiste.provider_notify_change(link);
+ end if;
+
+ return 0;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_set_skip_truncate(
+ i_queue text,
+ i_table text,
+ i_value bool)
+returns integer as $$
+begin
+ update londiste.subscriber_table
+ set skip_truncate = i_value
+ where queue_name = i_queue
+ and table_name = i_table;
+ if not found then
+ raise exception 'table not found';
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+
+create or replace function londiste.subscriber_set_table_state(
+ i_queue_name text,
+ i_table_name text,
+ i_snapshot text,
+ i_merge_state text)
+returns integer as $$
+declare
+ link text;
+ ok integer;
+begin
+ update londiste.subscriber_table
+ set snapshot = i_snapshot,
+ merge_state = i_merge_state,
+ -- reset skip_snapshot when table is copied over
+ skip_truncate = case when i_merge_state = 'ok'
+ then null
+ else skip_truncate
+ end
+ where queue_name = i_queue_name
+ and table_name = i_table_name;
+ if not found then
+ raise exception 'no such table';
+ end if;
+
+ -- sync link state also
+ link := londiste.link_dest(i_queue_name);
+ if link then
+ select * from londiste.provider_table
+ where queue_name = linkdst
+ and table_name = i_table_name;
+ if found then
+ if i_merge_state is null or i_merge_state <> 'ok' then
+ delete from londiste.provider_table
+ where queue_name = link
+ and table_name = i_table_name;
+ perform londiste.notify_change(link);
+ end if;
+ else
+ if i_merge_state = 'ok' then
+ insert into londiste.provider_table (queue_name, table_name)
+ values (link, i_table_name);
+ perform londiste.notify_change(link);
+ end if;
+ end if;
+ end if;
+
+ return 1;
+end;
+$$ language plpgsql security definer;
+
+create or replace function londiste.set_table_state(
+ i_queue_name text,
+ i_table_name text,
+ i_snapshot text,
+ i_merge_state text)
+returns integer as $$
+begin
+ return londiste.subscriber_set_table_state(i_queue_name, i_table_name, i_snapshot, i_merge_state);
+end;
+$$ language plpgsql security definer;
+
+
+
+create or replace function londiste.subscriber_get_table_pending_triggers(i_table_name text)
+returns setof londiste.subscriber_pending_triggers as $$
+declare
+ trigger record;
+begin
+ for trigger in
+ select *
+ from londiste.subscriber_pending_triggers
+ where table_name = i_table_name
+ loop
+ return next trigger;
+ end loop;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste.subscriber_drop_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+declare
+ trig_def record;
+begin
+ select * into trig_def
+ from londiste.find_table_triggers(i_table_name)
+ where trigger_name = i_trigger_name;
+
+ if FOUND is not true then
+ return 0;
+ end if;
+
+ insert into londiste.subscriber_pending_triggers(table_name, trigger_name, trigger_def)
+ values (i_table_name, i_trigger_name, trig_def.trigger_def);
+
+ execute 'drop trigger ' || quote_ident(i_trigger_name)
+ || ' on ' || londiste.quote_fqname(i_table_name);
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_drop_all_table_triggers(i_table_name text)
+returns integer as $$
+declare
+ trigger record;
+begin
+ for trigger in
+ select trigger_name as name
+ from londiste.find_table_triggers(i_table_name)
+ loop
+ perform londiste.subscriber_drop_table_trigger(i_table_name, trigger.name);
+ end loop;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_restore_table_trigger(i_table_name text, i_trigger_name text)
+returns integer as $$
+declare
+ trig_def text;
+begin
+ select trigger_def into trig_def
+ from londiste.subscriber_pending_triggers
+ where (table_name, trigger_name) = (i_table_name, i_trigger_name);
+
+ if not found then
+ return 0;
+ end if;
+
+ delete from londiste.subscriber_pending_triggers
+ where table_name = i_table_name and trigger_name = i_trigger_name;
+
+ execute trig_def;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+create or replace function londiste.subscriber_restore_all_table_triggers(i_table_name text)
+returns integer as $$
+declare
+ trigger record;
+begin
+ for trigger in
+ select trigger_name as name
+ from londiste.subscriber_get_table_pending_triggers(i_table_name)
+ loop
+ perform londiste.subscriber_restore_table_trigger(i_table_name, trigger.name);
+ end loop;
+
+ return 1;
+end;
+$$ language plpgsql;
+
+
+
+create or replace function londiste.version()
+returns text as $$
+begin
+ return '2.1.12';
+end;
+$$ language plpgsql;
+
View
2,680 upgrade/final/londiste.upgrade_2.1_to_3.1.sql
@@ -0,0 +1,2680 @@
+
+drop function if exists londiste.find_table_fkeys(text);
+
+
+
+-- ----------------------------------------------------------------------
+-- Section: Londiste internals
+--
+-- Londiste storage: tables/seqs/fkeys/triggers/events.
+--
+-- Londiste event types:
+-- I/U/D - partial SQL event from pgq.sqltriga()
+-- I:/U:/D: <pk> - urlencoded event from pgq.logutriga()
+-- EXECUTE - SQL script execution
+-- TRUNCATE - table truncation
+-- londiste.add-table - global table addition
+-- londiste.remove-table - global table removal
+-- londiste.update-seq - sequence update
+-- londiste.remove-seq - global sequence removal
+--
+-- pgq.sqltriga() event:
+-- ev_type - I/U/D which means insert/update/delete
+-- ev_data - partial SQL
+-- ev_extra1 - table name
+--
+-- Insert: ev_type = "I", ev_data = "(col1, col2) values (2, 'foo')", ev_extra1 = "public.tblname"
+--
+-- Update: ev_type = "U", ev_data = "col2 = null where col1 = 2", ev_extra1 = "public.tblname"
+--
+-- Delete: ev_type = "D", ev_data = "col1 = 2", ev_extra1 = "public.tblname"
+--
+-- pgq.logutriga() event:
+-- ev_type - I:/U:/D: plus comma separated list of pkey columns
+-- ev_data - urlencoded row columns
+-- ev_extra1 - table name
+--
+-- Insert: ev_type = "I:col1", ev_data = ""
+--
+-- Truncate trigger event:
+-- ev_type - TRUNCATE
+-- ev_extra1 - table name
+--
+-- Execute SQL event:
+-- ev_type - EXECUTE
+-- ev_data - SQL script
+-- ev_extra1 - Script ID
+--
+-- Global table addition:
+-- ev_type - londiste.add-table
+-- ev_data - table name
+--
+-- Global table removal:
+-- ev_type - londiste.remove-table
+-- ev_data - table name
+--
+-- Global sequence update:
+-- ev_type - londiste.update-seq
+-- ev_data - seq value
+-- ev_extra1 - seq name
+--5)
+-- Global sequence removal:
+-- ev_type - londiste.remove-seq
+-- ev_data - seq name
+-- ----------------------------------------------------------------------
+
+set default_with_oids = 'off';
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.table_info
+--
+-- Info about registered tables.
+--
+-- Columns:
+-- nr - number for visual ordering
+-- queue_name - Cascaded queue name
+-- table_name - fully-qualified table name
+-- local - Is used locally
+-- merge_state - State for tables
+-- custom_snapshot - remote snapshot for COPY command
+-- dropped_ddl - temp place to store ddl
+-- table_attrs - urlencoded dict of extra attributes
+--
+-- Tables merge states:
+-- NULL - copy has not yet happened
+-- in-copy - ongoing bulk copy
+-- catching-up - copy process applies events that happened during copy
+-- wanna-sync:% - copy process caught up, wants to hand table over to replay
+-- do-sync:% - replay process is ready to accept the table
+-- ok - in sync, replay applies events
+-- ----------------------------------------------------------------------
+create table londiste.table_info (
+ nr serial not null,
+ queue_name text not null,
+ table_name text not null,
+ local boolean not null default false,
+ merge_state text,
+ custom_snapshot text,
+ dropped_ddl text,
+ table_attrs text,
+ dest_table text,
+
+ primary key (queue_name, table_name),
+ foreign key (queue_name)
+ references pgq_node.node_info (queue_name)
+ on delete cascade,
+ check (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'))
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.seq_info
+--
+-- Sequences available on this queue.
+--
+-- Columns:
+-- nr - number for visual ordering
+-- queue_name - cascaded queue name
+-- seq_name - fully-qualified seq name
+-- local - there is actual seq on local node
+-- last_value - last published value from root
+-- ----------------------------------------------------------------------
+create table londiste.seq_info (
+ nr serial not null,
+ queue_name text not null,
+ seq_name text not null,
+ local boolean not null default false,
+ last_value int8 not null,
+
+ primary key (queue_name, seq_name),
+ foreign key (queue_name)
+ references pgq_node.node_info (queue_name)
+ on delete cascade
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.applied_execute
+--
+-- Info about EXECUTE commands that are ran.
+--
+-- Columns:
+-- queue_name - cascaded queue name
+-- execute_file - filename / unique id
+-- execute_time - the time execute happened
+-- execute_sql - contains SQL for EXECUTE event (informative)
+-- ----------------------------------------------------------------------
+create table londiste.applied_execute (
+ queue_name text not null,
+ execute_file text not null,
+ execute_time timestamptz not null default now(),
+ execute_sql text not null,
+ execute_attrs text,
+ primary key (execute_file)
+);
+
+
+-- ----------------------------------------------------------------------
+-- Table: londiste.pending_fkeys
+--
+-- Details on dropped fkeys. Global, not specific to any set.
+--
+-- Columns:
+-- from_table - fully-qualified table name
+-- to_table - fully-qualified table name
+-- fkey_name - name of constraint
+-- fkey_def - full fkey definition
+-- ----------------------------------------------------------------------
+create table londiste.pending_fkeys (
+ from_table text not null,
+ to_table text not null,
+ fkey_name text not null,
+ fkey_def text not null,
+
+ primary key (from_table, fkey_name)
+);
+
+
+
+
+-- Section: Londiste functions
+
+-- upgrade schema
+
+
+create or replace function londiste.upgrade_schema()
+returns int4 as $$
+-- updates table structure if necessary
+declare
+ cnt int4 = 0;
+begin
+
+ -- table_info: check (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'))
+ perform 1 from information_schema.check_constraints
+ where constraint_schema = 'londiste'
+ and constraint_name = 'table_info_check'
+ and position('in-copy' in check_clause) > 0
+ and position('catching' in check_clause) = 0;
+ if found then
+ alter table londiste.table_info drop constraint table_info_check;
+ alter table londiste.table_info add constraint table_info_check
+ check (dropped_ddl is null or merge_state in ('in-copy', 'catching-up'));
+ cnt := cnt + 1;
+ end if;
+
+ -- table_info.dest_table
+ perform 1 from information_schema.columns
+ where table_schema = 'londiste'
+ and table_name = 'table_info'
+ and column_name = 'dest_table';
+ if not found then
+ alter table londiste.table_info add column dest_table text;
+ end if;
+
+ -- applied_execute.dest_table
+ perform 1 from information_schema.columns
+ where table_schema = 'londiste'
+ and table_name = 'applied_execute'
+ and column_name = 'execute_attrs';
+ if not found then
+ alter table londiste.applied_execute add column execute_attrs text;
+ end if;
+
+ -- applied_execute: drop queue_name from primary key
+ perform 1 from pg_catalog.pg_indexes
+ where schemaname = 'londiste'
+ and tablename = 'applied_execute'
+ and indexname = 'applied_execute_pkey'
+ and indexdef like '%queue_name%';
+ if found then
+ alter table londiste.applied_execute
+ drop constraint applied_execute_pkey;
+ alter table londiste.applied_execute
+ add constraint applied_execute_pkey
+ primary key (execute_file);
+ end if;
+
+ -- applied_execute: drop fkey to pgq_node
+ perform 1 from information_schema.table_constraints
+ where constraint_schema = 'londiste'
+ and table_schema = 'londiste'
+ and table_name = 'applied_execute'
+ and constraint_type = 'FOREIGN KEY'
+ and constraint_name = 'applied_execute_queue_name_fkey';
+ if found then
+ alter table londiste.applied_execute
+ drop constraint applied_execute_queue_name_fkey;
+ end if;
+
+ -- create roles
+ perform 1 from pg_catalog.pg_roles where rolname = 'londiste_writer';
+ if not found then
+ create role londiste_writer in role pgq_admin;
+ cnt := cnt + 1;
+ end if;
+ perform 1 from pg_catalog.pg_roles where rolname = 'londiste_reader';
+ if not found then
+ create role londiste_reader in role pgq_reader;
+ cnt := cnt + 1;
+ end if;
+
+ return cnt;
+end;
+$$ language plpgsql;
+
+
+select londiste.upgrade_schema();
+
+-- Group: Information
+
+
+create or replace function londiste.get_seq_list(
+ in i_queue_name text,
+ out seq_name text,
+ out last_value int8,
+ out local boolean)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_seq_list(1)
+--
+-- Returns registered seqs on this Londiste node.
+--
+-- Result fiels:
+-- seq_name - fully qualified name of sequence
+-- last_value - last globally published value
+-- local - is locally registered
+-- ----------------------------------------------------------------------
+declare
+ rec record;
+begin
+ for seq_name, last_value, local in
+ select s.seq_name, s.last_value, s.local from londiste.seq_info s
+ where s.queue_name = i_queue_name
+ order by s.nr, s.seq_name
+ loop
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict;
+
+
+
+
+drop function if exists londiste.get_table_list(text);
+
+create or replace function londiste.get_table_list(
+ in i_queue_name text,
+ out table_name text,
+ out local boolean,
+ out merge_state text,
+ out custom_snapshot text,
+ out table_attrs text,
+ out dropped_ddl text,
+ out copy_role text,
+ out copy_pos int4,
+ out dest_table text)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.get_table_list(1)
+--
+-- Return info about registered tables.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+--
+-- Returns:
+-- table_name - fully-quelified table name
+-- local - does events needs to be applied to local table
+-- merge_state - show phase of initial copy
+-- custom_snapshot - remote snapshot of COPY transaction
+-- table_attrs - urlencoded dict of table attributes
+-- dropped_ddl - partition combining: temp place to put DDL
+-- copy_role - partition combining: how to handle copy
+-- copy_pos - position in parallel copy working order
+--
+-- copy_role = lead:
+-- on copy start, drop indexes and store in dropped_ddl
+-- on copy finish change state to catching-up, then wait until copy_role turns to NULL
+-- catching-up: if dropped_ddl not NULL, restore them
+-- copy_role = wait-copy:
+-- on copy start wait, until role changes (to wait-replay)
+-- copy_role = wait-replay:
+-- on copy finish, tag as 'catching-up'
+-- wait until copy_role is NULL, then proceed
+-- ----------------------------------------------------------------------
+begin
+ for table_name, local, merge_state, custom_snapshot, table_attrs,
+ dropped_ddl, dest_table
+ in
+ select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs,
+ t.dropped_ddl, t.dest_table
+ from londiste.table_info t
+ where t.queue_name = i_queue_name
+ order by t.nr, t.table_name
+ loop
+ copy_role := null;
+ copy_pos := 0;
+
+ if merge_state in ('in-copy', 'catching-up') then
+ select f.copy_role, f.copy_pos
+ from londiste._coordinate_copy(i_queue_name, table_name) f
+ into copy_role, copy_pos;
+ end if;
+
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+create or replace function londiste._coordinate_copy(
+ in i_queue_name text, in i_table_name text,
+ out copy_role text, out copy_pos int4)
+as $$
+-- if the table is in middle of copy from multiple partitions,
+-- the copy processes need coordination.
+declare
+ q_part1 text;
+ q_part_ddl text;
+ n_parts int4;
+ n_done int4;
+ _table_name text;
+ n_combined_queue text;
+ merge_state text;
+ dest_table text;
+ dropped_ddl text;
+begin
+ copy_pos := 0;
+ copy_role := null;
+
+ select t.merge_state, t.dest_table, t.dropped_ddl,
+ min(case when t2.local then t2.queue_name else null end) as _queue1,
+ min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl,
+ count(case when t2.local then t2.table_name else null end) as _total,
+ count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
+ min(n.combined_queue) as _combined_queue,
+ count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
+ from londiste.table_info t
+ join pgq_node.node_info n on (n.queue_name = t.queue_name)
+ left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
+ (n2.combined_queue is null and n.combined_queue is null))
+ left join londiste.table_info t2 on
+ (coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name) and
+ t2.queue_name = n2.queue_name and
+ (t2.merge_state is null or t2.merge_state != 'ok'))
+ where t.queue_name = i_queue_name and t.table_name = i_table_name
+ group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, t.dest_table
+ into merge_state, dest_table, dropped_ddl, q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos;
+
+ -- q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos, dest_table
+
+ -- be more robust against late joiners
+ q_part1 := coalesce(q_part_ddl, q_part1);
+
+ -- turn the logic off if no merge is happening
+ if n_parts = 1 then
+ q_part1 := null;
+ end if;
+
+ if q_part1 is not null then
+ if i_queue_name = q_part1 then
+ -- lead
+ if merge_state = 'in-copy' then
+ if dropped_ddl is null and n_done > 0 then
+ -- seems late addition, let it copy with indexes
+ copy_role := 'wait-replay';
+ elsif n_done < n_parts then
+ -- show copy_role only if need to drop ddl or already did drop ddl
+ copy_role := 'lead';
+ end if;
+
+ -- make sure it cannot be made to wait
+ copy_pos := 0;
+ end if;
+ if merge_state = 'catching-up' and dropped_ddl is not null then
+ -- show copy_role only if need to wait for others
+ if n_done < n_parts then
+ copy_role := 'wait-replay';
+ end if;
+ end if;
+ else
+ -- follow
+ if merge_state = 'in-copy' then
+ if q_part_ddl is not null then
+ -- can copy, wait in replay until lead has applied ddl
+ copy_role := 'wait-replay';
+ elsif n_done > 0 then
+ -- ddl is not dropped, others are active, copy without touching ddl
+ copy_role := 'wait-replay';
+ else
+ -- wait for lead to drop ddl
+ copy_role := 'wait-copy';
+ end if;
+ elsif merge_state = 'catching-up' then
+ -- show copy_role only if need to wait for lead
+ if q_part_ddl is not null then
+ copy_role := 'wait-replay';
+ end if;
+ end if;
+ end if;
+ end if;
+
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+
+
+create or replace function londiste.local_show_missing(
+ in i_queue_name text,
+ out obj_kind text, out obj_name text)
+returns setof record as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_show_missing(1)
+--
+-- Return info about missing tables. On root show tables
+-- not registered on set, on branch/leaf show tables
+-- in set but not registered locally.
+-- ----------------------------------------------------------------------
+begin
+ if pgq_node.is_root_node(i_queue_name) then
+ for obj_kind, obj_name in
+ select r.relkind, n.nspname || '.' || r.relname
+ from pg_catalog.pg_class r, pg_catalog.pg_namespace n
+ where n.oid = r.relnamespace
+ and r.relkind in ('r', 'S')
+ and n.nspname not in ('pgq', 'pgq_ext', 'pgq_node', 'londiste', 'pg_catalog', 'information_schema')
+ and n.nspname !~ '^pg_(toast|temp)'
+ and not exists (select 1 from londiste.table_info
+ where queue_name = i_queue_name and local
+ and coalesce(dest_table, table_name) = (n.nspname || '.' || r.relname))
+ order by 1, 2
+ loop
+ return next;
+ end loop;
+ else
+ for obj_kind, obj_name in
+ select 'S', s.seq_name from londiste.seq_info s
+ where s.queue_name = i_queue_name
+ and not s.local
+ union all
+ select 'r', t.table_name from londiste.table_info t
+ where t.queue_name = i_queue_name
+ and not t.local
+ order by 1, 2
+ loop
+ return next;
+ end loop;
+ end if;
+ return;
+end;
+$$ language plpgsql strict stable;
+
+
+
+-- Group: Local object registration (setup tool)
+
+
+create or replace function londiste.local_add_seq(
+ in i_queue_name text, in i_seq_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_seq(2)
+--
+-- Register sequence.
+--
+-- Parameters:
+-- i_queue_name - cascaded queue name
+-- i_seq_name - seq name
+--
+-- Returns:
+-- 200 - OK
+-- 400 - Not found
+-- ----------------------------------------------------------------------
+declare
+ fq_seq_name text;
+ lastval int8;
+ seq record;
+begin
+ fq_seq_name := londiste.make_fqname(i_seq_name);
+
+ perform 1 from pg_class
+ where oid = londiste.find_seq_oid(fq_seq_name);
+ if not found then
+ select 400, 'Sequence not found: ' || fq_seq_name into ret_code, ret_note;
+ return;
+ end if;
+
+ if pgq_node.is_root_node(i_queue_name) then
+ select local, last_value into seq
+ from londiste.seq_info
+ where queue_name = i_queue_name
+ and seq_name = fq_seq_name
+ for update;
+ if found and seq.local then
+ select 201, 'Sequence already added: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ if not seq.local then
+ update londiste.seq_info set local = true
+ where queue_name = i_queue_name and seq_name = fq_seq_name;
+ else
+ insert into londiste.seq_info (queue_name, seq_name, local, last_value)
+ values (i_queue_name, fq_seq_name, true, 0);
+ end if;
+ perform * from londiste.root_check_seqs(i_queue_name);
+ else
+ select local, last_value into seq
+ from londiste.seq_info
+ where queue_name = i_queue_name
+ and seq_name = fq_seq_name
+ for update;
+ if not found then
+ select 404, 'Unknown sequence: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ if seq.local then
+ select 201, 'Sequence already added: ' || fq_seq_name
+ into ret_code, ret_note;
+ return;
+ end if;
+ update londiste.seq_info set local = true
+ where queue_name = i_queue_name and seq_name = fq_seq_name;
+ perform pgq.seq_setval(fq_seq_name, seq.last_value);
+ end if;
+
+ select 200, 'Sequence added: ' || fq_seq_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql;
+
+
+
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ in i_trg_args text[],
+ in i_table_attrs text,
+ in i_dest_table text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(5)
+--
+-- Register table on Londiste node, with customizable trigger args.
+--
+-- Parameters:
+-- i_queue_name - queue name
+-- i_table_name - table name
+-- i_trg_args - args to trigger, or magic parameters.
+-- i_table_attrs - args to python handler
+-- i_dest_table - actual name of destination table (NULL if same)
+--
+-- Trigger args:
+-- See documentation for pgq triggers.
+--
+-- Magic parameters:
+-- no_triggers - skip trigger creation
+-- skip_truncate - set 'skip_truncate' table attribute
+-- expect_sync - set table state to 'ok'
+-- tgflags=X - trigger creation flags
+-- merge_all - merge table from all sources. required for
+-- multi-source table
+-- no_merge - do not merge tables from different sources
+-- skip - create skip trigger. same as S flag
+-- virtual_table - skips structure check and trigger creation
+--
+-- Trigger creation flags (default: AIUDL):
+-- I - ON INSERT
+-- U - ON UPDATE
+-- D - ON DELETE
+-- Q - use pgq.sqltriga() as trigger function
+-- L - use pgq.logutriga() as trigger function
+-- B - BEFORE
+-- A - AFTER
+-- S - SKIP
+--
+-- Example:
+-- > londiste.local_add_table('q', 'tbl', array['tgflags=BI', 'SKIP', 'pkey=col1,col2'])
+--
+-- Returns:
+-- 200 - Ok
+-- 301 - Warning, trigger exists that will fire before londiste one
+-- 400 - No such set
+------------------------------------------------------------------------
+declare
+ col_types text;
+ fq_table_name text;
+ new_state text;
+ trunctrg_name text;
+ pgversion int;
+ logtrg_previous text;
+ lg_name text;
+ lg_func text;
+ lg_pos text;
+ lg_event text;
+ lg_args text;
+ _extra_args text;
+ tbl record;
+ i integer;
+ j integer;
+ sql text;
+ arg text;
+ _node record;
+ _tbloid oid;
+ _combined_queue text;
+ _combined_table text;
+ -- skip trigger
+ _skip_prefix text := 'zzz_';
+ _skip_trg_count integer;
+ _skip_trg_name text;
+ -- check local tables from all sources
+ _queue_name text;
+ _local boolean;
+ -- array with all tgflags values
+ _check_flags char[] := array['B','A','Q','L','I','U','D','S'];
+ -- given tgflags array
+ _tgflags char[];
+ -- ordinary argument array
+ _args text[];
+ -- argument flags
+ _expect_sync boolean := false;
+ _merge_all boolean := false;
+ _no_merge boolean := false;
+ _skip_truncate boolean := false;
+ _no_triggers boolean := false;
+ _skip boolean := false;
+ _virtual_table boolean := false;
+ _dest_table text;
+ _got_extra1 boolean := false;
+ _table_name2 text;
+ _desc text;
+begin
+
+ -------- i_trg_args ARGUMENTS PARSING
+
+ if array_lower(i_trg_args, 1) is not null then
+ for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
+ arg := i_trg_args[i];
+ if arg like 'tgflags=%' then
+ -- special flag handling
+ arg := upper(substr(arg, 9));
+ for j in array_lower(_check_flags, 1) .. array_upper(_check_flags, 1) loop
+ if position(_check_flags[j] in arg) > 0 then
+ _tgflags := array_append(_tgflags, _check_flags[j]);
+ end if;
+ end loop;
+ elsif arg = 'expect_sync' then
+ _expect_sync := true;
+ elsif arg = 'skip_truncate' then
+ _skip_truncate := true;
+ elsif arg = 'no_triggers' then
+ _no_triggers := true;
+ elsif arg = 'merge_all' then
+ _merge_all = true;
+ elsif arg = 'no_merge' then
+ _no_merge = true;
+ elsif lower(arg) = 'skip' then
+ _skip := true;
+ elsif arg = 'virtual_table' then
+ _virtual_table := true;
+ _expect_sync := true; -- do not copy
+ _no_triggers := true; -- do not create triggers
+ else
+ if arg like 'ev_extra1=%' then
+ _got_extra1 := true;
+ end if;
+ -- ordinary arg
+ _args = array_append(_args, quote_literal(arg));
+ end if;
+ end loop;
+ end if;
+
+ if _merge_all and _no_merge then
+ select 405, 'Cannot use merge-all and no-merge together'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ fq_table_name := londiste.make_fqname(i_table_name);
+ _dest_table := londiste.make_fqname(coalesce(i_dest_table, i_table_name));
+
+ if _dest_table <> fq_table_name and not _got_extra1 then
+ -- if renamed table, enforce trigger to put
+ -- global table name into extra1
+ arg := 'ev_extra1=' || quote_literal(fq_table_name);
+ _args := array_append(_args, quote_literal(arg));
+ end if;
+
+ if _dest_table = fq_table_name then
+ _desc := fq_table_name;
+ else
+ _desc := fq_table_name || '(' || _dest_table || ')';
+ end if;
+
+ -------- TABLE STRUCTURE CHECK
+
+ if not _virtual_table then
+ _tbloid := londiste.find_table_oid(_dest_table);
+ if _tbloid is null then
+ select 404, 'Table does not exist: ' || _desc into ret_code, ret_note;
+ return;
+ end if;
+ col_types := londiste.find_column_types(_dest_table);
+ if position('k' in col_types) < 1 then
+ -- allow missing primary key in case of combined table where
+ -- pkey was removed by londiste
+ perform 1 from londiste.table_info t,
+ pgq_node.node_info n_this,
+ pgq_node.node_info n_other
+ where n_this.queue_name = i_queue_name
+ and n_other.combined_queue = n_this.combined_queue
+ and n_other.queue_name <> n_this.queue_name
+ and t.queue_name = n_other.queue_name
+ and coalesce(t.dest_table, t.table_name) = _dest_table
+ and t.dropped_ddl is not null;
+ if not found then
+ select 400, 'Primary key missing on table: ' || _desc into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+ end if;
+
+ -------- TABLE REGISTRATION LOGIC
+
+ select * from pgq_node.get_node_info(i_queue_name) into _node;
+ if not found or _node.ret_code >= 400 then
+ select 400, 'No such set: ' || i_queue_name into ret_code, ret_note;
+ return;
+ end if;
+
+ select merge_state, local into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ if not found then
+ -- add to set on root
+ if _node.node_type = 'root' then
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.global_add_table(i_queue_name, i_table_name) f;
+ if ret_code <> 200 then
+ return;
+ end if;
+ else
+ select 404, 'Table not available on queue: ' || _desc
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- reload info
+ select merge_state, local into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ end if;
+
+ if tbl.local then
+ select 200, 'Table already added: ' || _desc into ret_code, ret_note;
+ return;
+ end if;
+
+ if _node.node_type = 'root' then
+ new_state := 'ok';
+ perform londiste.root_notify_change(i_queue_name, 'londiste.add-table', fq_table_name);
+ elsif _node.node_type = 'leaf' and _node.combined_type = 'branch' then
+ new_state := 'ok';
+ elsif _expect_sync then
+ new_state := 'ok';
+ else
+ new_state := NULL;
+ end if;
+
+ update londiste.table_info
+ set local = true,
+ merge_state = new_state,
+ table_attrs = coalesce(i_table_attrs, table_attrs),
+ dest_table = nullif(_dest_table, fq_table_name)
+ where queue_name = i_queue_name and table_name = fq_table_name;
+ if not found then
+ raise exception 'lost table: %', fq_table_name;
+ end if;
+
+ -- merge all table sources on leaf
+ if _node.node_type = 'leaf' and not _no_merge then
+ for _queue_name, _table_name2, _local in
+ select t2.queue_name, t2.table_name, t2.local
+ from londiste.table_info t
+ join pgq_node.node_info n on (n.queue_name = t.queue_name)
+ left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
+ (n2.combined_queue is null and n.combined_queue is null))
+ left join londiste.table_info t2
+ on (t2.queue_name = n2.queue_name and
+ coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name))
+ where t.queue_name = i_queue_name
+ and t.table_name = fq_table_name
+ and t2.queue_name != i_queue_name -- skip self
+ loop
+ -- if table from some other source is already marked as local,
+ -- raise error
+ if _local and coalesce(new_state, 'x') <> 'ok' then
+ select 405, 'Found local table '|| _desc
+ || ' in queue ' || _queue_name
+ || ', use remove-table first to remove all previous '
+ || 'table subscriptions'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- when table comes from multiple sources, merge_all switch is
+ -- required
+ if not _merge_all and coalesce(new_state, 'x') <> 'ok' then
+ select 405, 'Found multiple sources for table '|| _desc
+ || ', use merge-all or no-merge to continue'
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ update londiste.table_info
+ set local = true,
+ merge_state = new_state,
+ table_attrs = coalesce(i_table_attrs, table_attrs)
+ where queue_name = _queue_name and table_name = _table_name2;
+ if not found then
+ raise exception 'lost table: % on queue %', _table_name2, _queue_name;
+ end if;
+ end loop;
+
+ -- if this node has combined_queue, add table there too
+ -- note: we need to keep both table_name/dest_table values
+ select n2.queue_name, t.table_name
+ from pgq_node.node_info n1
+ join pgq_node.node_info n2
+ on (n2.queue_name = n1.combined_queue)
+ left join londiste.table_info t
+ on (t.queue_name = n2.queue_name and t.table_name = fq_table_name and t.local)
+ where n1.queue_name = i_queue_name and n2.node_type = 'root'
+ into _combined_queue, _combined_table;
+ if found and _combined_table is null then
+ select f.ret_code, f.ret_note
+ from londiste.local_add_table(_combined_queue, fq_table_name, i_trg_args, i_table_attrs, _dest_table) f
+ into ret_code, ret_note;
+ if ret_code >= 300 then
+ return;
+ end if;
+ end if;
+ end if;
+
+ if _skip_truncate then
+ perform 1
+ from londiste.local_set_table_attrs(i_queue_name, fq_table_name,
+ coalesce(i_table_attrs || '&skip_truncate=1', 'skip_truncate=1'));
+ end if;
+
+ -------- TRIGGER LOGIC
+
+ -- new trigger
+ _extra_args := '';
+ lg_name := '_londiste_' || i_queue_name;
+ lg_func := 'pgq.logutriga';
+ lg_event := '';
+ lg_args := quote_literal(i_queue_name);
+ lg_pos := 'after';
+
+ if array_lower(_args, 1) is not null then
+ lg_args := lg_args || ', ' || array_to_string(_args, ', ');
+ end if;
+
+ if 'B' = any(_tgflags) then
+ lg_pos := 'before';
+ end if;
+ if 'A' = any(_tgflags) then
+ lg_pos := 'after';
+ end if;
+ if 'Q' = any(_tgflags) then
+ lg_func := 'pgq.sqltriga';
+ end if;
+ if 'L' = any(_tgflags) then
+ lg_func := 'pgq.logutriga';
+ end if;
+ if 'I' = any(_tgflags) then
+ lg_event := lg_event || ' or insert';
+ end if;
+ if 'U' = any(_tgflags) then
+ lg_event := lg_event || ' or update';
+ end if;
+ if 'D' = any(_tgflags) then
+ lg_event := lg_event || ' or delete';
+ end if;
+ if 'S' = any(_tgflags) then
+ _skip := true;
+ end if;
+
+ if _node.node_type = 'leaf' then
+ -- on weird leafs the trigger funcs may not exist
+ perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
+ where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
+ if not found then
+ select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note;
+ return;
+ end if;
+ -- on regular leaf, install deny trigger
+ _extra_args := ', ' || quote_literal('deny');
+ end if;
+
+ -- if skip param given, rename previous skip triggers and prefix current
+ if _skip then
+ -- get count and name of existing skip triggers
+ select count(*), min(t.tgname)
+ into _skip_trg_count, _skip_trg_name
+ from pg_catalog.pg_trigger t
+ where t.tgrelid = londiste.find_table_oid(_dest_table)
+ and position(E'\\000skip\\000' in lower(tgargs::text)) > 0;
+ -- if no previous skip triggers, prefix name and add SKIP to args
+ if _skip_trg_count = 0 then
+ lg_name := _skip_prefix || lg_name;
+ lg_args := lg_args || ', ' || quote_literal('SKIP');
+ -- if one previous skip trigger, check it's prefix and
+ -- do not use SKIP on current trigger
+ elsif _skip_trg_count = 1 then
+ -- if not prefixed then rename
+ if position(_skip_prefix in _skip_trg_name) != 1 then
+ sql := 'alter trigger ' || _skip_trg_name
+ || ' on ' || londiste.quote_fqname(_dest_table)
+ || ' rename to ' || _skip_prefix || _skip_trg_name;
+ execute sql;
+ end if;
+ else
+ select 405, 'Multiple SKIP triggers in table: ' || _desc
+ into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+
+ -- create Ins/Upd/Del trigger if it does not exists already
+ perform 1 from pg_catalog.pg_trigger
+ where tgrelid = londiste.find_table_oid(_dest_table)
+ and tgname = lg_name;
+ if not found then
+
+ if _no_triggers then
+ select 200, 'Table added with no triggers: ' || _desc
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ -- finalize event
+ lg_event := substr(lg_event, 4);
+ if lg_event = '' then
+ lg_event := 'insert or update or delete';
+ end if;
+
+ -- create trigger
+ sql := 'create trigger ' || quote_ident(lg_name)
+ || ' ' || lg_pos || ' ' || lg_event
+ || ' on ' || londiste.quote_fqname(_dest_table)
+ || ' for each row execute procedure '
+ || lg_func || '(' || lg_args || _extra_args || ')';
+ execute sql;
+ end if;
+
+ -- create truncate trigger if it does not exists already
+ show server_version_num into pgversion;
+ if pgversion >= 80400 then
+ trunctrg_name := '_londiste_' || i_queue_name || '_truncate';
+ perform 1 from pg_catalog.pg_trigger
+ where tgrelid = londiste.find_table_oid(_dest_table)
+ and tgname = trunctrg_name;
+ if not found then
+ sql := 'create trigger ' || quote_ident(trunctrg_name)
+ || ' after truncate on ' || londiste.quote_fqname(_dest_table)
+ || ' for each statement execute procedure pgq.sqltriga(' || quote_literal(i_queue_name)
+ || _extra_args || ')';
+ execute sql;
+ end if;
+ end if;
+
+ -- Check that no trigger exists on the target table that will get fired
+ -- before londiste one (this could have londiste replicate data
+ -- out-of-order
+ --
+ -- Don't report all the trigger names, 8.3 does not have array_accum
+ -- available
+
+ if pgversion >= 90000 then
+ select tg.tgname into logtrg_previous
+ from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
+ where r.oid = londiste.find_table_oid(_dest_table)
+ and not tg.tgisinternal
+ and tg.tgname < lg_name::name
+ -- per-row AFTER trigger
+ and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
+ -- current londiste
+ and not londiste.is_replica_func(tg.tgfoid)
+ -- old londiste
+ and substring(tg.tgname from 1 for 10) != '_londiste_'
+ and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
+ order by 1 limit 1;
+ else
+ select tg.tgname into logtrg_previous
+ from pg_class r join pg_trigger tg on (tg.tgrelid = r.oid)
+ where r.oid = londiste.find_table_oid(_dest_table)
+ and not tg.tgisconstraint
+ and tg.tgname < lg_name::name
+ -- per-row AFTER trigger
+ and (tg.tgtype & 3) = 1 -- bits: 0:ROW, 1:BEFORE
+ -- current londiste
+ and not londiste.is_replica_func(tg.tgfoid)
+ -- old londiste
+ and substring(tg.tgname from 1 for 10) != '_londiste_'
+ and substring(tg.tgname from char_length(tg.tgname) - 6) != '_logger'
+ order by 1 limit 1;
+ end if;
+
+ if logtrg_previous is not null then
+ select 301,
+ 'Table added: ' || _desc
+ || ', but londiste trigger is not first: '
+ || logtrg_previous
+ into ret_code, ret_note;
+ return;
+ end if;
+
+ select 200, 'Table added: ' || _desc into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ in i_trg_args text[],
+ in i_table_attrs text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(4)
+--
+-- Register table on Londiste node.
+-- ----------------------------------------------------------------------
+begin
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.local_add_table(i_queue_name, i_table_name, i_trg_args, i_table_attrs, null) f;
+ return;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ in i_trg_args text[],
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(3)
+--
+-- Register table on Londiste node.
+-- ----------------------------------------------------------------------
+begin
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.local_add_table(i_queue_name, i_table_name, i_trg_args, null) f;
+ return;
+end;
+$$ language plpgsql;
+
+create or replace function londiste.local_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_add_table(2)
+--
+-- Register table on Londiste node.
+-- ----------------------------------------------------------------------
+begin
+ select f.ret_code, f.ret_note into ret_code, ret_note
+ from londiste.local_add_table(i_queue_name, i_table_name, null) f;
+ return;
+end;
+$$ language plpgsql strict;
+
+
+
+
+
+create or replace function londiste.local_remove_seq(
+ in i_queue_name text, in i_seq_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_seq(2)
+--
+-- Remove sequence.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_seq_name - sequence name
+--
+-- Returns:
+-- 200 - OK
+-- 404 - Sequence not found
+-- ----------------------------------------------------------------------
+declare
+ fqname text;
+begin
+ fqname := londiste.make_fqname(i_seq_name);
+ if pgq_node.is_root_node(i_queue_name) then
+ select f.ret_code, f.ret_note
+ into ret_code, ret_note
+ from londiste.global_remove_seq(i_queue_name, fqname) f;
+ return;
+ end if;
+ update londiste.seq_info
+ set local = false
+ where queue_name = i_queue_name
+ and seq_name = fqname
+ and local;
+ if not found then
+ select 404, 'Sequence not found: '||fqname into ret_code, ret_note;
+ return;
+ end if;
+
+ select 200, 'Sequence removed: '||fqname into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
+
+
+
+create or replace function londiste.local_remove_table(
+ in i_queue_name text, in i_table_name text,
+ out ret_code int4, out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.local_remove_table(2)
+--
+-- Remove table.
+--
+-- Parameters:
+-- i_queue_name - set name
+-- i_table_name - table name
+--
+-- Returns:
+-- 200 - OK
+-- 404 - Table not found
+-- ----------------------------------------------------------------------
+declare
+ fq_table_name text;
+ qtbl text;
+ seqname text;
+ tbl record;
+ tbl_oid oid;
+ pgver integer;
+begin
+ fq_table_name := londiste.make_fqname(i_table_name);
+ qtbl := londiste.quote_fqname(fq_table_name);
+ tbl_oid := londiste.find_table_oid(i_table_name);
+ show server_version_num into pgver;
+
+ select local, dropped_ddl, merge_state into tbl
+ from londiste.table_info
+ where queue_name = i_queue_name
+ and table_name = fq_table_name
+ for update;
+ if not found then
+ select 400, 'Table not found: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+
+ if tbl.local then
+ perform londiste.drop_table_triggers(i_queue_name, fq_table_name);
+
+ -- restore dropped ddl
+ if tbl.dropped_ddl is not null then
+ -- table is not synced, drop data to make restore faster
+ if pgver >= 80400 then
+ execute 'TRUNCATE ONLY ' || qtbl;
+ else
+ execute 'TRUNCATE ' || qtbl;
+ end if;
+ execute tbl.dropped_ddl;
+ end if;
+
+ -- reset data
+ update londiste.table_info
+ set local = false,
+ custom_snapshot = null,
+ table_attrs = null,
+ dropped_ddl = null,
+ merge_state = null,
+ dest_table = null
+ where queue_name = i_queue_name
+ and table_name = fq_table_name;
+
+ -- drop dependent sequence
+ for seqname in
+ select n.nspname || '.' || s.relname
+ from pg_catalog.pg_class s,
+ pg_catalog.pg_namespace n,
+ pg_catalog.pg_attribute a
+ where a.attrelid = tbl_oid
+ and a.atthasdef
+ and a.atttypid::regtype::text in ('integer', 'bigint')
+ and s.oid = pg_get_serial_sequence(qtbl, a.attname)::regclass::oid
+ and n.oid = s.relnamespace
+ loop
+ perform londiste.local_remove_seq(i_queue_name, seqname);
+ end loop;
+ else
+ if not pgq_node.is_root_node(i_queue_name) then
+ select 400, 'Table not registered locally: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+ end if;
+
+ if pgq_node.is_root_node(i_queue_name) then
+ perform londiste.global_remove_table(i_queue_name, fq_table_name);
+ perform londiste.root_notify_change(i_queue_name, 'londiste.remove-table', fq_table_name);
+ end if;
+
+ select 200, 'Table removed: ' || fq_table_name into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
+
+
+-- Group: Global object registrations (internal)
+
+
+create or replace function londiste.global_add_table(
+ in i_queue_name text,
+ in i_table_name text,
+ out ret_code int4,
+ out ret_note text)
+as $$
+-- ----------------------------------------------------------------------
+-- Function: londiste.global_add_table(2)
+--
+-- Register table on Londiste set.
+--
+-- This means its available from root, events for it appear
+-- in queue and nodes can attach to it.
+--
+-- Called by:
+-- on root - londiste.local_add_table()
+-- elsewhere - londiste consumer when receives new table event
+--
+-- Returns:
+-- 200 - Ok
+-- 400 - No such set
+-- ----------------------------------------------------------------------
+declare
+ fq_table_name text;
+ _cqueue text;
+begin
+ fq_table_name := londiste.make_fqname(i_table_name);
+
+ select combined_queue into _cqueue
+ from pgq_node.node_info
+ where queue_name = i_queue_name
+ for update;
+ if not found then
+ select 400, 'No such queue: ' || i_queue_name into ret_code, ret_note;
+ return;
+ end if;
+
+ perform 1 from londiste.table_info where queue_name = i_queue_name and table_name = fq_table_name;
+ if found then
+ select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
+ return;
+ end if;
+
+ insert into londiste.table_info (queue_name, table_name)
+ values (i_queue_name, fq_table_name);
+ select 200, 'Table added: ' || i_table_name
+ into ret_code, ret_note;
+
+ -- let the combined node know about it too
+ if _cqueue is not null then
+ perform londiste.global_add_table(_cqueue, i_table_name);
+ end if;
+
+ return;
+exception
+ -- seems the row was added from parallel connection (setup vs. replay)
+ when unique_violation then
+ select 200, 'Table already added: ' || i_table_name
+ into ret_code, ret_note;
+ return;
+end;
+$$ language plpgsql strict;
+
+
+
+