Skip to content

Commit

Permalink
Use a fixed number of processes and round-robin tasks across them
Browse files Browse the repository at this point in the history
  • Loading branch information
Micah Wylde committed Oct 13, 2018
1 parent 13f6a14 commit b94758f
Showing 1 changed file with 18 additions and 5 deletions.
Expand Up @@ -18,6 +18,10 @@
package org.apache.beam.runners.flink.translation.functions;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
Expand Down Expand Up @@ -83,15 +87,24 @@ public FlinkExecutableStageContext get(JobInfo jobInfo) {
enum MultiInstanceFactory implements Factory {
MULTI_INSTANCE;

private static final ThreadLocal<ReferenceCountingFlinkExecutableStageContextFactory> threadFactories =
ThreadLocal.withInitial(() ->
ReferenceCountingFlinkExecutableStageContextFactory.create(
FlinkDefaultExecutableStageContext::create));
private static final int MAX_FACTORIES = 16;

private static final AtomicInteger counter = new AtomicInteger();

private static final List<ReferenceCountingFlinkExecutableStageContextFactory> factories =
Collections.synchronizedList(new ArrayList<>());

@Override
public FlinkExecutableStageContext get(JobInfo jobInfo) {
return threadFactories.get().get(jobInfo);
synchronized (factories) {
int count = counter.getAndIncrement();
if (count < MAX_FACTORIES) {
factories.add(ReferenceCountingFlinkExecutableStageContextFactory
.create(FlinkDefaultExecutableStageContext::create));
}

return factories.get(count % MAX_FACTORIES).get(jobInfo);
}
}
}
}

0 comments on commit b94758f

Please sign in to comment.