Skip to content

Commit

Permalink
size fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Jan 26, 2024
1 parent ce2b7cd commit afcbe74
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 58 deletions.
125 changes: 71 additions & 54 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static CustomPathMethods decompress_chunk_path_methods = {

typedef struct SortInfo
{
List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool can_pushdown_sort; /* sort can be pushed below DecompressChunk */
bool reverse;
Expand Down Expand Up @@ -153,7 +153,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
{
Var *var;
int varattno;
List *compressed_pathkeys = NIL;
List *required_compressed_pathkeys = NIL;
PathKey *pk;

/*
Expand Down Expand Up @@ -198,7 +198,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
*/
Assert(compressed_em != NULL);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);

segmentby_columns =
bms_add_member(segmentby_columns, castNode(Var, compressed_em->em_expr)->varattno);
Expand All @@ -210,7 +210,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu
* asserting here.
*/
Assert(bms_num_members(segmentby_columns) == info->num_segmentby_columns ||
list_length(compressed_pathkeys) == list_length(chunk_pathkeys));
list_length(required_compressed_pathkeys) == list_length(chunk_pathkeys));
}

/*
Expand Down Expand Up @@ -251,9 +251,9 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu

pk = make_canonical_pathkey(root, ec, opfamily, strategy, nulls_first);

compressed_pathkeys = lappend(compressed_pathkeys, pk);
required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk);
}
sort_info->compressed_pathkeys = compressed_pathkeys;
sort_info->required_compressed_pathkeys = required_compressed_pathkeys;
}

static DecompressChunkPath *
Expand Down Expand Up @@ -323,14 +323,22 @@ build_compressioninfo(PlannerInfo *root, Hypertable *ht, RelOptInfo *chunk_rel)
static void
cost_decompress_chunk(PlannerInfo *root, Path *path, Path *compressed_path)
{
/* Set the row number estimate. */
if (path->param_info != NULL)
{
path->rows = path->param_info->ppi_rows;
}
else
{
path->rows = path->parent->rows;
}

/* startup_cost is cost before fetching first tuple */
if (compressed_path->rows > 0)
path->startup_cost = compressed_path->total_cost / compressed_path->rows;

/* total_cost is cost for fetching all tuples */
path->total_cost = compressed_path->total_cost + path->rows * cpu_tuple_cost;
path->rows = compressed_path->rows * DECOMPRESS_CHUNK_BATCH_SIZE *
clauselist_selectivity(root, path->parent->baserestrictinfo, 0, JOIN_INNER, NULL);
}

/* Smoothstep function S1 (the h01 cubic Hermite spline). */
Expand Down Expand Up @@ -361,7 +369,7 @@ cost_batch_sorted_merge(PlannerInfo *root, CompressionInfo *compression_info,
enable_sort = true;
cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
dcpath->required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
Expand Down Expand Up @@ -691,7 +699,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
{
RelOptInfo *compressed_rel;
ListCell *lc;
double new_row_estimate;
Index ht_relid = 0;

CompressionInfo *compression_info = build_compressioninfo(root, ht, chunk_rel);
Expand Down Expand Up @@ -725,8 +732,12 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
chunk_rel,
compressed_rel,
ts_chunk_is_partial(chunk));

set_baserel_size_estimates(root, compressed_rel);
new_row_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_tuples_estimate = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
const double new_rows_estimate =
new_tuples_estimate *
clauselist_selectivity(root, chunk_rel->baserestrictinfo, 0, JOIN_INNER, NULL);

if (!compression_info->single_chunk)
{
Expand All @@ -735,10 +746,19 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
Assert(chunk_info->parent_reloid == ht->main_table_relid);
ht_relid = chunk_info->parent_relid;
RelOptInfo *hypertable_rel = root->simple_rel_array[ht_relid];
hypertable_rel->rows += (new_row_estimate - chunk_rel->rows);
hypertable_rel->rows =
clamp_row_est(hypertable_rel->rows + new_rows_estimate - chunk_rel->rows);
hypertable_rel->tuples =
clamp_row_est(hypertable_rel->tuples + new_tuples_estimate - chunk_rel->tuples);
}

chunk_rel->rows = new_row_estimate;
/*
* Note that we can be overwriting the estimate for uncompressed chunk part of a
* partial chunk here, but the paths for the uncompressed part were already
* built, so it is OK.
*/
chunk_rel->tuples = new_tuples_estimate;
chunk_rel->rows = new_rows_estimate;

create_compressed_scan_paths(root, compressed_rel, compression_info, &sort_info);

Expand All @@ -751,7 +771,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
foreach (lc, compressed_rel->pathlist)
{
Path *compressed_path = lfirst(lc);
Path *path;

/*
* We skip any BitmapScan parameterized paths here as supporting
Expand Down Expand Up @@ -831,7 +850,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
continue;
}

path = (Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);
Path *chunk_path =
(Path *) decompress_chunk_path_create(root, compression_info, 0, compressed_path);

/*
* Create a path for the batch sorted merge optimization. This optimization performs a
Expand All @@ -847,7 +867,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
MergeBatchResult merge_result = can_batch_sorted_merge(root, compression_info, chunk);
if (merge_result != MERGE_NOT_POSSIBLE)
{
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) path);
batch_merge_path = copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);

batch_merge_path->reverse = (merge_result != SCAN_FORWARD);
batch_merge_path->batch_sorted_merge = true;
Expand All @@ -874,42 +894,38 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* between the decompression node and the scan during plan creation */
if (sort_info.can_pushdown_sort)
{
DecompressChunkPath *dcpath = copy_decompress_chunk_path((DecompressChunkPath *) path);
dcpath->reverse = sort_info.reverse;
dcpath->needs_sequence_num = sort_info.needs_sequence_num;
dcpath->compressed_pathkeys = sort_info.compressed_pathkeys;
dcpath->custom_path.path.pathkeys = root->query_pathkeys;
DecompressChunkPath *path_copy =
copy_decompress_chunk_path((DecompressChunkPath *) chunk_path);
path_copy->reverse = sort_info.reverse;
path_copy->needs_sequence_num = sort_info.needs_sequence_num;
path_copy->required_compressed_pathkeys = sort_info.required_compressed_pathkeys;
path_copy->custom_path.path.pathkeys = root->query_pathkeys;

/*
* Add costing for a sort. The standard Postgres pattern is to add the cost during
* path creation, but not add the sort path itself, that's done during plan
* creation. Examples of this in: create_merge_append_path &
* create_merge_append_plan
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(sort_info.required_compressed_pathkeys,
compressed_path->pathkeys))
{
Path sort_path; /* dummy for result of cost_sort */

cost_sort(&sort_path,
root,
dcpath->compressed_pathkeys,
sort_info.required_compressed_pathkeys,
compressed_path->total_cost,
compressed_path->rows,
compressed_path->pathtarget->width,
0.0,
work_mem,
-1);

cost_decompress_chunk(root, &dcpath->custom_path.path, &sort_path);
cost_decompress_chunk(root, &path_copy->custom_path.path, &sort_path);
}
/*
* if chunk is partially compressed don't add this now but add an append path later
* combining the uncompressed and compressed parts of the chunk
*/
if (!ts_chunk_is_partial(chunk))
add_path(chunk_rel, &dcpath->custom_path.path);
else
path = &dcpath->custom_path.path;

chunk_path = &path_copy->custom_path.path;
}

/*
Expand All @@ -918,7 +934,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (ts_chunk_is_partial(chunk))
{
Bitmapset *req_outer = PATH_REQ_OUTER(path);
Bitmapset *req_outer = PATH_REQ_OUTER(chunk_path);
Path *uncompressed_path =
get_cheapest_path_for_pathkeys(initial_pathlist, NIL, req_outer, TOTAL_COST, false);

Expand All @@ -941,13 +957,13 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
*/
if (batch_merge_path != NULL)
{
path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
chunk_path = (Path *) create_merge_append_path_compat(root,
chunk_rel,
list_make2(batch_merge_path,
uncompressed_path),
root->query_pathkeys,
req_outer,
NIL);
}
else
/*
Expand All @@ -956,24 +972,25 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* and directly add its children, so we have to combine the children
* into a MergeAppend node later, at the chunk append level.
*/
path = (Path *) create_append_path_compat(root,
chunk_rel,
list_make2(path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
path->rows + uncompressed_path->rows);
chunk_path =
(Path *) create_append_path_compat(root,
chunk_rel,
list_make2(chunk_path, uncompressed_path),
NIL /* partial paths */,
root->query_pathkeys /* pathkeys */,
req_outer,
0,
false,
false,
chunk_path->rows + uncompressed_path->rows);
}

/* Add useful sorted versions of the decompress path */
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, path, compressed_path);
add_chunk_sorted_paths(root, chunk_rel, ht, ht_relid, chunk_path, compressed_path);

/* this has to go after the path is copied for the ordered path since path can get freed in
* add_path */
add_path(chunk_rel, path);
add_path(chunk_rel, chunk_path);
}

/* the chunk_rel now owns the paths, remove them from the compressed_rel so they can't be freed
Expand Down Expand Up @@ -1749,7 +1766,7 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral

path->custom_path.custom_paths = list_make1(compressed_path);
path->reverse = false;
path->compressed_pathkeys = NIL;
path->required_compressed_pathkeys = NIL;
cost_decompress_chunk(root, &path->custom_path.path, compressed_path);

return path;
Expand Down Expand Up @@ -1819,7 +1836,7 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, Comp
*/
List *orig_pathkeys = root->query_pathkeys;
build_compressed_scan_pathkeys(sort_info, root, root->query_pathkeys, info);
root->query_pathkeys = sort_info->compressed_pathkeys;
root->query_pathkeys = sort_info->required_compressed_pathkeys;
check_index_predicates(root, compressed_rel);
create_index_paths(root, compressed_rel);
root->query_pathkeys = orig_pathkeys;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/decompress_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ typedef struct DecompressChunkPath
*/
List *aggregated_column_type;

List *compressed_pathkeys;
List *required_compressed_pathkeys;
bool needs_sequence_num;
bool reverse;
bool batch_sorted_merge;
Expand Down
9 changes: 6 additions & 3 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
collations,
nullsFirst);

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ 0);
ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
Expand All @@ -948,12 +948,15 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
/*
* Add a sort if the compressed scan is not ordered appropriately.
*/
if (!pathkeys_contained_in(dcpath->compressed_pathkeys, compressed_path->pathkeys))
if (!pathkeys_contained_in(dcpath->required_compressed_pathkeys, compressed_path->pathkeys))
{
List *compressed_pks = dcpath->compressed_pathkeys;
List *compressed_pks = dcpath->required_compressed_pathkeys;
Sort *sort = ts_make_sort_from_pathkeys((Plan *) compressed_scan,
compressed_pks,
bms_make_singleton(compressed_scan->scanrelid));

ts_label_sort_with_costsize(root, sort, /* limit_tuples = */ -1.0);

decompress_plan->custom_plans = list_make1(sort);
}
else
Expand Down

0 comments on commit afcbe74

Please sign in to comment.