Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inserting into transactional table when task_writer_count > 1 #10261

Conversation

homar
Copy link
Member

@homar homar commented Dec 10, 2021

fixes: #9149

@cla-bot cla-bot bot added the cla-signed label Dec 10, 2021
@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch 4 times, most recently from a4526f4 to 5b9dcc7 Compare December 13, 2021 16:01
@homar homar changed the title [WIP] Fix inserting into transactional table when task_writer_count > 1 Fix inserting into transactional table when task_writer_count > 1 Dec 13, 2021
@homar
Copy link
Member Author

homar commented Dec 13, 2021

failure is not related #8432

@homar homar requested a review from findepi December 13, 2021 22:41
@homar homar marked this pull request as ready for review December 14, 2021 08:07
@@ -342,6 +342,13 @@ public StreamProperties visitExchange(ExchangeNode node, List<StreamProperties>
if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)) {
return new StreamProperties(FIXED, Optional.empty(), false);
}
// if this is a transaction and there are no arguments we are safe to provide empty optional as there will be only one stream
if (node.getPartitioningScheme().getPartitioning().getHandle().getTransactionHandle().isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a transaction

do you mean transactional table?

node.getPartitioningScheme().getPartitioning().getHandle().getTransactionHandle().isPresent()

this doesn't let us recognize what kind of table we're dealing with.
in fact, i'd expect this to be always true whenever we're dealing with connector-provided partitioning.
i am validating my understanding with Remove redundant null-friendliness commit in #10293

What about dropping this condition, and adding the "if arguments list is empty" logic directly to the code below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean transactional table?

this is exactly what I meant, was that an incorrect assumption ?

this doesn't let us recognize what kind of table we're dealing with.

any idea how to recognize if we are dealing with transactional table ?

What about dropping this condition, and adding the "if arguments list is empty" logic directly to the code below?

actually I wanted to avoid that, doing it this way will change the behaviour for all the situation and I want the change only for transactional tables. There is an explicit check:

checkArgument(distribution == SINGLE || !this.partitioningColumns.equals(Optional.of(ImmutableList.of())),
                    "Multiple streams must not be partitioned on empty set");

modifying the logic you mentioned may cause this check not to fail for situation when it should. I just wanted to make it pass for transactional table as we know there will be one bucket and thus one stream only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is how it works

  1. Hive provides bucketing function to be used when distributing writes. The function is 0-arg, because it's an artificial bucketing. The point is -- we want to have exactly one writer to the table.
  2. StreamPropertyDerivations chokes on the 0-arg bucketing function.

The fix can be

  1. make StreamPropertyDerivations not choke on that (like you did)
  2. find some other way for a connector to make sure there is only one writer
  3. make Hive fool engine -- declare false argument to bucketing function, pretending it's not 0-arg
    • that would be working around engine's limitation. We shouldn't need to do that though
  4. anything else? -- @electrum might know better

cc @losipiuk @arhimondr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wonder if this is an accidentally choking or if it was made on purpose like this in which case removing that choking won't break some other cases.

return new StreamProperties(FIXED, Optional.of(node.getPartitioningScheme().getPartitioning().getArguments().stream()
.map(ArgumentBinding::getColumn)
.collect(toImmutableList()))
.filter(x -> !x.isEmpty()), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern Optional.of( expr ).filter( condition ) is clever, but IMO doesn't make the code more readable.

@@ -361,7 +365,8 @@ private void closeWriter(HiveWriter writer)
HiveWriter writer = writers.get(writerIndex);
if (writer != null) {
// if current file not too big continue with the current writer
if (bucketFunction != null || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
// for transactional tables we don't want to split output files
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a hint why we don't want that

@@ -1903,6 +1903,47 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testInsertIntoUnbucketedTransactionalTableWithTaskWriterCounterGE1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter -> Count
GE -> GreaterThan

public void testInsertIntoUnbucketedTransactionalTableWithTaskWriterCounterGE1()
{
withTemporaryTable("test_insert_into_unbucketed_transactional_table", true, false, NONE, tableName -> {
onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true) AS TABLE tpch.tiny.nation WITH NO DATA;", tableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray ; im query

onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true) AS TABLE tpch.tiny.nation WITH NO DATA;", tableName));
onTrino().executeQuery("SET SESSION task_writer_count = 2");
onTrino().executeQuery(format("INSERT INTO %s SELECT * FROM tpch.tiny.nation", tableName));
int expectedResult = onTrino().executeQuery("SELECT * from tpch.tiny.nation").getRowsCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is well-known 25. Just use, even without declaring a constant.

withTemporaryTable("test_insert_into_unbucketed_transactional_table", true, false, NONE, tableName -> {
onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true) AS TABLE tpch.tiny.nation WITH NO DATA;", tableName));
onTrino().executeQuery("SET SESSION task_writer_count = 2");
onTrino().executeQuery(format("INSERT INTO %s SELECT * FROM tpch.tiny.nation", tableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect exactly one file to be created? let's have an assertion on that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't nation to small. Would two writers be still used for a source table which only has 1 split (I assume nation has just one).
Maybe use UNION of a couple of NATION tables as a source.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just an example from issue description #9149 I removed that whole test


onTrino().executeQuery(format("INSERT INTO %s SELECT * FROM tpch.sf1000.orders LIMIT 100000", tableName));
assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000));
verify(onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount() == 1, "There should be only 1 file");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this fails, you don't know how many files there are

// There should be only 1 file
assertThat(onTrino().executeQuery("SELECT count(DISTINCT \"$path\" FROM " + tableName))
  .containsOnly(row(1L));

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch 3 times, most recently from a484216 to fefbcc0 Compare December 15, 2021 13:19
@homar
Copy link
Member Author

homar commented Dec 15, 2021

@findepi please take another look

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from 8637167 to fefbcc0 Compare December 16, 2021 09:26
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 3093 to 3094
if (node.getPartitioningScheme().map(partitioningScheme ->
partitioningScheme.getPartitioning().getHandle().isSingleNode()).orElse(false)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm limited, so i find .orElse(false) hard to follow
I'd write

if (node.getPartitioningScheme().isPresent() &&
        node.getPartitioningScheme().get().getPartitioning().getHandle().isSingleNode()) {

Comment on lines 368 to 370
// for transactional tables we don't want to split output files because there is an implicit bucketing with
// 1 bucket so we want to stay consistent with bucketing behaviour for non transactional tables
if ((bucketFunction != null || isTransactional) || writer.getWrittenBytes() <= targetMaxFileSize.orElse(Long.MAX_VALUE)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about consistency between non-transactional bucketed and transactional, non-bucketed (implicitly bucketed) tables.
The naming rules for the two are different.
For transactional tables, the naming convention is bucket_<bucket-number> (eg bucket_00000). Doesn't contain any random, or incrementing part, so we simply cannot create more than once file.

path = createHiveBucketPath(subdirPath, bucketToUse, table.getParameters());

For bucketed, non-transactional tables -- actually i am not sure why we have this condition here. @dain would know.
My reading of the code leads to the following naming pattern for bucketed files

format("0%s_0_%s", paddedBucket, queryId.get())

-- if this is the right one (didn't test), then it's constant for bucket x query, so we cannot create more than 1 file either. (We could easily improve that by incrementing this _0_ part, but that's another story).

The condition seems good, but comments needs rewording.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant (...) parens in sequence of ||

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to be consistent with the behaviour -> when table is non transactional and has 1 bucket, writer count is ignored and only 1 file is created. I must have used wrong wording.

@@ -179,6 +179,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
maxOpenPartitions,
writeVerificationExecutor,
partitionUpdateCodec,
session);
session,
handle.isTransactional());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move before handle.getBucketProperty(),

onTrino().executeQuery("SET SESSION task_writer_count = 2");
onTrino().executeQuery(format("INSERT INTO %s SELECT * FROM tpch.tiny.nation", tableName));
int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount();
verify(numberOfCreatedFiles == 1, "There should be only 1 file created, instead there were %s", numberOfCreatedFiles);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use assertEquals(numberOfCreatedFiles, 1, "There should be only 1 file created")

@homar
Copy link
Member Author

homar commented Dec 16, 2021

There is a test failure: io.trino.tests.product.hive.TestHiveTransactionalTable.testUpdateFullAcidWithOriginalFilesTrinoInserting [true, NONE]
I am trying to figure it out.

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from fefbcc0 to 135459d Compare December 20, 2021 13:10
@@ -2885,14 +2885,15 @@ else if (isFullAcidTable(table.getParameters())) {
.map(Column::getName)
.forEach(partitioningColumns::add);

ImmutableList<HiveType> hiveTypes = hiveBucketHandle.get().getColumns().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it bucketingColumnTypes

OptionalInt.of(hiveBucketHandle.get().getTableBucketCount()),
!partitionColumns.isEmpty() && isParallelPartitionedBucketedWrites(session));
!partitionColumns.isEmpty() && isParallelPartitionedBucketedWrites(session) && !hiveTypes.isEmpty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move !hiveTypes.isEmpty() to be next to !partitionColumns.isEmpty()

}

@Test
public void testDataIsNotBrokenInUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the test above? This one seems to cover same stuff + DELETE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I will delete it

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from 135459d to e73eff1 Compare December 21, 2021 07:44
OptionalInt.of(hiveBucketHandle.get().getTableBucketCount()),
!partitionColumns.isEmpty() && isParallelPartitionedBucketedWrites(session));
!partitionColumns.isEmpty() && !bucketingColumnTypes.isEmpty() && isParallelPartitionedBucketedWrites(session));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why && !bucketingColumnTypes.isEmpty() is added here.

cc @raunaqmorarka @sopel39

Copy link
Member Author

@homar homar Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding because we started to rely more on HivePartitioningHandle and its isSingleNode method and I wanted to isUsePartitionedBucketing to return value that is consistent with isSingleNode - if isSingleNode returns true than isUsePartitionedBucketing should return false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that isUsePartitionedBucketing is now false for partitioned, unbucketed, non-transactional tables, while it used to be true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unbacketed means hiveBucketHandle.isEmpty() So it looks we should bail out of the method earlier and never get here.

Copy link
Member

@sopel39 sopel39 Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unbacketed means hiveBucketHandle.isEmpty() So it looks we should bail out of the method earlier and never get here.

Yes. I think we should hit if (hiveBucketHandle.isEmpty()) { earlier in the code, so I don't think this check here is needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably don't understand something but I just tested and partitioned transactional table(so 1 implicit bucket) seems to work fine, different partitions are created and each of them have 1 bucket

Writes will be correct, but with the change here only one node and one thread (in entire cluster) will be writing data. The code you changed distributes writes between worker nodes, so we can avoid single writer in entire cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homar it sounds like you tested transactional partitioned tables (unbucketed; aka with implicit 1 bucket)
The concern is about INSERT into non-transactional partitioned, unbucketed table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homar it sounds like you tested transactional partitioned tables (unbucketed; aka with implicit 1 bucket)
The concern is about INSERT into non-transactional partitioned, unbucketed table.

I think we want to redistribute writes even for transactional, partitioned and bucketed (implicit 1 bucket).

Copy link
Member Author

@homar homar Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@homar it sounds like you tested transactional partitioned tables (unbucketed; aka with implicit 1 bucket)
The concern is about INSERT into non-transactional partitioned, unbucketed table.

unbucketed, non-transactional should not reach this code beacause of if (hiveBucketHandle.isEmpty()) { earlier in the code. But unfortunately I am afraid that @sopel39 comments regarding decreasing number of writers is still a valid concern.

Copy link
Member Author

@homar homar Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 again I probably miss something but even with my changes for transactional, unbucketed(so 1 implicit bucket) and partitioned table when I try to make an insert that creates 100 partitions here https://github.com/trinodb/trino/blob/ee2ef32e6f09515a888a016adc1cc6ccd32cbae4/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java#L354 I get 100 writers. Maybe you mentioned different writers - in such a case please point me to the particular part of the code

@@ -2885,14 +2885,15 @@ else if (isFullAcidTable(table.getParameters())) {
.map(Column::getName)
.forEach(partitioningColumns::add);

ImmutableList<HiveType> bucketingColumnTypes = hiveBucketHandle.get().getColumns().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ImmutableList<HiveType> bucketingColumnTypes = hiveBucketHandle.get().getColumns().stream()
List<HiveType> bucketingColumnTypes = hiveBucketHandle.get().getColumns().stream()

@@ -95,9 +95,12 @@
private long systemMemoryUsage;
private long validationCpuNanos;

private boolean isTransactional;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make final
and maybe move before private final int[] dataColumnInputIndex

@@ -162,6 +165,7 @@ public HivePageSink(

this.session = requireNonNull(session, "session is null");
this.targetMaxFileSize = Optional.ofNullable(HiveSessionProperties.getTargetMaxFileSize(session)).stream().mapToLong(DataSize::toBytes).findAny();
this.isTransactional = isTransactional;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move before this.hdfsEnvironment = ...

(this isn't ideal; ideally fields, constructor params and assignments follow same ordering, but this is a mess here, so ideal place doesn't exist)

@Override
public boolean isSingleNode()
{
return hiveTypes.isEmpty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment.

@@ -546,7 +547,7 @@ public void testSimpleUnpartitionedTransactionalInsert()

// ensure that we treat ACID tables as implicitly bucketed on INSERT
String explainOutput = (String) onTrino().executeQuery("EXPLAIN " + insertQuery).row(0).get(0);
Assertions.assertThat(explainOutput).contains("Output partitioning: hive:HivePartitioningHandle{buckets=1");
Assertions.assertThat(explainOutput).contains("Output partitioning: SINGLE []");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't very specific. "Output partitioning: SINGLE" could be for example for the top Output stage, or many times in the plan. Would it be possible to identify the actual stage we want to test for, here? It's probably source of table writer, right?

cc @losipiuk

@@ -1903,6 +1904,36 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testDataIsNotBrokenInUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void testDataIsNotBrokenInUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from e73eff1 to 133cbe4 Compare December 21, 2021 09:37
onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true) AS TABLE tpch.sf1000.orders WITH NO DATA", tableName));
onTrino().executeQuery("SET SESSION scale_writers = true");
onTrino().executeQuery("SET SESSION writer_min_size = '4kB'");
onTrino().executeQuery("SET SESSION task_writer_count = 4");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a test with task_writer_count>1 for unpartitioned (here) and also partitioned tables?
i think we should.

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch 2 times, most recently from d106344 to 3a35e96 Compare December 22, 2021 12:20
@homar
Copy link
Member Author

homar commented Dec 22, 2021

@losipiuk @sopel39 @findepi I removed that problematic line from HiveMetadata and made a small change to HivePartitioningHandle.isSingleNode so could you please take another look ?

@sopel39
Copy link
Member

sopel39 commented Dec 23, 2021

@losipiuk @sopel39 @findepi I removed that problematic line from HiveMetadata and made a small change to HivePartitioningHandle.isSingleNode so could you please take another look ?

Could you add tests similar as io.trino.plugin.hive.AbstractTestHive#testInsertPartitionedBucketedTableLayout and io.trino.plugin.hive.AbstractTestHive#testInsertBucketedTableLayout for transactional bucketed and bucketed-partitioned tables?

@alexjo2144
Copy link
Member

Did this also impact writes made via an UPDATE query?

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from 3a35e96 to 3fc53b0 Compare December 27, 2021 12:48
@homar
Copy link
Member Author

homar commented Dec 27, 2021

@losipiuk @sopel39 @findepi I removed that problematic line from HiveMetadata and made a small change to HivePartitioningHandle.isSingleNode so could you please take another look ?

Could you add tests similar as io.trino.plugin.hive.AbstractTestHive#testInsertPartitionedBucketedTableLayout and io.trino.plugin.hive.AbstractTestHive#testInsertBucketedTableLayout for transactional bucketed and bucketed-partitioned tables?

@sopel39 I added 2 tests to io.trino.plugin.hive.TestHive I hope this is what you asked for.

Did this also impact writes made via an UPDATE query?

@alexjo2144 I checked and actually debug doesn't stop at any place I made a change to while performing UPDATE

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch from 98c5447 to 3fc53b0 Compare December 27, 2021 13:46
@Test
public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThan1()
{
withTemporaryTable("test_delete_with_unbucketed_transactional_table", true, false, NONE, tableName -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table name does not match the test

@Test
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
{
withTemporaryTable("test_delete_with_unbucketed_transactional_table", true, false, NONE, tableName -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table name does not match the test

public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThan1()
{
withTemporaryTable("test_delete_with_unbucketed_transactional_table", true, false, NONE, tableName -> {
onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true, partitioned_by = ARRAY['orderpriority']) AS SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority FROM tpch.sf1000.orders LIMIT 0", tableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space

public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
{
withTemporaryTable("test_delete_with_unbucketed_transactional_table", true, false, NONE, tableName -> {
onTrino().executeQuery(format("CREATE TABLE %s WITH (format='ORC', transactional=true) AS TABLE tpch.sf1000.orders WITH NO DATA", tableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space

@@ -1903,6 +1904,66 @@ public void testDeleteAfterMajorCompaction()
});
}

@Test
public void testUnbucketedPartitionedTransactionalTableWithTaskWriterCountGreaterThan1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 => One

}

@Test
public void testUnbucketedTransactionalTableWithTaskWriterCountGreaterThan1()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 => One

int sizeBeforeDeletion = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

onTrino().executeQuery(format("DELETE FROM %s WHERE (orderkey %% 2) = 0", tableName));
assertThat(onTrino().executeQuery(format("SELECT COUNT (orderkey) FROM %s WHERE orderkey %%2 = 0", tableName))).containsOnly(row(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after %% (everywhere)

int sizeOnHiveWithWhere = onHive().executeQuery(format("SELECT orderkey FROM %s WHERE orderkey %%2 = 1", tableName)).rows().size();
int sizeOnTrinoWithoutWhere = onTrino().executeQuery(format("SELECT orderkey FROM %s", tableName)).rows().size();

verify(sizeOnHiveWithWhere == sizeOnTrinoWithWhere);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use assertThat not verify

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will go with assertEquals and assertTrue because assertThat is imported from tempto.assertions and don't want to work with integers

onTrino().executeQuery(format("INSERT INTO %s SELECT orderkey, orderstatus, totalprice, orderdate, clerk, shippriority, \"comment\", custkey, orderpriority FROM tpch.sf1000.orders LIMIT 100000", tableName));
assertThat(onTrino().executeQuery(format("SELECT count(*) FROM %s", tableName))).containsOnly(row(100000));
int numberOfCreatedFiles = onTrino().executeQuery(format("SELECT DISTINCT \"$path\" FROM %s", tableName)).getRowsCount();
assertEquals(numberOfCreatedFiles, 5, "There should be only 1 file created");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong error message.

Btw: can we merge the tests and have the boolean partitioned argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I can give this a try

@homar homar force-pushed the homar/insert_into_unbucketed_trans_table_when_writer_task_ branch 2 times, most recently from c934618 to 71e30f7 Compare December 27, 2021 16:30
@@ -103,4 +103,18 @@ public void testHiveViewTranslationError()
// TODO: combine this with tests for successful translation (currently in TestHiveViews product test)
}
}

@Test
public void testInsertBucketedTransactionalTableLayout()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not in AbstractTestHive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because AbstractTestHive is also extended by other classes like TestHiveAlluxioMetastore

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because AbstractTestHive is also extended by other classes like TestHiveAlluxioMetastore

Yet io.trino.plugin.hive.AbstractTestHive#testInsertBucketedTableLayout and io.trino.plugin.hive.AbstractTestHive#testInsertPartitionedBucketedTableLayout are in AbstractTestHive

public boolean isSingleNode()
{
// empty hiveTypes means there is no bucketing
return hiveTypes.isEmpty() && !usePartitionedBucketing;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no bucketing means no insert distribution? Because you want single file?

@@ -3109,7 +3109,15 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
context.setDriverInstanceCount(getTaskWriterCount(session));
// being a single node means there is one node and one writer so
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being a single node means there is one node and one writer so

Single node doesn't mean single writer (there can be multiple writers per node).

Currently, single node partitioning is used only by system partitioning handle and it's not for insert path.
This code here only deals with local distribution, but there is also io.trino.sql.planner.optimizations.AddLocalExchanges.Rewriter#visitTableWriter and possibly more, see changes in b8e4e3f
I would rather not change this code.

Could we just handle your case using dedicated constant partitioning function which would direct all rows to single writer?

@sopel39
Copy link
Member

sopel39 commented Jan 11, 2022

This seems to be superseded by: #10460

@findepi findepi closed this Jan 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Insert into unbucketed unpartitioned transactional table fails when task_writer_count > 1
5 participants