Configuration Options


General config properties for this connector.


The connector name.

  • Type: string
  • Importance: high

Class indicating the connector.

  • Type: string
  • Importance: high

Number of tasks the connector is allowed to start.

  • Type: int
  • Importance: high


The number of URIs specified in the connector config will be grouped based on the number of tasks defined. So, if you have just one URI with one task is fine. Otherwise, if you want to improve the performance and process URIs in parallel you should adjust this number based on your requirements.


Comma-separated URIs of the FS(s). They can be URIs pointing directly to a file in the FS and also can be dynamic using expressions for modifying the URIs in runtime. These expressions have the form ${XXX} where XXX represents a pattern from java.time.format.DateTimeFormatter Java class.

  • Type: string
  • Importance: high


If you want to ingest data from dynamic directories, this is, directories created every day and avoiding to add new URIs or look for files from a parent directory, you can include expressions in the URIs to do that. For example, for this URI file:///data/${yyyy}, it will be converted to file:///data/2020 (when executing whe policy).

You can use as many as you like in the URIs, for instance: file:///data/${yyyy}/${MM}/${dd}/${HH}${mm}


If you want to ingest data from S3, you can add credentials with: policy.fs.fs.s3a.access.key=<ACCESS_KEY> and policy.fs.fs.s3a.secret.key=<SECRET_KEY>. Also, in case you want to configure a custom credentials provider, you should use<CLASS> property.


Topic in which copy data to.

  • Type: string
  • Importance: high

Frequency in milliseconds to poll for new data. This config just applies when the policies have ended.

  • Type: int
  • Default: 10000
  • Importance: medium

Policy class to apply (must implement com.github.mmolimar.kafka.connect.fs.policy.Policy interface).

  • Type: string
  • Importance: high

Regular expression to filter files from the FS.

  • Type: string
  • Importance: high

Flag to activate traversed recursion in subdirectories when listing files.

  • Type: boolean
  • Default: false
  • Importance: medium

Number of files that should be handled at a time. Non-positive values disable batching.

  • Type: int
  • Default: 0
  • Importance: medium

Cleanup strategy to use when skipping files. It's possible to move these files to another folder, remove them or do nothing.

  • Type: enum (available values none, move and delete)
  • Default: none
  • Importance: medium

Target directory to move files for the move cleanup strategy. Mandatory just in case of using this strategy.

  • Type: string
  • Importance: medium

Prefix to set to the filename in moved files.

  • Type: string
  • Default: ````
  • Importance: low

This represents custom properties you can include based on the policy class specified.

  • Type: based on the policy.
  • Importance: based on the policy.

Custom properties to use for the FS.

  • Type: based on the FS.
  • Importance: based on the FS.

File reader class to read files from the FS (must implement com.github.mmolimar.kafka.connect.fs.file.reader.FileReader interface).

  • Type: string
  • Importance: high

Number of records to process at a time. Non-positive values disable batching.

  • Type: int
  • Default: 0
  • Importance: medium

This represents custom properties you can include based on the file reader class specified.

  • Type: based on the file reader.
  • Importance: based on the file reader.


Some policies have custom properties to define and others don't. So, depending on the configuration you'll have to take into account their properties.


This policy does not have any additional configuration.


In order to configure custom properties for this policy, the name you must use is sleepy.


Max sleep time (in ms) to wait to look for files in the FS. Once an execution has finished, the policy will sleep during this time to be executed again.

  • Type: long
  • Importance: high

Sleep fraction to divide the sleep time to allow interrupting the policy faster.

  • Type: long
  • Default: 10
  • Importance: medium

Max executions allowed (negative to disable). After exceeding this number, the policy will end. An execution represents: listing files from the FS and its corresponding sleep time.

  • Type: long
  • Default: -1
  • Importance: medium


In order to configure custom properties for this policy, the name you must use is cron.


Cron expression to schedule the policy.

  • Type: string
  • Importance: high

End date to finish the policy with ISO date-time format.

  • Type: date
  • Default: null
  • Importance: medium

HDFS file watcher

In order to configure custom properties for this policy, the name you must use is hdfs_file_watcher.


Time to wait (in milliseconds) until the records retrieved from the file watcher will be sent to the source task.

  • Type: long
  • Default: 5000
  • Importance: medium

Sleep time to retry connections to HDFS in case of connection errors happened.

  • Type: long
  • Default: 20000
  • Importance: medium

S3 event notifications

In order to configure custom properties for this policy, the name you must use is s3_event_notifications.


SQS queue name to retrieve messages from.

  • Type: string
  • Importance: high

Time to wait (in milliseconds) until the records retrieved from the queue will be sent to the source task.

  • Type: long
  • Default: 5000
  • Importance: medium

Regular expression to filter event based on their types.

  • Type: string
  • Default: .*
  • Importance: medium

If messages from SQS should be removed after reading them.

  • Type: boolean
  • Default: true
  • Importance: medium

Maximum number of messages to retrieve at a time (must be between 1 and 10).

  • Type: int
  • Importance: medium

Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.

  • Type: int
  • Importance: low

File readers

Some file readers have custom properties to define and others don't. So, depending on the configuration you'll have to take into account their properties.


In order to configure custom properties for this reader, the name you must use is parquet.


Avro schema in JSON format to use when reading a file.

  • Type: string
  • Importance: medium

Avro schema in JSON format to use for projecting fields from records in a file.

  • Type: string
  • Importance: medium


In order to configure custom properties for this reader, the name you must use is avro.


Avro schema in JSON format to use when reading a file. If not specified, the reader will use the schema defined in the file.

  • Type: string
  • Importance: medium


In order to configure custom properties for this reader, the name you must use is orc.


Use zero-copy when reading a ORC file.

  • Type: boolean
  • Default: false
  • Importance: medium

If reader will skip corrupt data or not. If disabled, an exception will be thrown when there is corrupted data in the file.

  • Type: boolean
  • Default: false
  • Importance: medium


In order to configure custom properties for this reader, the name you must use is sequence.


Custom field name for the output key to include in the Kafka message.

  • Type: string
  • Default: key
  • Importance: medium

Custom field name for the output value to include in the Kafka message.

  • Type: string
  • Default: value
  • Importance: medium

Custom buffer size to read data from the Sequence file.

  • Type: int
  • Default: 4096
  • Importance: low


In order to configure custom properties for this reader, the name you must use is cobol.


The content of the copybook. It is mandatory if property file_reader.cobol.copybook.path is not set.

  • Type: string
  • Default: null
  • Importance: high

Copybook file path in the file system to be used. It is mandatory if property file_reader.cobol.copybook.content is not set.

  • Type: string
  • Default: null
  • Importance: high

If the input data file encoding is EBCDIC, otherwise it is ASCII.

  • Type: boolean
  • Default: true
  • Importance: medium

If line ending characters will be used (LF / CRLF) as the record separator.

  • Type: boolean
  • Default: false
  • Importance: medium

Code page to be used for EBCDIC to ASCII / Unicode conversions.

  • Type: string
  • Default: common
  • Importance: medium

If the input file has 4 byte record length headers.

  • Type: boolean
  • Default: false
  • Importance: medium

Format used for the floating-point numbers.

  • Type: enum (available values ibm, ibm_little_endian, ieee754, and ieee754_little_endian)
  • Default: ibm
  • Importance: medium

Specifies a policy to transform the input schema.

  • Type: enum (available values keep_original and collapse_root)
  • Default: keep_original
  • Importance: medium

The trim to apply for records with string data types.

  • Type: enum (available values both, left, right and none)
  • Default: both
  • Importance: medium

An offset to the start of the record in each binary data block.

  • Type: int
  • Default: 0
  • Importance: medium

An offset from the end of the record to the end of the binary data block.

  • Type: int
  • Default: 0
  • Importance: medium

A number of bytes to skip at the beginning of each file.

  • Type: int
  • Default: 0
  • Importance: medium

A number of bytes to skip at the end of each file.

  • Type: int
  • Default: 0
  • Importance: medium

Custom code page conversion class provided.

  • Type: string
  • Default: null
  • Importance: low

Charset for ASCII data.

  • Type: string
  • Default: ````
  • Importance: low

Flag to consider UTF-16 strings as big-endian.

  • Type: boolean
  • Default: true
  • Importance: low

If true, occurs depending on data size will depend on the number of elements.

  • Type: boolean
  • Default: false
  • Importance: low

Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.

  • Type: int
  • Default: null
  • Importance: low

The name for a field that contains the record length. If not set, the copybook record length will be used.

  • Type: string
  • Default: null
  • Importance: low

If the RDW is big endian.

  • Type: boolean
  • Default: false
  • Importance: low

If the RDW count itself as part of record length itself.

  • Type: boolean
  • Default: false
  • Importance: low

Controls a mismatch between RDW and record length.

  • Type: int
  • Default: 0
  • Importance: low

If the indexing input file before processing is requested.

  • Type: boolean
  • Default: false
  • Importance: low

The number of records to include in each partition.

  • Type: int
  • Default: null
  • Importance: low

A partition size to target.

  • Type: int
  • Default: null
  • Importance: low

Default HDFS block size for the HDFS filesystem used.

  • Type: int
  • Default: null
  • Importance: low

If true the parser will drop all FILLER fields, even GROUP FILLERS that have non-FILLER nested fields.

  • Type: boolean
  • Default: false
  • Importance: low

If true the parser will drop all value FILLER fields.

  • Type: boolean
  • Default: true
  • Importance: low

A comma-separated list of group-type fields to combine and parse as primitive fields.

  • Type: string[]
  • Default: null
  • Importance: low

Specifies if debugging fields need to be added and what should they contain.

  • Type: enum (available values hex, raw and none)
  • Default: none
  • Importance: low

Parser to be used to parse data field record headers.

  • Type: string
  • Default: null
  • Importance: low

Parser to be used to parse records.

  • Type: string
  • Default: null
  • Importance: low

Extra option to be passed to a custom record header parser.

  • Type: string
  • Default: null
  • Importance: low

A string provided for the raw record extractor.

  • Type: string
  • Default: ````
  • Importance: low

A column name to add to each record containing the input file name.

  • Type: string
  • Default: ````
  • Importance: low


There are no extra configuration options for this file reader.


To configure custom properties for this reader, the name you must use is delimited (even though it's for CSV).


Field delimiter.

  • Type: string
  • Default: ,
  • Importance: high

If the file contains header or not.

  • Type: boolean
  • Default: false
  • Importance: high

A comma-separated list of ordered data types for each field in the file. Possible values: byte, short, int, long, float, double, boolean, bytes and string)

  • Type: string[]
  • Default: null
  • Importance: medium

Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be null.

  • Type: boolean
  • Default: true
  • Importance: medium

If the schema supports nullable fields. If file_reader.delimited.settings.data_type_mapping_error config flag is disabled, the value set for this config will be ignored and set to true.

  • Type: boolean
  • Default: false
  • Importance: medium

A comma-separated list of ordered field names to set when reading a file.

  • Type: string[]
  • Default: null
  • Importance: medium

Default value for null values.

  • Type: string
  • Default: null
  • Importance: medium

Default value for empty values (empty values within quotes).

  • Type: string
  • Default: null
  • Importance: medium

Line separator to be used.

  • Type: string
  • Default: \n
  • Importance: medium

Default value for null values.

  • Type: int
  • Default: 512
  • Importance: low

Default value for null values.

  • Type: int
  • Default: 4096
  • Importance: low

Number of rows to skip.

  • Type: long
  • Default: 0
  • Importance: low

If the reader should detect the line separator automatically.

  • Type: boolean
  • Default: false
  • Importance: low

If the reader should detect the delimiter automatically.

  • Type: boolean
  • Default: false
  • Importance: low

Flag to enable/disable skipping leading whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Flag to enable/disable skipping trailing whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Character that represents a line comment at the beginning of a line.

  • Type: char
  • Default: #
  • Importance: low

Flag to enable/disable processing escape sequences in unquoted values.

  • Type: boolean
  • Default: false
  • Importance: low

Character used for escaping values where the field delimiter is part of the value.

  • Type: char
  • Default: "
  • Importance: low

Character used for escaping quotes inside an already quoted value.

  • Type: char
  • Default: "
  • Importance: low

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is delimited (even though it's for TSV).


If the file contains header or not.

  • Type: boolean
  • Default: false
  • Importance: high

A comma-separated list of ordered data types for each field in the file. Possible values: byte, short, int, long, float, double, boolean, bytes and string)

  • Type: string[]
  • Default: null
  • Importance: medium

Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be null.

  • Type: boolean
  • Default: true
  • Importance: medium

If the schema supports nullable fields. If file_reader.delimited.settings.data_type_mapping_error config flag is disabled, the value set for this config will be ignored and set to true.

  • Type: boolean
  • Default: false
  • Importance: medium

A comma-separated list of ordered field names to set when reading a file.

  • Type: string[]
  • Default: null
  • Importance: medium

Default value for null values.

  • Type: string
  • Default: null
  • Importance: medium

Line separator to be used.

  • Type: string
  • Default: \n
  • Importance: medium

Default value for null values.

  • Type: int
  • Default: 512
  • Importance: low

Default value for null values.

  • Type: int
  • Default: 4096
  • Importance: low

Number of rows to skip.

  • Type: long
  • Default: 0
  • Importance: low

If the reader should detect the line separator automatically.

  • Type: boolean
  • Default: false
  • Importance: low

Identifies whether or lines ending with the escape character and followed by a line separator character should be joined with the following line.

  • Type: boolean
  • Default: true
  • Importance: low

Flag to enable/disable skipping leading whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Flag to enable/disable skipping trailing whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Character that represents a line comment at the beginning of a line.

  • Type: char
  • Default: #
  • Importance: low

Character used for escaping special characters.

  • Type: char
  • Default: \
  • Importance: low

Character used to represent an escaped tab.

  • Type: char
  • Default: t
  • Importance: low

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is delimited (even though it's for FixedWidth).


A comma-separated ordered list of integers with the lengths of each field.

  • Type: int[]
  • Importance: high

If the file contains header or not.

  • Type: boolean
  • Default: false
  • Importance: high

A comma-separated list of ordered data types for each field in the file. Possible values: byte, short, int, long, float, double, boolean, bytes and string)

  • Type: string[]
  • Default: null
  • Importance: medium

Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be null.

  • Type: boolean
  • Default: true
  • Importance: medium

If the schema supports nullable fields. If file_reader.delimited.settings.data_type_mapping_error config flag is disabled, the value set for this config will be ignored and set to true.

  • Type: boolean
  • Default: false
  • Importance: medium

A comma-separated list of ordered field names to set when reading a file.

  • Type: string[]
  • Default: null
  • Importance: medium

If the padding character should be kept in each value.

  • Type: boolean
  • Default: false
  • Importance: medium

If headers have the default padding specified.

  • Type: boolean
  • Default: true
  • Importance: medium

Default value for null values.

  • Type: string
  • Default: null
  • Importance: medium

Line separator to be used.

  • Type: boolean
  • Default: true
  • Importance: medium

Line separator to be used.

  • Type: string
  • Default: \n
  • Importance: medium

The padding character used to represent unwritten spaces.

  • Type: char
  • Default: `` ``
  • Importance: medium

Default value for null values.

  • Type: int
  • Default: 512
  • Importance: low

Default value for null values.

  • Type: int
  • Default: 4096
  • Importance: low

If the trailing characters beyond the record's length should be skipped.

  • Type: boolean
  • Default: false
  • Importance: low

Number of rows to skip.

  • Type: long
  • Default: 0
  • Importance: low

If the reader should detect the line separator automatically.

  • Type: boolean
  • Default: false
  • Importance: low

Flag to enable/disable skipping leading whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Flag to enable/disable skipping trailing whitespaces from values.

  • Type: boolean
  • Default: true
  • Importance: low

Character that represents a line comment at the beginning of a line.

  • Type: char
  • Default: #
  • Importance: low

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is json.


If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.

  • Type: boolean
  • Default: true
  • Importance: medium

Deserialization feature to use when reading a JSON file. You can add as much as you like based on the ones defined here.

  • Type: boolean
  • Importance: medium

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is xml.


If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.

  • Type: boolean
  • Default: true
  • Importance: medium

Deserialization feature to use when reading a XML file. You can add as much as you like based on the ones defined here.

  • Type: boolean
  • Importance: medium

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is yaml.


Deserialization feature to use when reading a YAML file. You can add as much as you like based on the ones defined here.

  • Type: boolean
  • Importance: medium

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is text.


If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.

  • Type: boolean
  • Default: true
  • Importance: medium

Custom field name for the output value to include in the Kafka message.

  • Type: string
  • Default: value
  • Importance: medium

Encoding to use for reading a file. If not specified, the reader will use the default encoding.

  • Type: string
  • Default: based on the locale and charset of the underlying operating system.
  • Importance: medium

Compression type to use when reading a file.

  • Type: enum (available values bzip2, gzip and none)
  • Default: none
  • Importance: medium

Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.

  • Type: boolean
  • Default: true
  • Importance: low


To configure custom properties for this reader, the name you must use is agnostic.


A comma-separated string list with the accepted extensions for Parquet files.

  • Type: string[]
  • Default: parquet
  • Importance: medium

A comma-separated string list with the accepted extensions for Avro files.

  • Type: string[]
  • Default: avro
  • Importance: medium

A comma-separated string list with the accepted extensions for ORC files.

  • Type: string[]
  • Default: orc
  • Importance: medium

A comma-separated string list with the accepted extensions for Sequence files.

  • Type: string[]
  • Default: seq
  • Importance: medium

A comma-separated string list with the accepted extensions for Cobol files.

  • Type: string[]
  • Default: dat
  • Importance: medium

A comma-separated string list with the accepted extensions for binary files.

  • Type: string[]
  • Default: bin
  • Importance: medium

A comma-separated string list with the accepted extensions for CSV files.

  • Type: string[]
  • Default: csv
  • Importance: medium

A comma-separated string list with the accepted extensions for TSV files.

  • Type: string[]
  • Default: tsv
  • Importance: medium

A comma-separated string list with the accepted extensions for fixed-width files.

  • Type: string[]
  • Default: fixed
  • Importance: medium

A comma-separated string list with the accepted extensions for JSON files.

  • Type: string[]
  • Default: json
  • Importance: medium

A comma-separated string list with the accepted extensions for XML files.

  • Type: string[]
  • Default: xml
  • Importance: medium

A comma-separated string list with the accepted extensions for YAML files.

  • Type: string[]
  • Default: yaml
  • Importance: medium


The Agnostic reader uses the previous ones as inner readers. So, in case of using this reader, you'll probably need to include also the specified properties for those readers in the connector configuration as well.