Skip to content

Commit

Permalink
[FLINK-9762][yarn] Use local default tmp directories on Yarn and Mesos
Browse files Browse the repository at this point in the history
This closes apache#6284.
  • Loading branch information
Oleksandr Nitavskyi authored and tillrohrmann committed Jul 18, 2018
1 parent e984168 commit a603258
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 58 deletions.
7 changes: 6 additions & 1 deletion docs/_includes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
<td style="word-wrap: break-word;">"child-first"</td>
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
<td><h5>internal.io.tmp.dirs.use-local-default</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>key, which says if default value is used for `io.tmp.dirs` config variable.</td>
</tr>
<tr>
<td><h5>io.tmp.dirs</h5></td>
<td style="word-wrap: break-word;">System.getProperty("java.io.tmpdir")</td>
<td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,32 @@ public Map<String, String> toMap() {
}
}

/**
* Removes given config option from the configuration.
*
* @param configOption config option to remove
* @param <T> Type of the config option
* @return true is config has been removed, false otherwise
*/
public <T> boolean removeConfig(ConfigOption<T> configOption){
synchronized (this.confData){
// try the current key
Object oldValue = this.confData.remove(configOption.key());
if (oldValue == null){
for (String deprecatedKey : configOption.deprecatedKeys()){
oldValue = this.confData.remove(deprecatedKey);
if (oldValue != null){
LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
deprecatedKey, configOption.key());
return true;
}
}
return false;
}
return true;
}
}


// --------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,19 @@ public static String[] getParentFirstLoaderPatterns(Configuration config) {
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*/
@Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
@Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty(\"java.io.tmpdir\") in standalone.")
public static final ConfigOption<String> TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");

/**
* String key, which says if default value is used for `io.tmp.dirs` config variable.
*/
public static final ConfigOption<Boolean> USE_LOCAL_DEFAULT_TMP_DIRS = key("internal.io.tmp.dirs.use-local-default")
.defaultValue(true)
.withDescription("key, which says if default value is used for `io.tmp.dirs` config variable.");

// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ public Map<String, String> toMap() {
return prefixed;
}

@Override
public <T> boolean removeConfig(ConfigOption<T> configOption){
return backingConfig.removeConfig(configOption);
}

@Override
public boolean containsKey(String key) {
return backingConfig.containsKey(prefix + key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -303,4 +304,32 @@ public void testDeprecatedKeys() {
assertEquals(13, cfg.getInteger(matchesThird));
assertEquals(-1, cfg.getInteger(notContained));
}

@Test
public void testRemove(){
Configuration cfg = new Configuration();
cfg.setInteger("a", 1);
cfg.setInteger("b", 2);

ConfigOption<Integer> validOption = ConfigOptions
.key("a")
.defaultValue(-1);

ConfigOption<Integer> deprecatedOption = ConfigOptions
.key("c")
.defaultValue(-1)
.withDeprecatedKeys("d", "b");

ConfigOption<Integer> unexistedOption = ConfigOptions
.key("e")
.defaultValue(-1)
.withDeprecatedKeys("f", "g", "j");

assertEquals("Wrong expectation about size", cfg.keySet().size(), 2);
assertTrue("Expected 'validOption' is removed", cfg.removeConfig(validOption));
assertEquals("Wrong expectation about size", cfg.keySet().size(), 1);
assertTrue("Expected 'existedOption' is removed", cfg.removeConfig(deprecatedOption));
assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
Expand Down Expand Up @@ -173,15 +173,7 @@ public static Configuration loadConfiguration(Configuration dynamicProperties, L
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding Mesos' temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else if (tmpDirs != null) {
log.info("Setting directories for temporary files to: {}", tmpDirs);
configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, tmpDirs);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ public static Configuration generateTaskManagerConfiguration(
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}

return cfg;
if (baseConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
cfg.removeConfig(CoreOptions.TMP_DIRS);
}

return cfg;
}

/**
Expand Down Expand Up @@ -467,4 +471,22 @@ public static String getStartCommand(String template,
}
return template;
}

/**
* Set temporary configuration directories if necessary
*
* @param configuration flink config to patch
* @param defaultDirs in case no tmp directories is set, next directories will be applied
*/
public static void updateTmpDirectoriesInConfiguration(Configuration configuration, String defaultDirs){
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding Fink's temporary file directories with those " +
"specified in the Flink config: {}", configuration.getValue(CoreOptions.TMP_DIRS));
configuration.setBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS, false);
}
else {
LOG.info("Setting directories for temporary files to: {}", defaultDirs);
configuration.setString(CoreOptions.TMP_DIRS, defaultDirs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,23 @@ public void testGetTaskManagerShellCommand() {
true, true, true, this.getClass()));

}

@Test
public void testUpdateTmpDirectoriesInConfiguration(){
Configuration config = new Configuration();

// test that default value is taken
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "default/directory/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");

// test that we ignore default value is value is set before
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "not/default/directory/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "default/directory/path");

//test that empty value is not a magic string
config.setString(CoreOptions.TMP_DIRS, "");
BootstrapTools.updateTmpDirectoriesInConfiguration(config, "some/new/path");
assertEquals(config.getString(CoreOptions.TMP_DIRS), "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -523,16 +522,8 @@ private static Configuration createConfiguration(String baseDirectory, Map<Strin
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
log.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
Expand Down Expand Up @@ -472,14 +473,19 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());

log.debug("TaskManager configuration: {}", flinkConfig);
Configuration taskManagerConfig = flinkConfig.clone();
if (flinkConfig.getBoolean(CoreOptions.USE_LOCAL_DEFAULT_TMP_DIRS)){
taskManagerConfig.removeConfig(CoreOptions.TMP_DIRS);
}

log.debug("TaskManager configuration: {}", taskManagerConfig);

ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig,
taskManagerConfig,
yarnConfig,
env,
taskManagerParameters,
flinkConfig,
taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class,
log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -100,15 +100,7 @@ private static void run(String[] args) {
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
LOG.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -121,15 +121,7 @@ public static Runner create(
final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
LOG.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
LOG.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -130,16 +129,8 @@ public static Configuration loadConfiguration(String workingDirectory, Map<Strin
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
}

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding YARN's temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else {
final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
log.info("Setting directories for temporary files to: {}", localDirs);
configuration.setString(CoreOptions.TMP_DIRS, localDirs);
}
final String localDirs = env.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);

return configuration;
}
Expand Down

0 comments on commit a603258

Please sign in to comment.