Skip to content

HiveSplitSource and Grouped Execution

Wenlei Xie edited this page Jul 27, 2019 · 10 revisions

Background

Presto schedules splits to workers in order to execute queries. Each connector implements a ConnectorSplitManager, which returns the ConnectorSplitSource with respect to TableLayoutHandle to the engine.

Most connectors preload all the splits and simply return FixedSplitSource. HiveSplitSource are much more sophisticated. Unlike traditional database and storage engine, Hive table is just a simple wrapper over HDFS directory. Metastore only provides the directory for each partition. HiveConnector thus has to list directory and interacts with underlying file system to enumerate HiveSplits, as a result:

  • Preloading all splits can be too slow for interactive queries.
  • For large batch queries, holding all splits might require too much memory.
  • Loading splits with single thread can also be too slow in some case.

Hive Split Management

In general, HiveSplitManager did the following when requested for a split source:

  • It creates a HiveSplitLoader and HiveSplitSource. The HiveSplitSource initially has no splits loaded, but contains the HiveSplitLoader.
  • It starts the HiveSplitLoader to loads all the Hive splits asynchronously with multithreading into the HiveSplitSource.

Loaded HiveSplits are stored in AsyncQueue (part of HiveSplitSource).

Old Days

Prior to #9119, AsyncQueue provides a simple “push back” mechanism when the number of HiveSplit exceed pre-configured maxOutstandingSplits. The main APIs are offer and getBatch/getBatchAsync.

One interesting thing to note is for bucketed table, the splits are added in a round robin fashion over buckets (instead of generating all the splits from bucket 0, then move on to bucket 1, and so on). This is to avoid scheduling block due to Presto schedule the same file to the same worker (enforced by BucketedSplitPlacementPolicy, more explanations can be found in #7031. Note the “push back” from AsyncQueue will be ignored in this case, result in potential unbounded memory usage of HiveSplit. The memory usage will be similar to preloading all HiveSplit for long-running batch queries.

InternalHiveSplit

To reduce memory consumption and avoid OOM over bucketed tables, InternalHiveSplit was first introduced in #9119 and further improved in #9332. We skip discuss #9119 here as it’s used as a simple mitigation. The discussion of this PR can be found in appendix.

The first idea of InternalHiveSplit is to serve as a “factory” for HiveSplit. Only all the InternalHiveSplits has to be loaded, and HiveSplit can be constructed upon getSplit is called. Specifically, we only need one InternalHiveSplit per file.

InternalHiveSplit also revives “pushing back” adding more splits without blocking schedule. The idea is following:

  • AsyncQueue contains InternalHiveSplit
  • To fetch a batch of HiveSplit:
    • We “borrow” the the first k InternalHiveSplit from AsyncQueue
    • We create HiveSplit from each of the “borrowed” InternalHiveSplit. This “chopped” each of the InternalHiveSplit.
    • For InternalHiveSplit still survived after “chopping”, we return them to the end of the AsyncQueue.

The implementation of this "borrow and chop" is in HiveSplitSource#getNextBatch, backed by AysncQueue#borrowBatchAsync. And this is why a complicated and unintuitive (:P) borrowBatchAsync method was added to AsyncQueue.

So, problem solved and Presto has been happily executing queries over Hive data, until ......

Grouped Execution

Grouped execution was added in #8951 to support huge join and aggregation raised in ETL pipelines. See #12124 for more details.

To execute a query with grouped execution, connector has to support addressable split group. The support to Hive connector is added in 1502cc .

A PerBucket interface is introduced, it has provides offer and borrowAsync API which is very similar to what AsyncQueue has, yet amended with the optional bucketNumber to support addressable split group. The PerBucket implementation for ungrouped execution (called allInOnce) simply delegates to the underlying AsyncQueue. The PerBucket implementation for grouped execution (called bucketed) maintains one AsyncQueue for each bucket. One note is it again doesn’t “push back” InternalHiveSplit from being added. Thus grouped execution over tables with many partitions and buckets will fail with HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT.

The reason why PerBucket cannot “push back” is because split discovery is done in a per-partition fashion, while split schedule is done per bucket, as illustrated by the following figure:

Split Discovery and Schedule for Bucketed Table

Note each InternalHiveSplit takes about 1.5K - 2K memory, which could be too much. Since now inside each bucket, we can enumerate the splits from one file to another file (instead of enumerate them in a round-robin fashion), and this opens opportunity to only store all the Hive file paths, which should take much less memory.

Conclusion

HiveSplitSource has been hacked for several fixes and new features. Since we need to add the two following features:

  • Support rewind
  • Reduce memory usage for grouped execution.

It might be a good time to consider overhaul it.

AsyncQueue’s API is too complicated as a general data structure. The semantic of borrowAsync makes it quite tailored for loading HiveSplit purpose. We might want to consider fuse AsyncQueue and PerBucket into HiveInternalSplitQueue.

It might be difficult to evolving based on existing code given many classes are coupled together. So I am open to have separate BucketHiveSplitSource and BucketHiveSplitLoader that only handles grouped execution case, and gradually migrate :).

Appendix

#9119

#9119 did a simple mitigation. An simple InternalHiveSplit class was introduced, the major changes are: Remove obvious fields that only required when constructing HiveSplit (connectorId, database and table name) Remove effectivePredicate, and use string to represent Hive type (HiveTypeName) so we can track memory. InternalHiveSplit and HiveSplit are still one-to-one mapping in this PR. Memory tracking and limit is applied to all InternalHiveSplit.

While this PR doesn’t solve the root cause, it mitigate the reliability issue by explicitly fail the query before it causes coordinator OOM.

Clone this wiki locally