Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make move_chunk use AN txns on DN #3372

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 37 additions & 10 deletions tsl/src/chunk_copy.c
Expand Up @@ -400,7 +400,7 @@ chunk_copy_stage_create_publication(ChunkCopy *cc)
NameStr(cc->chunk->fd.table_name)));

/* Create the publication in autocommit mode */
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never remember what booleans like this means and I had to look it up in the code. Just a mental note that often it is better to use descriptive enums, e.g., CMD_TRANSACTIONAL. Obviously not for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

}

static void
Expand All @@ -415,7 +415,7 @@ chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
cmd = psprintf("SELECT pg_create_logical_replication_slot('%s', 'pgoutput')",
NameStr(cc->fd.operation_id));

ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}

static void
Expand All @@ -432,7 +432,7 @@ chunk_copy_stage_create_subscription(ChunkCopy *cc)
NameStr(cc->fd.operation_id),
connection_string,
NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
}

static void
Expand All @@ -442,29 +442,52 @@ chunk_copy_stage_sync_start(ChunkCopy *cc)

/* Start data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
}

static void
chunk_copy_stage_sync(ChunkCopy *cc)
{
const char *cmd;
char *cmd;

/*
* Transaction blocks run in REPEATABLE READ mode in the connection pool.
* However this wait_subscription_sync procedure needs to refresh the subcription
* sync status data and hence needs a READ COMMITTED transaction isolation
* level for that.
*/
cmd = psprintf("SET transaction_isolation TO 'READ COMMITTED'");
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);

/* Wait until data transfer finishes in its own transaction */
cmd = psprintf("CALL _timescaledb_internal.wait_subscription_sync(%s, %s)",
quote_literal_cstr(NameStr(cc->chunk->fd.schema_name)),
quote_literal_cstr(NameStr(cc->chunk->fd.table_name)));

ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
}

static void
chunk_copy_stage_drop_subscription(ChunkCopy *cc)
{
const char *cmd;
char *cmd;

/* Stop data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
Copy link
Contributor

@pmwkaa pmwkaa Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might makes sense to separate all of those steps in two/three separate stages, since we have a separate stage to create the replication slot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We created these additional substeps just to allow things to work in a transaction block. Semantically it's all about dropping the subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but from point of view of the healing function which way is more convenient to operate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not an issue. The cleanup function will be exactly the same as this one with these 3 steps part of a transaction block

ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);

/* Disassociate the subscription from the replication slot first */
cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);

/* Drop the subscription now */
cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
}

static void
Expand All @@ -473,7 +496,7 @@ chunk_copy_stage_drop_publication(ChunkCopy *cc)
const char *cmd;

cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}

static void
Expand Down Expand Up @@ -556,7 +579,11 @@ chunk_copy_execute(ChunkCopy *cc)
{
const ChunkCopyStage *stage;

/* Execute each copy stage in a separate transaction */
/*
* Execute each copy stage in a separate transaction. The below will employ
* 2PC by default. This can be later optimized to use 1PC since only one
* datanode is involved in most of the stages.
*/
for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++)
{
StartTransactionCommand();
Expand Down