Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Direct Recursive Hive File Listings #12443

Merged
merged 5 commits into from Jun 23, 2022

Conversation

pettyjamesm
Copy link
Member

@pettyjamesm pettyjamesm commented May 17, 2022

Description

This change includes three changes that are designed to improve the efficiency of the hive plugin DirectoryLister when used in combination with a FileSystem that supports more efficient recursive listing via FileSystem#listFiles(Path, boolean recursive) like the TrinoS3FileSystem.

  1. The first commit extends the DirectoryLister interface with an additional method that requests that a full recursive file listing instead of a shallow, "files and directories" listing. This necessarily required altering CachingDirectoryLister and TransactionScopeCachingDirectoryLister to be able to store both kinds of listings at the same path, which was done by adding DirectoryListingCacheKey which distinguishes the two kinds of keys via the sign bit in their precomputed hash code.
  2. The second commit modifies HiveFileIterator to conditionally call DirectoryLister.listFilesRecursively when recursive behaviors are requested instead of DirectoryLister.list. This greatly simplifies the the implementation, since now recursion and traversal down into sub-paths are handled by the file system itself, but comes with the cost of attempting to identify "hidden sub-paths" between the top level path and child paths in a slightly more complex fashion.
  3. The third and final commit changes the behavior of HiveFileIterator to avoid eagerly checking FileSystem.exists when ignoreAbsentPartitions is true, instead waiting for the listing to throw an exception and then checking whether the exception was caused by the path existing. This is especially useful for the TrinoS3FileSystem since the implementation of FileSystem.exists itself performs an S3 listing operation.

Overall, this greatly reduces the number of S3 listing operations that will be performed when there is a large number of nested "directories" since we can instead fully enumerate all of the leaf S3 object paths when that's the desired behavior. For instance the ELB access log S3 path structure is described here will generate a separate "directory" per region, per day- which can quickly result in many hundreds of S3 listing calls to recurse through the hierarchy before reaching the actual data files without this improvement.

This change bridges the gap between the BackgroundHiveSplitLoader and the optimized TrinoS3FileSystem#listFiles(Path, boolean recursive) implementation originally contributed in #4825

Documentation

( ) No documentation is needed.
(x) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(x) No release notes entries required.
( ) Release notes entries required with the following suggested text:

@cla-bot cla-bot bot added the cla-signed label May 17, 2022
@pettyjamesm pettyjamesm marked this pull request as ready for review May 17, 2022 21:19
@pettyjamesm pettyjamesm requested a review from sopel39 May 17, 2022 21:19
@sopel39 sopel39 requested review from findepi and ebyhr and removed request for sopel39 May 17, 2022 22:44
@pettyjamesm pettyjamesm requested review from sopel39, findinpath and dain and removed request for dain May 18, 2022 14:12
@findepi findepi requested a review from electrum May 19, 2022 14:10
@findepi
Copy link
Member

findepi commented May 20, 2022

@findinpath @alexjo2144 PTAL

@findinpath
Copy link
Contributor

nit: Use recurisve listing in HiveFileIterator small typo in recurisve

@@ -187,6 +187,14 @@ public RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, org.ap
return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException);
}

@Override
public RemoteIterator<LocatedFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct when we want to list the whole tree under the specified path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we want to list all files underneath the specified path (not including the intermediate "directory" entries). This is ultimately the goal of a "recursive listing" when generating splits in Hive, but comes with a caveat- we no longer can identify and skip sub-listing "hidden" directories, so we have to filter out the files that are listed underneath those hidden paths after the fact.

@pettyjamesm
Copy link
Member Author

nit: Use recurisve listing in HiveFileIterator small typo in recurisve

Woops, fixed.

@findinpath
Copy link
Contributor

findinpath commented May 23, 2022

@pettyjamesm please setup a test containing hive.recursive-directories set to true which runs on both Hadoop & MinIO/AWS S3 in order to add coverage for regressions in the functionality affected by this PR.
I went over the Trino code and couldn' t find any test making use of recursive DFS directory lister.

We need to add these tests in order guard Trino against eventual future regressions.

@sopel39 sopel39 removed their request for review May 23, 2022 15:52
@pettyjamesm
Copy link
Member Author

@findinpath - I might need some help figuring out how to put one of those together, the only existing tests I can see bootstrapping minio components are in the delta lake connector. Is it fine to assume certain bucket names and instance local credentials for the purposes of integrating with S3 in tests?

@findinpath
Copy link
Contributor

@pettyjamesm ideally the class io.trino.plugin.deltalake.util.DockerizedMinioDataLake should be extracted to a common place (e.g. : trino-testing-containers).

With this common building block, we could build integration tests for the all Lakehouse connectors.

Is it fine to assume certain bucket names and instance local credentials for the purposes of integrating with S3 in tests?

Yes, it should be fine.
Follow along the lines what is being done in io.trino.plugin.deltalake.DeltaLakeQueryRunner#createS3DeltaLakeQueryRunner(java.lang.String, java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.util.Map<java.lang.String,java.lang.String>, java.lang.String, io.trino.plugin.deltalake.util.TestingHadoop, java.util.function.Consumer<io.trino.testing.QueryRunner>) for doing the necessary adaptations on Hive on top of MinIO.

Please do take into account the HDFS backed integration tests as well - ideally we'd have common base class containing the tests for the directory walker functionality on which we could eventually add also Azure/ GCS tests later (not in scope of this PR).

Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly nit-picks

One thing we might want to think about is that there may be a lot of duplicates in this cache if a directory is asked for in both recursive and non-recursive listings

catch (TrinoException e) {
if (ignoreAbsentPartitions) {
try {
if (!fileSystem.exists(path)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this ignoreAbsentPartitions && !filesystem.exisists(path) to the top, before constructing the FileStatusIteraor? That way we don't have to rely on Exception handling to resolve the condition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this change is to avoid that extra check for fileSystem.exists(path) if the listing succeeds

return emptyIterator();
}
}
catch (Exception ee) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
catch (Exception ee) {
catch (RuntimeException ee) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to handle any Exception type here, since we're promoting it to a TrinoException with a specific error code and message regardless of the underlying cause.

}

@VisibleForTesting
static boolean containsHiddenPathPartAfterIndex(String pathString, int startFromIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do something like:

stream(pathSubstring.split(String.valueOf(SEPARATOR_CHAR)))
  .anyMatch(name -> ...));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could, but you'd end up generating a lot of extra allocations in a performance sensitive path and wouldn't allow isHiddenFileOrDirectory and isHiddenOrWithinHiddenParentDirectory to both share the same logic.

public DirectoryListingCacheKey(Path path, boolean recursiveFilesOnly)
{
this.path = requireNonNull(path, "path is null");
this.hashCode = path.hashCode() + (recursiveFilesOnly ? 1 : 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just here to precompute the hash code? I'd usually just compute it as needed.

Also we mostly we just do Objects.hash(path, recursiveFilesOnly)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precomputing and storing the hashcode avoids an extra pointer indirection through the path reference, which is significant in terms of latency when, eg: rehashing internal Cache structures which is typically memory access latency bound.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use Objects.hash(path, recursiveFilesOnly) ?
The result is slightly different, because Boolean.hashCode returns either 1231/1237 instead of 0/1, but it is easier to grasp by the maintainers of the code.

Please do add also a comment in the code explainining why the hash code is precomputed.

}
DirectoryListingCacheKey other = (DirectoryListingCacheKey) o;
return recursiveFilesOnly == other.recursiveFilesOnly
&& hashCode == other.hashCode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
&& hashCode == other.hashCode


return status;
}
else if (isHiddenFileOrDirectory(status.getPath())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds redundant with isHiddenOrWithinHiddenParentDirectory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is hidden file or directory only looks at the "name", ie: the last path part and is used when doing shallow listings that don't recurse. When we're donig a recursive listing, we have to check for intermediate hidden folders between the current listing root and the final file name part, but not for shallow listings.

{
private final Path path;
private final int hashCode;
private final boolean recursiveFilesOnly;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe includesRecursiveFiles or includesRecusiveListing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now where the naming you choose comes from : io.trino.plugin.hive.s3.TrinoS3FileSystem.ListingMode#RECURSIVE_FILES_ONLY.

Comment on lines 123 to 126
iterator = fs.listFiles(cacheKey.getPath(), true);
}
else {
iterator = fs.listLocatedStatus(cacheKey.getPath());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not always call fs.listFiles(cachKey.getPath(), cacheKey.recursiveFilesOnly()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.listFiles(Path, boolean recursive) only lists files, not directories (and does so either recursively, or only at the current level), while fs.listLocatedStatus(Path) returns both.

@pettyjamesm
Copy link
Member Author

One thing we might want to think about is that there may be a lot of duplicates in this cache if a directory is asked for in both recursive and non-recursive listings

While that's true in theory, I don't think will actually occur in practice- at least in terms of how the code works today. The only time a directory will be listed recursively is inside of the BackgroundHiveSplitLoader when the HiveConfig(hive.recursive-directories=true). Since this is config driven, for any table and any given cluster- the listings should always be either shallow or recursive.

@@ -27,4 +27,7 @@
{
RemoteIterator<LocatedFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException;

RemoteIterator<LocatedFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, both list operations are listing ONLY files (and not sub-directories).

Please take into consideration changing the method name. e.g. : listRecursively

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s incorrect, the list method will list all contents at the requested path (shallowly) including directories, which is why the HiveFileIterator previously handled recursing through child directories (or failing, or ignoring them as per whatever nested directory policy was set). The new method listFilesRecursively will list only files, at any depth below the requested path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fail to see a constellation in which directories would be returned by the list method with the current NestedDirectoryPolicy values: IGNORED, RECURSE, FAIL.

It would probably make sense to put a separate (preparatory) PR containing the newly added hive test to make sure that there is no regression performed through your latest changes. Just to be on the safe-side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested directory policy previously only affected how the HiveFileIterator handled encountered directories, but had no bearing on the results of the DirectoryLister. Now, when the mode is set to recurse, we call the appropriate DirectoryLister method which calls the corresponding FileSystem#listFiles(Path, boolean recursive=true) method to handle recursing and returning only files (not directories) for us. Effectively, the directory lister now has two listing modes:

  • shallow, files + directories
  • recursive, files only

The only change here is that we’re now letting the FileSystem handle “recursing” into subdirectories to enumerate the leaf “files” for us, which allows some implementations (like TrinoS3FileSystem, which already has an optimized listFiles overriding implementation from a prior PR but wasn’t previously being used by the DirectoryLister) to provide much more efficient implementations.

Specifically, most blob store APIs (like S3) provide a mechanism to directly list all keys starting with a given prefix, regardless of how many / characters are between the prefix and the end of the key, which greatly reduces the number of unnecessary list calls compared to the behavior before that would issue another batch of listings to traverse into each “sub-directory”.

For most other FileSystem implementations, the behavior should be effectively unchanged, because the FileSystem parent class implementation does essentially the same thing that HiveFileIterator was doing before this change: it builds a Deque of RemoteIterator instances and repeatedly calls FileSystem#listLocatedStatus(Path) to recurse into subdirectories.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I was fixed previously on the HiveFileIterator, but noticed now that the discussion is about the DirectoryLister and its concrete implementation.

@findinpath
Copy link
Contributor

This change is specifically targeted at hive.recursive-directories setting hive connector setting.
@pettyjamesm do you have any numbers available to share on how this PR is improving the query times for a fairly large table with and without caching enabled?

@pettyjamesm do you see any timeout related issues related to the recursive listing of a table containing a huge number of files ?

@pettyjamesm
Copy link
Member Author

pettyjamesm commented Jun 9, 2022

This change is specifically targeted at hive.recursive-directories setting hive connector setting. @pettyjamesm do you have any numbers available to share on how this PR is improving the query times for a fairly large table with and without caching enabled?

This greatly depends on the level of nesting and how many files are at the “leaves” at each level, but anecdotally if you have a directory layout like the one I mentioned and linked in the PR description for ELB logs (link)- the difference can be enormous. On the order of queries taking 30+ minutes (traversing through layers and layers of listing calls and producing no splits for most of the query duration) to ~30 seconds (splits generated immediately and all files enumerated using orders of magnitude fewer list API calls).

@pettyjamesm do you see any timeout related issues related to the recursive listing of a table containing a huge number of files ?

No timeouts, although the current behavior without this change does increase the risk of throttling because of the extremely high call volume. Queries just take much much longer when recursive listing is performed against S3 because of inefficient use of the API.

As a worked example, assuming the ELB log layout above (constant-prefix/{region}/{YYYY}/{mm}/{dd}/<file name>)- if we have logs from:

  • 1 region, for one year: that's 379 listObjectsV2 calls to enumerate all files before, and <total file count> / 1000 listings afterward.
  • 2 regions 2 years: would be 1,513 listObjectsV2 calls before, and still <total file count> / 1000 after this change.

@findinpath
Copy link
Contributor

As discussed on Slack, I would find beneficial adding a new test class TestCachingDirectoryListerRecursiveFilesOnly similar to TestCachingDirectoryLister with the query runner having the property:

"hive.recursive-directories", "true"

in order to ensure the accuracy of the new implementation.

@pettyjamesm
Copy link
Member Author

@findepi / @findinpath - can we now consider this ready to merge?

Copy link
Contributor

@findinpath findinpath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code-wise this PR is great.

Functionality-wise it seems to be handling an exotic/not so generic use-case that without proper documentation will be not spotted by the Trino users.

assertUpdate("CREATE TABLE recursive_directories (clicks bigint, day date, country varchar) WITH (format = 'ORC', partitioned_by = ARRAY['day', 'country'])");
assertUpdate("INSERT INTO recursive_directories VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5);

// Replace the partitioned table a new unpartitioned table with the same root location, leaving the data in place
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite interesting.

Without the hive.recursive-directories set to true the reading from such a table would not work at all.

@findepi , @alexjo2144 is this use-case too exotic for trino-hive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitioned table + drop / recreate table unpartitioned is just a convenience for this test. The FileHiveMetastore implementation doesn't allow duplicate tables with the same name, and table names must correspond to their file system location, so I'm using the partitioned table to insert new records into sub-paths, dropping it (without deleting the data) and then creating a new table at the same path location using the same name.

This isn't a typical usage pattern, but it does work to exercise the recursive listing behaviors in combination with caching.

assertUpdate("INSERT INTO recursive_directories VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5);

// Replace the partitioned table a new unpartitioned table with the same root location, leaving the data in place
Table partitionedTable = getTable(TPCH_SCHEMA, "recursive_directories")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing the low-level table dropping and creation, could we create two tables with the same external_location:

  • for writing use the partitioned configuration partitioned_by = ARRAY['day', 'country']
  • for reading use the plain configuration without partioning on which we can use the list recursively files super power

This seems to me (although it screams like an anti-pattern) the way in which such a scenario could be implemented in the real-world.

Please take this note with a grain of salt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do something along those lines, but I couldn't get it work very easily in the way the tests are set up- so I just went with this approach for the purposes of this test. It's not intended to represent a realistic usage pattern outside of this test scenario.

Adds support for directly listing all recursive files in directory
listing and associated caches.
Removes a pre-emptive FileSystem#exists(Path) check before attempting
to list path contents when absent partition errors are configured to
be ignored. Instead, ignoring absent partitions can be done as part
of a check only in the case where partition listing actually fails.

For file systems like S3, checking the existence of a "directory"
already incurs an S3 listing call, which are relatively expensive in
terms of API rate limits and latency.
@pettyjamesm
Copy link
Member Author

@findepi - this should be ready for final approval / merge

@sopel39 sopel39 removed their request for review June 23, 2022 08:37
@arhimondr arhimondr merged commit 726fbd7 into trinodb:master Jun 23, 2022
@github-actions github-actions bot added this to the 388 milestone Jun 23, 2022
@pettyjamesm pettyjamesm deleted the recursive-s3-file-listing branch June 23, 2022 18:06
@findinpath
Copy link
Contributor

@arhimondr / @findepi this change is worthy of having release notes

On the order of queries taking 30+ minutes (traversing through layers and layers of listing calls and producing no splits for most of the query duration) to ~30 seconds (splits generated immediately and all files enumerated using orders of magnitude fewer list API calls).

@pettyjamesm
Copy link
Member Author

pettyjamesm commented Jun 24, 2022

@arhimondr / @findepi this change is worthy of having release notes

Proposed release notes:

Hive Connector:

  • improve efficiency of listing files and generating splits when recursive directory listings are enabled and tables are stored in S3

@colebow
Copy link
Member

colebow commented Jun 27, 2022

@arhimondr / @findepi this change is worthy of having release notes

Proposed release notes:

Hive Connector:

  • improve efficiency of listing files and generating splits when recursive directory listings are enabled and tables are stored in S3

In the future, could you please edit the original PR message to include release notes so it's easier for me to find? I still got here, but it took me a few minutes

@pettyjamesm
Copy link
Member Author

In the future, could you please edit the original PR message to include release notes so it's easier for me to find? I still got here, but it took me a few minutes

Sure thing, sorry about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

6 participants