New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow pushdown of reference table joins #5212
Allow pushdown of reference table joins #5212
Conversation
0484057
to
4973f89
Compare
Codecov Report
@@ Coverage Diff @@
## main #5212 +/- ##
==========================================
+ Coverage 90.71% 90.89% +0.17%
==========================================
Files 225 225
Lines 52056 52321 +265
==========================================
+ Hits 47225 47557 +332
+ Misses 4831 4764 -67
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
a437f70
to
07098d1
Compare
fea9905
to
4a68eb6
Compare
* This code does not work for joins with lateral references, since those | ||
* must have parameterized paths, which we don't generate yet. | ||
*/ | ||
if (!bms_is_empty(joinrel->lateral_relids)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test case with join lateral?
DataNodeScanPath *scanpath = palloc0(sizeof(DataNodeScanPath)); | ||
|
||
if (rel->lateral_relids && !bms_is_subset(rel->lateral_relids, required_outer)) | ||
required_outer = bms_union(required_outer, rel->lateral_relids); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test case?
* revisit this. | ||
*/ | ||
if (!bms_is_empty(required_outer) || !bms_is_empty(rel->lateral_relids)) | ||
elog(ERROR, "parameterized foreign joins are not supported yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These parameters are replaced before the data_node_scan_plan
is generated in our implementation; this check is from PG upstream. However, I added a test case for a prepared statement to test that the join pushdown also works in this case.
We have a similar check also for regular scans in place. I wanted to check if we were covering this line in the existing tests. But unfortunately, codecov currently has an outage at the moment and I can't access the reports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erimatnor Codecov works now and I checked the coverage of the closely related line in the existing code. This line in the existing data_node_scan_path_create
function is also one of these lines that are copied from PG upstream and cannot directly be tested in our implementation. So, it is hard to create a test case for the new/similar line.
e5c16e0
to
0562c6d
Compare
#endif | ||
join_partition->reltarget->exprs = | ||
castNode(List, | ||
adjust_appendrel_attrs(root, (Node *) joinrel->reltarget->exprs, 1, &appinfo)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to do this? The data node joinrel and the unpartitioned joinrel have the same relid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is needed. The join is performed partition-wise. Each of the partitions belongs to one data node and we are adjusting the expressions here using the appinfo
of the data node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it "data-node-wise" or something, because "partition" is ambiguous between chunks and data nodes. So which kind of adjustment exactly do we do? I see that adjust_apprendrel_attrs
can change varno or convert row type. For data node joinrels, the varno and rowtype are the same, because we just copy them from the base joinrel. Does this function do anything else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function is renamed as suggested.
With adjust_apprendrel_attrs
, the relid in the expressions is translated from the hypertable relid to the relid of the data node that is responsible for this partition. Otherwise, the deparser could not pushdown these expressions properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With adjust_apprendrel_attrs, the relid in the expressions is translated from the hypertable relid to the relid of the data node that is responsible for this partition.
Yes, but aren't they exactly equal? As set here: https://github.com/timescale/timescaledb/pull/5212/files#diff-47f57773f9558fb5ef0345e680eb19c0d1c9378c1701011b7a930a96e8a0813cR1135
This is relid in the sense of varno
and RelOptInfo
, not the oid from RTE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, these values are equal. However, in this function, we have to adjust the expressions for the join partition and make sure that the expressions (they are generated for the unpartitioned join) belong to this partition. We do something similar in adjust_data_node_rel_attrs for the DataNodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I still don't understand what exactly it adjusts, given that the varno and row type are exactly the same, but I'll have to trust you on this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the imprecise answer. The varno
is adjusted at this point
For example, here you see the exprs
before and after the adjustment. The varno
of the expressions is translated from the input table position to the proper position of this specific DataNode entry in the simple_rel_array (the data node is at position 8 in this example). This is needed for the deparser to generate the SQL for the partition properly.
Original List
(
{VAR
:varno 1
:varattno 2
:vartype 23
:vartypmod -1
:varcollid 0
:varlevelsup 0
:varnosyn 1
:varattnosyn 2
:location -1
}
[...]
)
Adjusted List
(
{VAR
:varno 8
:varattno 2
:vartype 23
:vartypmod -1
:varcollid 0
:varlevelsup 0
:varnosyn 8
:varattnosyn 2
:location -1
}
[...]
)
tsl/src/fdw/data_node_scan_plan.c
Outdated
fpinfo = | ||
fdw_relinfo_create(root, joinrel, InvalidOid, InvalidOid, TS_FDW_RELINFO_UNINITIALIZED); | ||
Assert(fpinfo->type == TS_FDW_RELINFO_UNINITIALIZED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What type is it going to have if we succesfully push down the join? I think "uninitialized" might be confusing here, maybe makes sense to initialize it with normal type but set pushdown_safe = false
. Maybe even without going through fdw_relinfo_create
, that function is very big and confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
60ec326
to
d34c8a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the fixes, I think it's almost ideal now :)
d34c8a5
to
6471243
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving, although I have some inline comments.
src/cross_module_fn.c
Outdated
@@ -555,6 +555,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { | |||
.hypertable_distributed_set_replication_factor = error_no_default_fn_pg_community, | |||
.update_compressed_chunk_relstats = update_compressed_chunk_relstats_default, | |||
.health_check = error_no_default_fn_pg_community, | |||
.mn_get_foreign_join_paths = NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the pattern here, you might want to add a dummy function instead (I guess one that does nothing by default). Then you can avoid a NULL check when using the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erimatnor Good point. I introduced a dummy function.
break; | ||
|
||
case JOIN_RIGHT: | ||
#if PG14_GE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make dead code clear, by wrapping it in #if ENABLE_DEAD_CODE
or marking it with pg_unreachable()
, or similar so that the code is there but you can avoid having code coverage complain and it is obvious to the reader that the code isn't executed.
5975570
to
2a4f0ec
Compare
@erimatnor I like the idea with the |
2a4f0ec
to
a8661a5
Compare
This patch adds the functionality that is needed to perform distributed, parallel joins on reference tables on access nodes. This code allows the pushdown of a join if: * (1) The setting "ts_guc_enable_per_data_node_queries" is enabled * (2) The outer relation is a distributed hypertable * (3) The inner relation is marked as a reference table * (4) The join is a left join or an inner join
a8661a5
to
788d671
Compare
This release includes these noteworthy features: * compressed hypertable enhancements: * UPDATE/DELETE support * ON CONFLICT DO UPDATE * Join support for hierarchical Continougs Aggregates * performance improvements **Features** * timescale#5212 Allow pushdown of reference table joins * timescale#5221 Improve Realtime Continuous Aggregate performance * timescale#5252 Improve unique constraint support on compressed hypertables * timescale#5339 Support UPDATE/DELETE on compressed hypertables * timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates * timescale#5361 Add parallel support for partialize_agg() * timescale#5417 Refactor and optimize distributed COPY * timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * timescale#5547 Skip Ordered Append when only 1 child node is present * timescale#5510 Propagate vacuum/analyze to compressed chunks * timescale#5584 Reduce decompression during constraint checking * timescale#5530 Optimize compressed chunk resorting **Bugfixes** * timescale#5396 Fix SEGMENTBY columns predicates to be pushed down * timescale#5427 Handle user-defined FDW options properly * timescale#5442 Decompression may have lost DEFAULT values * timescale#5459 Fix issue creating dimensional constraints * timescale#5570 Improve interpolate error message on datatype mismatch * timescale#5573 Fix unique constraint on compressed tables * timescale#5615 Add permission checks to run_job() * timescale#5614 Enable run_job() for telemetry job * timescale#5578 Fix on-insert decompression after schema changes * timescale#5613 Quote username identifier appropriately * timescale#5525 Fix tablespace for compressed hypertable and corresponding toast * timescale#5642 Fix ALTER TABLE SET with normal tables * timescale#5666 Reduce memory usage for distributed analyze * timescale#5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This release contains new features and bug fixes since the 2.10.3 release. We deem it moderate priority for upgrading. This release includes these noteworthy features: * Support for DML operations on compressed chunks: * UPDATE/DELETE support * Support for unique constraints on compressed chunks * Support for `ON CONFLICT DO UPDATE` * Support for `ON CONFLICT DO NOTHING` * Join support for hierarchical Continuous Aggregates **Features** * timescale#5212 Allow pushdown of reference table joins * timescale#5221 Improve Realtime Continuous Aggregate performance * timescale#5252 Improve unique constraint support on compressed hypertables * timescale#5339 Support UPDATE/DELETE on compressed hypertables * timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates * timescale#5361 Add parallel support for partialize_agg() * timescale#5417 Refactor and optimize distributed COPY * timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * timescale#5547 Skip Ordered Append when only 1 child node is present * timescale#5510 Propagate vacuum/analyze to compressed chunks * timescale#5584 Reduce decompression during constraint checking * timescale#5530 Optimize compressed chunk resorting * timescale#5639 Support sending telemetry event reports **Bugfixes** * timescale#5396 Fix SEGMENTBY columns predicates to be pushed down * timescale#5427 Handle user-defined FDW options properly * timescale#5442 Decompression may have lost DEFAULT values * timescale#5459 Fix issue creating dimensional constraints * timescale#5570 Improve interpolate error message on datatype mismatch * timescale#5573 Fix unique constraint on compressed tables * timescale#5615 Add permission checks to run_job() * timescale#5614 Enable run_job() for telemetry job * timescale#5578 Fix on-insert decompression after schema changes * timescale#5613 Quote username identifier appropriately * timescale#5525 Fix tablespace for compressed hypertable and corresponding toast * timescale#5642 Fix ALTER TABLE SET with normal tables * timescale#5666 Reduce memory usage for distributed analyze * timescale#5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This release contains new features and bug fixes since the 2.10.3 release. We deem it moderate priority for upgrading. This release includes these noteworthy features: * Support for DML operations on compressed chunks: * UPDATE/DELETE support * Support for unique constraints on compressed chunks * Support for `ON CONFLICT DO UPDATE` * Support for `ON CONFLICT DO NOTHING` * Join support for hierarchical Continuous Aggregates **Features** * timescale#5212 Allow pushdown of reference table joins * timescale#5221 Improve Realtime Continuous Aggregate performance * timescale#5252 Improve unique constraint support on compressed hypertables * timescale#5339 Support UPDATE/DELETE on compressed hypertables * timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates * timescale#5361 Add parallel support for partialize_agg() * timescale#5417 Refactor and optimize distributed COPY * timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * timescale#5547 Skip Ordered Append when only 1 child node is present * timescale#5510 Propagate vacuum/analyze to compressed chunks * timescale#5584 Reduce decompression during constraint checking * timescale#5530 Optimize compressed chunk resorting * timescale#5639 Support sending telemetry event reports **Bugfixes** * timescale#5396 Fix SEGMENTBY columns predicates to be pushed down * timescale#5427 Handle user-defined FDW options properly * timescale#5442 Decompression may have lost DEFAULT values * timescale#5459 Fix issue creating dimensional constraints * timescale#5570 Improve interpolate error message on datatype mismatch * timescale#5573 Fix unique constraint on compressed tables * timescale#5615 Add permission checks to run_job() * timescale#5614 Enable run_job() for telemetry job * timescale#5578 Fix on-insert decompression after schema changes * timescale#5613 Quote username identifier appropriately * timescale#5525 Fix tablespace for compressed hypertable and corresponding toast * timescale#5642 Fix ALTER TABLE SET with normal tables * timescale#5666 Reduce memory usage for distributed analyze * timescale#5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This release contains new features and bug fixes since the 2.10.3 release. We deem it moderate priority for upgrading. This release includes these noteworthy features: * Support for DML operations on compressed chunks: * UPDATE/DELETE support * Support for unique constraints on compressed chunks * Support for `ON CONFLICT DO UPDATE` * Support for `ON CONFLICT DO NOTHING` * Join support for hierarchical Continuous Aggregates **Features** * #5212 Allow pushdown of reference table joins * #5221 Improve Realtime Continuous Aggregate performance * #5252 Improve unique constraint support on compressed hypertables * #5339 Support UPDATE/DELETE on compressed hypertables * #5344 Enable JOINS for Hierarchical Continuous Aggregates * #5361 Add parallel support for partialize_agg() * #5417 Refactor and optimize distributed COPY * #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * #5547 Skip Ordered Append when only 1 child node is present * #5510 Propagate vacuum/analyze to compressed chunks * #5584 Reduce decompression during constraint checking * #5530 Optimize compressed chunk resorting * #5639 Support sending telemetry event reports **Bugfixes** * #5396 Fix SEGMENTBY columns predicates to be pushed down * #5427 Handle user-defined FDW options properly * #5442 Decompression may have lost DEFAULT values * #5459 Fix issue creating dimensional constraints * #5570 Improve interpolate error message on datatype mismatch * #5573 Fix unique constraint on compressed tables * #5615 Add permission checks to run_job() * #5614 Enable run_job() for telemetry job * #5578 Fix on-insert decompression after schema changes * #5613 Quote username identifier appropriately * #5525 Fix tablespace for compressed hypertable and corresponding toast * #5642 Fix ALTER TABLE SET with normal tables * #5666 Reduce memory usage for distributed analyze * #5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This release contains new features and bug fixes since the 2.10.3 release. We deem it moderate priority for upgrading. This release includes these noteworthy features: * Support for DML operations on compressed chunks: * UPDATE/DELETE support * Support for unique constraints on compressed chunks * Support for `ON CONFLICT DO UPDATE` * Support for `ON CONFLICT DO NOTHING` * Join support for hierarchical Continuous Aggregates **Features** * timescale#5212 Allow pushdown of reference table joins * timescale#5221 Improve Realtime Continuous Aggregate performance * timescale#5252 Improve unique constraint support on compressed hypertables * timescale#5339 Support UPDATE/DELETE on compressed hypertables * timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates * timescale#5361 Add parallel support for partialize_agg() * timescale#5417 Refactor and optimize distributed COPY * timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * timescale#5547 Skip Ordered Append when only 1 child node is present * timescale#5510 Propagate vacuum/analyze to compressed chunks * timescale#5584 Reduce decompression during constraint checking * timescale#5530 Optimize compressed chunk resorting * timescale#5639 Support sending telemetry event reports **Bugfixes** * timescale#5396 Fix SEGMENTBY columns predicates to be pushed down * timescale#5427 Handle user-defined FDW options properly * timescale#5442 Decompression may have lost DEFAULT values * timescale#5459 Fix issue creating dimensional constraints * timescale#5570 Improve interpolate error message on datatype mismatch * timescale#5573 Fix unique constraint on compressed tables * timescale#5615 Add permission checks to run_job() * timescale#5614 Enable run_job() for telemetry job * timescale#5578 Fix on-insert decompression after schema changes * timescale#5613 Quote username identifier appropriately * timescale#5525 Fix tablespace for compressed hypertable and corresponding toast * timescale#5642 Fix ALTER TABLE SET with normal tables * timescale#5666 Reduce memory usage for distributed analyze * timescale#5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This release contains new features and bug fixes since the 2.10.3 release. We deem it moderate priority for upgrading. This release includes these noteworthy features: * Support for DML operations on compressed chunks: * UPDATE/DELETE support * Support for unique constraints on compressed chunks * Support for `ON CONFLICT DO UPDATE` * Support for `ON CONFLICT DO NOTHING` * Join support for hierarchical Continuous Aggregates **Features** * timescale#5212 Allow pushdown of reference table joins * timescale#5221 Improve Realtime Continuous Aggregate performance * timescale#5252 Improve unique constraint support on compressed hypertables * timescale#5339 Support UPDATE/DELETE on compressed hypertables * timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates * timescale#5361 Add parallel support for partialize_agg() * timescale#5417 Refactor and optimize distributed COPY * timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables * timescale#5547 Skip Ordered Append when only 1 child node is present * timescale#5510 Propagate vacuum/analyze to compressed chunks * timescale#5584 Reduce decompression during constraint checking * timescale#5530 Optimize compressed chunk resorting * timescale#5639 Support sending telemetry event reports **Bugfixes** * timescale#5396 Fix SEGMENTBY columns predicates to be pushed down * timescale#5427 Handle user-defined FDW options properly * timescale#5442 Decompression may have lost DEFAULT values * timescale#5459 Fix issue creating dimensional constraints * timescale#5570 Improve interpolate error message on datatype mismatch * timescale#5573 Fix unique constraint on compressed tables * timescale#5615 Add permission checks to run_job() * timescale#5614 Enable run_job() for telemetry job * timescale#5578 Fix on-insert decompression after schema changes * timescale#5613 Quote username identifier appropriately * timescale#5525 Fix tablespace for compressed hypertable and corresponding toast * timescale#5642 Fix ALTER TABLE SET with normal tables * timescale#5666 Reduce memory usage for distributed analyze * timescale#5668 Fix subtransaction resource owner **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @ollz272 for reporting an issue with interpolate error messages
This patch adds the functionality that is needed to perform distributed, parallel joins on reference tables on access nodes. This code allows the pushdown of a join if: