Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recompress optional argument to compress_chunk #6609

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .unreleased/pr_6609
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Implements: #6609 Deprecate recompress_chunk
Implements: #6609 Add optional recompress argument to compress_chunk
64 changes: 9 additions & 55 deletions sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.create_compressed_chunk(

CREATE OR REPLACE FUNCTION @extschema@.compress_chunk(
uncompressed_chunk REGCLASS,
if_not_compressed BOOLEAN = true
if_not_compressed BOOLEAN = true,
recompress BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_compress_chunk' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION @extschema@.decompress_chunk(
Expand Down Expand Up @@ -62,61 +63,14 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.get_compressed_chunk_index_for
-- Parameters:
-- chunk: Chunk to recompress.
-- if_not_compressed: Print notice instead of error if chunk is already compressed.
CREATE OR REPLACE PROCEDURE @extschema@.recompress_chunk(chunk REGCLASS,
if_not_compressed BOOLEAN = true)
AS $$
DECLARE
status INT;
chunk_name TEXT[];
compressed_chunk_index REGCLASS;
BEGIN

-- procedures with SET clause cannot execute transaction
-- control so we adjust search_path in procedure body
SET LOCAL search_path TO pg_catalog, pg_temp;

status := _timescaledb_functions.chunk_status(chunk);

-- Chunk names are in the internal catalog, but we only care about
-- the chunk name here.
-- status bits:
-- 1: compressed
-- 2: compressed unordered
-- 4: frozen
-- 8: compressed partial

chunk_name := parse_ident(chunk::text);
CASE
WHEN status = 0 THEN
RAISE EXCEPTION 'call compress_chunk instead of recompress_chunk';
WHEN status = 1 THEN
IF if_not_compressed THEN
RAISE NOTICE 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
RETURN;
ELSE
RAISE EXCEPTION 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
END IF;
WHEN status = 3 OR status = 9 OR status = 11 THEN
-- first check if there's an index. Might have to use a heuristic to determine if index usage would be efficient,
-- or if we'd better fall back to decompressing & recompressing entire chunk
SELECT _timescaledb_functions.get_compressed_chunk_index_for_recompression(chunk) INTO STRICT compressed_chunk_index;
IF compressed_chunk_index IS NOT NULL THEN
PERFORM _timescaledb_functions.recompress_chunk_segmentwise(chunk, if_not_compressed);
ELSE
PERFORM @extschema@.decompress_chunk(chunk);
COMMIT;
-- SET LOCAL is only active until end of transaction.
-- While we could use SET at the start of the function we do not
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
SET LOCAL search_path TO pg_catalog, pg_temp;
PERFORM @extschema@.compress_chunk(chunk, if_not_compressed);
END IF;
ELSE
RAISE EXCEPTION 'unexpected chunk status % in chunk "%"', status, chunk_name[array_upper(chunk_name,1)];
END CASE;
END
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE @extschema@.recompress_chunk(chunk REGCLASS, if_not_compressed BOOLEAN = true) LANGUAGE PLPGSQL AS $$
BEGIN
IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
RAISE WARNING 'procedure @extschema@.recompress_chunk(regclass,boolean) is deprecated and the functionality is now included in @extschema@.compress_chunk. this compatibility function will be removed in a future version.';
END IF;
PERFORM @extschema@.compress_chunk(chunk, if_not_compressed);
fabriziomello marked this conversation as resolved.
Show resolved Hide resolved
END$$ SET search_path TO pg_catalog,pg_temp;

-- A version of makeaclitem that accepts a comma-separated list of
-- privileges rather than just a single privilege. This is copied from
Expand Down
4 changes: 4 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,7 @@ $BODY$
SELECT sum(total_bytes)::bigint
FROM @extschema@.hypertable_approximate_detailed_size(hypertable);
$BODY$ SET search_path TO pg_catalog, pg_temp;

DROP FUNCTION IF EXISTS @extschema@.compress_chunk;
CREATE FUNCTION @extschema@.compress_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN = true, recompress BOOLEAN = false) RETURNS REGCLASS AS '' LANGUAGE SQL SET search_path TO pg_catalog, pg_temp;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should invent a more precise term for recompress, something like apply_hypertable_compression_settings. And make separate functions, one to fully compress a partially or fully uncompressed chunk, and another to apply the new compression settings. I think this will be a less confusing interface.


4 changes: 4 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,7 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.hypertable', ''
DROP FUNCTION IF EXISTS _timescaledb_functions.relation_approximate_size(relation REGCLASS);
DROP FUNCTION IF EXISTS @extschema@.hypertable_approximate_detailed_size(relation REGCLASS);
DROP FUNCTION IF EXISTS @extschema@.hypertable_approximate_size(hypertable REGCLASS);

DROP FUNCTION IF EXISTS @extschema@.compress_chunk;
CREATE FUNCTION @extschema@.compress_chunk(uncompressed_chunk REGCLASS, if_not_compressed BOOLEAN = true) RETURNS REGCLASS AS '' LANGUAGE SQL SET search_path TO pg_catalog,pg_temp;

2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ policy_recompression_execute(int32 job_id, Jsonb *config)
if (!ts_chunk_needs_recompression(chunk))
continue;

tsl_recompress_chunk_wrapper(chunk);
tsl_compress_chunk_wrapper(chunk, true, false);

elog(LOG,
"completed recompressing chunk \"%s.%s\"",
Expand Down
54 changes: 28 additions & 26 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
if (chunk_unordered)
{
ts_chunk_set_unordered(mergable_chunk);
tsl_recompress_chunk_wrapper(mergable_chunk);
tsl_compress_chunk_wrapper(mergable_chunk, true, false);
}
}

Expand Down Expand Up @@ -779,20 +779,45 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);
bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);

PG_RETURN_OID(uncompressed_chunk_id);
}

Oid
tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress)
{
Oid uncompressed_chunk_id = chunk->table_id;

if (ts_chunk_is_compressed(chunk))
{
if (recompress)
{
CompressionSettings *ht_settings = ts_compression_settings_get(chunk->hypertable_relid);
Oid compressed_chunk_relid = ts_chunk_get_relid(chunk->fd.compressed_chunk_id, true);
CompressionSettings *chunk_settings =
ts_compression_settings_get(compressed_chunk_relid);

if (!ts_compression_settings_equal(ht_settings, chunk_settings))
{
decompress_chunk_impl(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
return uncompressed_chunk_id;
}
}
if (!ts_chunk_needs_recompression(chunk))
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
PG_RETURN_OID(uncompressed_chunk_id);
return uncompressed_chunk_id;
}

if (get_compressed_chunk_index_for_recompression(chunk))
Expand All @@ -810,7 +835,7 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

PG_RETURN_OID(uncompressed_chunk_id);
return uncompressed_chunk_id;
}

Datum
Expand Down Expand Up @@ -844,29 +869,6 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS)
PG_RETURN_OID(uncompressed_chunk_id);
}

bool
tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_relid = uncompressed_chunk->table_id;

Hypertable *ht = ts_hypertable_get_by_id(uncompressed_chunk->fd.hypertable_id);
ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());

if (!ht->fd.compressed_hypertable_id)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("missing compressed hypertable")));

if (ts_chunk_is_compressed(uncompressed_chunk))
{
decompress_chunk_impl(uncompressed_chunk, false);
}

Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_relid, true);
Assert(!ts_chunk_is_compressed(chunk));
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);

return true;
}

/* Sort the tuples and recompress them */
static void
recompress_segment(Tuplesortstate *tuplesortstate, Relation compressed_chunk_rel,
Expand Down
5 changes: 3 additions & 2 deletions tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
#include <postgres.h>
#include <fmgr.h>

#include "chunk.h"

extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_recompress_chunk(PG_FUNCTION_ARGS);
extern bool tsl_recompress_chunk_wrapper(Chunk *chunk);
extern Oid tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress);
extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS);

extern Datum tsl_get_compressed_chunk_index_for_recompression(
Expand Down
106 changes: 106 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -2667,3 +2667,109 @@ SELECT approximate_row_count('stattest');
(1 row)

DROP TABLE stattest;
-- test that all variants of compress_chunk produce a fully compressed chunk
CREATE TABLE compress_chunk_test(time TIMESTAMPTZ NOT NULL, device text, value float);
SELECT create_hypertable('compress_chunk_test', 'time');
create_hypertable
-----------------------------------
(47,public,compress_chunk_test,t)
(1 row)

INSERT INTO compress_chunk_test SELECT '2020-01-01', 'r2d2', 3.14;
ALTER TABLE compress_chunk_test SET (timescaledb.compress);
SELECT show_chunks('compress_chunk_test') AS "CHUNK" \gset
-- initial call will compress the chunk
SELECT compress_chunk(:'CHUNK');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- subsequent calls will be noop
SELECT compress_chunk(:'CHUNK');
NOTICE: chunk "_hyper_47_100_chunk" is already compressed
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- unless if_not_compressed is set to false
\set ON_ERROR_STOP 0
SELECT compress_chunk(:'CHUNK', false);
ERROR: chunk "_hyper_47_100_chunk" is already compressed
\set ON_ERROR_STOP 1
ALTER TABLE compress_chunk_test SET (timescaledb.compress_segmentby='device');
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
101
(1 row)

-- changing compression settings will not recompress the chunk by default
SELECT compress_chunk(:'CHUNK');
NOTICE: chunk "_hyper_47_100_chunk" is already compressed
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- unless we specify recompress := true
SELECT compress_chunk(:'CHUNK', recompress := true);
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- compressed_chunk_id should be different now
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
102
(1 row)

--test partial handling
INSERT INTO compress_chunk_test SELECT '2020-01-01', 'c3po', 3.14;
-- should result in merging uncompressed data into compressed chunk
SELECT compress_chunk(:'CHUNK');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_100_chunk
(1 row)

-- compressed_chunk_id should not have changed
SELECT compressed_chunk_id from _timescaledb_catalog.chunk ch INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = ch.hypertable_id AND ht.table_name='compress_chunk_test';
compressed_chunk_id
---------------------
102
(1 row)

-- should return no rows
SELECT * FROM ONLY :CHUNK;
time | device | value
------+--------+-------
(0 rows)

ALTER TABLE compress_chunk_test SET (timescaledb.compress_segmentby='');
-- create another chunk
INSERT INTO compress_chunk_test SELECT '2021-01-01', 'c3po', 3.14;
SELECT show_chunks('compress_chunk_test') AS "CHUNK2" LIMIT 1 OFFSET 1 \gset
SELECT compress_chunk(:'CHUNK2');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_103_chunk
(1 row)

-- make it partial and compress again
INSERT INTO compress_chunk_test SELECT '2021-01-01', 'r2d2', 3.14;
SELECT compress_chunk(:'CHUNK2');
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_47_103_chunk
(1 row)

-- should return no rows
SELECT * FROM ONLY :CHUNK2;
time | device | value
------+--------+-------
(0 rows)

44 changes: 28 additions & 16 deletions tsl/test/expected/compression_bgw-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,12 @@ SELECT count(*) from test2;
29
(1 row)

-- call recompress_chunk inside a transaction. This should fails since
-- it contains transaction-terminating commands.
\set ON_ERROR_STOP 0
START TRANSACTION;
CALL recompress_chunk(:'CHUNK_NAME'::regclass);
ROLLBACK;
\set ON_ERROR_STOP 1
CALL recompress_chunk(:'CHUNK_NAME'::regclass);
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

-- Demonstrate that no locks are held on the hypertable, chunk, or the
-- compressed chunk after recompress_chunk has executed.
SELECT pid, locktype, relation, relation::regclass, mode, granted
Expand Down Expand Up @@ -510,21 +508,35 @@ WHERE hypertable_name = 'test2' ORDER BY chunk_name;
(2 rows)

\set ON_ERROR_STOP 0
-- call recompress_chunk when status is not unordered
CALL recompress_chunk(:'CHUNK_NAME'::regclass, true);
psql:include/recompress_basic.sql:115: NOTICE: nothing to recompress in chunk "_hyper_14_62_chunk"
-- call compress_chunk when status is not unordered
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:107: NOTICE: chunk "_hyper_14_62_chunk" is already compressed
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

-- This will succeed and compress the chunk for the test below.
CALL recompress_chunk(:'CHUNK_NAME'::regclass, false);
psql:include/recompress_basic.sql:118: ERROR: nothing to recompress in chunk "_hyper_14_62_chunk"
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:110: NOTICE: chunk "_hyper_14_62_chunk" is already compressed
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

--now decompress it , then try and recompress
SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

CALL recompress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:122: ERROR: call compress_chunk instead of recompress_chunk
SELECT compress_chunk(:'CHUNK_NAME'::regclass);
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_14_62_chunk
(1 row)

\set ON_ERROR_STOP 1
-- test recompress policy
CREATE TABLE metrics(time timestamptz NOT NULL);
Expand Down Expand Up @@ -629,7 +641,7 @@ SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'met

---- nothing to do yet
CALL run_job(:JOB_RECOMPRESS);
psql:include/recompress_basic.sql:194: NOTICE: no chunks for hypertable "public.metrics" that satisfy recompress chunk policy
psql:include/recompress_basic.sql:186: NOTICE: no chunks for hypertable "public.metrics" that satisfy recompress chunk policy
---- status should be 1
SELECT chunk_status FROM compressed_chunk_info_view WHERE hypertable_name = 'metrics';
chunk_status
Expand Down