Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert into unbucketed unpartitioned transactional table fails when task_writer_count > 1 #9149

Closed
findepi opened this issue Sep 8, 2021 · 1 comment · Fixed by #10460
Closed
Labels
bug Something isn't working
Milestone

Comments

@findepi
Copy link
Member

findepi commented Sep 8, 2021

trino:default> CREATE TABLE transactional_nation WITH (format='ORC', transactional=true) AS TABLE tpch.tiny.nation WITH NO DATA;
CREATE TABLE: 0 rows

trino:default> SET SESSION task_writer_count = 2;
SET SESSION

trino:default> INSERT INTO transactional_nation SELECT * FROM tpch.tiny.nation;
Query 20210908_093648_00013_c89az failed: Multiple streams must not be partitioned on empty set
java.lang.IllegalArgumentException: Multiple streams must not be partitioned on empty set
	at com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations$StreamProperties.<init>(StreamPropertyDerivations.java:689)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations$StreamProperties.<init>(StreamPropertyDerivations.java:673)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations$Visitor.visitExchange(StreamPropertyDerivations.java:346)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations$Visitor.visitExchange(StreamPropertyDerivations.java:177)
	at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:243)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations.deriveProperties(StreamPropertyDerivations.java:162)
	at io.trino.sql.planner.optimizations.StreamPropertyDerivations.deriveProperties(StreamPropertyDerivations.java:129)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.deriveProperties(AddLocalExchanges.java:879)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitTableWriter(AddLocalExchanges.java:612)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitTableWriter(AddLocalExchanges.java:123)
	at io.trino.sql.planner.plan.TableWriterNode.accept(TableWriterNode.java:187)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforce(AddLocalExchanges.java:811)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.lambda$planAndEnforceChildren$7(AddLocalExchanges.java:794)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforceChildren(AddLocalExchanges.java:798)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitExchange(AddLocalExchanges.java:638)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitExchange(AddLocalExchanges.java:123)
	at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:243)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforce(AddLocalExchanges.java:811)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.lambda$planAndEnforceChildren$7(AddLocalExchanges.java:794)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
	at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforceChildren(AddLocalExchanges.java:798)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitTableFinish(AddLocalExchanges.java:257)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitTableFinish(AddLocalExchanges.java:123)
	at io.trino.sql.planner.plan.TableFinishNode.accept(TableFinishNode.java:106)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforce(AddLocalExchanges.java:811)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.lambda$planAndEnforceChildren$7(AddLocalExchanges.java:794)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
	at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.planAndEnforceChildren(AddLocalExchanges.java:798)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitOutput(AddLocalExchanges.java:161)
	at io.trino.sql.planner.optimizations.AddLocalExchanges$Rewriter.visitOutput(AddLocalExchanges.java:123)
	at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:83)
	at io.trino.sql.planner.optimizations.AddLocalExchanges.optimize(AddLocalExchanges.java:119)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:218)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:203)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:198)
	at io.trino.execution.SqlQueryExecution.doPlanQuery(SqlQueryExecution.java:488)
	at io.trino.execution.SqlQueryExecution.planQuery(SqlQueryExecution.java:468)
	at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:411)
	at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:237)
	at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
	at io.trino.$gen.Trino_361_132_ge8b8554____20210908_093156_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Note that this works fine when table is explicitly bucketed:

trino:default> CREATE TABLE transactional_nation WITH (format='ORC', transactional=true, bucketed_by=ARRAY['nationkey'], bucket_count=2) AS TABLE tpch.tiny.nation WITH NO DATA;
CREATE TABLE: 0 rows

trino:default> SET SESSION task_writer_count = 2;
SET SESSION

trino:default> INSERT INTO transactional_nation SELECT * FROM tpch.tiny.nation;
INSERT: 25 rows

Slack thread https://trinodb.slack.com/archives/CGB0QHWSW/p1631043251064900

@findepi findepi added the bug Something isn't working label Sep 8, 2021
@vinay-kl
Copy link
Contributor

vinay-kl commented Sep 8, 2021

Note: it's working for un-bucketed partitioned

trino> CREATE TABLE dev_30.uninstalls_sla1_hive_trans_par_nonbuck (device_id varchar,app_name varchar,avg_sla decimal(12, 1), load_date varchar) WITH (format = 'ORC',transactional = true, partitioned_by=ARRAY['load_date']);
CREATE TABLE
 
trino> SET SESSION task_writer_count = 1;
SET SESSION

trino> insert into dev_30.uninstalls_sla1_hive_trans_par_nonbuck select * from dev_30.uninstalls_sla1_hive_trans_par_nonbuck;
INSERT: 0 rows


trino> SET SESSION task_writer_count = 8;
SET SESSION
 
trino> insert into dev_30.uninstalls_sla1_hive_trans_par_nonbuck select * from dev_30.uninstalls_sla1_hive_trans_par_nonbuck;
INSERT: 0 rows

also for non-partitioned bucketed

trino> CREATE TABLE dev_30.uninstalls_sla1_hive_trans_nonpar_buck (device_id varchar,app_name varchar,avg_sla decimal(12, 1), load_date varchar) WITH (format = 'ORC',transactional = true, bucketed_by=ARRAY['load_date'], bucket_count=2);
CREATE TABLE

trino> SET SESSION task_writer_count = 8;
SET SESSION

trino> insert into dev_30.uninstalls_sla1_hive_trans_nonpar_buck select * from dev_30.uninstalls_sla1_hive_trans_nonpar_buck;
INSERT: 0 rows

trino> SET SESSION task_writer_count = 1;
SET SESSION

trino> insert into dev_30.uninstalls_sla1_hive_trans_nonpar_buck select * from dev_30.uninstalls_sla1_hive_trans_nonpar_buck;
INSERT: 0 rows

@findepi findepi changed the title Insert into unbucketed transactional table fails when task_writer_count > 1 Insert into unbucketed unpartitioned transactional table fails when task_writer_count > 1 Sep 8, 2021
homar added a commit to homar/trino that referenced this issue Dec 27, 2021
homar added a commit to homar/trino that referenced this issue Jan 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment