[FLINK-36540][Runtime] Add Support for Hadoop Caller Context when using Flink to operate hdfs. #26681
+154
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
As described in FLINK-36540.
When we use Flink to delete or write or modify files on Hadoop filesystem, callerContext is a helpful feature if we want to trace who did the operation or count how many files an application can create on hadoop filesystem. UGI is not good enough to trace these operations because if we have a tenant who has a lot of jobs writing into HDFS, we cannot find out which job caused the breakdown of HDFS.
I created a new interface and class in flink-core module, so that it will not cause the leak in ThreadLocal value, and it won't influence the situation if we do not use hdfs.
What's more, with this new feature and history json files in history server, we can calculate how many read operations and write operations a Flink application did to hdfs, and find out if there is a pressure or bottleneck to operate on hdfs files.
Brief change log
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
(example:)
I rebuild this project, and test the new jar file in my cluster, it prints out the correct caller context as expected

Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation