New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5724] [STRM-1519] Create a fixed number of sdk_worker processes #10
Conversation
cc @tweise I wanted to send this out internally first for feedback on the overall approach before sending it to beam |
0587bc2
to
de79fcf
Compare
b94758f
to
f0fc51b
Compare
f276448
to
0ffd477
Compare
168a533
to
78cb9d5
Compare
0ffd477
to
c0fcb6f
Compare
@@ -90,9 +90,9 @@ String getFlinkMasterUrl() { | |||
name = "--sdk-worker-parallelism", | |||
usage = "Default parallelism for SDK worker processes (see portable pipeline options)" | |||
) | |||
String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE; | |||
private Long sdkWorkerParallelism = 1L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't change the modifiers (that was a recent change in master that affects several fields).
@Description( | ||
"SDK worker/harness process parallelism. Currently supported options are " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description that explains the option and default is required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added default information
@@ -67,19 +74,47 @@ public void close() throws Exception { | |||
jobBundleFactory.close(); | |||
} | |||
|
|||
enum ReferenceCountingFactory implements Factory { | |||
REFERENCE_COUNTING; | |||
private static class JobFactoryState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't work across jobs because the don't share the same class loader. It shouldn't be necessary because the job factory isn't shared between jobs, but just within a job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the best option is here. It's true that the map that holds these should only ever have a single instance (corresponding to one job). However that's not how the APIs for FlinkExecutableStageContext.Factory are set up. We have to implement a get(JobInfo)
method which could (in theory) get passed different JobInfos.
Since we get the configuration for the JobFactoryState (i.e., the parallelism) from the JobInfo, it's not clear how we'd handle that case without having support for multiple jobs. I think the only other alternative would be to throw an exception if it's called with a different JobInfo (or refactor the Factory interface).
My preference would be to leave the code as is along with a comment explaining this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG. The jobInfo needs to be passed on because it is needed to construct the final context, not to identify the job.
@Nullable | ||
String getSdkWorkerParallelism(); | ||
Long getSdkWorkerParallelism(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a magic value to indicate auto scale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added 0 as an auto-scale value which uses num_cores - 1
.
We should also added test coverage, see ReferenceCountingFlinkExecutableStageContextFactoryTest |
c0fcb6f
to
3e520cf
Compare
Updated for review comments + added a test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. There are some minor check violations such as missing license header that can be dealt with when opening a PR against Beam.
Changes the behavior of --sdk-worker-parallelism=stage to only create a fixed number of sdk workers (currently hard-coded to 16).
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)