Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report progress in compression related APIs #6589

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 28 additions & 2 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,20 @@
compresschunkcxt_init(&cxt, hcache, hypertable_relid, chunk_relid);

/* acquire locks on src and compress hypertable and src chunk */
ereport(LOG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages about locks might be too verbose for LOG. Maybe DEBUG1? Or INFO under GUC for interactive messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akuzm locking is another area which is suspect during chunk_compression. I am inclined to leave it as a LOG for now and if we see any adverse reports about chatty logs then we can take a look again.

(errmsg("acquiring locks for compressing \"%s.%s\"",
get_namespace_name(get_rel_namespace(chunk_relid)),
get_rel_name(chunk_relid))));
LockRelationOid(cxt.srcht->main_table_relid, AccessShareLock);
LockRelationOid(cxt.compress_ht->main_table_relid, AccessShareLock);
LockRelationOid(cxt.srcht_chunk->table_id, ExclusiveLock);

/* acquire locks on catalog tables to keep till end of txn */
LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock);
ereport(LOG,
(errmsg("locks acquired for compressing \"%s.%s\"",
get_namespace_name(get_rel_namespace(chunk_relid)),
get_rel_name(chunk_relid))));

DEBUG_WAITPOINT("compress_chunk_impl_start");

Expand All @@ -487,12 +495,20 @@
/* create compressed chunk and a new table */
compress_ht_chunk = create_compress_chunk(cxt.compress_ht, cxt.srcht_chunk, InvalidOid);
new_compressed_chunk = true;
ereport(LOG,
(errmsg("new compressed chunk \"%s.%s\" created",
NameStr(compress_ht_chunk->fd.schema_name),
NameStr(compress_ht_chunk->fd.table_name))));
}
else
{
/* use an existing compressed chunk to compress into */
compress_ht_chunk = ts_chunk_get_by_id(mergable_chunk->fd.compressed_chunk_id, true);
result_chunk_id = mergable_chunk->table_id;
ereport(LOG,
(errmsg("merge into existing compressed chunk \"%s.%s\"",
NameStr(compress_ht_chunk->fd.schema_name),
NameStr(compress_ht_chunk->fd.table_name))));
}

/* Since the compressed relation is created in the same transaction as the tuples that will be
Expand Down Expand Up @@ -610,6 +626,10 @@
ts_chunk_validate_chunk_status_for_operation(uncompressed_chunk, CHUNK_DECOMPRESS, true);
compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);

ereport(LOG,
(errmsg("acquiring locks for decompressing \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));
/* acquire locks on src and compress hypertable and src chunk */
LockRelationOid(uncompressed_hypertable->main_table_relid, AccessShareLock);
LockRelationOid(compressed_hypertable->main_table_relid, AccessShareLock);
Expand All @@ -633,6 +653,10 @@

/* acquire locks on catalog tables to keep till end of txn */
LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock);
ereport(LOG,
(errmsg("locks acquired for decompressing \"%s.%s\"",
NameStr(uncompressed_chunk->fd.schema_name),
NameStr(uncompressed_chunk->fd.table_name))));

DEBUG_WAITPOINT("decompress_chunk_impl_start");

Expand Down Expand Up @@ -852,7 +876,8 @@
row_compressor_reset(row_compressor);
row_compressor_append_sorted_rows(row_compressor,
tuplesortstate,
RelationGetDescr(compressed_chunk_rel));
RelationGetDescr(compressed_chunk_rel),
compressed_chunk_rel);
tuplesort_end(tuplesortstate);
CommandCounterIncrement();
}
Expand Down Expand Up @@ -1126,7 +1151,7 @@
* 8: compressed_partial
*/
if (!ts_chunk_is_compressed(uncompressed_chunk) &&
ts_chunk_needs_recompression(uncompressed_chunk))

Check warning on line 1154 in tsl/src/compression/api.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/compression/api.c#L1154

Added line #L1154 was not covered by tests
elog(ERROR,
"unexpected chunk status %d in chunk %s.%s",
uncompressed_chunk->fd.status,
Expand Down Expand Up @@ -1384,7 +1409,8 @@
row_compressor_reset(&row_compressor);
row_compressor_append_sorted_rows(&row_compressor,
segment_tuplesortstate,
RelationGetDescr(uncompressed_chunk_rel));
RelationGetDescr(uncompressed_chunk_rel),
uncompressed_chunk_rel);
tuplesort_end(segment_tuplesortstate);

/* make changes visible */
Expand Down
84 changes: 82 additions & 2 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,31 @@ truncate_relation(Oid table_oid)
table_close(rel, NoLock);
}

/*
* Use reltuples as an estimate for the number of rows that will get compressed. This value
* might be way off the mark in case analyze hasn't happened in quite a while on this input
* chunk. But that's the best guesstimate to start off with.
*
* We will report progress for every 10% of reltuples compressed. If rel or reltuples is not valid
* or it's just too low then we just assume reporting every 100K tuples for now.
*/
#define RELTUPLES_REPORT_DEFAULT 100000
static int64
calculate_reltuples_to_report(Relation rel)
{
int64 report_reltuples = RELTUPLES_REPORT_DEFAULT;

if (rel != NULL && rel->rd_rel->reltuples > 0)
{
report_reltuples = (int64) (0.1 * rel->rd_rel->reltuples);
/* either analyze has not been done or table doesn't have a lot of rows */
if (report_reltuples < RELTUPLES_REPORT_DEFAULT)
report_reltuples = RELTUPLES_REPORT_DEFAULT;
}

return report_reltuples;
}

CompressionStats
compress_chunk(Oid in_table, Oid out_table, int insert_options)
{
Expand All @@ -241,6 +266,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
Form_pg_attribute in_table_attr_tp, index_attr_tp;
CompressionStats cstat;
CompressionSettings *settings = ts_compression_settings_get(out_table);
int64 report_reltuples;

/* We want to prevent other compressors from compressing this table,
* and we want to prevent INSERTs or UPDATEs which could mess up our compression.
Expand Down Expand Up @@ -415,37 +441,66 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)

if (matched_index_rel != NULL)
{
int64 nrows_processed = 0;

/*
* even though we log the information below, this debug info
* is still used for INFO messages to clients and our tests.
*/
if (ts_guc_debug_compression_path_info)
{
elog(INFO,
"compress_chunk_indexscan_start matched index \"%s\"",
get_rel_name(matched_index_rel->rd_id));
}

elog(LOG,
"using index \"%s\" to scan rows for compression",
get_rel_name(matched_index_rel->rd_id));

index_scan = index_beginscan(in_rel, matched_index_rel, GetTransactionSnapshot(), 0, 0);
slot = table_slot_create(in_rel, NULL);
index_rescan(index_scan, NULL, 0, NULL, 0);
report_reltuples = calculate_reltuples_to_report(in_rel);
while (index_getnext_slot(index_scan, indexscan_direction, slot))
{
row_compressor_process_ordered_slot(&row_compressor, slot, mycid);
if ((++nrows_processed % report_reltuples) == 0)
elog(LOG,
"compressed " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));
}

if (row_compressor.rows_compressed_into_current_value > 0)
row_compressor_flush(&row_compressor, mycid, true);

elog(LOG,
"finished compressing " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));

ExecDropSingleTupleTableSlot(slot);
index_endscan(index_scan);
index_close(matched_index_rel, AccessShareLock);
}
else
{
/*
* even though we log the information below, this debug info
* is still used for INFO messages to clients and our tests.
*/
if (ts_guc_debug_compression_path_info)
{
elog(INFO, "compress_chunk_tuplesort_start");
}

elog(LOG,
"using tuplesort to scan rows from \"%s\" for compression",
RelationGetRelationName(in_rel));

Tuplesortstate *sorted_rel = compress_chunk_sort_relation(settings, in_rel);
row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc);
row_compressor_append_sorted_rows(&row_compressor, sorted_rel, in_desc, in_rel);
tuplesort_end(sorted_rel);
}

Expand Down Expand Up @@ -930,11 +985,15 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor

void
row_compressor_append_sorted_rows(RowCompressor *row_compressor, Tuplesortstate *sorted_rel,
TupleDesc sorted_desc)
TupleDesc sorted_desc, Relation in_rel)
{
CommandId mycid = GetCurrentCommandId(true);
TupleTableSlot *slot = MakeTupleTableSlot(sorted_desc, &TTSOpsMinimalTuple);
bool got_tuple;
int64 nrows_processed = 0;
int64 report_reltuples;

report_reltuples = calculate_reltuples_to_report(in_rel);

for (got_tuple = tuplesort_gettupleslot(sorted_rel,
true /*=forward*/,
Expand All @@ -949,10 +1008,19 @@ row_compressor_append_sorted_rows(RowCompressor *row_compressor, Tuplesortstate
NULL /*=abbrev*/))
{
row_compressor_process_ordered_slot(row_compressor, slot, mycid);
if ((++nrows_processed % report_reltuples) == 0)
elog(LOG,
"compressed " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));
}

if (row_compressor->rows_compressed_into_current_value > 0)
row_compressor_flush(row_compressor, mycid, true);
elog(LOG,
"finished compressing " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));

ExecDropSingleTupleTableSlot(slot);
}
Expand Down Expand Up @@ -1402,10 +1470,12 @@ decompress_chunk(Oid in_table, Oid out_table)
*/
Relation out_rel = table_open(out_table, AccessExclusiveLock);
Relation in_rel = table_open(in_table, ExclusiveLock);
int64 nrows_processed = 0;

RowDecompressor decompressor = build_decompressor(in_rel, out_rel);
TupleTableSlot *slot = table_slot_create(in_rel, NULL);
TableScanDesc scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
int64 report_reltuples = calculate_reltuples_to_report(in_rel);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand All @@ -1421,8 +1491,18 @@ decompress_chunk(Oid in_table, Oid out_table)
heap_freetuple(tuple);

row_decompressor_decompress_row_to_table(&decompressor);

if ((++nrows_processed % report_reltuples) == 0)
elog(LOG,
"decompressed " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));
}

elog(LOG,
"finished decompressing " INT64_FORMAT " rows from \"%s\"",
nrows_processed,
RelationGetRelationName(in_rel));
table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
row_decompressor_close(&decompressor);
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ extern void row_compressor_init(CompressionSettings *settings, RowCompressor *ro
extern void row_compressor_reset(RowCompressor *row_compressor);
extern void row_compressor_close(RowCompressor *row_compressor);
extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Tuplesortstate *sorted_rel, TupleDesc sorted_desc);
Tuplesortstate *sorted_rel, TupleDesc sorted_desc,
Relation in_rel);
extern Oid get_compressed_chunk_index(ResultRelInfo *resultRelInfo, CompressionSettings *settings);

extern void segment_info_update(SegmentInfo *segment_info, Datum val, bool is_null);
Expand Down
5 changes: 5 additions & 0 deletions tsl/test/expected/compression_bgw-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ SELECT alter_job(id,config:=jsonb_set(config,'{verbose_log}', 'true'))
set client_min_messages TO LOG;
CALL run_job(:job_id);
LOG: statement: CALL run_job(1004);
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_13_61_chunk" created
LOG: using index "_hyper_11_40_chunk_conditions_time_idx" to scan rows for compression
LOG: finished compressing 144 rows from "_hyper_11_40_chunk"
LOG: job 1004 completed processing chunk _timescaledb_internal._hyper_11_40_chunk
set client_min_messages TO NOTICE;
LOG: statement: set client_min_messages TO NOTICE;
Expand Down
5 changes: 5 additions & 0 deletions tsl/test/expected/compression_bgw-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ SELECT alter_job(id,config:=jsonb_set(config,'{verbose_log}', 'true'))
set client_min_messages TO LOG;
CALL run_job(:job_id);
LOG: statement: CALL run_job(1004);
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_13_61_chunk" created
LOG: using index "_hyper_11_40_chunk_conditions_time_idx" to scan rows for compression
LOG: finished compressing 144 rows from "_hyper_11_40_chunk"
LOG: job 1004 completed processing chunk _timescaledb_internal._hyper_11_40_chunk
set client_min_messages TO NOTICE;
LOG: statement: set client_min_messages TO NOTICE;
Expand Down
5 changes: 5 additions & 0 deletions tsl/test/expected/compression_bgw-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ SELECT alter_job(id,config:=jsonb_set(config,'{verbose_log}', 'true'))
set client_min_messages TO LOG;
CALL run_job(:job_id);
LOG: statement: CALL run_job(1004);
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_13_61_chunk" created
LOG: using index "_hyper_11_40_chunk_conditions_time_idx" to scan rows for compression
LOG: finished compressing 144 rows from "_hyper_11_40_chunk"
LOG: job 1004 completed processing chunk _timescaledb_internal._hyper_11_40_chunk
set client_min_messages TO NOTICE;
LOG: statement: set client_min_messages TO NOTICE;
Expand Down
5 changes: 5 additions & 0 deletions tsl/test/expected/compression_bgw-16.out
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ SELECT alter_job(id,config:=jsonb_set(config,'{verbose_log}', 'true'))
set client_min_messages TO LOG;
CALL run_job(:job_id);
LOG: statement: CALL run_job(1004);
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_11_40_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_13_61_chunk" created
LOG: using index "_hyper_11_40_chunk_conditions_time_idx" to scan rows for compression
LOG: finished compressing 144 rows from "_hyper_11_40_chunk"
LOG: job 1004 completed processing chunk _timescaledb_internal._hyper_11_40_chunk
set client_min_messages TO NOTICE;
LOG: statement: set client_min_messages TO NOTICE;
Expand Down
48 changes: 48 additions & 0 deletions tsl/test/expected/compression_indexscan.out
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,54 @@ select count(*) from tab1;
62400
(1 row)

-- Check compression progression messages
SET client_min_messages TO LOG;
select compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
LOG: statement: select compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_1_1_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_1_1_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_3_133_chunk" created
INFO: compress_chunk_tuplesort_start
LOG: using tuplesort to scan rows from "_hyper_1_1_chunk" for compression
LOG: finished compressing 13500 rows from "_hyper_1_1_chunk"
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)

RESET client_min_messages;
LOG: statement: RESET client_min_messages;
-- Analyze and use stats now on a larger data set
INSERT INTO tab1
SELECT
time + (INTERVAL '1 minute' * random()) AS time,
id,
random() AS c1,
random()* 100 AS c2
FROM
generate_series('2018-03-02 1:00'::TIMESTAMPTZ, '2018-03-28 1:00', '5 minutes') AS g1(time),
generate_series(1, 100, 1 ) AS g2(id)
ORDER BY
time;
ANALYZE _timescaledb_internal._hyper_1_2_chunk;
SET client_min_messages TO LOG;
select compress_chunk('_timescaledb_internal._hyper_1_2_chunk');
LOG: statement: select compress_chunk('_timescaledb_internal._hyper_1_2_chunk');
LOG: acquiring locks for compressing "_timescaledb_internal._hyper_1_2_chunk"
LOG: locks acquired for compressing "_timescaledb_internal._hyper_1_2_chunk"
LOG: new compressed chunk "_timescaledb_internal.compress_hyper_3_134_chunk" created
INFO: compress_chunk_tuplesort_start
LOG: using tuplesort to scan rows from "_hyper_1_2_chunk" for compression
LOG: compressed 100000 rows from "_hyper_1_2_chunk"
LOG: compressed 200000 rows from "_hyper_1_2_chunk"
LOG: finished compressing 218400 rows from "_hyper_1_2_chunk"
compress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_2_chunk
(1 row)

RESET client_min_messages;
LOG: statement: RESET client_min_messages;
drop index predicate;
--Tear down
DROP TABLE tab1;
Expand Down