Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scale writers when write is partitioned #10791

Open
findepi opened this issue Jan 25, 2022 · 6 comments
Open

Scale writers when write is partitioned #10791

findepi opened this issue Jan 25, 2022 · 6 comments
Labels
enhancement New feature or request performance

Comments

@findepi
Copy link
Member

findepi commented Jan 25, 2022

Support writer scaling when write is partitioned, including partitioned or bucketed Hive or Iceberg tables, or OPTIMIZE (which may force repartitioning, overriding, preferred partitioning tuning knobs).

cc @sopel39

@findepi findepi added enhancement New feature or request performance labels Jan 25, 2022
@vincentpoon
Copy link
Member

+1 to this idea

@sopel39
Copy link
Member

sopel39 commented Jan 31, 2022

@sopel39
Copy link
Member

sopel39 commented May 19, 2022

This might be implemented by introducing a special adaptive partitioned exchange. Normally, row is assigned to a single partition based on hash. However, when there is a skew, a lot of rows will be assigned to the same partition. This will cause that specific partition buffer to be full due to slow writes.
The idea with adaptive partitioned exchange is that such exchange would keep track of hash -> partition assignments. When adaptive partitioned exchange gets a signal that any partition buffer gets blocked, it would rebalance hash -> partition assignments. For example, it might decide that particular hash which is skewed would be distributed between two (deterministic) partitions in a round robin fashion.

This solution is suitable for piplinemode. For tardigrade I think insert tasks for a partition would probably need to be scaled differently (cc. @losipiuk @arhimondr)

@arhimondr
Copy link
Contributor

For fault tolerant execution it should be possible to split partition dynamically. This mechanism could also be useful for handling skews in joins. Though it is unclear when we are gonna be able to start working on it.

@damnMeddlingKid
Copy link
Member

Ran into this recently and thought it would be helpful to add some context on our case. We write to iceberg using transforms in our partition column (hour(), day() etc.).

when using transforms in partition columns trino uses iceberg's bucket function for partitioning, this means that the planner can not choose writer scaling or redistributed writes.

This leads to writer skew when we write a single large partition.

Steps

Heres a break down of what is happening:

  1. IcebergMetadata sees that the partition spec contains transform columns so it creates a partition handle https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L700
  2. LogicalPlanner sees that the connector has specified a partitionhandle so it creates a partitioningScheme based on this https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java#L591-L595
  3. ApplyPreferredTableWriterPartitioning does not match this node because theres no preferred partitioning.
  4. AddExchanges creates an exchange node based on the iceberg bucket function https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java#L634-L642

Heres a diagram that maps out all codepaths related to partitioned writes in iceberg.

@damnMeddlingKid
Copy link
Member

I'd like to work on this issue and I'm curious about the adaptive exchange idea and how it differs from the current implementation of writer scaling. I've tried a change which forces writer scaling and it works pretty well for iceberg, however we sometimes run into OOMs for hive based writes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Development

No branches or pull requests

5 participants