Skip to content

Commit

Permalink
[FLINK-9821] Forward dynamic properties to configuration in TaskManag…
Browse files Browse the repository at this point in the history
…erRunner

With this commit we can use dynamic properties to overwrite configuration values in the
TaskManagerRunner.

This closes apache#6318.
  • Loading branch information
tillrohrmann authored and sampath s committed Jul 26, 2018
1 parent 4278082 commit 3d6b84e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
10 changes: 6 additions & 4 deletions flink-dist/src/main/flink-bin/bin/taskmanager.sh
Expand Up @@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"

STARTSTOP=$1

ARGS=("${@:2}")

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
Expand Down Expand Up @@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

# Startup parameters
args=("--configDir" "${FLINK_CONF_DIR}")
ARGS+=("--configDir" "${FLINK_CONF_DIR}")
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}"
exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
Expand All @@ -92,7 +94,7 @@ else
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
done
fi
fi
Expand Up @@ -19,16 +19,20 @@
package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
Expand Down Expand Up @@ -274,11 +278,7 @@ public static void main(String[] args) throws Exception {
LOG.info("Cannot determine the maximum number of open file descriptors");
}

ParameterTool parameterTool = ParameterTool.fromArgs(args);

final String configDir = parameterTool.get("configDir");

final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);
final Configuration configuration = loadConfiguration(args);

try {
FileSystem.initialize(configuration);
Expand All @@ -303,6 +303,23 @@ public Void call() throws Exception {
}
}

private static Configuration loadConfiguration(String[] args) throws FlinkParseException {
final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());

final ClusterConfiguration clusterConfiguration;

try {
clusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse the command line options.", e);
commandLineParser.printHelp();
throw e;
}

final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties);
}

public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);

Expand Down

0 comments on commit 3d6b84e

Please sign in to comment.