New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix inserting into transactional table when task_writer_count > 1 (v2) #10460
Fix inserting into transactional table when task_writer_count > 1 (v2) #10460
Conversation
734a988
to
1c78a61
Compare
240fd65
to
5531384
Compare
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
5531384
to
70464bc
Compare
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java
Outdated
Show resolved
Hide resolved
01a7585
to
43a8ff0
Compare
@sopel39 do you want to take a look? |
Yes, I will |
@sopel39 any chances you will find some time to take a look this week? |
@@ -338,6 +338,10 @@ public StreamProperties visitExchange(ExchangeNode node, List<StreamProperties> | |||
if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)) { | |||
return new StreamProperties(FIXED, Optional.empty(), false); | |||
} | |||
// empty arguments list means the bucketing function is effectively constant (1 bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about PropertyDerivations.Visitor#visitExchange
?
@@ -338,6 +338,10 @@ public StreamProperties visitExchange(ExchangeNode node, List<StreamProperties> | |||
if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)) { | |||
return new StreamProperties(FIXED, Optional.empty(), false); | |||
} | |||
// empty arguments list means the bucketing function is effectively constant (1 bucket) | |||
if (node.getPartitioningScheme().getPartitioning().getArguments().isEmpty()) { | |||
return new StreamProperties(SINGLE, Optional.of(ImmutableSet.of()), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a shortcut for it: StreamPropertyDerivations.StreamProperties#singleStream
@@ -338,6 +338,10 @@ public StreamProperties visitExchange(ExchangeNode node, List<StreamProperties> | |||
if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)) { | |||
return new StreamProperties(FIXED, Optional.empty(), false); | |||
} | |||
// empty arguments list means the bucketing function is effectively constant (1 bucket) | |||
if (node.getPartitioningScheme().getPartitioning().getArguments().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this always works.
For example LocalExecutionPlanner.Visitor#visitTableWriter
sets:
context.setDriverInstanceCount(getTaskWriterCount(session));
while LocalExecutionPlanner.Visitor#createMergeSource
sets:
context.setDriverInstanceCount(1);
This would fail if such operators are part of same pipeline. We use StreamProperties
to add local exchanges which split single large pipeline into separate ones.
Perhaps this works in your use case, but I think this breaks definition of stream properties. You need exactly one stream (set via setDriverInstanceCount(1)
) to be able to use singleStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's also wrong because connector could return some random partitioning even if there are no arguments.
I would just skip property changes in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is unwritten, pre-existing assumption that partitioning must be deterministic. it doesn't seem to make sense otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't have to be deterministic, see io.trino.sql.planner.SystemPartitioningHandle.SystemPartitionFunction.RoundRobinBucketFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would fail if such operators are part of same pipeline.
Can you think of any scenario that would actually create such a pipeline?
I would just skip property changes in this PR
Unfortunately changing this property is the core of this pr - at least I don't see any other ways to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok but as you can see in this PR tests also passed yet you claim this is incorrect why trusting tests then and not now ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've described why this PR breaks relationship between properties and local execution planner (#10460 (comment)).
I don't think execution model enforces
checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
"Multiple streams must not be partitioned on empty set");
(for example on global level it's fine to have non-deterministic partitioning with empty set).
I think that check was just added a bit eagerly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(for example on global level it's fine to have non-deterministic partitioning with empty set).
that doesn't make sense to me as a feature. why would connector return a partitioning at all?
and, i don't accept this as a design decision. Determinism is an important aspect, and not to be given away without good reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and, i don't accept this as a design decision. Determinism is an important aspect, and not to be given away without good reason.
One could imagine partitioning that is based on load (e.g when writing data from one system to the other). It doesn't have to be connector provided (it could be system partitioning, just using SPI)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the check
@@ -60,6 +60,20 @@ private int getHiveVersionMajor() | |||
return hiveVersionMajor; | |||
} | |||
|
|||
@Test | |||
public void testInsertBucketedTransactionalTableLayout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the explanation why this cant be in AbstractTestHive
while insertBucketedTableLayout(false)
and insertPartitionedBucketedTableLayout(false)
are
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because if they are in AbstractTestHive
then TestHiveAlluxioMetastore
fails because of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because if they are in AbstractTestHive then TestHiveAlluxioMetastore fails because of them.
Add them to AbstractTestHive
and override them in TestHiveAlluxioMetastore
with a comment why they don't work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I will do that, but I am not sure why this is a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I will do that, but I am not sure why this is a better way.
We use this approach in other abstract/dervied
tests
43a8ff0
to
58e0c34
Compare
if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { | ||
// for transactional tables we don't want to split output files because there is an explicit or implicit bucketing | ||
// and file names have no random component (e.g. bucket_00000) | ||
if (bucketFunction != null || isTransactional || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't bucket function always present when isTransactional=true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not present in the scenario I am trying to fix
fixes: #9149