title | nav-id | nav-parent_id | nav-pos |
---|---|---|---|
Configuration |
config |
ops |
4 |
For single-node setups Flink is ready to go out of the box and you don't need to change the default configuration to get started.
The out of the box configuration will use your default Java installation. You can manually set the environment variable JAVA_HOME
or the configuration key env.java.home
in conf/flink-conf.yaml
if you want to manually override the Java runtime to use.
This page lists the most common options that are typically needed to set up a well performing (distributed) installation. In addition a full list of all available configuration parameters is listed here.
All configuration is done in conf/flink-conf.yaml
, which is expected to be a flat collection of YAML key value pairs with format key: value
.
The system and run scripts parse the config at startup time. Changes to the configuration file require restarting the Flink JobManager and TaskManagers.
The configuration files for the TaskManagers can be different, Flink does not assume uniform machines in the cluster.
- This will be replaced by the TOC {:toc}
{% include generated/common_section.html %}
HADOOP_CONF_DIR
instead.
These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files
) Files will also be written with default HDFS parameters (block size, replication factor).
-
fs.hdfs.hadoopconf
: The absolute path to the Hadoop File System's (HDFS) configuration directory (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (hdfs:///path/to/files
, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs likehdfs://address:port/path/to/files
. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory. -
fs.hdfs.hdfsdefault
: The absolute path of Hadoop's own configuration file "hdfs-default.xml" (DEFAULT: null). -
fs.hdfs.hdfssite
: The absolute path of Hadoop's own configuration file "hdfs-site.xml" (DEFAULT: null).
{% include generated/core_configuration.html %}
{% include generated/job_manager_configuration.html %}
{% include generated/task_manager_configuration.html %}
For batch jobs (or if taskmanager.memoy.preallocate
is enabled) Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.heap.mb minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
The default fraction for managed memory can be adjusted using the taskmanager.memory.fraction parameter. An absolute value may be set using taskmanager.memory.size (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.
{% include generated/task_manager_memory_configuration.html %}
{% include generated/cluster_configuration.html %}
{% include generated/akka_configuration.html %}
{% include generated/rest_configuration.html %}
{% include generated/blob_server_configuration.html %}
{% include generated/heartbeat_manager_configuration.html %}
{% include generated/security_configuration.html %}
{% include generated/network_environment_configuration.html %}
These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.
{% include generated/network_netty_configuration.html %}
{% include generated/web_configuration.html %}
{% include generated/file_system_configuration.html %}
{% include generated/optimizer_configuration.html %}
{% include generated/algorithm_configuration.html %}
The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, ...)
{% include generated/resource_manager_configuration.html %}
{% include generated/yarn_config_configuration.html %}
{% include generated/mesos_configuration.html %}
{% include generated/mesos_task_manager_configuration.html %}
{% include generated/high_availability_configuration.html %}
{% include generated/high_availability_zookeeper_configuration.html %}
{% include generated/zoo_keeper_configuration.html %}
{% include generated/kerberos_configuration.html %}
{% include generated/environment_configuration.html %}
{% include generated/checkpointing_configuration.html %}
{% include generated/rocks_db_configuration.html %}
Specific RocksDB configurable options, provided by Flink, to create a corresponding ConfigurableOptionsFactory
.
And the created one would be used as default OptionsFactory
in RocksDBStateBackend
unless user define a OptionsFactory
and set via RocksDBStateBackend.setOptions(optionsFactory)
{% include generated/rocks_db_configurable_configuration.html %}
{% include generated/queryable_state_configuration.html %}
{% include generated/metric_configuration.html %}
Certain RocksDB native metrics may be forwarded to Flink's metrics reporter. All native metrics are scoped to operators and then further broken down by column family; values are reported as unsigned longs.
{% include generated/rocks_db_native_metric_configuration.html %}
You have to configure jobmanager.archive.fs.dir
in order to archive terminated jobs and add it to the list of monitored directories via historyserver.archive.fs.dir
if you want to display them via the HistoryServer's web frontend.
jobmanager.archive.fs.dir
: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server viahistoryserver.archive.fs.dir
.
{% include generated/history_server_configuration.html %}
mode
: Execution mode of Flink. Possible values arelegacy
andnew
. In order to start the legacy components, you have to specifylegacy
(DEFAULT:new
).
If you ever see the Exception java.io.IOException: Insufficient number of network buffers
, you
need to adapt the amount of memory used for network buffers in order for your program to run on your
task managers.
Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput.
In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks.
taskmanager.memory.off-heap
. This way, we can pass these buffers directly to the underlying network stack layers.
Previously, the number of network buffers was set manually which became a quite error-prone task (see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for network buffers with the following configuration parameters:
taskmanager.network.memory.fraction
: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),taskmanager.network.memory.min
: Minimum memory size for network buffers (DEFAULT: 64MB),taskmanager.network.memory.max
: Maximum memory size for network buffers (DEFAULT: 1GB), andtaskmanager.memory.segment-size
: Size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32KB).
The required number of buffers on a task manager is total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n with n being a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. Since the intra-node-parallelism is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to
{% highlight plain %} #slots-per-TM^2 * #TMs * 4 {% endhighlight %}
Where #slots per TM
are the number of slots per TaskManager and #TMs
are the total number of task managers.
To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.
Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus allocate roughly 300 MiBytes for network buffers.
The number and size of network buffers can be configured with the following parameters:
taskmanager.network.numberOfBuffers
, andtaskmanager.memory.segment-size
.
Although Flink aims to process as much data in main memory as possible, it is not uncommon that more data needs to be processed than memory is available. Flink's runtime is designed to write temporary data to disk to handle these situations.
The io.tmp.dirs
parameter specifies a list of directories into which Flink writes temporary files. The paths of the directories need to be separated by ':' (colon character). Flink will concurrently write (or read) one temporary file to (from) each configured directory. This way, temporary I/O can be evenly distributed over multiple independent I/O devices such as hard disks to improve performance. To leverage fast I/O devices (e.g., SSD, RAID, NAS), it is possible to specify a directory multiple times.
If the io.tmp.dirs
parameter is not explicitly specified, Flink writes temporary data to the temporary directory of the operating system, such as /tmp in Linux systems.
Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.
Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots
.
When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p
(for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.
{% top %}