Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondar
excluded_schema conflict_stat \
toasted replication_set exception_row_capture matview bidirectional primary_key \
interfaces foreign_key copy sequence triggers parallel functions row_filter \
row_filter_sampling att_list column_filter apply_delay \
row_filter_sampling att_list column_filter apply_delay alter_options \
extended node_origin_cascade multiple_upstreams tuple_origin autoddl \
sync_event sync_table generated_columns spill_transaction read_only drop

Expand Down
8 changes: 8 additions & 0 deletions sql/spock--5.0.6--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,11 @@ BEGIN
CALL spock.wait_for_sync_event(result, origin_id, lsn, timeout, wait_if_disabled);
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION spock.sub_alter_options(
subscription_name name,
options jsonb
)
RETURNS boolean
AS 'MODULE_PATHNAME', 'spock_alter_subscription_options'
LANGUAGE C STRICT VOLATILE;
8 changes: 8 additions & 0 deletions sql/spock--6.0.0-devel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_su
CREATE FUNCTION spock.sub_alter_skiplsn(subscription_name name, lsn pg_lsn)
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_skip_lsn';

CREATE FUNCTION spock.sub_alter_options(
subscription_name name,
options jsonb
)
RETURNS boolean
AS 'MODULE_PATHNAME', 'spock_alter_subscription_options'
LANGUAGE C STRICT VOLATILE;

CREATE FUNCTION spock.sub_show_status(
subscription_name name DEFAULT NULL,
OUT subscription_name text,
Expand Down
122 changes: 122 additions & 0 deletions src/spock_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/json.h"
#include "utils/jsonb.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 160000
#include "utils/guc_hooks.h"
Expand Down Expand Up @@ -122,6 +123,7 @@ PG_FUNCTION_INFO_V1(spock_alter_subscription_enable);
PG_FUNCTION_INFO_V1(spock_alter_subscription_add_replication_set);
PG_FUNCTION_INFO_V1(spock_alter_subscription_remove_replication_set);
PG_FUNCTION_INFO_V1(spock_alter_subscription_skip_lsn);
PG_FUNCTION_INFO_V1(spock_alter_subscription_options);

PG_FUNCTION_INFO_V1(spock_alter_subscription_synchronize);
PG_FUNCTION_INFO_V1(spock_alter_subscription_resynchronize_table);
Expand Down Expand Up @@ -924,6 +926,126 @@ spock_alter_subscription_skip_lsn(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(true);
}

/*
* Update multiple subscription options at once mentioned at the input JSONB.
*
* Recognised keys: "forward_origins", "apply_delay", and "skip_schema".
*
* Any unrecognised key or wrong JSON type raises an ERROR immediately.
*
* Note: options that affect the replication stream (forward_origins,
* apply_delay) take effect only after the apply worker reconnects to the
* publisher.
*/
Datum
spock_alter_subscription_options(PG_FUNCTION_ARGS)
{
char *sub_name = NameStr(*PG_GETARG_NAME(0));
Jsonb *options = PG_GETARG_JSONB_P(1);
SpockSubscription *sub = get_subscription_by_name(sub_name, false);
JsonbIterator *it;
JsonbIteratorToken r;
JsonbValue v;
bool changed = false;

/* XXX: Only used for locking purposes. */
(void) get_local_node(true, false);

it = JsonbIteratorInit(&options->root);

r = JsonbIteratorNext(&it, &v, false);
if (r != WJB_BEGIN_OBJECT)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("subscription options must be a JSON object")));

while ((r = JsonbIteratorNext(&it, &v, false)) != WJB_END_OBJECT)
{
char *key;

if (r != WJB_KEY)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unexpected token while parsing subscription options")));

key = pnstrdup(v.val.string.val, v.val.string.len);

if (strcmp(key, "forward_origins") == 0 ||
strcmp(key, "skip_schema") == 0)
{
List *result = NIL;

r = JsonbIteratorNext(&it, &v, false);
if (r != WJB_BEGIN_ARRAY)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" must be a JSON array of strings",
key)));

while ((r = JsonbIteratorNext(&it, &v, false)) != WJB_END_ARRAY)
{
char *elem;

if (r != WJB_ELEM || v.type != jbvString)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" must contain only strings",
key)));

elem = pnstrdup(v.val.string.val, v.val.string.len);

/* forward_origins only accepts the sentinel value "all" */
if (strcmp(key, "forward_origins") == 0 &&
strcmp(elem, "all") != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value \"%s\" for option \"forward_origins\"",
elem),
errhint("The only supported value is \"all\".")));

result = lappend(result, elem);
}

if (strcmp(key, "forward_origins") == 0)
sub->forward_origins = result;
else
sub->skip_schema = result;
changed = true;
}
else if (strcmp(key, "apply_delay") == 0)
{
char *delay_str;

r = JsonbIteratorNext(&it, &v, false);
if (r != WJB_VALUE || v.type != jbvString)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"apply_delay\" must be a string")));

delay_str = pnstrdup(v.val.string.val, v.val.string.len);
sub->apply_delay = DatumGetIntervalP(
DirectFunctionCall3(interval_in,
CStringGetDatum(delay_str),
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1)));
changed = true;
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized subscription option \"%s\"", key),
errhint("Valid options are: \"forward_origins\", "
"\"apply_delay\", \"skip_schema\".")));
}
}

if (changed)
alter_subscription(sub);

PG_RETURN_BOOL(changed);
}

/*
* Synchronize all the missing tables.
*/
Expand Down
Loading
Loading