Skip to content

Commit

Permalink
[FLINK-2640] [yarn] integrate off-heap configuration
Browse files Browse the repository at this point in the history
This closes apache#1132
  • Loading branch information
mxm authored and Alexander Kolb committed Oct 8, 2015
1 parent 97acb20 commit 1faed06
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,15 @@ public boolean accept(File dir, String name) {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE).
Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." +
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
// between heap and off-heap memory (see {@link ApplicationMasterActor}).
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
content.contains(" (2/2) (attempt #0) to "));
content.contains(expected));

// make sure the detached app is really finished.
LOG.info("Checking again that app has finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@ import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.util.{StandaloneUtils, LeaderRetrievalUtils, EnvironmentInformation}
import org.apache.flink.runtime.util.{StandaloneUtils, EnvironmentInformation}
import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.yarn.Messages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import scala.collection.JavaConversions._


import scala.io.Source

object ApplicationMaster {
import scala.collection.JavaConversions._

val LOG = Logger(getClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ trait ApplicationMasterActor extends FlinkActor {
Try {
log.info("Start yarn session.")
memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)

val memoryLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)

val applicationMasterHost = env.get(Environment.NM_HOST.key)
require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
Expand Down Expand Up @@ -500,7 +501,7 @@ trait ApplicationMasterActor extends FlinkActor {
val hs = ApplicationMaster.hasStreamingMode(env)
containerLaunchContext = Some(
createContainerLaunchContext(
heapLimit,
memoryLimit,
hasLogback,
hasLog4j,
yarnClientUsername,
Expand Down Expand Up @@ -550,7 +551,7 @@ trait ApplicationMasterActor extends FlinkActor {
}

private def createContainerLaunchContext(
heapLimit: Int,
memoryLimit: Int,
hasLogback: Boolean,
hasLog4j: Boolean,
yarnClientUsername: String,
Expand All @@ -561,9 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
log.info("Create container launch context.")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])

val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)

val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
s"-Xmx${heapLimit}m $javaOpts")
s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")

if (hasLogback || hasLog4j) {
tmCommand ++=
Expand Down Expand Up @@ -616,4 +619,48 @@ trait ApplicationMasterActor extends FlinkActor {

ctx
}

/**
* Calculate the correct JVM heap and off-heap memory limits.
* @param memoryLimit The maximum memory in megabytes.
* @param streamingMode True if this is a streaming cluster.
* @return A Tuple2 containing the heap and the offHeap limit in megabytes.
*/
private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {

// The new config entry overrides the old one
val networkBufferSizeOld = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)

val networkBufferSize = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
networkBufferSizeOld)

val numNetworkBuffers = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)

// direct memory for Netty's off-heap buffers
val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1

val useOffHeap = flinkConfiguration.getBoolean(
ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)

if (useOffHeap && !streamingMode){
val fixedOffHeapSize = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
if (fixedOffHeapSize > 0) {
(memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
} else {
val fraction = flinkConfiguration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
val offHeapSize = (fraction * memoryLimit).toLong
(memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
}
} else {
(memoryLimit - networkMemory, networkMemory)
}
}
}

0 comments on commit 1faed06

Please sign in to comment.