Skip to content

Commit

Permalink
[FLINK-9028] [yarn] Perform parameters checking before Yarn starting …
Browse files Browse the repository at this point in the history
…cluster

This closes apache#5726.
  • Loading branch information
sihuazhou authored and sampath s committed Jul 26, 2018
1 parent 1c61816 commit f35c42c
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.Preconditions;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -101,46 +102,62 @@ public String toString() {
// ------------------------------------------------------------------------
// Factory
// ------------------------------------------------------------------------

/**
* Computes the parameters to be used to start a TaskManager Java process.
* calcuate cutoff memory size used by container, it will throw an {@link IllegalArgumentException}
* if the config is invalid or return the cutoff value if valid.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
* @return The parameters to start the TaskManager processes with.
*
* @return cutoff memory size used by container.
*/
public static ContaineredTaskManagerParameters create(
Configuration config, long containerMemoryMB, int numSlots)
{
// (1) compute how much memory we subtract from the total memory, to get the Java memory
public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {
Preconditions.checkArgument(containerMemoryMB > 0);

// (1) check cutoff ratio
final float memoryCutoffRatio = config.getFloat(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);

final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}

// (2) check min cutoff value
final int minCutoff = config.getInteger(
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}

// (3) check between heap and off-heap
long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
if (cutoff < minCutoff) {
cutoff = minCutoff;
}
return cutoff;
}

final long javaMemorySizeMB = containerMemoryMB - cutoff;
/**
* Computes the parameters to be used to start a TaskManager Java process.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
* @return The parameters to start the TaskManager processes with.
*/
public static ContaineredTaskManagerParameters create(
Configuration config, long containerMemoryMB, int numSlots)
{
// (1) try to compute how much memory used by container
final long cutoffMB = calculateCutoffMB(config, containerMemoryMB);

// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
// use the cut-off memory for off-heap (that was its intention)
final long offHeapSizeMB = containerMemoryMB - heapSizeMB;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout)
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
if (jobManagerRunners.isEmpty()) {
System.out.println("empty");
log.info("empty");
}
return CompletableFuture.completedFuture(
Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ContaineredTaskManagerParametersTest extends TestLogger {
private static final long CONTAINER_MEMORY = 8192L;
Expand Down Expand Up @@ -91,4 +93,34 @@ public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() {
assertTrue(params.taskManagerHeapSizeMB() +
params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
}

/**
* Test to guard {@link ContaineredTaskManagerParameters#calculateCutoffMB(Configuration, long)}.
*/
@Test
public void testCalculateCutoffMB() throws Exception {

Configuration config = new Configuration();
long containerMemoryMB = 1000;

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.1f);
config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 128);

assertEquals(128,
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));

config.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.2f);
assertEquals(200,
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB));

config.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 1000);

try {
ContaineredTaskManagerParameters.calculateCutoffMB(config, containerMemoryMB);
} catch (IllegalArgumentException expected) {
// we expected it.
return;
}
fail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,4 @@ public void calculateHeapSizeMB() throws Exception {
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
Expand Down Expand Up @@ -409,6 +411,28 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException
}
}

/**
* Method to validate cluster specification before deploy it, it will throw
* an {@link IllegalConfigurationException} if the {@link ClusterSpecification} is invalid.
*/
private void validateClusterSpecification(ClusterSpecification clusterSpecification) {
long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
long cutoff;
try {
// We do the validation by calling the calculation methods here
cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
} catch (IllegalArgumentException cutoffConfigurationInvalidEx) {
throw new IllegalConfigurationException("Configurations related to cutoff checked failed.", cutoffConfigurationInvalidEx);
}

try {
// We do the validation by calling the calculation methods here
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
} catch (IllegalArgumentException heapSizeConfigurationInvalidEx) {
throw new IllegalConfigurationException("Configurations related to heap size checked failed.", heapSizeConfigurationInvalidEx);
}
}

/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
Expand All @@ -423,6 +447,9 @@ protected ClusterClient<ApplicationId> deployInternal(
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {

// ------------------ Check if configuration is valid --------------------
validateClusterSpecification(clusterSpecification);

if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand Down Expand Up @@ -631,7 +632,7 @@ public int run(String[] args) throws CliArgsException, FlinkException {
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + applicationId.getOpt());
"yarn application -kill " + yarnApplicationId);
} else {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ public void testCommandLineClusterSpecification() throws Exception {
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 7331);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);

final int jobManagerMemory = 42;
final int taskManagerMemory = 41;
final int jobManagerMemory = 1337;
final int taskManagerMemory = 7331;
final int slotsPerTaskManager = 30;
final String[] args = {"-yjm", String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), "-ys", String.valueOf(slotsPerTaskManager)};
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
Expand Down Expand Up @@ -91,8 +92,11 @@ public static void tearDownClass() {
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {

final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
flinkConfiguration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
Expand All @@ -101,8 +105,8 @@ public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentExc
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));

ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(-1)
.setTaskManagerMemoryMB(-1)
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(Integer.MAX_VALUE)
.createClusterSpecification();
Expand All @@ -126,6 +130,7 @@ public void testConfigOverwrite() throws ClusterDeploymentException {
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
Expand All @@ -138,8 +143,8 @@ public void testConfigOverwrite() throws ClusterDeploymentException {

// configure slots
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(-1)
.setTaskManagerMemoryMB(-1)
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(1)
.createClusterSpecification();
Expand Down

0 comments on commit f35c42c

Please sign in to comment.