Skip to content

Commit

Permalink
Change compress_chunk to recompress partial and unordered chunks when…
Browse files Browse the repository at this point in the history
… needed

This patch changes compress_chunk to recompress partial or unordered
chunks so the result of compress_chunk will always be a fully compressed
chunk.
  • Loading branch information
svenklemm committed Feb 5, 2024
1 parent 525b045 commit 4118dca
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 110 deletions.
7 changes: 7 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -4339,6 +4339,13 @@ ts_chunk_is_compressed(const Chunk *chunk)
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}

bool
ts_chunk_needs_recompression(const Chunk *chunk)
{
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_is_partial(chunk) || ts_chunk_is_unordered(chunk);
}

/* Note that only a compressed chunk can have partial flag set */
bool
ts_chunk_is_partial(const Chunk *chunk)
Expand Down
1 change: 1 addition & 0 deletions src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chun
extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_partial(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_needs_recompression(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_validate_chunk_status_for_operation(const Chunk *chunk,
ChunkOperation cmd,
bool throw_error);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ policy_recompression_execute(int32 job_id, Jsonb *config)
int32 chunkid = lfirst_int(lc);
Chunk *chunk = ts_chunk_get_by_id(chunkid, true);
Assert(chunk);
if (!ts_chunk_is_unordered(chunk) && !ts_chunk_is_partial(chunk))
if (!ts_chunk_needs_recompression(chunk))
continue;

tsl_recompress_chunk_wrapper(chunk);
Expand Down
105 changes: 65 additions & 40 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ typedef struct CompressChunkCxt
Hypertable *compress_ht; /*compressed table for srcht */
} CompressChunkCxt;

static Oid get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk);
static Oid recompress_chunk_segmentwise_impl(Chunk *chunk);

static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
int32 compress_chunk_id, const RelationSize *compress_size,
Expand Down Expand Up @@ -668,25 +671,6 @@ decompress_chunk_impl(Chunk *uncompressed_chunk, bool if_compressed)
ts_cache_release(hcache);
}

/*
* Set if_not_compressed to true for idempotent operation. Aborts transaction if the chunk is
* already compressed, unless it is running in idempotent mode.
*/

Oid
tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed)
{
if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
return chunk->table_id;
}

return compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

/*
* Create a new compressed chunk using existing table with compressed data.
*
Expand Down Expand Up @@ -770,14 +754,37 @@ Datum
tsl_compress_chunk(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

uncompressed_chunk_id = tsl_compress_chunk_wrapper(chunk, if_not_compressed);
if (ts_chunk_is_compressed(chunk))
{
if (!ts_chunk_needs_recompression(chunk))
{
ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
PG_RETURN_OID(uncompressed_chunk_id);
}

if (get_compressed_chunk_index_for_recompression(chunk))
{
uncompressed_chunk_id = recompress_chunk_segmentwise_impl(chunk);
}
else
{
decompress_chunk_impl(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}
}
else
{
uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
}

PG_RETURN_OID(uncompressed_chunk_id);
}
Expand Down Expand Up @@ -831,7 +838,8 @@ tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)

Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_relid, true);
Assert(!ts_chunk_is_compressed(chunk));
tsl_compress_chunk_wrapper(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);

return true;
}

Expand Down Expand Up @@ -922,6 +930,20 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);

Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

Oid index_oid = get_compressed_chunk_index_for_recompression(uncompressed_chunk);

if (OidIsValid(index_oid))
{
PG_RETURN_OID(index_oid);
}
else
PG_RETURN_NULL();
}

static Oid
get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk)
{
Chunk *compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ShareLock);
Expand All @@ -937,12 +959,7 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);

if (OidIsValid(index_oid))
{
PG_RETURN_OID(index_oid);
}
else
PG_RETURN_NULL();
return index_oid;
}

/*
Expand Down Expand Up @@ -1074,37 +1091,45 @@ Datum
tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);
TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

int32 status = uncompressed_chunk->fd.status;
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);

if (status == CHUNK_STATUS_DEFAULT)
elog(ERROR, "call compress_chunk instead of recompress_chunk");
if (status == CHUNK_STATUS_COMPRESSED)
if (!ts_chunk_needs_recompression(chunk))
{
int elevel = if_not_compressed ? NOTICE : ERROR;
elog(elevel,
"nothing to recompress in chunk %s.%s",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name));
NameStr(chunk->fd.schema_name),
NameStr(chunk->fd.table_name));
}
else
{
uncompressed_chunk_id = recompress_chunk_segmentwise_impl(chunk);
}

PG_RETURN_OID(uncompressed_chunk_id);
}

static Oid
recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_id = uncompressed_chunk->table_id;

/*
* only proceed if status in (3, 9, 11)
* 1: compressed
* 2: compressed_unordered
* 4: frozen
* 8: compressed_partial
*/
if (!(ts_chunk_is_compressed(uncompressed_chunk) &&
(ts_chunk_is_unordered(uncompressed_chunk) || ts_chunk_is_partial(uncompressed_chunk))))
if (!ts_chunk_is_compressed(uncompressed_chunk) &&
ts_chunk_needs_recompression(uncompressed_chunk))
elog(ERROR,
"unexpected chunk status %d in chunk %s.%s",
status,
uncompressed_chunk->fd.status,
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name));

Expand Down
1 change: 0 additions & 1 deletion tsl/src/compression/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ extern Datum tsl_create_compressed_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_recompress_chunk(PG_FUNCTION_ARGS);
extern Oid tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed);
extern bool tsl_recompress_chunk_wrapper(Chunk *chunk);
extern Datum tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS);

Expand Down
Loading

0 comments on commit 4118dca

Please sign in to comment.