Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cleanup for chunk copy/move #3396

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions sql/ddl_experimental.sql
Expand Up @@ -37,3 +37,12 @@ CREATE OR REPLACE PROCEDURE timescaledb_experimental.copy_chunk(
source_node NAME = NULL,
destination_node NAME = NULL)
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_proc' LANGUAGE C;

-- A copy_chunk or move_chunk procedure call involves multiple nodes and
-- depending on the data size can take a long time. Failures are possible
-- when this long running activity is ongoing. We need to be able to recover
-- and cleanup such failed chunk copy/move activities and it's done via this
-- procedure
CREATE OR REPLACE PROCEDURE timescaledb_experimental.cleanup_copy_chunk_operation(
operation_id NAME)
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_cleanup_proc' LANGUAGE C;
9 changes: 3 additions & 6 deletions sql/pre_install/tables.sql
Expand Up @@ -385,11 +385,10 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.remote_txn', ''
-- carry over chunk copy/move operations from earlier (if it makes sense at all)
--

CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1;
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'),
operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
backend_pid integer NOT NULL, -- the pid of the backend running this activity
completed_stage name NOT NULL, -- the completed stage/step
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
Expand All @@ -399,8 +398,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
delete_on_source_node bool NOT NULL -- is a move or copy activity
);

ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id;

-- Set table permissions
-- We need to grant SELECT to PUBLIC for all tables even those not
-- marked as being dumped because pg_dump will try to access all
Expand Down
20 changes: 5 additions & 15 deletions sql/updates/latest-dev.sql
Expand Up @@ -4,18 +4,10 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.block_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.allow_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;

-- Use the experimental schema for ths new procedure
CREATE OR REPLACE PROCEDURE timescaledb_experimental.move_chunk(
chunk REGCLASS,
source_node NAME = NULL,
destination_node NAME = NULL)
AS '@MODULE_PATHNAME@', 'ts_move_chunk_proc' LANGUAGE C;
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;

CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1;

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'),
operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
backend_pid integer NOT NULL, -- the pid of the backend running this activity
completed_stage name NOT NULL, -- the completed stage/step
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
Expand All @@ -25,7 +17,5 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
delete_on_source_node bool NOT NULL -- is a move or copy activity
);

ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id;

GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation TO PUBLIC;
5 changes: 3 additions & 2 deletions sql/updates/reverse-dev.sql
Expand Up @@ -8,8 +8,9 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;
DROP PROCEDURE IF EXISTS _timescaledb_internal.wait_subscription_sync;
DROP PROCEDURE IF EXISTS timescaledb_experimental.move_chunk;
DROP PROCEDURE IF EXISTS timescaledb_experimental.copy_chunk;
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_activity;
DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq;
DROP PROCEDURE IF EXISTS timescaledb_experimental.cleanup_copy_chunk_operation;
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_operation;
DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq;
DROP VIEW IF EXISTS timescaledb_experimental.chunk_replication_status;
DROP SCHEMA IF EXISTS timescaledb_experimental CASCADE;

Expand Down
12 changes: 6 additions & 6 deletions src/catalog.c
Expand Up @@ -103,9 +103,9 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = REMOTE_TXN_TABLE_NAME,
},
[CHUNK_COPY_ACTIVITY] = {
[CHUNK_COPY_OPERATION] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CHUNK_COPY_ACTIVITY_TABLE_NAME,
.table_name = CHUNK_COPY_OPERATION_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema",
Expand Down Expand Up @@ -250,10 +250,10 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[REMOTE_TXN_DATA_NODE_NAME_IDX] = "remote_txn_data_node_name_idx"
}
},
[CHUNK_COPY_ACTIVITY] = {
.length = _MAX_CHUNK_COPY_ACTIVITY_INDEX,
[CHUNK_COPY_OPERATION] = {
.length = _MAX_CHUNK_COPY_OPERATION_INDEX,
.names = (char *[]) {
[CHUNK_COPY_ACTIVITY_PKEY_IDX] = "chunk_copy_activity_pkey",
[CHUNK_COPY_OPERATION_PKEY_IDX] = "chunk_copy_operation_pkey",
},
}
};
Expand All @@ -276,7 +276,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
[HYPERTABLE_COMPRESSION] = NULL,
[COMPRESSION_CHUNK_SIZE] = NULL,
[REMOTE_TXN] = NULL,
[CHUNK_COPY_ACTIVITY] = CATALOG_SCHEMA_NAME ".chunk_copy_activity_id_seq",
[CHUNK_COPY_OPERATION] = CATALOG_SCHEMA_NAME ".chunk_copy_operation_id_seq",
};

typedef struct InternalFunctionDef
Expand Down
43 changes: 20 additions & 23 deletions src/catalog.h
Expand Up @@ -53,7 +53,7 @@ typedef enum CatalogTable
HYPERTABLE_COMPRESSION,
COMPRESSION_CHUNK_SIZE,
REMOTE_TXN,
CHUNK_COPY_ACTIVITY,
CHUNK_COPY_OPERATION,
_MAX_CATALOG_TABLES,
} CatalogTable;

Expand Down Expand Up @@ -1189,27 +1189,25 @@ enum Anum_remote_data_node_name_idx
*
********************************************/

#define CHUNK_COPY_ACTIVITY_TABLE_NAME "chunk_copy_activity"
#define CHUNK_COPY_OPERATION_TABLE_NAME "chunk_copy_operation"

enum Anum_chunk_copy_activity
enum Anum_chunk_copy_operation
{
Anum_chunk_copy_activity_id = 1,
Anum_chunk_copy_activity_operation_id,
Anum_chunk_copy_activity_backend_pid,
Anum_chunk_copy_activity_completed_stage,
Anum_chunk_copy_activity_time_start,
Anum_chunk_copy_activity_chunk_id,
Anum_chunk_copy_activity_source_node_name,
Anum_chunk_copy_activity_dest_node_name,
Anum_chunk_copy_activity_delete_on_src_node,
_Anum_chunk_copy_activity_max,
Anum_chunk_copy_operation_operation_id = 1,
Anum_chunk_copy_operation_backend_pid,
Anum_chunk_copy_operation_completed_stage,
Anum_chunk_copy_operation_time_start,
Anum_chunk_copy_operation_chunk_id,
Anum_chunk_copy_operation_source_node_name,
Anum_chunk_copy_operation_dest_node_name,
Anum_chunk_copy_operation_delete_on_src_node,
_Anum_chunk_copy_operation_max,
};

#define Natts_chunk_copy_activity (_Anum_chunk_copy_activity_max - 1)
#define Natts_chunk_copy_operation (_Anum_chunk_copy_operation_max - 1)

typedef struct FormData_chunk_copy_activity
typedef struct FormData_chunk_copy_operation
{
int32 id;
NameData operation_id;
int32 backend_pid;
NameData completed_stage;
Expand All @@ -1218,20 +1216,19 @@ typedef struct FormData_chunk_copy_activity
NameData source_node_name;
NameData dest_node_name;
bool delete_on_src_node;
} FormData_chunk_copy_activity;
} FormData_chunk_copy_operation;

enum
{
CHUNK_COPY_ACTIVITY_PKEY_IDX = 0,
_MAX_CHUNK_COPY_ACTIVITY_INDEX,
CHUNK_COPY_OPERATION_PKEY_IDX = 0,
_MAX_CHUNK_COPY_OPERATION_INDEX,
};

enum Anum_chunk_copy_activity_pkey_idx
enum Anum_chunk_copy_operation_pkey_idx
{
Anum_chunk_copy_activity_pkey_idx_id = 1,
_Anum_chunk_copy_activity_pkey_idx_max,
Anum_chunk_copy_operation_idx_operation_id = 1,
_Anum_chunk_copy_operation_pkey_idx_max,
};
#define Natts_chunk_copy_activity_pkey_idx (_Anum_chunk_copy_activity_pkey_idx_max - 1)

typedef enum CacheType
{
Expand Down
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -44,6 +44,7 @@ CROSSMODULE_WRAPPER(reorder_chunk);
CROSSMODULE_WRAPPER(move_chunk);
CROSSMODULE_WRAPPER(move_chunk_proc);
CROSSMODULE_WRAPPER(copy_chunk_proc);
CROSSMODULE_WRAPPER(copy_chunk_cleanup_proc);

/* partialize/finalize aggregate */
CROSSMODULE_WRAPPER(partialize_agg);
Expand Down Expand Up @@ -336,6 +337,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.move_chunk = error_no_default_fn_pg_community,
.move_chunk_proc = error_no_default_fn_pg_community,
.copy_chunk_proc = error_no_default_fn_pg_community,
.copy_chunk_cleanup_proc = error_no_default_fn_pg_community,
.reorder_chunk = error_no_default_fn_pg_community,

.partialize_agg = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -81,6 +81,7 @@ typedef struct CrossModuleFunctions
PGFunction move_chunk;
PGFunction move_chunk_proc;
PGFunction copy_chunk_proc;
PGFunction copy_chunk_cleanup_proc;
void (*ddl_command_start)(ProcessUtilityArgs *args);
void (*ddl_command_end)(EventTriggerData *command);
void (*sql_drop)(List *dropped_objects);
Expand Down
7 changes: 5 additions & 2 deletions src/utils.c
Expand Up @@ -525,8 +525,11 @@ ts_create_struct_from_tuple(HeapTuple tuple, MemoryContext mctx, size_t alloc_si
{
void *struct_ptr = MemoryContextAllocZero(mctx, alloc_size);

/* Make sure the function is not used when the tuple contains NULLs */
Assert(copy_size == tuple->t_len - tuple->t_data->t_hoff);
/*
* Make sure the function is not used when the tuple contains NULLs.
* Also compare the aligned sizes in the assert.
*/
Assert(copy_size == MAXALIGN(tuple->t_len - tuple->t_data->t_hoff));
memcpy(struct_ptr, GETSTRUCT(tuple), copy_size);

return struct_ptr;
Expand Down
4 changes: 2 additions & 2 deletions src/utils.h
Expand Up @@ -82,8 +82,8 @@ typedef struct Dimension Dimension;

extern TSDLLEXPORT Oid ts_get_integer_now_func(const Dimension *open_dim);

extern void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx, size_t alloc_size,
size_t copy_size);
extern TSDLLEXPORT void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx,
size_t alloc_size, size_t copy_size);

extern TSDLLEXPORT AppendRelInfo *ts_get_appendrelinfo(PlannerInfo *root, Index rti,
bool missing_ok);
Expand Down
2 changes: 1 addition & 1 deletion test/expected/drop_rename_hypertable.out
Expand Up @@ -196,7 +196,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
----------------------+--------------------------------------------------+-------+------------
_timescaledb_catalog | chunk | table | super_user
_timescaledb_catalog | chunk_constraint | table | super_user
_timescaledb_catalog | chunk_copy_activity | table | super_user
_timescaledb_catalog | chunk_copy_operation | table | super_user
_timescaledb_catalog | chunk_data_node | table | super_user
_timescaledb_catalog | chunk_index | table | super_user
_timescaledb_catalog | compression_algorithm | table | super_user
Expand Down
4 changes: 2 additions & 2 deletions test/expected/pg_dump.out
Expand Up @@ -556,8 +556,8 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
timescaledb_information.hypertables
_timescaledb_internal.compressed_chunk_stats
_timescaledb_internal.hypertable_chunk_local_size
_timescaledb_catalog.chunk_copy_activity
_timescaledb_catalog.chunk_copy_activity_id_seq
_timescaledb_catalog.chunk_copy_operation
_timescaledb_catalog.chunk_copy_operation_id_seq
_timescaledb_catalog.compression_algorithm
_timescaledb_internal.bgw_policy_chunk_stats
_timescaledb_internal.bgw_job_stat
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/chunk_api.c
Expand Up @@ -1709,7 +1709,7 @@ chunk_api_call_create_empty_chunk_table(const Hypertable *ht, const Chunk *chunk
ts_dist_cmd_params_invoke_on_data_nodes(create_cmd,
stmt_params_create_from_values(params, 4),
list_make1((void *) node_name),
false));
true));
}

void
Expand Down