Skip to content

Commit

Permalink
Merge pull request #71 from mmolimar/develop
Browse files Browse the repository at this point in the history
Release version 1.1.0
  • Loading branch information
mmolimar committed Jul 6, 2020
2 parents 588d310 + 25890da commit 7ec9293
Show file tree
Hide file tree
Showing 42 changed files with 1,598 additions and 313 deletions.
2 changes: 2 additions & 0 deletions config/kafka-connect-fs.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ topic=mytopic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
policy.regexp=^.*\.txt$
policy.batch_size=0
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader
file_reader.batch_size=0
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

connect-fs:
image: mmolimar/kafka-connect-fs:1.0.0
image: mmolimar/kafka-connect-fs:1.1.0
container_name: connect
depends_on:
- cp-kafka
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
# built documents.
#
# The short X.Y version.
version = '1.0'
version = '1.1'
# The full version, including alpha/beta/rc tags.
release = '1.0'
release = '1.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
51 changes: 47 additions & 4 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ General config properties for this connector.
* Default: ``false``
* Importance: medium

``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: int
* Default: ``0``
* Importance: medium

``policy.<policy_name>.<policy_property>``
This represents custom properties you can include based on the policy class specified.

Expand All @@ -110,6 +117,13 @@ General config properties for this connector.
* Type: string
* Importance: high

``file_reader.batch_size``
Number of records to process at a time. Non-positive values disable batching.

* Type: int
* Default: ``0``
* Importance: medium

``file_reader.<file_reader_name>.<file_reader_property>``
This represents custom properties you can include based on the file reader class specified.

Expand Down Expand Up @@ -243,7 +257,29 @@ In order to configure custom properties for this reader, the name you must use i
* Type: string
* Importance: medium

.. _config_options-filereaders-sequencefile:
.. _config_options-filereaders-orc:

ORC
--------------------------------------------

In order to configure custom properties for this reader, the name you must use is ``orc``.

``file_reader.orc.use_zerocopy``
Use zero-copy when reading a ORC file.

* Type: boolean
* Default: ``false``
* Importance: medium

``file_reader.orc.skip_corrupt_records``
If reader will skip corrupt data or not. If disabled, an exception will be thrown when there is
corrupted data in the file.

* Type: boolean
* Default: ``false``
* Importance: medium

.. _config_options-filereaders-json:

SequenceFile
--------------------------------------------
Expand Down Expand Up @@ -817,7 +853,7 @@ Text

To configure custom properties for this reader, the name you must use is ``text``.

``file_reader.json.record_per_line``
``file_reader.text.record_per_line``
If enabled, the reader will read each line as a record. Otherwise, the reader will read the full
content of the file as a record.

Expand All @@ -839,14 +875,14 @@ To configure custom properties for this reader, the name you must use is ``text`
* Default: based on the locale and charset of the underlying operating system.
* Importance: medium

``file_reader.json.compression.type``
``file_reader.text.compression.type``
Compression type to use when reading a file.

* Type: enum (available values ``bzip2``, ``gzip`` and ``none``)
* Default: ``none``
* Importance: medium

``file_reader.json.compression.concatenated``
``file_reader.text.compression.concatenated``
Flag to specify if the decompression of the reader will finish at the end of the file or after
the first compressed stream.

Expand Down Expand Up @@ -875,6 +911,13 @@ To configure custom properties for this reader, the name you must use is ``agnos
* Default: ``avro``
* Importance: medium

``file_reader.agnostic.extensions.orc``
A comma-separated string list with the accepted extensions for ORC files.

* Type: string
* Default: ``orc``
* Importance: medium

``file_reader.agnostic.extensions.sequence``
A comma-separated string list with the accepted extensions for Sequence files.

Expand Down
6 changes: 5 additions & 1 deletion docs/source/connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Among others, these are some file systems it supports:
* S3.
* Google Cloud Storage.
* Azure Blob Storage & Azure Data Lake Store.
* FTP.
* FTP & SFTP.
* WebHDFS.
* Local File System.
* Hadoop Archive File System.
Expand Down Expand Up @@ -52,7 +52,9 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req
policy.class=<Policy class>
policy.recursive=true
policy.regexp=.*
policy.batch_size=0
file_reader.class=<File reader class>
file_reader.batch_size=0

#. The connector name.
#. Class indicating the connector.
Expand All @@ -65,8 +67,10 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req
``com.github.mmolimar.kafka.connect.fs.policy.Policy`` interface).
#. Flag to activate traversed recursion in subdirectories when listing files.
#. Regular expression to filter files from the FS.
#. Number of files that should be handled at a time. Non-positive values disable batching.
#. File reader class to read files from the FS
(must implement ``com.github.mmolimar.kafka.connect.fs.file.reader.FileReader`` interface).
#. Number of records to process at a time. Non-positive values disable batching.

A more detailed information about these properties can be found :ref:`here<config_options-general>`.

Expand Down
3 changes: 3 additions & 0 deletions docs/source/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ Obviously, this depends of the files in the FS(s) but having several URIs in
the connector might be a good idea to adjust the number of tasks
to process those URIs in parallel ( ``tasks.max`` connector property).

Also, using the properties ``policy.batch_size`` and/or ``file_reader.batch_size``
in case you have tons of files or files too large might help.

**I removed a file from the FS but the connector is still sending messages
with the contents of that file.**

Expand Down
19 changes: 18 additions & 1 deletion docs/source/filereaders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ way as the Avro file reader does.

More information about properties of this file reader :ref:`here<config_options-filereaders-parquet>`.

ORC
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

`ORC files <https://orc.apache.org>`__ are a self-describing type-aware
columnar file format designed for Hadoop workloads.

This reader can process this file format, translating its schema and building
a Kafka message with the content.

.. warning:: If you have ORC files with ``union`` data types, this sort of
data types will be transformed in a ``map`` object in the Kafka message.
The value of each key will be ``fieldN``, where ``N`` represents
the index within the data type.

More information about properties of this file reader :ref:`here<config_options-filereaders-orc>`.

SequenceFile
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -100,13 +116,14 @@ Agnostic
Actually, this reader is a wrapper of the readers listing above.

It tries to read any kind of file format using an internal reader based on the file extension,
applying the proper one (Parquet, Avro, SequenceFile, CSV, TSV or Text). In case of no
applying the proper one (Parquet, Avro, ORC, SequenceFile, CSV, TSV or Text). In case of no
extension has been matched, the Text file reader will be applied.

Default extensions for each format (configurable):

* Parquet: ``.parquet``
* Avro: ``.avro``
* ORC: ``.orc``
* SequenceFile: ``.seq``
* JSON: ``.json``
* CSV: ``.csv``
Expand Down
76 changes: 67 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,68 @@

<groupId>com.github.mmolimar.kafka.connect</groupId>
<artifactId>kafka-connect-fs</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
<packaging>jar</packaging>

<name>kafka-connect-fs</name>
<description>
Kafka Connect FileSystem Connector is a source connector for reading records from different
sort of file formats and from different file system types and load them into Kafka.
</description>
<url>https://github.com/mmolimar/kafka-connect-fs</url>

<licenses>
<license>
<name>Apache License 2.0</name>
<url>https://github.com/mmolimar/kafka-connect-fs/blob/master/LICENSE</url>
<distribution>repo</distribution>
</license>
</licenses>

<developers>
<developer>
<name>Mario Molina</name>
<url>https://github.com/mmolimar</url>
<roles>
<role>Committer</role>
</roles>
</developer>
</developers>

<scm>
<connection>scm:git:https://github.com/mmolimar/kafka-connect-fs.git</connection>
<developerConnection>scm:git:git@github.com:mmolimar/kafka-connect-fs.git</developerConnection>
<url>https://github.com/mmolimar/kafka-connect-fs</url>
<tag>HEAD</tag>
</scm>

<issueManagement>
<system>github</system>
<url>https://github.com/mmolimar/kafka-connect-fs/issues</url>
</issueManagement>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.5.0</kafka.version>
<confluent.version>5.5.0</confluent.version>
<hadoop.version>3.2.1</hadoop.version>
<gcs-connector.version>hadoop3-2.1.2</gcs-connector.version>
<gcs-connector.version>hadoop3-2.1.3</gcs-connector.version>
<parquet.version>1.11.0</parquet.version>
<orc.version>1.6.3</orc.version>
<univocity.version>2.8.4</univocity.version>
<cron-utils.version>9.0.2</cron-utils.version>
<jsch.version>0.1.54</jsch.version>
<junit-jupiter.version>5.6.2</junit-jupiter.version>
<easymock.version>4.2</easymock.version>
<powermock.version>2.0.7</powermock.version>
<maven-compiler.source>1.8</maven-compiler.source>
<maven-compiler.target>${maven-compiler.source}</maven-compiler.target>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-assembly-plugin.version>3.2.0</maven-assembly-plugin.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<maven-jacoco-plugin.version>0.8.5</maven-jacoco-plugin.version>
<maven-coveralls-plugin.version>4.3.0</maven-coveralls-plugin.version>
<maven-surfire-plugin.version>3.0.0-M4</maven-surfire-plugin.version>
<maven-surfire-plugin.version>3.0.0-M5</maven-surfire-plugin.version>
<maven-kafka-connect-plugin.version>0.11.3</maven-kafka-connect-plugin.version>
</properties>

Expand Down Expand Up @@ -74,6 +111,11 @@
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
</dependency>
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
Expand All @@ -84,6 +126,11 @@
<artifactId>cron-utils</artifactId>
<version>${cron-utils.version}</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -192,25 +239,32 @@
<configuration>
<name>kafka-connect-fs</name>
<title>Kafka Connect FileSystem</title>
<documentationUrl>https://kafka-connect-fs.readthedocs.io/</documentationUrl>
<documentationUrl>https://kafka-connect-fs.readthedocs.io</documentationUrl>
<sourceUrl>https://github.com/mmolimar/kafka-connect-fs</sourceUrl>
<description>
Kafka Connect FileSystem Connector is a source connector for reading records from files
in the file systems specified and load them into Kafka.
Kafka Connect FileSystem Connector is a source connector for reading records from
different sort of file formats and from different file system types and load them
into Kafka.
</description>
<sourceUrl>https://github.com/mmolimar/kafka-connect-fs</sourceUrl>

<supportProviderName>Mario Molina</supportProviderName>
<supportSummary>This connector is supported by the open source community.</supportSummary>
<supportUrl>https://github.com/mmolimar/kafka-connect-fs/issues</supportUrl>

<ownerUsername>mmolimar</ownerUsername>
<ownerType>user</ownerType>
<ownerName>Mario Molina</ownerName>
<ownerUrl>https://github.com/mmolimar</ownerUrl>

<dockerNamespace>mmolimar</dockerNamespace>
<dockerName>kafka-connect-fs</dockerName>
<dockerTag>${project.version}</dockerTag>

<componentTypes>
<componentType>source</componentType>
</componentTypes>

<tags>
<tag>filesystem</tag>
<tag>files</tag>
Expand All @@ -221,18 +275,22 @@
<tag>google</tag>
<tag>gcs</tag>
<tag>azure</tag>
<tag>sftp</tag>
<tag>ftp</tag>
<tag>txt</tag>
<tag>csv</tag>
<tag>tsv</tag>
<tag>json</tag>
<tag>avro</tag>
<tag>parquet</tag>
<tag>orc</tag>
<tag>sequence</tag>
</tags>
<requirements/>

<deliveryGuarantee>
<deliveryGuarantee>atLeastOnce</deliveryGuarantee>
</deliveryGuarantee>

<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
</configuration>
</execution>
Expand All @@ -254,7 +312,7 @@
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<enabled>false</enabled>
</snapshots>
<url>http://packages.confluent.io/maven/</url>
</repository>
Expand Down

0 comments on commit 7ec9293

Please sign in to comment.