Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change compress_chunk to recompress partial and unordered chunks when needed #6604

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -4339,6 +4339,13 @@ ts_chunk_is_compressed(const Chunk *chunk)
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}

bool
ts_chunk_needs_recompression(const Chunk *chunk)
{
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_is_partial(chunk) || ts_chunk_is_unordered(chunk);
}

/* Note that only a compressed chunk can have partial flag set */
bool
ts_chunk_is_partial(const Chunk *chunk)
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chun
extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_partial(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_needs_recompression(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_validate_chunk_status_for_operation(const Chunk *chunk,
ChunkOperation cmd,
bool throw_error);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ policy_recompression_execute(int32 job_id, Jsonb *config)
int32 chunkid = lfirst_int(lc);
Chunk *chunk = ts_chunk_get_by_id(chunkid, true);
Assert(chunk);
if (!ts_chunk_is_unordered(chunk) && !ts_chunk_is_partial(chunk))
if (!ts_chunk_needs_recompression(chunk))
continue;

tsl_recompress_chunk_wrapper(chunk);
Expand Down
105 changes: 65 additions & 40 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
Hypertable *compress_ht; /*compressed table for srcht */
} CompressChunkCxt;

static Oid get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk);
static Oid recompress_chunk_segmentwise_impl(Chunk *chunk);

static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
int32 compress_chunk_id, const RelationSize *compress_size,
Expand Down Expand Up @@ -668,25 +671,6 @@
ts_cache_release(hcache);
}

/*
* Set if_not_compressed to true for idempotent operation. Aborts transaction if the chunk is
* already compressed, unless it is running in idempotent mode.
*/

Oid
tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed)
{
if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
return chunk->table_id;
}

return compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

/*
* Create a new compressed chunk using existing table with compressed data.
*
Expand Down Expand Up @@ -770,14 +754,37 @@
tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed);
if (ts_chunk_is_compressed(chunk))
{
if (!ts_chunk_needs_recompression(chunk))
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
PG_RETURN_OID(uncompressed_chunk_id);
}

if (get_compressed_chunk_index_for_recompression(chunk))
{
uncompressed_chunk_id = recompress_chunk_segmentwise_impl(chunk);
}
else
{
decompress_chunk_impl(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}
}
else
{
uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

PG_RETURN_OID(uncompressed_chunk_id);
}
Expand Down Expand Up @@ -831,7 +838,8 @@

Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_relid, true);
Assert(!ts_chunk_is_compressed(chunk));
tsl_compress_chunk_wrapper(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);

return true;
}

Expand Down Expand Up @@ -922,6 +930,20 @@
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);

Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

Oid index_oid = get_compressed_chunk_index_for_recompression(uncompressed_chunk);

if (OidIsValid(index_oid))
{
PG_RETURN_OID(index_oid);
}
else
PG_RETURN_NULL();
}

static Oid
get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk)
{
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ShareLock);
Expand All @@ -937,12 +959,7 @@
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

if (OidIsValid(index_oid))
{
PG_RETURN_OID(index_oid);
}
else
PG_RETURN_NULL();
return index_oid;
}

/*
Expand Down Expand Up @@ -1074,37 +1091,45 @@
tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

int32 status = uncompressed_chunk->fd.status;
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

if (status == CHUNK_STATUS_DEFAULT)
elog(ERROR, "call compress_chunk instead of recompress_chunk");
if (status == CHUNK_STATUS_COMPRESSED)
if (!ts_chunk_needs_recompression(chunk))
{
int elevel = if_not_compressed ? NOTICE : ERROR;
elog(elevel,
"nothing to recompress in chunk %s.%s",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name));
NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name));
}
else
{
uncompressed_chunk_id = recompress_chunk_segmentwise_impl(chunk);
}

PG_RETURN_OID(uncompressed_chunk_id);
}

static Oid
recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_id = uncompressed_chunk->table_id;

/*
* only proceed if status in (3, 9, 11)
* 1: compressed
* 2: compressed_unordered
* 4: frozen
* 8: compressed_partial
*/
if (!(ts_chunk_is_compressed(uncompressed_chunk) &&
(ts_chunk_is_unordered(uncompressed_chunk) || ts_chunk_is_partial(uncompressed_chunk))))
if (!ts_chunk_is_compressed(uncompressed_chunk) &&
ts_chunk_needs_recompression(uncompressed_chunk))

Check warning on line 1129 in tsl/src/compression/api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/api.c#L1129

Added line #L1129 was not covered by tests
elog(ERROR,
"unexpected chunk status %d in chunk %s.%s",
status,
uncompressed_chunk->fd.status,
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name));

Expand Down
1 change: 0 additions & 1 deletion tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_recompress_chunk(PG_FUNCTION_ARGS);
extern Oid tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed);
extern bool tsl_recompress_chunk_wrapper(Chunk *chunk);
extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS);

Expand Down