Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved parallel DecompressChunk worker selection #5870

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/import/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ set_tablesample_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *
}

/* copied from allpaths.c */
void
static void
ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{
int parallel_workers;
Expand Down
1 change: 0 additions & 1 deletion src/import/allpaths.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include "export.h"

extern TSDLLEXPORT void ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel);
extern void ts_set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
extern void ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte);
Expand Down
4 changes: 2 additions & 2 deletions test/sql/updates/post.compression.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

SELECT * FROM compress ORDER BY time DESC, small_cardinality;
SELECT * FROM compress ORDER BY time DESC, small_cardinality, large_cardinality, some_double, some_int, some_custom, some_bool;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test output was not deterministic so far.


INSERT INTO compress
SELECT g, 'QW', g::text, 2, 0, (100,4)::custom_type_for_compression, false
Expand All @@ -17,7 +17,7 @@ WHERE
hypertable.table_name = 'compress'
AND chunk.compressed_chunk_id IS NULL;

SELECT * FROM compress ORDER BY time DESC, small_cardinality;
SELECT * FROM compress ORDER BY time DESC, small_cardinality, large_cardinality, some_double, some_int, some_custom, some_bool;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORDER BY compress would order by all columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to use the same ordering as the existing up/downgrade tests to generate the same output. Otherwise, the tests will fail. Therefore, we need an ordering of time DESC and large_cardinality ASC. I think that is not possible with the proposed change.


\x on
WITH hypertables AS (
Expand Down
21 changes: 20 additions & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,26 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp

/* create parallel scan path */
if (compressed_rel->consider_parallel)
ts_create_plain_partial_paths(root, compressed_rel);
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the only caller for ts_create_plain_partial_paths we should either move the code into that function or get rid of the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is still called in set_plain_rel_pathlist(). However, it is used only in non-TSL code. So, I removed the export of the function.

/* Almost the same functionality as ts_create_plain_partial_paths.
*
* However, we also create a partial path for small chunks to allow PostgreSQL to choose a
* parallel plan for decompression. If no partial path is present for a single chunk,
* PostgreSQL will not use a parallel plan and all chunks are decompressed by a non-parallel
* plan (even if there are a few bigger chunks).
*/
int parallel_workers = compute_parallel_worker(compressed_rel,
compressed_rel->pages,
-1,
max_parallel_workers_per_gather);

/* Use at least one worker */
parallel_workers = Max(parallel_workers, 1);

/* Add an unordered partial path based on a parallel sequential scan. */
add_partial_path(compressed_rel,
create_seqscan_path(root, compressed_rel, NULL, parallel_workers));
}

/*
* We set enable_bitmapscan to false here to ensure any pathes with bitmapscan do not
Expand Down
49 changes: 49 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -1943,3 +1943,52 @@ SELECT count(*) FROM :chunk_name;
1000
(1 row)

-- Test that parallel plans are chosen even if partial and small chunks are involved
RESET min_parallel_index_scan_size;
RESET min_parallel_table_scan_size;
CREATE TABLE ht_metrics_partially_compressed(time timestamptz, device int, value float);
SELECT create_hypertable('ht_metrics_partially_compressed','time',create_default_indexes:=false);
NOTICE: adding not-null constraint to column "time"
create_hypertable
-----------------------------------------------
(41,public,ht_metrics_partially_compressed,t)
(1 row)

ALTER TABLE ht_metrics_partially_compressed SET (timescaledb.compress, timescaledb.compress_segmentby='device');
INSERT INTO ht_metrics_partially_compressed
SELECT time, device, device * 0.1 FROM
generate_series('2020-01-01'::timestamptz,'2020-01-02'::timestamptz, INTERVAL '1 m') g(time),
LATERAL (SELECT generate_series(1,2) AS device) g2;
SELECT compress_chunk(c) FROM show_chunks('ht_metrics_partially_compressed') c;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_41_75_chunk
_timescaledb_internal._hyper_41_76_chunk
(2 rows)

INSERT INTO ht_metrics_partially_compressed VALUES ('2020-01-01'::timestamptz, 1, 0.1);
:explain
SELECT * FROM ht_metrics_partially_compressed ORDER BY time DESC, device LIMIT 1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: _hyper_41_75_chunk."time", _hyper_41_75_chunk.device, _hyper_41_75_chunk.value
-> Gather Merge
Output: _hyper_41_75_chunk."time", _hyper_41_75_chunk.device, _hyper_41_75_chunk.value
Workers Planned: 2
-> Sort
Output: _hyper_41_75_chunk."time", _hyper_41_75_chunk.device, _hyper_41_75_chunk.value
Sort Key: _hyper_41_75_chunk."time" DESC, _hyper_41_75_chunk.device
-> Parallel Append
-> Parallel Seq Scan on _timescaledb_internal._hyper_41_75_chunk
Output: _hyper_41_75_chunk."time", _hyper_41_75_chunk.device, _hyper_41_75_chunk.value
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_41_76_chunk
Output: _hyper_41_76_chunk."time", _hyper_41_76_chunk.device, _hyper_41_76_chunk.value
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_42_78_chunk
Output: compress_hyper_42_78_chunk."time", compress_hyper_42_78_chunk.device, compress_hyper_42_78_chunk.value, compress_hyper_42_78_chunk._ts_meta_count, compress_hyper_42_78_chunk._ts_meta_sequence_num, compress_hyper_42_78_chunk._ts_meta_min_1, compress_hyper_42_78_chunk._ts_meta_max_1
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_41_75_chunk
Output: _hyper_41_75_chunk."time", _hyper_41_75_chunk.device, _hyper_41_75_chunk.value
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_42_77_chunk
Output: compress_hyper_42_77_chunk."time", compress_hyper_42_77_chunk.device, compress_hyper_42_77_chunk.value, compress_hyper_42_77_chunk._ts_meta_count, compress_hyper_42_77_chunk._ts_meta_sequence_num, compress_hyper_42_77_chunk._ts_meta_min_1, compress_hyper_42_77_chunk._ts_meta_max_1
(19 rows)

21 changes: 21 additions & 0 deletions tsl/test/sql/compression.sql
Original file line number Diff line number Diff line change
Expand Up @@ -888,3 +888,24 @@ SELECT count(*) FROM :chunk_name;
ANALYZE :chunk_name;

SELECT count(*) FROM :chunk_name;


-- Test that parallel plans are chosen even if partial and small chunks are involved
RESET min_parallel_index_scan_size;
RESET min_parallel_table_scan_size;

CREATE TABLE ht_metrics_partially_compressed(time timestamptz, device int, value float);
SELECT create_hypertable('ht_metrics_partially_compressed','time',create_default_indexes:=false);
ALTER TABLE ht_metrics_partially_compressed SET (timescaledb.compress, timescaledb.compress_segmentby='device');

INSERT INTO ht_metrics_partially_compressed
SELECT time, device, device * 0.1 FROM
generate_series('2020-01-01'::timestamptz,'2020-01-02'::timestamptz, INTERVAL '1 m') g(time),
LATERAL (SELECT generate_series(1,2) AS device) g2;

SELECT compress_chunk(c) FROM show_chunks('ht_metrics_partially_compressed') c;

INSERT INTO ht_metrics_partially_compressed VALUES ('2020-01-01'::timestamptz, 1, 0.1);

:explain
SELECT * FROM ht_metrics_partially_compressed ORDER BY time DESC, device LIMIT 1;