Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory locality in batch sorted merge #6517

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
296 changes: 228 additions & 68 deletions tsl/src/nodes/decompress_chunk/batch_queue_heap.c

Large diffs are not rendered by default.

28 changes: 12 additions & 16 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -533,20 +533,16 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext,
* The entire batch doesn't pass the vectorized quals, so we might be
* able to avoid reading and decompressing other columns. Scroll it to
* the end.
* Note that this optimization can't work with "batch sorted merge",
* because the latter always has to read the first row of the batch for
* its sorting needs, so it always has to read and decompress all
* columns. This can be improved by only decompressing the columns
* needed for sorting.
*/
batch_state->next_batch_row = batch_state->total_batch_rows;

InstrCountTuples2(dcontext->ps, 1);
InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows);

/*
* Note that this optimization can't work with "batch sorted merge",
* because the latter always has to read the first row of the batch for
* its sorting needs, so it always has to read and decompress all
* columns. This is not a problem at the moment, because for batch
* sorted merge we disable bulk decompression entirely, at planning time.
*/
Assert(!dcontext->batch_sorted_merge);
}
else
{
Expand Down Expand Up @@ -787,12 +783,10 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
Assert(TupIsNull(batch_state->decompressed_scan_slot));

/*
* We might not have decompressed some columns if the vector quals didn't
* pass for the entire batch. Have to decompress them anyway if we're asked
* Check that we have decompressed all columns even if the vector quals
* didn't pass for the entire batch. We need them because we're asked
* to save the first tuple. This doesn't actually happen yet, because the
* vectorized decompression is disabled with sorted merge, but we might want
* to enable it for some queries. For now, just assert that it doesn't
* happen.
* vectorized decompression is disabled with sorted merge.
*/
#ifdef USE_ASSERT_CHECKING
const int num_compressed_columns = dcontext->num_compressed_columns;
Expand All @@ -804,15 +798,17 @@ compressed_batch_save_first_tuple(DecompressContext *dcontext, DecompressBatchSt
#endif

/* Make the first tuple and save it. */
make_next_tuple(batch_state, dcontext->reverse, dcontext->num_compressed_columns);
Assert(batch_state->next_batch_row == 0);
const uint16 arrow_row = dcontext->reverse ? batch_state->total_batch_rows - 1 : 0;
make_next_tuple(batch_state, arrow_row, dcontext->num_compressed_columns);
ExecCopySlot(first_tuple_slot, batch_state->decompressed_scan_slot);

/*
* Check the quals and advance, so that the batch is in the correct state
* for the subsequent calls (matching tuple is in decompressed scan slot).
*/
const bool qual_passed =
vector_qual(batch_state, dcontext->reverse) && postgres_qual(dcontext, batch_state);
vector_qual(batch_state, arrow_row) && postgres_qual(dcontext, batch_state);
batch_state->next_batch_row++;

if (!qual_passed)
Expand Down
4 changes: 2 additions & 2 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,9 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
/* Values array, with 64 element padding (actually we have less). */
batch_memory_context_bytes +=
(GLOBAL_MAX_ROWS_PER_COMPRESSION + 64) * column->value_bytes;
/* Also nulls bitmap. */
/* Nulls bitmap, one uint64 per 64 rows. */
batch_memory_context_bytes +=
GLOBAL_MAX_ROWS_PER_COMPRESSION / (64 * sizeof(uint64));
((GLOBAL_MAX_ROWS_PER_COMPRESSION + 63) / 64) * sizeof(uint64);
/* Arrow data structure. */
batch_memory_context_bytes += sizeof(ArrowArray) + sizeof(void *) * 2 /* buffers */;
/* Memory context header overhead for the above parts. */
Expand Down
43 changes: 43 additions & 0 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/nodes.h>
#include <optimizer/cost.h>
#include <optimizer/optimizer.h>
#include <optimizer/paths.h>
#include <optimizer/plancat.h>
Expand Down Expand Up @@ -604,6 +605,46 @@ find_vectorized_quals(DecompressChunkPath *path, List *qual_list, List **vectori
}
}

/*
* Copy of the Postgres' static function from createplan.c.
*
* Some places in this file build Sort nodes that don't have a directly
* corresponding Path node. The cost of the sort is, or should have been,
* included in the cost of the Path node we're working from, but since it's
* not split out, we have to re-figure it using cost_sort(). This is just
* to label the Sort node nicely for EXPLAIN.
*
* limit_tuples is as for cost_sort (in particular, pass -1 if no limit)
*/
static void
ts_label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
{
Plan *lefttree = plan->plan.lefttree;
Path sort_path; /* dummy for result of cost_sort */

/*
* This function shouldn't have to deal with IncrementalSort plans because
* they are only created from corresponding Path nodes.
*/
Assert(IsA(plan, Sort));

cost_sort(&sort_path,
root,
NIL,
lefttree->total_cost,
lefttree->plan_rows,
lefttree->plan_width,
0.0,
work_mem,
limit_tuples);
plan->plan.startup_cost = sort_path.startup_cost;
plan->plan.total_cost = sort_path.total_cost;
plan->plan.plan_rows = lefttree->plan_rows;
plan->plan.plan_width = lefttree->plan_width;
plan->plan.parallel_aware = false;
plan->plan.parallel_safe = lefttree->parallel_safe;
}

Plan *
decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path,
List *decompressed_tlist, List *clauses, List *custom_plans)
Expand Down Expand Up @@ -898,6 +939,8 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
collations,
nullsFirst);

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ 0);

decompress_plan->custom_plans = list_make1(sort);
}
else
Expand Down
4 changes: 2 additions & 2 deletions tsl/test/expected/compression_sorted_merge-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, time, x2 FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk."time", _hyper_1_1_chunk.x2
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand All @@ -858,7 +858,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, x2, time FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk.x2, _hyper_1_1_chunk."time"
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand Down
4 changes: 2 additions & 2 deletions tsl/test/expected/compression_sorted_merge-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, time, x2 FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk."time", _hyper_1_1_chunk.x2
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand All @@ -858,7 +858,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, x2, time FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk.x2, _hyper_1_1_chunk."time"
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand Down
4 changes: 2 additions & 2 deletions tsl/test/expected/compression_sorted_merge-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, time, x2 FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk."time", _hyper_1_1_chunk.x2
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand All @@ -858,7 +858,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, x2, time FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk.x2, _hyper_1_1_chunk."time"
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand Down
4 changes: 2 additions & 2 deletions tsl/test/expected/compression_sorted_merge-16.out
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, time, x2 FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk."time", _hyper_1_1_chunk.x2
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand All @@ -858,7 +858,7 @@ EXPLAIN (verbose) SELECT 1 as one, 2 as two, 3 as three, x2, time FROM test1 ORD
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_1_1_chunk (cost=1.06..63.44 rows=3000 width=12)
Output: _hyper_1_1_chunk.x2, _hyper_1_1_chunk."time"
Batch Sorted Merge: true
-> Sort (cost=0.00..0.00 rows=0 width=0)
-> Sort (cost=1.05..1.06 rows=3 width=56)
Output: compress_hyper_2_2_chunk."time", compress_hyper_2_2_chunk.x1, compress_hyper_2_2_chunk.x2, compress_hyper_2_2_chunk.x3, compress_hyper_2_2_chunk.x4, compress_hyper_2_2_chunk.x5, compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_sequence_num, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk._ts_meta_min_2, compress_hyper_2_2_chunk._ts_meta_max_2, compress_hyper_2_2_chunk._ts_meta_min_3, compress_hyper_2_2_chunk._ts_meta_max_3
Sort Key: compress_hyper_2_2_chunk._ts_meta_max_1 DESC
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (cost=0.00..1.03 rows=3 width=56)
Expand Down