Skip to content

Commit

Permalink
Repurpose --sdk-worker-parallelism to represent the number of sdk wor…
Browse files Browse the repository at this point in the history
…kers per job per TM
  • Loading branch information
Micah Wylde committed Oct 22, 2018
1 parent f12e27b commit 3e520cf
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 44 deletions.
Expand Up @@ -90,9 +90,9 @@ String getFlinkMasterUrl() {
name = "--sdk-worker-parallelism",
usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
)
String sdkWorkerParallelism = PortablePipelineOptions.SDK_WORKER_PARALLELISM_PIPELINE;
private Long sdkWorkerParallelism = 1L;

String getSdkWorkerParallelism() {
public Long getSdkWorkerParallelism() {
return this.sdkWorkerParallelism;
}
}
Expand Down
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.runners.flink.translation.functions;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.runners.core.construction.BeamUrns;
Expand All @@ -34,6 +36,7 @@
import org.apache.beam.runners.fnexecution.environment.LyftPythonEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;

/** Implementation of a {@link FlinkExecutableStageContext}. */
class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
Expand Down Expand Up @@ -71,40 +74,47 @@ public void close() throws Exception {
jobBundleFactory.close();
}

enum ReferenceCountingFactory implements Factory {
REFERENCE_COUNTING;
private static class JobFactoryState {
private final AtomicInteger counter = new AtomicInteger(0);
private final List<ReferenceCountingFlinkExecutableStageContextFactory> factories =
new ArrayList<>();
private final int maxFactories;

private static final ReferenceCountingFlinkExecutableStageContextFactory actualFactory =
ReferenceCountingFlinkExecutableStageContextFactory.create(
FlinkDefaultExecutableStageContext::create);
private JobFactoryState(int maxFactories) {
this.maxFactories = maxFactories;
}

@Override
public FlinkExecutableStageContext get(JobInfo jobInfo) {
return actualFactory.get(jobInfo);
private synchronized FlinkExecutableStageContext.Factory getFactory() {
int count = counter.getAndIncrement();

if (count < maxFactories) {
factories.add(ReferenceCountingFlinkExecutableStageContextFactory
.create(FlinkDefaultExecutableStageContext::create));
}

return factories.get(count % maxFactories);
}
}

enum MultiInstanceFactory implements Factory {
MULTI_INSTANCE;

private static final int MAX_FACTORIES = 16;

private static final AtomicInteger counter = new AtomicInteger();

private static final List<ReferenceCountingFlinkExecutableStageContextFactory> factories =
Collections.synchronizedList(new ArrayList<>());
private static final ConcurrentMap<String, JobFactoryState> jobFactories =
new ConcurrentHashMap<>();

@Override
public FlinkExecutableStageContext get(JobInfo jobInfo) {
synchronized (factories) {
int count = counter.getAndIncrement();
if (count < MAX_FACTORIES) {
factories.add(ReferenceCountingFlinkExecutableStageContextFactory
.create(FlinkDefaultExecutableStageContext::create));
}

return factories.get(count % MAX_FACTORIES).get(jobInfo);
}
JobFactoryState state = jobFactories
.computeIfAbsent(jobInfo.jobId(), k -> {
PortablePipelineOptions portableOptions = PipelineOptionsTranslation
.fromProto(jobInfo.pipelineOptions())
.as(PortablePipelineOptions.class);

return new JobFactoryState(
MoreObjects.firstNonNull(portableOptions.getSdkWorkerParallelism(), 1L).intValue());
});

return state.getFactory().get(jobInfo);
}
}
}
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.runners.flink.translation.functions;

import com.google.common.base.MoreObjects;
import java.io.Serializable;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.MultiInstanceFactory;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;
Expand All @@ -38,13 +40,7 @@ interface Factory extends Serializable {
}

static Factory factory(FlinkPipelineOptions options) {
PortablePipelineOptions portableOptions = options.as(PortablePipelineOptions.class);
if (PortablePipelineOptions.SDK_WORKER_PARALLELISM_STAGE.equals(
portableOptions.getSdkWorkerParallelism())) {
return FlinkDefaultExecutableStageContext.MultiInstanceFactory.MULTI_INSTANCE;
} else {
return FlinkDefaultExecutableStageContext.ReferenceCountingFactory.REFERENCE_COUNTING;
}
return MultiInstanceFactory.MULTI_INSTANCE;
}

StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
Expand Down
Expand Up @@ -73,24 +73,17 @@ public interface PortablePipelineOptions extends PipelineOptions {

void setDefaultEnvironmentConfig(@Nullable String config);

String SDK_WORKER_PARALLELISM_PIPELINE = "pipeline";
String SDK_WORKER_PARALLELISM_STAGE = "stage";

@Description(
"SDK worker/harness process parallelism. Currently supported options are "
+ "<null> (let the runner decide) or '"
+ SDK_WORKER_PARALLELISM_PIPELINE
+ "' (single SDK harness process per pipeline and runner process) or '"
+ SDK_WORKER_PARALLELISM_STAGE
+ "' (separate SDK harness for every executable stage).")
"Sets the number of sdk worker processes that will run on each worker node.")
@Nullable
String getSdkWorkerParallelism();
Long getSdkWorkerParallelism();

void setSdkWorkerParallelism(@Nullable String parallelism);
void setSdkWorkerParallelism(@Nullable Long parallelism);

@Description("Duration in milliseconds for environment cache within a job. 0 means no caching.")
@Default.Integer(0)
int getEnvironmentCacheMillis();

void setEnvironmentCacheMillis(int environmentCacheMillis);
}

0 comments on commit 3e520cf

Please sign in to comment.