Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy chunk merge PR #3446

Merged
merged 17 commits into from Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions appveyor.yml
Expand Up @@ -125,6 +125,8 @@ build_script:

Add-Content "C:\Program Files\postgresql\12\data\postgresql.conf" "timescaledb.license = 'apache'"

Add-Content "C:\Program Files\postgresql\12\data\postgresql.conf" "wal_level = 'logical'"

# Add-Content "C:\Program Files\postgresql\12\data\postgresql.conf" "log_min_messages='debug5'"

# build timescale
Expand Down
1 change: 1 addition & 0 deletions cmake/ScriptFiles.cmake
Expand Up @@ -54,6 +54,7 @@ set(SOURCE_FILES
metadata.sql
dist_internal.sql
views.sql
views_experimental.sql
gapfill.sql
maintenance_utils.sql
partialize_finalize.sql
Expand Down
19 changes: 16 additions & 3 deletions sql/chunk.sql
Expand Up @@ -40,12 +40,18 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.show_chunk(chunk REGCLASS)
RETURNS TABLE(chunk_id INTEGER, hypertable_id INTEGER, schema_name NAME, table_name NAME, relkind "char", slices JSONB)
AS '@MODULE_PATHNAME@', 'ts_chunk_show' LANGUAGE C VOLATILE;

-- Create a chunk with the given dimensional constraints (slices) as given in the JSONB.
-- Create a chunk with the given dimensional constraints (slices) as
-- given in the JSONB. If chunk_table is a valid relation, it will be
-- attached to the hypertable and used as the data table for the new
-- chunk. Note that schema_name and table_name need not be the same as
-- the existing schema and name for chunk_table. The provided chunk
-- table will be renamed and/or moved as necessary.
CREATE OR REPLACE FUNCTION _timescaledb_internal.create_chunk(
hypertable REGCLASS,
slices JSONB,
slices JSONB,
schema_name NAME = NULL,
table_name NAME = NULL)
table_name NAME = NULL,
chunk_table REGCLASS = NULL)
RETURNS TABLE(chunk_id INTEGER, hypertable_id INTEGER, schema_name NAME, table_name NAME, relkind "char", slices JSONB, created BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_chunk_create' LANGUAGE C VOLATILE;

Expand All @@ -63,3 +69,10 @@ RETURNS TABLE(chunk_id INTEGER, hypertable_id INTEGER, att_num INTEGER, nullfrac
slot1numbers FLOAT4[], slot2numbers FLOAT4[], slot3numbers FLOAT4[], slot4numbers FLOAT4[], slot5numbers FLOAT4[],
slotvaluetypetrings CSTRING[], slot1values CSTRING[], slot2values CSTRING[], slot3values CSTRING[], slot4values CSTRING[], slot5values CSTRING[])
AS '@MODULE_PATHNAME@', 'ts_chunk_get_colstats' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.create_chunk_table(
hypertable REGCLASS,
slices JSONB,
schema_name NAME,
table_name NAME)
RETURNS BOOL AS '@MODULE_PATHNAME@', 'ts_chunk_create_empty_table' LANGUAGE C VOLATILE;
21 changes: 21 additions & 0 deletions sql/ddl_experimental.sql
Expand Up @@ -25,3 +25,24 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.refresh_continuous_aggregate
continuous_aggregate REGCLASS,
hypertable_chunk REGCLASS
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh_chunk' LANGUAGE C VOLATILE;

CREATE OR REPLACE PROCEDURE timescaledb_experimental.move_chunk(
chunk REGCLASS,
source_node NAME = NULL,
destination_node NAME = NULL)
AS '@MODULE_PATHNAME@', 'ts_move_chunk_proc' LANGUAGE C;

CREATE OR REPLACE PROCEDURE timescaledb_experimental.copy_chunk(
chunk REGCLASS,
source_node NAME = NULL,
destination_node NAME = NULL)
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_proc' LANGUAGE C;

-- A copy_chunk or move_chunk procedure call involves multiple nodes and
-- depending on the data size can take a long time. Failures are possible
-- when this long running activity is ongoing. We need to be able to recover
-- and cleanup such failed chunk copy/move activities and it's done via this
-- procedure
CREATE OR REPLACE PROCEDURE timescaledb_experimental.cleanup_copy_chunk_operation(
operation_id NAME)
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_cleanup_proc' LANGUAGE C;
42 changes: 42 additions & 0 deletions sql/ddl_internal.sql
Expand Up @@ -8,3 +8,45 @@ AS '@MODULE_PATHNAME@', 'ts_chunk_index_clone' LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_index_replace(chunk_index_oid_old OID, chunk_index_oid_new OID) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_chunk_index_replace' LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION _timescaledb_internal.create_chunk_replica_table(
chunk REGCLASS,
data_node_name NAME
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_chunk_create_replica_table' LANGUAGE C VOLATILE;

-- Drop the specified chunk replica on the specified data node
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_drop_replica(
chunk REGCLASS,
node_name NAME
) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_chunk_drop_replica' LANGUAGE C VOLATILE;

CREATE OR REPLACE PROCEDURE _timescaledb_internal.wait_subscription_sync(
schema_name NAME,
table_name NAME,
retry_count INT DEFAULT 18000,
retry_delay_ms NUMERIC DEFAULT 0.200
)
LANGUAGE PLPGSQL AS
$BODY$
DECLARE
in_sync BOOLEAN;
BEGIN
FOR i in 1 .. retry_count
LOOP
SELECT pgs.srsubstate = 'r'
INTO in_sync
FROM pg_subscription_rel pgs
JOIN pg_class pgc ON relname = table_name
JOIN pg_namespace n ON (n.OID = pgc.relnamespace)
WHERE pgs.srrelid = pgc.oid AND schema_name = n.nspname;

if (in_sync IS NULL OR NOT in_sync) THEN
PERFORM pg_sleep(retry_delay_ms);
ELSE
RETURN;
END IF;
END LOOP;
RAISE 'subscription sync wait timedout';
END
$BODY$;
1 change: 0 additions & 1 deletion sql/maintenance_utils.sql
Expand Up @@ -33,4 +33,3 @@ CREATE OR REPLACE FUNCTION recompress_chunk(
chunk REGCLASS,
if_not_compressed BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_recompress_chunk' LANGUAGE C STRICT VOLATILE;

31 changes: 31 additions & 0 deletions sql/pre_install/tables.sql
Expand Up @@ -367,6 +367,37 @@ CREATE INDEX IF NOT EXISTS remote_txn_data_node_name_idx ON _timescaledb_catalog

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.remote_txn', '');

-- This table stores information about the stage that has been completed of a
-- chunk move/copy activity
--
-- A cleanup activity can query and check if the backend is running. If the
-- backend has exited then we can commence cleanup. The cleanup
-- activity can also do a diff with the "time_start" value to ascertain if
-- the entire end-to-end activity is going on for too long
--
-- We also track the end time of every stage. A diff with the current time
-- will give us an idea about how long the current stage has been running
--
-- Entry for a chunk move/copy activity gets deleted on successful completion
--
-- We don't want to pg_dump this table's contents. A node restored using it
-- could be part of a totally different multinode setup and we don't want to
-- carry over chunk copy/move operations from earlier (if it makes sense at all)
--

CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
backend_pid integer NOT NULL, -- the pid of the backend running this activity
completed_stage name NOT NULL, -- the completed stage/step
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
chunk_id integer NOT NULL REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
source_node_name name NOT NULL,
dest_node_name name NOT NULL,
delete_on_source_node bool NOT NULL -- is a move or copy activity
);

-- Set table permissions
-- We need to grant SELECT to PUBLIC for all tables even those not
-- marked as being dumped because pg_dump will try to access all
Expand Down
17 changes: 17 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -3,3 +3,20 @@ GRANT USAGE ON SCHEMA timescaledb_experimental TO PUBLIC;
DROP FUNCTION IF EXISTS _timescaledb_internal.block_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.allow_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.refresh_continuous_aggregate;
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;

CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;

CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
backend_pid integer NOT NULL, -- the pid of the backend running this activity
completed_stage name NOT NULL, -- the completed stage/step
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
chunk_id integer NOT NULL REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
source_node_name name NOT NULL,
dest_node_name name NOT NULL,
delete_on_source_node bool NOT NULL -- is a move or copy activity
);

GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation TO PUBLIC;
41 changes: 40 additions & 1 deletion sql/updates/reverse-dev.sql
@@ -1,4 +1,43 @@
DROP SCHEMA IF EXISTS timescaledb_experimental CASCADE;
DROP FUNCTION IF EXISTS _timescaledb_internal.block_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.allow_new_chunks;
DROP FUNCTION IF EXISTS _timescaledb_internal.refresh_continuous_aggregate;
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk_table;
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk_replica_table;
DROP FUNCTION IF EXISTS _timescaledb_internal.chunk_drop_replica;
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;
DROP PROCEDURE IF EXISTS _timescaledb_internal.wait_subscription_sync;
DROP PROCEDURE IF EXISTS timescaledb_experimental.move_chunk;
DROP PROCEDURE IF EXISTS timescaledb_experimental.copy_chunk;
DROP PROCEDURE IF EXISTS timescaledb_experimental.cleanup_copy_chunk_operation;
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_operation;
DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq;
DROP VIEW IF EXISTS timescaledb_experimental.chunk_replication_status;
DROP SCHEMA IF EXISTS timescaledb_experimental CASCADE;

-- We need to rewrite all continuous aggregates to make sure that the
-- queries do not contain qualification. They will be re-written in
-- the post-update script as well, but the previous version does not
-- process all continuous aggregates, leaving some with qualification
-- for the standard functions. To make this work, we need to
-- temporarily set the update stage to the post-update stage, which
-- will allow the ALTER MATERIALIZED VIEW to rewrite the query. If
-- that is not done, the TimescaleDB-specific hooks will not be used
-- and you will get an error message saying that, for example,
-- `conditions_summary` is not a materialized view.
SET timescaledb.update_script_stage TO 'post';
DO $$
DECLARE
vname regclass;
materialized_only bool;
altercmd text;
ts_version TEXT;
BEGIN
FOR vname, materialized_only IN select format('%I.%I', cagg.user_view_schema, cagg.user_view_name)::regclass, cagg.materialized_only from _timescaledb_catalog.continuous_agg cagg
LOOP
altercmd := format('ALTER MATERIALIZED VIEW %s SET (timescaledb.materialized_only=%L) ', vname::text, materialized_only);
EXECUTE altercmd;
END LOOP;
EXCEPTION WHEN OTHERS THEN RAISE;
END
$$;
RESET timescaledb.update_script_stage;
28 changes: 28 additions & 0 deletions sql/views_experimental.sql
@@ -0,0 +1,28 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

CREATE VIEW timescaledb_experimental.chunk_replication_status AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name AS chunk_schema,
c.table_name AS chunk_name,
h.replication_factor AS desired_num_replicas,
count(cdn.chunk_id) AS num_replicas,
array_agg(cdn.node_name) AS replica_nodes,
-- compute the set of data nodes that doesn't have the chunk
(SELECT array_agg(node_name) FROM
(SELECT node_name FROM _timescaledb_catalog.hypertable_data_node hdn
WHERE hdn.hypertable_id = h.id
EXCEPT
SELECT node_name FROM _timescaledb_catalog.chunk_data_node cdn
WHERE cdn.chunk_id = c.id
ORDER BY node_name) nodes) AS non_replica_nodes
FROM _timescaledb_catalog.chunk c
INNER JOIN _timescaledb_catalog.chunk_data_node cdn ON (cdn.chunk_id = c.id)
INNER JOIN _timescaledb_catalog.hypertable h ON (h.id = c.hypertable_id)
GROUP BY h.id, c.id, hypertable_schema, hypertable_name, chunk_schema, chunk_name
ORDER BY h.id, c.id, hypertable_schema, hypertable_name, chunk_schema, chunk_name;

GRANT SELECT ON ALL TABLES IN SCHEMA timescaledb_experimental TO PUBLIC;
11 changes: 11 additions & 0 deletions src/catalog.c
Expand Up @@ -103,6 +103,10 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = REMOTE_TXN_TABLE_NAME,
},
[CHUNK_COPY_OPERATION] = {
.schema_name = CATALOG_SCHEMA_NAME,
.table_name = CHUNK_COPY_OPERATION_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema",
.table_name = "invalid table",
Expand Down Expand Up @@ -245,6 +249,12 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
[REMOTE_TXN_PKEY_IDX] = "remote_txn_pkey",
[REMOTE_TXN_DATA_NODE_NAME_IDX] = "remote_txn_data_node_name_idx"
}
},
[CHUNK_COPY_OPERATION] = {
.length = _MAX_CHUNK_COPY_OPERATION_INDEX,
.names = (char *[]) {
[CHUNK_COPY_OPERATION_PKEY_IDX] = "chunk_copy_operation_pkey",
},
}
};

Expand All @@ -266,6 +276,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
[HYPERTABLE_COMPRESSION] = NULL,
[COMPRESSION_CHUNK_SIZE] = NULL,
[REMOTE_TXN] = NULL,
[CHUNK_COPY_OPERATION] = CATALOG_SCHEMA_NAME ".chunk_copy_operation_id_seq",
};

typedef struct InternalFunctionDef
Expand Down
48 changes: 48 additions & 0 deletions src/catalog.h
Expand Up @@ -53,6 +53,7 @@ typedef enum CatalogTable
HYPERTABLE_COMPRESSION,
COMPRESSION_CHUNK_SIZE,
REMOTE_TXN,
CHUNK_COPY_OPERATION,
_MAX_CATALOG_TABLES,
} CatalogTable;

Expand Down Expand Up @@ -1182,6 +1183,53 @@ enum Anum_remote_data_node_name_idx
_Anum_remote_txn_data_node_name_idx_max,
};

/********************************************
*
* table to track chunk copy/move operations
*
********************************************/

#define CHUNK_COPY_OPERATION_TABLE_NAME "chunk_copy_operation"

enum Anum_chunk_copy_operation
{
Anum_chunk_copy_operation_operation_id = 1,
Anum_chunk_copy_operation_backend_pid,
Anum_chunk_copy_operation_completed_stage,
Anum_chunk_copy_operation_time_start,
Anum_chunk_copy_operation_chunk_id,
Anum_chunk_copy_operation_source_node_name,
Anum_chunk_copy_operation_dest_node_name,
Anum_chunk_copy_operation_delete_on_src_node,
_Anum_chunk_copy_operation_max,
};

#define Natts_chunk_copy_operation (_Anum_chunk_copy_operation_max - 1)

typedef struct FormData_chunk_copy_operation
{
NameData operation_id;
int32 backend_pid;
NameData completed_stage;
TimestampTz time_start;
int32 chunk_id;
NameData source_node_name;
NameData dest_node_name;
bool delete_on_src_node;
} FormData_chunk_copy_operation;

enum
{
CHUNK_COPY_OPERATION_PKEY_IDX = 0,
_MAX_CHUNK_COPY_OPERATION_INDEX,
};

enum Anum_chunk_copy_operation_pkey_idx
{
Anum_chunk_copy_operation_idx_operation_id = 1,
_Anum_chunk_copy_operation_pkey_idx_max,
};

typedef enum CacheType
{
CACHE_TYPE_HYPERTABLE,
Expand Down