diff --git a/config/kafka-connect-fs.properties b/config/kafka-connect-fs.properties index aab1ae6..27b9eb9 100644 --- a/config/kafka-connect-fs.properties +++ b/config/kafka-connect-fs.properties @@ -6,4 +6,6 @@ topic=mytopic policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy policy.recursive=true policy.regexp=^.*\.txt$ +policy.batch_size=0 file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader +file_reader.batch_size=0 diff --git a/docker-compose.yml b/docker-compose.yml index e763372..d03f109 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,7 +45,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' connect-fs: - image: mmolimar/kafka-connect-fs:1.0.0 + image: mmolimar/kafka-connect-fs:1.1.0 container_name: connect depends_on: - cp-kafka diff --git a/docs/source/conf.py b/docs/source/conf.py index f6edf0c..724450a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -55,9 +55,9 @@ # built documents. # # The short X.Y version. -version = '1.0' +version = '1.1' # The full version, including alpha/beta/rc tags. -release = '1.0' +release = '1.1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/source/config_options.rst b/docs/source/config_options.rst index 0a69105..0abb591 100644 --- a/docs/source/config_options.rst +++ b/docs/source/config_options.rst @@ -91,6 +91,13 @@ General config properties for this connector. * Default: ``false`` * Importance: medium +``policy.batch_size`` + Number of files that should be handled at a time. Non-positive values disable batching. + + * Type: int + * Default: ``0`` + * Importance: medium + ``policy..`` This represents custom properties you can include based on the policy class specified. @@ -110,6 +117,13 @@ General config properties for this connector. * Type: string * Importance: high +``file_reader.batch_size`` + Number of records to process at a time. Non-positive values disable batching. + + * Type: int + * Default: ``0`` + * Importance: medium + ``file_reader..`` This represents custom properties you can include based on the file reader class specified. @@ -243,7 +257,29 @@ In order to configure custom properties for this reader, the name you must use i * Type: string * Importance: medium -.. _config_options-filereaders-sequencefile: +.. _config_options-filereaders-orc: + +ORC +-------------------------------------------- + +In order to configure custom properties for this reader, the name you must use is ``orc``. + +``file_reader.orc.use_zerocopy`` + Use zero-copy when reading a ORC file. + + * Type: boolean + * Default: ``false`` + * Importance: medium + +``file_reader.orc.skip_corrupt_records`` + If reader will skip corrupt data or not. If disabled, an exception will be thrown when there is + corrupted data in the file. + + * Type: boolean + * Default: ``false`` + * Importance: medium + +.. _config_options-filereaders-json: SequenceFile -------------------------------------------- @@ -817,7 +853,7 @@ Text To configure custom properties for this reader, the name you must use is ``text``. -``file_reader.json.record_per_line`` +``file_reader.text.record_per_line`` If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record. @@ -839,14 +875,14 @@ To configure custom properties for this reader, the name you must use is ``text` * Default: based on the locale and charset of the underlying operating system. * Importance: medium -``file_reader.json.compression.type`` +``file_reader.text.compression.type`` Compression type to use when reading a file. * Type: enum (available values ``bzip2``, ``gzip`` and ``none``) * Default: ``none`` * Importance: medium -``file_reader.json.compression.concatenated`` +``file_reader.text.compression.concatenated`` Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream. @@ -875,6 +911,13 @@ To configure custom properties for this reader, the name you must use is ``agnos * Default: ``avro`` * Importance: medium +``file_reader.agnostic.extensions.orc`` + A comma-separated string list with the accepted extensions for ORC files. + + * Type: string + * Default: ``orc`` + * Importance: medium + ``file_reader.agnostic.extensions.sequence`` A comma-separated string list with the accepted extensions for Sequence files. diff --git a/docs/source/connector.rst b/docs/source/connector.rst index 476aa7b..948dcb7 100644 --- a/docs/source/connector.rst +++ b/docs/source/connector.rst @@ -15,7 +15,7 @@ Among others, these are some file systems it supports: * S3. * Google Cloud Storage. * Azure Blob Storage & Azure Data Lake Store. -* FTP. +* FTP & SFTP. * WebHDFS. * Local File System. * Hadoop Archive File System. @@ -52,7 +52,9 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req policy.class= policy.recursive=true policy.regexp=.* + policy.batch_size=0 file_reader.class= + file_reader.batch_size=0 #. The connector name. #. Class indicating the connector. @@ -65,8 +67,10 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req ``com.github.mmolimar.kafka.connect.fs.policy.Policy`` interface). #. Flag to activate traversed recursion in subdirectories when listing files. #. Regular expression to filter files from the FS. +#. Number of files that should be handled at a time. Non-positive values disable batching. #. File reader class to read files from the FS (must implement ``com.github.mmolimar.kafka.connect.fs.file.reader.FileReader`` interface). +#. Number of records to process at a time. Non-positive values disable batching. A more detailed information about these properties can be found :ref:`here`. diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 1041bc4..489cde9 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -42,6 +42,9 @@ Obviously, this depends of the files in the FS(s) but having several URIs in the connector might be a good idea to adjust the number of tasks to process those URIs in parallel ( ``tasks.max`` connector property). +Also, using the properties ``policy.batch_size`` and/or ``file_reader.batch_size`` +in case you have tons of files or files too large might help. + **I removed a file from the FS but the connector is still sending messages with the contents of that file.** diff --git a/docs/source/filereaders.rst b/docs/source/filereaders.rst index f887499..d38a5e9 100644 --- a/docs/source/filereaders.rst +++ b/docs/source/filereaders.rst @@ -26,6 +26,22 @@ way as the Avro file reader does. More information about properties of this file reader :ref:`here`. +ORC +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +`ORC files `__ are a self-describing type-aware +columnar file format designed for Hadoop workloads. + +This reader can process this file format, translating its schema and building +a Kafka message with the content. + +.. warning:: If you have ORC files with ``union`` data types, this sort of + data types will be transformed in a ``map`` object in the Kafka message. + The value of each key will be ``fieldN``, where ``N`` represents + the index within the data type. + +More information about properties of this file reader :ref:`here`. + SequenceFile ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -100,13 +116,14 @@ Agnostic Actually, this reader is a wrapper of the readers listing above. It tries to read any kind of file format using an internal reader based on the file extension, -applying the proper one (Parquet, Avro, SequenceFile, CSV, TSV or Text). In case of no +applying the proper one (Parquet, Avro, ORC, SequenceFile, CSV, TSV or Text). In case of no extension has been matched, the Text file reader will be applied. Default extensions for each format (configurable): * Parquet: ``.parquet`` * Avro: ``.avro`` +* ORC: ``.orc`` * SequenceFile: ``.seq`` * JSON: ``.json`` * CSV: ``.csv`` diff --git a/pom.xml b/pom.xml index bd22791..8d1c2dc 100644 --- a/pom.xml +++ b/pom.xml @@ -4,20 +4,57 @@ com.github.mmolimar.kafka.connect kafka-connect-fs - 1.0.0 + 1.1.0 jar kafka-connect-fs + + Kafka Connect FileSystem Connector is a source connector for reading records from different + sort of file formats and from different file system types and load them into Kafka. + + https://github.com/mmolimar/kafka-connect-fs + + + + Apache License 2.0 + https://github.com/mmolimar/kafka-connect-fs/blob/master/LICENSE + repo + + + + + + Mario Molina + https://github.com/mmolimar + + Committer + + + + + + scm:git:https://github.com/mmolimar/kafka-connect-fs.git + scm:git:git@github.com:mmolimar/kafka-connect-fs.git + https://github.com/mmolimar/kafka-connect-fs + HEAD + + + + github + https://github.com/mmolimar/kafka-connect-fs/issues + UTF-8 2.5.0 5.5.0 3.2.1 - hadoop3-2.1.2 + hadoop3-2.1.3 1.11.0 + 1.6.3 2.8.4 9.0.2 + 0.1.54 5.6.2 4.2 2.0.7 @@ -25,10 +62,10 @@ ${maven-compiler.source} 3.2.0 3.8.1 - 3.2.0 + 3.3.0 0.8.5 4.3.0 - 3.0.0-M4 + 3.0.0-M5 0.11.3 @@ -74,6 +111,11 @@ parquet-avro ${parquet.version} + + org.apache.orc + orc-core + ${orc.version} + com.univocity univocity-parsers @@ -84,6 +126,11 @@ cron-utils ${cron-utils.version} + + com.jcraft + jsch + ${jsch.version} + @@ -192,25 +239,32 @@ kafka-connect-fs Kafka Connect FileSystem - https://kafka-connect-fs.readthedocs.io/ + https://kafka-connect-fs.readthedocs.io https://github.com/mmolimar/kafka-connect-fs - Kafka Connect FileSystem Connector is a source connector for reading records from files - in the file systems specified and load them into Kafka. + Kafka Connect FileSystem Connector is a source connector for reading records from + different sort of file formats and from different file system types and load them + into Kafka. + https://github.com/mmolimar/kafka-connect-fs + Mario Molina This connector is supported by the open source community. https://github.com/mmolimar/kafka-connect-fs/issues + mmolimar user Mario Molina https://github.com/mmolimar + mmolimar kafka-connect-fs ${project.version} + source + filesystem files @@ -221,18 +275,22 @@ google gcs azure + sftp + ftp txt csv tsv json avro parquet + orc sequence - + atLeastOnce + true @@ -254,7 +312,7 @@ true - true + false http://packages.confluent.io/maven/ diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceConnector.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceConnector.java index 839477b..3c7c115 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceConnector.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceConnector.java @@ -28,7 +28,7 @@ public String version() { @Override public void start(Map properties) { - log.info("Starting FsSourceConnector..."); + log.info("{} Starting connector...", this); try { config = new FsSourceConnectorConfig(properties); } catch (ConfigException ce) { @@ -59,19 +59,24 @@ public List> taskConfigs(int maxTasks) { taskConfigs.add(taskProps); }); - log.debug("Partitions grouped as: {}", taskConfigs); + log.debug("{} Partitions grouped as: {}", this, taskConfigs); return taskConfigs; } @Override public void stop() { - log.info("Stopping FsSourceConnector."); - //Nothing to do + log.info("{} Stopping FsSourceConnector.", this); + // Nothing to do } @Override public ConfigDef config() { return FsSourceConnectorConfig.conf(); } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java index bb2169a..bb74d5e 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs; import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; +import com.github.mmolimar.kafka.connect.fs.file.reader.AbstractFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; import com.github.mmolimar.kafka.connect.fs.policy.Policy; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; @@ -18,6 +19,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -45,7 +47,7 @@ public String version() { @Override public void start(Map properties) { - log.info("Starting FS source task..."); + log.info("{} Starting FS source task...", this); try { config = new FsSourceTaskConfig(properties); if (config.getClass(FsSourceTaskConfig.POLICY_CLASS).isAssignableFrom(Policy.class)) { @@ -61,43 +63,57 @@ public void start(Map properties) { policy = ReflectionUtils.makePolicy(policyClass, config); pollInterval = config.getInt(FsSourceTaskConfig.POLL_INTERVAL_MS); } catch (ConfigException ce) { - log.error("Couldn't start FsSourceTask.", ce); - throw new ConnectException("Couldn't start FsSourceTask due to configuration error: " + ce.getMessage(), ce); + log.error("{} Couldn't start FS source task: {}", this, ce.getMessage(), ce); + throw new ConnectException("Couldn't start FS source task due to configuration error: " + ce.getMessage(), ce); } catch (Exception e) { - log.error("Couldn't start FsSourceConnector.", e); + log.error("{} Couldn't start FS source task: {}", this, e.getMessage(), e); throw new ConnectException("A problem has occurred reading configuration: " + e.getMessage(), e); } - log.info("FS source task started with policy [{}].", policy.getClass().getName()); + log.info("{} FS source task started with policy [{}].", this, policy.getClass().getName()); } @Override public List poll() { while (!stop.get() && policy != null && !policy.hasEnded()) { - log.trace("Polling for new data..."); + log.trace("{} Polling for new data...", this); + Function> makePartitionKey = (FileMetadata metadata) -> + Collections.singletonMap("path", metadata.getPath()); - List totalRecords = filesToProcess().map(metadata -> { + // Fetch all the offsets upfront to avoid fetching offsets once per file + List filesToProcess = filesToProcess().collect(Collectors.toList()); + List> partitions = filesToProcess.stream().map(makePartitionKey).collect(Collectors.toList()); + Map, Map> offsets = context.offsetStorageReader().offsets(partitions); + + List totalRecords = filesToProcess.stream().map(metadata -> { List records = new ArrayList<>(); - try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) { - log.info("Processing records for file {}.", metadata); + Map partitionKey = makePartitionKey.apply(metadata); + Map offset = Optional.ofNullable(offsets.get(partitionKey)).orElse(new HashMap<>()); + try (FileReader reader = policy.offer(metadata, offset)) { + log.info("{} Processing records for file {}...", this, metadata); while (reader.hasNext()) { - records.add(convert(metadata, reader.currentOffset() + 1, reader.next())); + Struct record = reader.next(); + // TODO change FileReader interface in the next major version + boolean hasNext = (reader instanceof AbstractFileReader) ? + ((AbstractFileReader) reader).hasNextBatch() || reader.hasNext() : reader.hasNext(); + records.add(convert(metadata, reader.currentOffset(), !hasNext, record)); } } catch (ConnectException | IOException e) { - //when an exception happens reading a file, the connector continues - log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e); + // when an exception happens reading a file, the connector continues + log.warn("{} Error reading file [{}]: {}. Keep going...", + this, metadata.getPath(), e.getMessage(), e); } - log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath()); + log.debug("{} Read [{}] records from file [{}].", this, records.size(), metadata.getPath()); return records; }).flatMap(Collection::stream).collect(Collectors.toList()); - log.debug("Returning [{}] records in execution number [{}] for policy [{}].", - totalRecords.size(), policy.getExecutions(), policy.getClass().getName()); + log.debug("{} Returning [{}] records in execution number [{}] for policy [{}].", + this, totalRecords.size(), policy.getExecutions(), policy.getClass().getName()); return totalRecords; } if (pollInterval > 0) { - log.trace("Waiting [{}] ms for next poll.", pollInterval); + log.trace("{} Waiting [{}] ms for next poll.", this, pollInterval); time.sleep(pollInterval); } return null; @@ -108,10 +124,10 @@ private Stream filesToProcess() { return asStream(policy.execute()) .filter(metadata -> metadata.getLen() > 0); } catch (IOException | ConnectException e) { - //when an exception happens executing the policy, the connector continues - log.error("Cannot retrieve files to process from the FS: {}. " + - "There was an error executing the policy but the task tolerates this and continues.", - policy.getURIs(), e); + // when an exception happens executing the policy, the connector continues + log.error("{} Cannot retrieve files to process from the FS: [{}]. " + + "There was an error executing the policy but the task tolerates this and continues: {}", + this, policy.getURIs(), e.getMessage(), e); return Stream.empty(); } } @@ -121,10 +137,14 @@ private Stream asStream(Iterator src) { return StreamSupport.stream(iterable.spliterator(), false); } - private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) { + private SourceRecord convert(FileMetadata metadata, long offset, boolean eof, Struct struct) { return new SourceRecord( Collections.singletonMap("path", metadata.getPath()), - Collections.singletonMap("offset", offset), + new HashMap() {{ + put("offset", offset); + put("file-size", metadata.getLen()); + put("eof", eof); + }}, config.getTopic(), struct.schema(), struct @@ -133,16 +153,21 @@ private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) @Override public void stop() { - log.info("Stopping FS source task..."); + log.info("{} Stopping FS source task...", this); stop.set(true); synchronized (this) { if (policy != null) { try { policy.close(); } catch (IOException ioe) { - log.warn("Error closing policy [{}].", policy.getClass().getName(), ioe); + log.warn("{} Error closing policy: {}", this, ioe.getMessage(), ioe); } } } } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java index 58231fd..f3b56ed 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java @@ -21,18 +21,27 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig { private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS."; private static final String POLICY_REGEXP_DISPLAY = "File filter regex"; + public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size"; + private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching."; + private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch"; + public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs."; public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class"; private static final String FILE_READER_CLASS_DOC = "File reader class to read files from the FS."; private static final String FILE_READER_CLASS_DISPLAY = "File reader class"; + public static final String FILE_READER_BATCH_SIZE = FILE_READER_PREFIX + "batch_size"; + private static final String FILE_READER_BATCH_SIZE_DOC = "Number of records to process at a time. Non-positive values disable batching."; + private static final String FILE_READER_BATCH_SIZE_DISPLAY = "Records per batch"; + public static final String POLL_INTERVAL_MS = "poll.interval.ms"; private static final String POLL_INTERVAL_MS_DOC = "Frequency in ms to poll for new data."; public static final int POLL_INTERVAL_MS_DEFAULT = 10000; private static final String POLL_INTERVAL_MS_DISPLAY = "Poll Interval (ms)"; private static final String POLICY_GROUP = "Policy"; + private static final String FILE_READER_GROUP = "FileReader"; private static final String CONNECTOR_GROUP = "Connector"; public FsSourceTaskConfig(ConfigDef config, Map parsedConfig) { @@ -76,16 +85,36 @@ public static ConfigDef conf() { ++order, ConfigDef.Width.MEDIUM, POLICY_REGEXP_DISPLAY + ).define( + POLICY_BATCH_SIZE, + ConfigDef.Type.INT, + 0, + ConfigDef.Importance.MEDIUM, + POLICY_BATCH_SIZE_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_BATCH_SIZE_DISPLAY ).define( FILE_READER_CLASS, ConfigDef.Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, FILE_READER_CLASS_DOC, - POLICY_GROUP, + FILE_READER_GROUP, ++order, ConfigDef.Width.MEDIUM, FILE_READER_CLASS_DISPLAY + ).define( + FILE_READER_BATCH_SIZE, + ConfigDef.Type.INT, + 0, + ConfigDef.Importance.MEDIUM, + FILE_READER_BATCH_SIZE_DOC, + FILE_READER_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + FILE_READER_BATCH_SIZE_DISPLAY ).define( POLL_INTERVAL_MS, ConfigDef.Type.INT, diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/FileMetadata.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/FileMetadata.java index 669b681..6ea8996 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/FileMetadata.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/FileMetadata.java @@ -1,6 +1,7 @@ package com.github.mmolimar.kafka.connect.fs.file; import java.util.List; +import java.util.Optional; public class FileMetadata { private String path; @@ -8,7 +9,7 @@ public class FileMetadata { private List blocks; public FileMetadata(String path, long length, List blocks) { - this.path = path; + this.path = Optional.ofNullable(path).orElse(""); this.length = length; this.blocks = blocks; } @@ -42,7 +43,7 @@ public boolean equals(Object object) { } public int hashCode() { - return path == null ? 0 : path.hashCode(); + return path.hashCode(); } @@ -57,6 +58,17 @@ public BlockInfo(long offset, long length, boolean corrupt) { this.corrupt = corrupt; } + @Override + public boolean equals(Object object) { + if (this == object) return true; + if (!(object instanceof BlockInfo)) return false; + + BlockInfo blockInfo = (BlockInfo) object; + return this.offset == blockInfo.offset && + this.length == blockInfo.length && + this.corrupt == blockInfo.corrupt; + } + @Override public String toString() { return String.format("[offset = %s, length = %s, corrupt = %s]", offset, length, corrupt); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java index d63283f..c698913 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java @@ -1,5 +1,6 @@ package com.github.mmolimar.kafka.connect.fs.file.reader; +import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Struct; @@ -21,6 +22,8 @@ public abstract class AbstractFileReader implements FileReader { private final FileSystem fs; private final Path filePath; private final ReaderAdapter adapter; + private final int batchSize; + private boolean seeked; private long offset; public AbstractFileReader(FileSystem fs, Path filePath, ReaderAdapter adapter, Map config) { @@ -30,10 +33,12 @@ public AbstractFileReader(FileSystem fs, Path filePath, ReaderAdapter adapter this.fs = fs; this.filePath = filePath; this.adapter = adapter; + this.batchSize = Integer.parseInt(config.getOrDefault(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, "0").toString()); + this.seeked = false; this.offset = 0; configure(readerConfig(config)); - log.trace("Initialized file reader [{}] for file [{}].", getClass().getName(), filePath); + log.trace("{} Initialized file reader with batch size [{}] for file [{}].", this, this.batchSize, this.filePath); } protected final Map readerConfig(Map config) { @@ -54,13 +59,41 @@ public Path getFilePath() { return filePath; } + @Override + public long currentOffset() { + return offset; + } + + protected void incrementOffset() { + offset++; + } + + protected void setOffset(long offset) { + this.offset = offset; + } + + @Override + public final boolean hasNext() { + checkClosed(); + try { + return (batchSize <= 0 || offset == 0 || offset % batchSize != 0 || (offset % batchSize == 0 && seeked)) && + hasNextRecord(); + } catch (ConnectException ce) { + throw ce; + } catch (Exception e) { + throw new ConnectException("Error when checking if the reader has more records.", e); + } + } + @Override public final Struct next() { if (!hasNext()) { throw new NoSuchElementException("There are no more records in file: " + getFilePath()); } try { - return adapter.apply(nextRecord()); + Struct struct = adapter.apply(nextRecord()); + seeked = false; + return struct; } catch (ConnectException ce) { throw ce; } catch (Exception e) { @@ -68,17 +101,23 @@ public final Struct next() { } } - @Override - public long currentOffset() { - return offset; - } - - protected void incrementOffset() { - this.offset++; + public final boolean hasNextBatch() { + checkClosed(); + try { + return batchSize > 0 && hasNextRecord(); + } catch (ConnectException ce) { + throw ce; + } catch (Exception e) { + throw new ConnectException("Error when checking if the reader has more batches.", e); + } } - protected void setOffset(long offset) { - this.offset = offset; + public final void nextBatch() { + if (!hasNextBatch()) { + throw new NoSuchElementException("There are no more batches in file: " + getFilePath()); + } + long batchOffset = offset + (offset % batchSize); + seek(batchOffset); } @Override @@ -89,21 +128,15 @@ public final void seek(long offset) { checkClosed(); try { seekFile(offset); + seeked = true; } catch (IOException ioe) { throw new ConnectException("Error seeking file: " + getFilePath(), ioe); } } @Override - public final boolean hasNext() { - checkClosed(); - try { - return hasNextRecord(); - } catch (ConnectException ce) { - throw ce; - } catch (Exception e) { - throw new ConnectException("Error when checking if the reader has more records.", e); - } + public String toString() { + return this.getClass().getSimpleName(); } protected ReaderAdapter getAdapter() { diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java index 2630762..2d0dbe5 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java @@ -22,6 +22,7 @@ public class AgnosticFileReader extends AbstractFileReader reader; - private Set parquetExtensions, avroExtensions, sequenceExtensions, + private Set parquetExtensions, avroExtensions, sequenceExtensions, orcExtensions, jsonExtensions, csvExtensions, tsvExtensions, fixedExtensions; public AgnosticFileReader(FileSystem fs, Path filePath, Map config) throws Exception { @@ -54,6 +55,8 @@ private AbstractFileReader readerByExtension(FileSystem fs, Path filePat clz = AvroFileReader.class; } else if (sequenceExtensions.contains(extension)) { clz = SequenceFileReader.class; + } else if (orcExtensions.contains(extension)) { + clz = OrcFileReader.class; } else if (jsonExtensions.contains(extension)) { clz = JsonFileReader.class; } else if (csvExtensions.contains(extension)) { @@ -77,6 +80,8 @@ protected void configure(Map config) { .toLowerCase().split(",")).collect(Collectors.toSet()); this.sequenceExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE, "seq") .toLowerCase().split(",")).collect(Collectors.toSet()); + this.orcExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_ORC, "orc") + .toLowerCase().split(",")).collect(Collectors.toSet()); this.jsonExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_JSON, "json") .toLowerCase().split(",")).collect(Collectors.toSet()); this.csvExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_CSV, "csv") diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReader.java index 3db8e3c..fee6324 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReader.java @@ -61,8 +61,15 @@ protected GenericRecord nextRecord() { @Override public void seekFile(long offset) throws IOException { - reader.sync(offset); - setOffset(reader.previousSync() - 16L); + if (offset == currentOffset()) { + return; + } else if (offset < currentOffset()) { + reader.sync(0L); + } + while (super.hasNext() && offset > currentOffset()) { + super.next(); + } + setOffset(offset); } @Override diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java index 518e9f8..a5fe758 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java @@ -11,8 +11,6 @@ public interface FileReader extends Iterator, Closeable { Path getFilePath(); - boolean hasNext(); - Struct next(); void seek(long offset); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java index 3fabc01..f1440b0 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java @@ -47,7 +47,7 @@ public JsonFileReader(FileSystem fs, Path filePath, Map config) if (hasNext()) { String line = inner.nextRecord().getValue(); this.schema = extractSchema(mapper.readTree(line)); - //back to the first line + // back to the first line inner.seek(0); } else { this.schema = SchemaBuilder.struct().build(); @@ -68,7 +68,8 @@ protected void configure(Map config) { mapper.configure(DeserializationFeature.valueOf(feature), Boolean.parseBoolean(entry.getValue())); } else { - log.warn("Ignoring deserialization configuration '{}' due to it does not exist.", feature); + log.warn("{} Ignoring deserialization configuration [{}] due to it does not exist.", + this, feature); } }); } @@ -115,14 +116,8 @@ private static Schema extractSchema(JsonNode jsonNode) { return Schema.OPTIONAL_INT32_SCHEMA; } else if (jsonNode.isLong()) { return Schema.OPTIONAL_INT64_SCHEMA; - } else if (jsonNode.isFloat()) { - return Schema.OPTIONAL_FLOAT32_SCHEMA; - } else if (jsonNode.isDouble()) { - return Schema.OPTIONAL_FLOAT64_SCHEMA; } else if (jsonNode.isBigInteger()) { return Schema.OPTIONAL_INT64_SCHEMA; - } else if (jsonNode.isBigDecimal()) { - return Schema.OPTIONAL_FLOAT64_SCHEMA; } else { return Schema.OPTIONAL_FLOAT64_SCHEMA; } @@ -157,8 +152,10 @@ private Struct toStruct(Schema schema, JsonNode jsonNode) { if (jsonNode.isNull()) return null; Struct struct = new Struct(schema); jsonNode.fields() - .forEachRemaining(field -> struct.put(field.getKey(), - mapValue(struct.schema().field(field.getKey()).schema(), field.getValue()))); + .forEachRemaining(field -> struct.put( + field.getKey(), + mapValue(struct.schema().field(field.getKey()).schema(), field.getValue()) + )); return struct; } @@ -175,14 +172,10 @@ private Object mapValue(Schema schema, JsonNode value) { return value.intValue(); } else if (value.isLong()) { return value.longValue(); - } else if (value.isFloat()) { - return value.floatValue(); - } else if (value.isDouble()) { - return value.doubleValue(); } else if (value.isBigInteger()) { - return value.bigIntegerValue(); + return value.bigIntegerValue().longValue(); } else { - return value.numberValue(); + return value.numberValue().doubleValue(); } case STRING: return value.asText(); @@ -204,7 +197,7 @@ private Object mapValue(Schema schema, JsonNode value) { case ARRAY: Iterable arrayElements = value::elements; return StreamSupport.stream(arrayElements.spliterator(), false) - .map(elm -> mapValue(schema, elm)) + .map(elm -> mapValue(schema.valueSchema(), elm)) .collect(Collectors.toList()); case NULL: case MISSING: diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReader.java new file mode 100644 index 0000000..9db0edd --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReader.java @@ -0,0 +1,277 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; + +public class OrcFileReader extends AbstractFileReader { + + private static final String FILE_READER_ORC = FILE_READER_PREFIX + "orc."; + + public static final String FILE_READER_ORC_USE_ZEROCOPY = FILE_READER_ORC + "use_zerocopy"; + public static final String FILE_READER_ORC_SKIP_CORRUPT_RECORDS = FILE_READER_ORC + "skip_corrupt_records"; + + private final RecordReader reader; + private final VectorizedRowBatch batch; + private final Schema schema; + private final long numberOfRows; + private boolean useZeroCopy; + private boolean skipCorruptRecords; + private int vectorIndex; + private boolean closed; + + public OrcFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, new OrcToStruct(), config); + + Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(fs.getConf())); + Reader.Options options = new Reader.Options(fs.getConf()) + .useZeroCopy(this.useZeroCopy) + .skipCorruptRecords(this.skipCorruptRecords); + this.reader = orcReader.rows(options); + this.numberOfRows = orcReader.getNumberOfRows(); + this.batch = orcReader.getSchema().createRowBatch(); + this.schema = hasNext() ? buildSchema(orcReader.getSchema()) : SchemaBuilder.struct().build(); + this.vectorIndex = 0; + this.closed = false; + } + + @Override + protected void configure(Map config) { + this.useZeroCopy = Boolean.parseBoolean(config.getOrDefault(FILE_READER_ORC_USE_ZEROCOPY, "false")); + this.skipCorruptRecords = Boolean.parseBoolean(config.getOrDefault(FILE_READER_ORC_SKIP_CORRUPT_RECORDS, "false")); + } + + private Schema buildSchema(TypeDescription typeDescription) { + TypeDescription td; + if (typeDescription.getChildren() == null || typeDescription.getChildren().isEmpty()) { + td = TypeDescription.createStruct().addField(typeDescription.getCategory().getName(), typeDescription); + } else { + td = typeDescription; + } + return extractSchema(td); + } + + private Schema extractSchema(TypeDescription typeDescription) { + switch (typeDescription.getCategory()) { + case BOOLEAN: + return Schema.OPTIONAL_BOOLEAN_SCHEMA; + case BYTE: + case CHAR: + return Schema.OPTIONAL_INT8_SCHEMA; + case SHORT: + return Schema.OPTIONAL_INT16_SCHEMA; + case INT: + return Schema.OPTIONAL_INT32_SCHEMA; + case DATE: + case TIMESTAMP: + case TIMESTAMP_INSTANT: + case LONG: + return Schema.OPTIONAL_INT64_SCHEMA; + case FLOAT: + return Schema.OPTIONAL_FLOAT32_SCHEMA; + case DECIMAL: + case DOUBLE: + return Schema.OPTIONAL_FLOAT64_SCHEMA; + case VARCHAR: + case STRING: + return Schema.OPTIONAL_STRING_SCHEMA; + case BINARY: + return Schema.OPTIONAL_BYTES_SCHEMA; + case LIST: + Schema arraySchema = typeDescription.getChildren().stream() + .findFirst().map(this::extractSchema) + .orElse(SchemaBuilder.struct().build()); + return SchemaBuilder.array(arraySchema).build(); + case UNION: + // union data types are mapped as structs with a faked key. + SchemaBuilder mapBuilder = SchemaBuilder.struct(); + IntStream.range(0, typeDescription.getChildren().size()) + .forEach(index -> mapBuilder.field( + "field" + (index + 1), + extractSchema(typeDescription.getChildren().get(index)) + )); + return mapBuilder.build(); + case STRUCT: + SchemaBuilder structBuilder = SchemaBuilder.struct(); + IntStream.range(0, typeDescription.getChildren().size()) + .forEach(index -> + structBuilder.field(typeDescription.getFieldNames().get(index), + extractSchema(typeDescription.getChildren().get(index)))); + return structBuilder.build(); + case MAP: + return SchemaBuilder.map(extractSchema(typeDescription.getChildren().get(0)), + extractSchema(typeDescription.getChildren().get(1))) + .build(); + default: + throw new ConnectException("Data type '" + typeDescription.getCategory() + "' in ORC file " + + "is not supported."); + } + } + + @Override + public boolean hasNextRecord() throws IOException { + if (vectorIndex >= batch.size) { + vectorIndex = 0; + reader.nextBatch(batch); + } + return batch.size > 0; + } + + @Override + protected OrcRecord nextRecord() { + incrementOffset(); + return new OrcRecord(schema, batch, vectorIndex++); + } + + @Override + public void seekFile(long offset) throws IOException { + reader.seekToRow(Math.min(offset, numberOfRows - 1)); + if (offset >= numberOfRows) { + reader.nextBatch(batch); + } + setOffset(offset); + vectorIndex = Integer.MAX_VALUE; + } + + @Override + public void close() throws IOException { + closed = true; + reader.close(); + } + + @Override + public boolean isClosed() { + return closed; + } + + static class OrcToStruct implements ReaderAdapter { + + @Override + public Struct apply(OrcRecord record) { + return toStruct(record.schema, record.batch, record.vectorIndex); + } + + private Struct toStruct(Schema schema, VectorizedRowBatch batch, int vectorIndex) { + Struct struct = new Struct(schema); + IntStream.range(0, schema.fields().size()) + .forEach(index -> { + Field field = schema.fields().get(index); + struct.put(field.name(), mapValue(field.schema(), batch.cols[index], vectorIndex)); + }); + return struct; + } + + private Object mapValue(Schema schema, ColumnVector value, long vectorIndex) { + switch (value.getClass().getSimpleName()) { + case "BytesColumnVector": + BytesColumnVector bytes = ((BytesColumnVector) value); + if (bytes.vector[(int) vectorIndex] == null) { + return null; + } + String content = new String( + bytes.vector[(int) vectorIndex], + bytes.start[(int) vectorIndex], + bytes.length[(int) vectorIndex] + ); + switch (schema.type()) { + case INT8: + return (byte) content.charAt(0); + case BYTES: + return content.getBytes(); + default: + return content; + } + case "DecimalColumnVector": + return ((DecimalColumnVector) value).vector[(int) vectorIndex].doubleValue(); + case "DoubleColumnVector": + if (schema.type() == Schema.Type.FLOAT32) { + return (float) ((DoubleColumnVector) value).vector[(int) vectorIndex]; + } + return ((DoubleColumnVector) value).vector[(int) vectorIndex]; + case "IntervalDayTimeColumnVector": + return ((IntervalDayTimeColumnVector) value).getNanos(0); + case "Decimal64ColumnVector": + case "DateColumnVector": + case "LongColumnVector": + long castedValue = ((LongColumnVector) value).vector[(int) vectorIndex]; + switch (schema.type()) { + case BOOLEAN: + return castedValue != 0; + case INT8: + return (byte) castedValue; + case INT16: + return (short) castedValue; + case INT32: + return (int) castedValue; + default: + return castedValue; + } + case "ListColumnVector": + ListColumnVector list = ((ListColumnVector) value); + return LongStream.range(0, list.lengths[(int) vectorIndex]) + .mapToObj(index -> mapValue(schema.valueSchema(), list.child, list.offsets[(int) vectorIndex])) + .collect(Collectors.toList()); + case "MapColumnVector": + MapColumnVector map = ((MapColumnVector) value); + return Collections.singletonMap( + mapValue(schema.keySchema(), map.keys, map.offsets[(int) vectorIndex]), + mapValue(schema.valueSchema(), map.values, map.offsets[(int) vectorIndex]) + ); + case "UnionColumnVector": + case "StructColumnVector": + ColumnVector[] fields; + if (value instanceof StructColumnVector) { + fields = ((StructColumnVector) value).fields; + } else { + fields = ((UnionColumnVector) value).fields; + } + Struct struct = new Struct(schema); + IntStream.range(0, fields.length) + .forEach(index -> { + String structKey = schema.fields().get(index).name(); + Object structValue = mapValue(schema.fields().get(index).schema(), + fields[index], vectorIndex); + struct.put(structKey, structValue); + }); + return struct; + case "TimestampColumnVector": + return ((TimestampColumnVector) value).time[(int) vectorIndex]; + case "VoidColumnVector": + default: + return null; + } + } + } + + static class OrcRecord { + + private final Schema schema; + private final VectorizedRowBatch batch; + private final int vectorIndex; + + OrcRecord(Schema schema, VectorizedRowBatch batch, int vectorIndex) { + this.schema = schema; + this.batch = batch; + this.vectorIndex = vectorIndex; + } + + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReader.java index 3740db9..ac71056 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReader.java @@ -31,7 +31,6 @@ public class SequenceFileReader extends AbstractFileReader conf .field(keyFieldName, getSchema(this.key)) .field(valueFieldName, getSchema(this.value)) .build(); - this.recordIndex = this.hasNextIndex = -1; this.hasNext = false; this.closed = false; } @@ -82,9 +80,7 @@ Schema getSchema(Writable writable) { @Override public boolean hasNextRecord() throws IOException { try { - if (hasNextIndex == -1 || hasNextIndex == recordIndex) { - hasNextIndex++; - incrementOffset(); + if (!hasNext) { hasNext = reader.next(key, value); } return hasNext; @@ -95,16 +91,24 @@ public boolean hasNextRecord() throws IOException { @Override protected SequenceRecord nextRecord() { - recordIndex++; + incrementOffset(); + hasNext = false; return new SequenceRecord<>(schema, keyFieldName, key, valueFieldName, value); } @Override public void seekFile(long offset) throws IOException { - reader.sync(offset); - hasNextIndex = recordIndex = offset; - hasNext = false; - setOffset(offset - 1); + if (offset == currentOffset()) { + return; + } else if (offset < currentOffset()) { + reader.sync(0L); + hasNext = false; + } + while (super.hasNext() && offset > currentOffset()) { + super.next(); + hasNext = false; + } + setOffset(offset); } @Override diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/TextFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/TextFileReader.java index 56f5581..3f27b35 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/TextFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/TextFileReader.java @@ -118,8 +118,7 @@ public void seekFile(long offset) throws IOException { reader.close(); reader = new LineNumberReader(getFileReader(getFs().open(getFilePath()))); } - while (reader.getLineNumber() < offset) { - reader.readLine(); + while (reader.getLineNumber() < offset && reader.readLine() != null) { } setOffset(reader.getLineNumber()); } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java index d250e76..9d23c04 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java @@ -3,6 +3,7 @@ import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; +import com.github.mmolimar.kafka.connect.fs.util.Iterators; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import com.github.mmolimar.kafka.connect.fs.util.TailCall; import org.apache.hadoop.conf.Configuration; @@ -11,9 +12,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.IllegalWorkerStateException; -import org.apache.kafka.connect.storage.OffsetStorageReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +23,7 @@ import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -36,6 +38,8 @@ abstract class AbstractPolicy implements Policy { private final FsSourceTaskConfig conf; private final AtomicLong executions; private final boolean recursive; + private final int batchSize; + private Iterator> partitions; private boolean interrupted; public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { @@ -44,12 +48,16 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { this.executions = new AtomicLong(0); this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE); this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP)); + this.batchSize = conf.getInt(FsSourceTaskConfig.POLICY_BATCH_SIZE); this.interrupted = false; + this.partitions = Collections.emptyIterator(); Map customConfigs = customConfigs(); logAll(customConfigs); configFs(customConfigs); configPolicy(customConfigs); + + log.info("{} Initialized policy with batch size [{}].", this, batchSize); } private Map customConfigs() { @@ -61,6 +69,7 @@ private Map customConfigs() { private void configFs(Map customConfigs) throws IOException { for (String uri : this.conf.getFsUris()) { Configuration fsConfig = new Configuration(); + fsConfig.set("fs.sftp.impl", "org.apache.hadoop.fs.sftp.SFTPFileSystem"); customConfigs.entrySet().stream() .filter(entry -> entry.getKey().startsWith(FsSourceTaskConfig.POLICY_PREFIX_FS)) .forEach(entry -> fsConfig.set(entry.getKey().replace(FsSourceTaskConfig.POLICY_PREFIX_FS, ""), @@ -96,7 +105,9 @@ private String convert(String uri) { @Override public List getURIs() { List uris = new ArrayList<>(); - fileSystems.forEach(fs -> uris.add(fs.getWorkingDirectory().toString())); + fileSystems.forEach(fs -> + uris.add(Optional.ofNullable(fs.getWorkingDirectory()).orElse(new Path("./")).toString()) + ); return uris; } @@ -105,6 +116,10 @@ public final Iterator execute() throws IOException { if (hasEnded()) { throw new IllegalWorkerStateException("Policy has ended. Cannot be retried."); } + if (partitions.hasNext()) { + return partitions.next(); + } + preCheck(); executions.incrementAndGet(); @@ -115,7 +130,8 @@ public final Iterator execute() throws IOException { postCheck(); - return files; + partitions = Iterators.partition(files, batchSize); + return partitions.hasNext() ? partitions.next() : Collections.emptyIterator(); } @Override @@ -143,8 +159,7 @@ private TailCall hasNextRec() { current = it.next(); return this::hasNextRec; } - if (current.isFile() & - fileRegexp.matcher(current.getPath().getName()).find()) { + if (current.isFile() && fileRegexp.matcher(current.getPath().getName()).find()) { return TailCall.done(true); } current = null; @@ -173,7 +188,10 @@ public FileMetadata next() { @Override public final boolean hasEnded() { - return interrupted || isPolicyCompleted(); + if (interrupted) { + return true; + } + return !partitions.hasNext() && isPolicyCompleted(); } protected abstract boolean isPolicyCompleted(); @@ -183,7 +201,6 @@ public final long getExecutions() { } FileMetadata toMetadata(LocatedFileStatus fileStatus) { - List blocks = Arrays.stream(fileStatus.getBlockLocations()) .map(block -> new FileMetadata.BlockInfo(block.getOffset(), block.getLength(), block.isCorrupt())) .collect(Collectors.toList()); @@ -192,29 +209,52 @@ FileMetadata toMetadata(LocatedFileStatus fileStatus) { } @Override - public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) { + public FileReader offer(FileMetadata metadata, Map offsetMap) { FileSystem current = fileSystems.stream() .filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString())) .findFirst() - .orElse(null); + .orElse(fileSystems.stream().findFirst().orElseThrow(() -> new ConnectException(("Invalid FS.")))); + + Supplier makeReader = () -> ReflectionUtils.makeReader( + (Class) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS), + current, new Path(metadata.getPath()), conf.originals() + ); try { - FileReader reader = ReflectionUtils.makeReader( - (Class) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS), - current, new Path(metadata.getPath()), conf.originals()); - Map partition = Collections.singletonMap("path", metadata.getPath()); - Map offset = offsetStorageReader.offset(partition); - if (offset != null && offset.get("offset") != null) { - log.info("Seeking to offset [{}] for file [{}].", offset.get("offset"), metadata.getPath()); - reader.seek((Long) offset.get("offset")); - } - return reader; + return Optional.ofNullable(offsetMap.get("offset")) + .map(offset -> Long.parseLong(offset.toString())) + .filter(offset -> offset > 0) + .map(offset -> { + long fileSize = Long.parseLong(offsetMap.getOrDefault("file-size", "0").toString()); + boolean eof = Boolean.parseBoolean(offsetMap.getOrDefault("eof", "false").toString()); + if (metadata.getLen() == fileSize && eof) { + log.info("{} Skipping file [{}] due to it was already processed.", this, metadata.getPath()); + return emptyFileReader(new Path(metadata.getPath())); + } else { + log.info("{} Seeking to offset [{}] for file [{}].", + this, offsetMap.get("offset"), metadata.getPath()); + FileReader reader = makeReader.get(); + reader.seek(offset); + return reader; + } + }).orElseGet(makeReader); } catch (Exception e) { throw new ConnectException("An error has occurred when creating reader for file: " + metadata.getPath(), e); } } - private Iterator concat(final Iterator it1, - final Iterator it2) { + @Override + public void close() throws IOException { + for (FileSystem fs : fileSystems) { + fs.close(); + } + } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } + + private Iterator concat(final Iterator it1, final Iterator it2) { return new Iterator() { @Override @@ -229,11 +269,36 @@ public FileMetadata next() { }; } - @Override - public void close() throws IOException { - for (FileSystem fs : fileSystems) { - fs.close(); - } + private FileReader emptyFileReader(Path filePath) { + return new FileReader() { + @Override + public Path getFilePath() { + return filePath; + } + + @Override + public Struct next() { + throw new NoSuchElementException(); + } + + @Override + public void seek(long offset) { + } + + @Override + public long currentOffset() { + return 0; + } + + @Override + public void close() { + } + + @Override + public boolean hasNext() { + return false; + } + }; } private void logAll(Map conf) { diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java index 8d2f0d6..b89923c 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicy.java @@ -64,7 +64,7 @@ protected void configPolicy(Map customConfigs) { "number (long). Got: " + customConfigs.get(HDFS_FILE_WATCHER_POLICY_RETRY_MS)); } this.fsEvenStream = new HashMap<>(); - fileSystems.stream() + this.fileSystems.stream() .filter(fs -> fs.getWorkingDirectory().toString().startsWith(URI_PREFIX)) .forEach(fs -> { try { @@ -89,7 +89,9 @@ public Iterator listFiles(FileSystem fs) { Set files = new HashSet<>(); FileMetadata metadata; while ((metadata = fileQueue.poll()) != null) { - files.add(metadata); + FileMetadata fm = metadata; + files.removeIf(f -> f.getPath().equals(fm.getPath())); + files.add(fm); } return files.iterator(); } @@ -116,6 +118,11 @@ public void close() throws IOException { super.close(); } + @Override + public String toString() { + return this.getClass().getSimpleName(); + } + private class EventStreamThread extends Thread { private final FileSystem fs; private final HdfsAdmin admin; @@ -139,62 +146,75 @@ public void run() { EventBatch batch = eventStream.poll(); if (batch == null) continue; + Set files = new HashSet<>(); for (Event event : batch.getEvents()) { switch (event.getEventType()) { case CREATE: if (!((Event.CreateEvent) event).getPath().endsWith("._COPYING_")) { - enqueue(((Event.CreateEvent) event).getPath()); + files.add(((Event.CreateEvent) event).getPath()); } break; case APPEND: if (!((Event.AppendEvent) event).getPath().endsWith("._COPYING_")) { - enqueue(((Event.AppendEvent) event).getPath()); + files.add(((Event.AppendEvent) event).getPath()); } break; case RENAME: if (((Event.RenameEvent) event).getSrcPath().endsWith("._COPYING_")) { - enqueue(((Event.RenameEvent) event).getDstPath()); + files.add(((Event.RenameEvent) event).getDstPath()); } break; case CLOSE: if (!((Event.CloseEvent) event).getPath().endsWith("._COPYING_")) { - enqueue(((Event.CloseEvent) event).getPath()); + files.add(((Event.CloseEvent) event).getPath()); } break; default: break; } } + enqueue(files); } } catch (IOException ioe) { if (retrySleepMs > 0) { time.sleep(retrySleepMs); } else { - log.warn("Error watching path [{}]. Stopping it...", fs.getWorkingDirectory(), ioe); + log.warn("{} Error watching path [{}]: {}. Stopping it...", + this, fs.getWorkingDirectory(), ioe.getMessage(), ioe); throw new IllegalWorkerStateException(ioe); } } catch (Exception e) { - log.warn("Stopping watcher due to an unexpected exception when watching path [{}].", - fs.getWorkingDirectory(), e); + log.warn("{} Stopping watcher due to an unexpected exception when watching path [{}]: {}", + this, fs.getWorkingDirectory(), e.getMessage(), e); throw new IllegalWorkerStateException(e); } } } - private void enqueue(String path) throws IOException { - Path filePath = new Path(path); - if (!fs.exists(filePath) || fs.getFileStatus(filePath) == null) { - log.info("Cannot enqueue file [{}] because it does not exist but got an event from the FS", filePath); - return; - } + private void enqueue(Set paths) throws IOException { + for (String path : paths) { + Path filePath = new Path(path); + if (!fs.exists(filePath) || fs.getFileStatus(filePath) == null) { + log.info("{} Cannot enqueue file [{}] because it does not exist but got an event from the FS", + this, filePath); + return; + } - log.debug("Enqueuing file to process [{}]", filePath); - RemoteIterator it = fs.listFiles(filePath, false); - while (it.hasNext()) { - LocatedFileStatus status = it.next(); - if (!status.isFile() || !fileRegexp.matcher(status.getPath().getName()).find()) continue; - fileQueue.offer(toMetadata(status)); + RemoteIterator it = fs.listFiles(filePath, false); + while (it.hasNext()) { + LocatedFileStatus status = it.next(); + if (status.isFile() && fileRegexp.matcher(status.getPath().getName()).find()) { + FileMetadata metadata = toMetadata(status); + log.debug("{} Enqueuing file to process [{}].", this, metadata.getPath()); + fileQueue.offer(metadata); + } + } } } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } } } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/Policy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/Policy.java index 370288f..c53e2c0 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/Policy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/Policy.java @@ -8,12 +8,13 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; public interface Policy extends Closeable { Iterator execute() throws IOException; - FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) throws IOException; + FileReader offer(FileMetadata metadata, Map offset) throws IOException; boolean hasEnded(); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicy.java index 2a02884..32c6e12 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicy.java @@ -33,28 +33,22 @@ protected void configPolicy(Map customConfigs) { try { this.sleep = Long.parseLong((String) customConfigs.get(SLEEPY_POLICY_SLEEP_MS)); } catch (NumberFormatException nfe) { - throw new ConfigException(SLEEPY_POLICY_SLEEP_MS + " property is required and must be a number(long). Got: " + + throw new ConfigException(SLEEPY_POLICY_SLEEP_MS + " property is required and must be a number (long). Got: " + customConfigs.get(SLEEPY_POLICY_SLEEP_MS)); } - if (customConfigs.get(SLEEPY_POLICY_MAX_EXECS) != null) { - try { - this.maxExecs = Long.parseLong((String) customConfigs.get(SLEEPY_POLICY_MAX_EXECS)); - } catch (NumberFormatException nfe) { - throw new ConfigException(SLEEPY_POLICY_MAX_EXECS + " property must be a number(long). Got: " + - customConfigs.get(SLEEPY_POLICY_MAX_EXECS)); - } - } else { - this.maxExecs = DEFAULT_MAX_EXECS; + try { + this.maxExecs = Long.parseLong((String) customConfigs.getOrDefault(SLEEPY_POLICY_MAX_EXECS, + String.valueOf(DEFAULT_MAX_EXECS))); + } catch (NumberFormatException nfe) { + throw new ConfigException(SLEEPY_POLICY_MAX_EXECS + " property must be a number (long). Got: " + + customConfigs.get(SLEEPY_POLICY_MAX_EXECS)); } - if (customConfigs.get(SLEEPY_POLICY_SLEEP_FRACTION) != null) { - try { - this.sleepFraction = Long.parseLong((String) customConfigs.get(SLEEPY_POLICY_SLEEP_FRACTION)); - } catch (NumberFormatException nfe) { - throw new ConfigException(SLEEPY_POLICY_SLEEP_FRACTION + " property must be a number(long). Got: " + - customConfigs.get(SLEEPY_POLICY_SLEEP_FRACTION)); - } - } else { - this.sleepFraction = DEFAULT_SLEEP_FRACTION; + try { + this.sleepFraction = Long.parseLong((String) customConfigs.getOrDefault(SLEEPY_POLICY_SLEEP_FRACTION, + String.valueOf(DEFAULT_SLEEP_FRACTION))); + } catch (NumberFormatException nfe) { + throw new ConfigException(SLEEPY_POLICY_SLEEP_FRACTION + " property must be a number (long). Got: " + + customConfigs.get(SLEEPY_POLICY_SLEEP_FRACTION)); } } @@ -71,7 +65,7 @@ private void sleepIfApply() { Thread.sleep(sleep / sleepFraction); counter++; } catch (InterruptedException ie) { - log.warn("An interrupted exception has occurred.", ie); + log.warn("{} An interrupted exception has occurred when sleeping: {}", this, ie.getMessage(), ie); } } } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Iterators.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Iterators.java new file mode 100644 index 0000000..3cbf32d --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Iterators.java @@ -0,0 +1,31 @@ +package com.github.mmolimar.kafka.connect.fs.util; + +import java.util.*; + +public class Iterators { + + public static Iterator> partition(Iterator it, int size) { + if (size <= 0) { + return Collections.singletonList(it).iterator(); + } + + return new Iterator>() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Iterator next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List elements = new ArrayList<>(size); + while (it.hasNext() && elements.size() < size) { + elements.add(it.next()); + } + return elements.iterator(); + } + }; + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java index 04fa75c..9473837 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java @@ -27,8 +27,8 @@ public static Policy makePolicy(Class clazz, FsSourceTaskConfi private static T make(Class clazz, Object... args) { try { Class[] constClasses = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new); - Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses); + return constructor.newInstance(args); } catch (IllegalAccessException | InstantiationException | diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Version.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Version.java index 7e94e04..1807428 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Version.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/Version.java @@ -15,7 +15,7 @@ public class Version { props.load(Version.class.getResourceAsStream("/kafka-connect-fs-version.properties")); version = props.getProperty("version", version).trim(); } catch (Exception e) { - log.warn("Error while loading version:", e); + log.warn("Error while loading version: {}", e.getMessage(), e); } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java index ab44e27..3c01e86 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java @@ -181,4 +181,25 @@ public String getFileExtension() { } } + @Nested + class AgnosticOrcFileReaderTest extends OrcFileReaderTest { + + @Override + protected Map getReaderConfig() { + Map config = super.getReaderConfig(); + config.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_ORC, getFileExtension()); + return config; + } + + @Override + public Class getReaderClass() { + return AgnosticFileReader.class; + } + + @Override + public String getFileExtension() { + return FILE_EXTENSION; + } + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReaderTest.java index 5e9d59e..8c04de7 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AvroFileReaderTest.java @@ -55,7 +55,7 @@ protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throw datum.put(FIELD_NAME, String.format("%d_name_%s", index, UUID.randomUUID())); datum.put(FIELD_SURNAME, String.format("%d_surname_%s", index, UUID.randomUUID())); try { - fsConfig.offsetsByIndex().put(index, dataFileWriter.sync() - 16L); + fsConfig.offsetsByIndex().put(index, (long) index); dataFileWriter.append(datum); } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -123,7 +123,7 @@ protected Map getReaderConfig() { @Override protected void checkData(Struct record, long index) { assertAll( - () -> assertEquals((int) (Integer) record.get(FIELD_INDEX), index), + () -> assertEquals(index, (int) record.get(FIELD_INDEX)), () -> assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")), () -> assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")) ); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java index f21cf49..7b6e725 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java @@ -1,5 +1,6 @@ package com.github.mmolimar.kafka.connect.fs.file.reader; +import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; @@ -60,7 +61,7 @@ public void closeReader() { try { fsConfig.getReader().close(); } catch (Exception e) { - //ignoring + // ignoring } } } @@ -146,10 +147,33 @@ public void readAllData(ReaderFsTestConfig fsConfig) { assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataInBatches(ReaderFsTestConfig fsConfig) { + Map config = getReaderConfig(); + int batchSize = 5; + config.put(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, batchSize); + AbstractFileReader reader = (AbstractFileReader) getReader(fsConfig.getFs(), fsConfig.getDataFile(), config); + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNextBatch()) { + reader.nextBatch(); + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + assertEquals(0, recordCount % batchSize); + } + assertThrows(NoSuchElementException.class, reader::nextBatch); + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void seekFile(ReaderFsTestConfig fsConfig) { - FileReader reader = fsConfig.getReader(); + FileReader reader = getReader(fsConfig.getFs(), fsConfig.getDataFile(), getReaderConfig()); int recordIndex = NUM_RECORDS / 2; reader.seek(fsConfig.offsetsByIndex().get(recordIndex)); assertTrue(reader.hasNext()); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java index 98e7e5b..f7f6da0 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.hadoop.fs.Path; @@ -15,11 +16,9 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.math.BigInteger; import java.nio.charset.UnsupportedCharsetException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; @@ -27,11 +26,14 @@ public class JsonFileReaderTest extends FileReaderTestBase { private static final String FIELD_INTEGER = "integerField"; + private static final String FIELD_BIG_INTEGER = "bigIntegerField"; private static final String FIELD_LONG = "longField"; private static final String FIELD_BOOLEAN = "booleanField"; private static final String FIELD_STRING = "stringField"; private static final String FIELD_DECIMAL = "decimalField"; - private static final String FIELD_ARRAY = "arrayField"; + private static final String FIELD_BINARY = "binaryField"; + private static final String FIELD_ARRAY_SIMPLE = "arraySimpleField"; + private static final String FIELD_ARRAY_COMPLEX = "arrayComplexField"; private static final String FIELD_STRUCT = "structField"; private static final String FIELD_NULL = "nullField"; private static final String FILE_EXTENSION = "jsn"; @@ -48,14 +50,31 @@ protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throw IntStream.range(0, numRecords).forEach(index -> { ObjectNode json = JsonNodeFactory.instance.objectNode() .put(FIELD_INTEGER, index) + .put(FIELD_BIG_INTEGER, new BigInteger("9999999999999999999")) .put(FIELD_LONG, Long.MAX_VALUE) .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) .put(FIELD_BOOLEAN, true) .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_BINARY, "test".getBytes()) .put(FIELD_NULL, (String) null); - json.putArray(FIELD_ARRAY) + json.putArray(FIELD_ARRAY_SIMPLE) .add("elm[" + index + "]") .add("elm[" + (index + 1) + "]"); + ArrayNode array = json.putArray(FIELD_ARRAY_COMPLEX); + array.addObject() + .put(FIELD_INTEGER, index) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_NULL, (String) null); + array.addObject() + .put(FIELD_INTEGER, index + 1) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_NULL, (String) null); json.putObject(FIELD_STRUCT) .put(FIELD_INTEGER, (short) index) .put(FIELD_LONG, Long.MAX_VALUE) @@ -181,21 +200,40 @@ protected Map getReaderConfig() { @Override protected void checkData(Struct record, long index) { + List array = record.getArray(FIELD_ARRAY_COMPLEX); Struct subrecord = record.getStruct(FIELD_STRUCT); assertAll( - () -> assertEquals((int) (Integer) record.get(FIELD_INTEGER), index), - () -> assertEquals((long) (Long) record.get(FIELD_LONG), Long.MAX_VALUE), + () -> assertEquals(index, (int) record.get(FIELD_INTEGER)), + () -> assertEquals(new BigInteger("9999999999999999999").longValue(), record.get(FIELD_BIG_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) record.get(FIELD_LONG)), () -> assertTrue(record.get(FIELD_STRING).toString().startsWith(index + "_")), () -> assertTrue(Boolean.parseBoolean(record.get(FIELD_BOOLEAN).toString())), - () -> assertEquals((Double) record.get(FIELD_DECIMAL), Double.parseDouble(index + "." + index), 0), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) record.get(FIELD_DECIMAL), 0), () -> assertNull(record.get(FIELD_NULL)), () -> assertNotNull(record.schema().field(FIELD_NULL)), - () -> assertEquals(record.get(FIELD_ARRAY), Arrays.asList("elm[" + index + "]", "elm[" + (index + 1) + "]")), - () -> assertEquals((int) (Integer) subrecord.get(FIELD_INTEGER), index), - () -> assertEquals((long) (Long) subrecord.get(FIELD_LONG), Long.MAX_VALUE), + () -> assertEquals("dGVzdA==", record.get(FIELD_BINARY)), + () -> assertEquals(Arrays.asList("elm[" + index + "]", "elm[" + (index + 1) + "]"), record.get(FIELD_ARRAY_SIMPLE)), + + () -> assertEquals(index, (int) array.get(0).get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) array.get(0).get(FIELD_LONG)), + () -> assertTrue(array.get(0).get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(0).get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) array.get(0).get(FIELD_DECIMAL), 0), + () -> assertNull(array.get(0).get(FIELD_NULL)), + () -> assertNotNull(array.get(0).schema().field(FIELD_NULL)), + () -> assertEquals(index + 1, (int) array.get(1).get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) array.get(1).get(FIELD_LONG)), + () -> assertTrue(array.get(1).get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(1).get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) array.get(1).get(FIELD_DECIMAL), 0), + () -> assertNull(array.get(1).get(FIELD_NULL)), + () -> assertNotNull(array.get(1).schema().field(FIELD_NULL)), + + () -> assertEquals(index, (int) subrecord.get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) subrecord.get(FIELD_LONG)), () -> assertTrue(subrecord.get(FIELD_STRING).toString().startsWith(index + "_")), () -> assertTrue(Boolean.parseBoolean(subrecord.get(FIELD_BOOLEAN).toString())), - () -> assertEquals((Double) subrecord.get(FIELD_DECIMAL), Double.parseDouble(index + "." + index), 0), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) subrecord.get(FIELD_DECIMAL), 0), () -> assertNull(subrecord.get(FIELD_NULL)), () -> assertNotNull(subrecord.schema().field(FIELD_NULL)) ); diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReaderTest.java new file mode 100644 index 0000000..a34a7de --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/OrcFileReaderTest.java @@ -0,0 +1,268 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.kafka.connect.data.Struct; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class OrcFileReaderTest extends FileReaderTestBase { + + private static final String FIELD_INTEGER = "integerField"; + private static final String FIELD_LONG = "longField"; + private static final String FIELD_BOOLEAN = "booleanField"; + private static final String FIELD_STRING = "stringField"; + private static final String FIELD_DECIMAL = "decimalField"; + private static final String FIELD_ARRAY = "arrayField"; + private static final String FIELD_MAP = "mapField"; + private static final String FIELD_STRUCT = "structField"; + private static final String FIELD_UNION = "unionField"; + private static final String FIELD_EMPTY = "emptyField"; + private static final String FIELD_BYTE = "byteField"; + private static final String FIELD_CHAR = "charField"; + private static final String FIELD_SHORT = "shortField"; + private static final String FIELD_FLOAT = "floatField"; + private static final String FIELD_BINARY = "binaryField"; + private static final String FIELD_DATE = "dateField"; + private static final String FIELD_TIMESTAMP = "timestampField"; + private static final String FIELD_TIMESTAMP_INSTANT = "timestampInstantField"; + private static final String FILE_EXTENSION = "rc"; + + @Override + protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + int numRecords = args.length < 1 ? NUM_RECORDS : (int) args[0]; + File orcFile = File.createTempFile("test-", "." + getFileExtension()); + + TypeDescription schema = TypeDescription.createStruct(); + schema.addField(FIELD_INTEGER, TypeDescription.createInt()); + schema.addField(FIELD_LONG, TypeDescription.createLong()); + schema.addField(FIELD_STRING, TypeDescription.createString()); + schema.addField(FIELD_BOOLEAN, TypeDescription.createBoolean()); + schema.addField(FIELD_DECIMAL, TypeDescription.createDecimal()); + schema.addField(FIELD_EMPTY, TypeDescription.createVarchar()); + schema.addField(FIELD_BYTE, TypeDescription.createByte()); + schema.addField(FIELD_CHAR, TypeDescription.createChar()); + schema.addField(FIELD_SHORT, TypeDescription.createShort()); + schema.addField(FIELD_FLOAT, TypeDescription.createFloat()); + schema.addField(FIELD_BINARY, TypeDescription.createBinary()); + schema.addField(FIELD_DATE, TypeDescription.createDate()); + schema.addField(FIELD_TIMESTAMP, TypeDescription.createTimestamp()); + schema.addField(FIELD_TIMESTAMP_INSTANT, TypeDescription.createTimestampInstant()); + schema.addField(FIELD_ARRAY, TypeDescription.createList(TypeDescription.createString())); + schema.addField(FIELD_MAP, TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createString())); + TypeDescription struct = TypeDescription.createStruct(); + struct.addField(FIELD_INTEGER, TypeDescription.createInt()); + struct.addField(FIELD_LONG, TypeDescription.createLong()); + struct.addField(FIELD_STRING, TypeDescription.createString()); + struct.addField(FIELD_BOOLEAN, TypeDescription.createBoolean()); + struct.addField(FIELD_DECIMAL, TypeDescription.createDecimal()); + struct.addField(FIELD_EMPTY, TypeDescription.createVarchar()); + schema.addField(FIELD_STRUCT, struct); + TypeDescription union = TypeDescription.createUnion(); + union.addUnionChild(TypeDescription.createInt()); + schema.addField(FIELD_UNION, union); + + Properties props = new Properties(); + props.put(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true"); + OrcFile.WriterOptions opts = OrcFile.writerOptions(props, fsConfig.getFs().getConf()).setSchema(schema); + try (Writer writer = OrcFile.createWriter(new Path(orcFile.toURI()), opts)) { + VectorizedRowBatch batch = schema.createRowBatch(numRecords); + LongColumnVector f1 = (LongColumnVector) batch.cols[0]; + LongColumnVector f2 = (LongColumnVector) batch.cols[1]; + BytesColumnVector f3 = (BytesColumnVector) batch.cols[2]; + LongColumnVector f4 = (LongColumnVector) batch.cols[3]; + DecimalColumnVector f5 = (DecimalColumnVector) batch.cols[4]; + BytesColumnVector f6 = (BytesColumnVector) batch.cols[5]; + LongColumnVector f7 = (LongColumnVector) batch.cols[6]; + BytesColumnVector f8 = (BytesColumnVector) batch.cols[7]; + LongColumnVector f9 = (LongColumnVector) batch.cols[8]; + DoubleColumnVector f10 = (DoubleColumnVector) batch.cols[9]; + BytesColumnVector f11 = (BytesColumnVector) batch.cols[10]; + DateColumnVector f12 = (DateColumnVector) batch.cols[11]; + TimestampColumnVector f13 = (TimestampColumnVector) batch.cols[12]; + TimestampColumnVector f14 = (TimestampColumnVector) batch.cols[13]; + ListColumnVector f15 = (ListColumnVector) batch.cols[14]; + MapColumnVector f16 = (MapColumnVector) batch.cols[15]; + StructColumnVector f17 = (StructColumnVector) batch.cols[16]; + UnionColumnVector f18 = (UnionColumnVector) batch.cols[17]; + + for (int index = 0; index < numRecords; index++) { + f1.vector[index] = index; + f2.vector[index] = Long.MAX_VALUE; + f3.setVal(index, String.format("%d_%s", index, UUID.randomUUID()).getBytes()); + f4.vector[index] = 1; + f5.vector[index] = new HiveDecimalWritable(HiveDecimal.create(Double.parseDouble(index + "." + index))); + f6.setVal(index, new byte[0]); + + f7.vector[index] = index; + f8.setVal(index, new byte[]{(byte) (index % 32)}); + f9.vector[index] = (short) index; + f10.vector[index] = Float.parseFloat(index + "." + index); + f11.setVal(index, String.format("%d_%s", index, UUID.randomUUID()).getBytes()); + f12.vector[index] = Calendar.getInstance().getTimeInMillis(); + f13.time[index] = Calendar.getInstance().getTimeInMillis(); + f14.time[index] = Calendar.getInstance().getTimeInMillis(); + + ((BytesColumnVector) f15.child).setVal(index, ("elm[" + index + "]").getBytes()); + f15.lengths[index] = 1; + f15.offsets[index] = f15.childCount++; + + ((BytesColumnVector) f16.keys).setVal(index, ("key[" + index + "]").getBytes()); + ((BytesColumnVector) f16.values).setVal(index, ("value[" + index + "]").getBytes()); + f16.lengths[index] = 1; + f16.offsets[index] = f16.childCount++; + + ((LongColumnVector) f17.fields[0]).vector[index] = index; + ((LongColumnVector) f17.fields[1]).vector[index] = Long.MAX_VALUE; + ((BytesColumnVector) f17.fields[2]).setVal(index, + String.format("%d_%s", index, UUID.randomUUID()).getBytes()); + ((LongColumnVector) f17.fields[3]).vector[index] = 1; + ((DecimalColumnVector) f17.fields[4]).vector[index] = + new HiveDecimalWritable(HiveDecimal.create(Double.parseDouble(index + "." + index))); + ((BytesColumnVector) f17.fields[5]).setVal(index, new byte[0]); + + ((LongColumnVector) f18.fields[0]).vector[index] = index; + + fsConfig.offsetsByIndex().put(index, (long) index); + } + batch.size = numRecords; + writer.addRowBatch(batch); + } + Path path = new Path(new Path(fsConfig.getFsUri()), orcFile.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(orcFile.getAbsolutePath()), path); + return path; + } + + private Path createDataFileWithoutStruct(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + int numRecords = args.length < 1 ? NUM_RECORDS : (int) args[0]; + File orcFile = File.createTempFile("test-", "." + getFileExtension()); + + TypeDescription schema = TypeDescription.createLong(); + Properties props = new Properties(); + props.put(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true"); + OrcFile.WriterOptions opts = OrcFile.writerOptions(props, fsConfig.getFs().getConf()).setSchema(schema); + try (Writer writer = OrcFile.createWriter(new Path(orcFile.toURI()), opts)) { + VectorizedRowBatch batch = schema.createRowBatch(numRecords); + LongColumnVector longField = (LongColumnVector) batch.cols[0]; + + for (int index = 0; index < numRecords; index++) { + longField.vector[index] = index; + + fsConfig.offsetsByIndex().put(index, (long) index); + } + batch.size = numRecords; + writer.addRowBatch(batch); + } + Path path = new Path(new Path(fsConfig.getFsUri()), orcFile.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(orcFile.getAbsolutePath()), path); + return path; + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFile(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + FileReader reader = getReader(fsConfig.getFs(), path, getReaderConfig()); + assertFalse(reader.hasNext()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void useZeroCopyConfig(ReaderFsTestConfig fsConfig) { + Map readerConfig = getReaderConfig(); + readerConfig.put(OrcFileReader.FILE_READER_ORC_USE_ZEROCOPY, "true"); + fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); + readAllData(fsConfig); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void skipCorruptRecordsConfig(ReaderFsTestConfig fsConfig) { + Map readerConfig = getReaderConfig(); + readerConfig.put(OrcFileReader.FILE_READER_ORC_SKIP_CORRUPT_RECORDS, "true"); + fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); + readAllData(fsConfig); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readDataWithoutStruct(ReaderFsTestConfig fsConfig) throws IOException { + Path file = createDataFileWithoutStruct(fsConfig, NUM_RECORDS); + FileReader reader = getReader(fsConfig.getFs(), file, getReaderConfig()); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkDataWithoutStruct(record, recordCount); + recordCount++; + } + reader.close(); + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @Override + protected Class getReaderClass() { + return OrcFileReader.class; + } + + @Override + protected Map getReaderConfig() { + return new HashMap<>(); + } + + @Override + protected void checkData(Struct record, long index) { + Struct struct = record.getStruct(FIELD_STRUCT); + Struct union = record.getStruct(FIELD_UNION); + assertAll( + () -> assertEquals(index, (int) record.get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) record.get(FIELD_LONG)), + () -> assertTrue(record.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(record.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) record.get(FIELD_DECIMAL), 0), + () -> assertEquals("", record.get(FIELD_EMPTY)), + () -> assertEquals(index, ((Byte) record.get(FIELD_BYTE)).intValue()), + () -> assertEquals((byte) (index % 32), record.get(FIELD_CHAR)), + () -> assertEquals((short) index, (short) record.get(FIELD_SHORT)), + () -> assertEquals(Float.parseFloat(index + "." + index), record.get(FIELD_FLOAT)), + () -> assertTrue(new String((byte[]) record.get(FIELD_BINARY)).startsWith(index + "_")), + () -> assertNotNull(record.get(FIELD_TIMESTAMP)), + () -> assertNotNull(record.get(FIELD_TIMESTAMP_INSTANT)), + () -> assertEquals(Collections.singletonMap("key[" + index + "]", "value[" + index + "]"), record.get(FIELD_MAP)), + () -> assertEquals(Collections.singletonList("elm[" + index + "]"), record.get(FIELD_ARRAY)), + () -> assertEquals(index, (int) struct.get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) struct.get(FIELD_LONG)), + () -> assertTrue(struct.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(struct.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) struct.get(FIELD_DECIMAL), 0), + () -> assertEquals("", struct.get(FIELD_EMPTY)), + () -> assertNotNull(struct.schema().field(FIELD_EMPTY)), + () -> assertEquals((int) index, union.get("field1")) + ); + } + + private void checkDataWithoutStruct(Struct record, long index) { + assertEquals(index, (long) record.get("bigint")); + } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java index 30dd425..4531808 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java @@ -192,7 +192,7 @@ protected Class getReaderClass() { @Override protected void checkData(Struct record, long index) { - assertEquals((int) (Integer) record.get(FIELD_INDEX), index); + assertEquals(index, (int) record.get(FIELD_INDEX)); assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_")); assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_")); } @@ -201,5 +201,4 @@ protected void checkData(Struct record, long index) { protected String getFileExtension() { return FILE_EXTENSION; } - } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReaderTest.java index e70d3dd..0580ec2 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/SequenceFileReaderTest.java @@ -39,22 +39,12 @@ protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throw try { writer.append(key, value); writer.sync(); + fsConfig.offsetsByIndex().put(index, (long) index); } catch (IOException ioe) { throw new RuntimeException(ioe); } }); } - try (SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), - SequenceFile.Reader.file(new Path(seqFile.getAbsolutePath())))) { - Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf()); - Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf()); - int index = 0; - long pos = reader.getPosition() - 1; - while (reader.next(key, value)) { - fsConfig.offsetsByIndex().put(index++, pos); - pos = reader.getPosition(); - } - } Path path = new Path(new Path(fsConfig.getFsUri()), seqFile.getName()); fs.moveFromLocalFile(new Path(seqFile.getAbsolutePath()), path); return path; @@ -149,7 +139,7 @@ protected void checkData(Struct record, long index) { private void checkData(String keyFieldName, String valueFieldName, Struct record, long index) { assertAll( - () -> assertEquals((int) (Integer) record.get(keyFieldName), index), + () -> assertEquals(index, (int) record.get(keyFieldName)), () -> assertTrue(record.get(valueFieldName).toString().startsWith(index + "_")) ); } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java index 79663bc..27c0edf 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java @@ -249,43 +249,43 @@ protected Class getReaderClass() { @Override protected void checkData(Struct record, long index) { assertAll( - () -> assertEquals(record.get(FIELD_COLUMN1), (byte) 2), - () -> assertEquals(record.get(FIELD_COLUMN2), (short) 4), - () -> assertEquals(record.get(FIELD_COLUMN3), 8), - () -> assertEquals(record.get(FIELD_COLUMN4), 16L), - () -> assertEquals(record.get(FIELD_COLUMN5), 32.32f), - () -> assertEquals(record.get(FIELD_COLUMN6), 64.64d), - () -> assertEquals(record.get(FIELD_COLUMN7), true), - () -> assertEquals(new String((byte[]) record.get(FIELD_COLUMN8)), "test bytes"), - () -> assertEquals(record.get(FIELD_COLUMN9), "test string") + () -> assertEquals((byte) 2, record.get(FIELD_COLUMN1)), + () -> assertEquals((short) 4, record.get(FIELD_COLUMN2)), + () -> assertEquals(8, record.get(FIELD_COLUMN3)), + () -> assertEquals(16L, record.get(FIELD_COLUMN4)), + () -> assertEquals(32.32f, record.get(FIELD_COLUMN5)), + () -> assertEquals(64.64d, record.get(FIELD_COLUMN6)), + () -> assertEquals(true, record.get(FIELD_COLUMN7)), + () -> assertEquals("test bytes", new String((byte[]) record.get(FIELD_COLUMN8))), + () -> assertEquals("test string", record.get(FIELD_COLUMN9)) ); } protected void checkDataString(Struct record) { assertAll( - () -> assertEquals(record.get(FIELD_COLUMN1), "2"), - () -> assertEquals(record.get(FIELD_COLUMN2), "4"), - () -> assertEquals(record.get(FIELD_COLUMN3), "8"), - () -> assertEquals(record.get(FIELD_COLUMN4), "16"), - () -> assertEquals(record.get(FIELD_COLUMN5), "32.320000"), - () -> assertEquals(record.get(FIELD_COLUMN6), "64.640000"), - () -> assertEquals(record.get(FIELD_COLUMN7), "true"), - () -> assertEquals(record.get(FIELD_COLUMN8), "test bytes"), - () -> assertEquals(record.get(FIELD_COLUMN9), "test string") + () -> assertEquals("2", record.get(FIELD_COLUMN1)), + () -> assertEquals("4", record.get(FIELD_COLUMN2)), + () -> assertEquals("8", record.get(FIELD_COLUMN3)), + () -> assertEquals("16", record.get(FIELD_COLUMN4)), + () -> assertEquals("32.320000", record.get(FIELD_COLUMN5)), + () -> assertEquals("64.640000", record.get(FIELD_COLUMN6)), + () -> assertEquals("true", record.get(FIELD_COLUMN7)), + () -> assertEquals("test bytes", record.get(FIELD_COLUMN8)), + () -> assertEquals("test string", record.get(FIELD_COLUMN9)) ); } protected void checkDataNull(Struct record) { assertAll( - () -> assertEquals(record.get(FIELD_COLUMN1), null), - () -> assertEquals(record.get(FIELD_COLUMN2), null), - () -> assertEquals(record.get(FIELD_COLUMN3), null), - () -> assertEquals(record.get(FIELD_COLUMN4), null), - () -> assertEquals(record.get(FIELD_COLUMN5), null), - () -> assertEquals(record.get(FIELD_COLUMN6), null), - () -> assertEquals(record.get(FIELD_COLUMN7), null), - () -> assertEquals(record.get(FIELD_COLUMN8), null), - () -> assertEquals(record.get(FIELD_COLUMN9), null) + () -> assertNull(record.get(FIELD_COLUMN1)), + () -> assertNull(record.get(FIELD_COLUMN2)), + () -> assertNull(record.get(FIELD_COLUMN3)), + () -> assertNull(record.get(FIELD_COLUMN4)), + () -> assertNull(record.get(FIELD_COLUMN5)), + () -> assertNull(record.get(FIELD_COLUMN6)), + () -> assertNull(record.get(FIELD_COLUMN7)), + () -> assertNull(record.get(FIELD_COLUMN8)), + () -> assertNull(record.get(FIELD_COLUMN9)) ); } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java index 72bac98..ac5e264 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/CronPolicyTest.java @@ -89,15 +89,16 @@ public void invalidEndDate(PolicyFsTestConfig fsConfig) { @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void canBeInterrupted(PolicyFsTestConfig fsConfig) throws IOException { - Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() .getClass(FsSourceTaskConfig.POLICY_CLASS), - fsConfig.getSourceTaskConfig()); + fsConfig.getSourceTaskConfig())) { - for (int i = 0; i < 5; i++) { - assertFalse(policy.hasEnded()); - policy.execute(); + for (int i = 0; i < 5; i++) { + assertFalse(policy.hasEnded()); + policy.execute(); + } + policy.interrupt(); + assertTrue(policy.hasEnded()); } - policy.interrupt(); - assertTrue(policy.hasEnded()); } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java index a29ae5d..f04db50 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/HdfsFileWatcherPolicyTest.java @@ -50,7 +50,7 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { return new FsSourceTaskConfig(cfg); } - //This policy does not throw any exception. Just stop watching those nonexistent dirs + // This policy does not throw any exception. Just stop watching those nonexistent dirs @ParameterizedTest @MethodSource("fileSystemConfigProvider") @Override @@ -67,7 +67,7 @@ public void invalidDirectory(PolicyFsTestConfig fsConfig) throws IOException { } } - //This policy never ends. We have to interrupt it + // This policy never ends. We have to interrupt it @ParameterizedTest @MethodSource("fileSystemConfigProvider") @Override @@ -81,21 +81,22 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti @ParameterizedTest @MethodSource("fileSystemConfigProvider") - public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException { + public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException, IOException { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.FS_URIS, "hdfs://localhost:65432/data"); originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_POLL_MS, "0"); originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_RETRY_MS, "0"); FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); - Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg); - int count = 0; - while (!policy.hasEnded() && count < 10) { - Thread.sleep(500); - count++; + try(Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) { + int count = 0; + while (!policy.hasEnded() && count < 10) { + Thread.sleep(500); + count++; + } + assertTrue(count < 10); + assertTrue(policy.hasEnded()); } - assertTrue(count < 10); - assertTrue(policy.hasEnded()); } @ParameterizedTest diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java index ba77775..9fbb429 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java @@ -2,6 +2,7 @@ import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; +import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -16,8 +17,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.io.FileNotFoundException; -import java.io.IOException; +import java.io.*; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; @@ -128,11 +128,11 @@ public void oneFilePerFs(PolicyFsTestConfig fsConfig) throws IOException, Interr FileSystem fs = fsConfig.getFs(); for (Path dir : fsConfig.getDirectories()) { fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); - //this file does not match the regexp + // this file does not match the regexp fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); - //we wait till FS has registered the files - Thread.sleep(3000); + // we wait till FS has registered the files + Thread.sleep(5000); } Iterator it = fsConfig.getPolicy().execute(); assertTrue(it.hasNext()); @@ -150,11 +150,11 @@ public void recursiveDirectory(PolicyFsTestConfig fsConfig) throws IOException, Path tmpDir = new Path(dir, String.valueOf(System.nanoTime())); fs.mkdirs(tmpDir); fs.createNewFile(new Path(tmpDir, System.nanoTime() + ".txt")); - //this file does not match the regexp + // this file does not match the regexp fs.createNewFile(new Path(tmpDir, System.nanoTime() + ".invalid")); - //we wait till FS has registered the files - Thread.sleep(3000); + // we wait till FS has registered the files + Thread.sleep(5000); } Iterator it = fsConfig.getPolicy().execute(); assertTrue(it.hasNext()); @@ -180,24 +180,25 @@ public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); originals.put(FsSourceTaskConfig.FS_URIS, dynamic.toString()); FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); - Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg); - fsConfig.setPolicy(policy); - assertEquals(1, fsConfig.getPolicy().getURIs().size()); - - LocalDateTime dateTime = LocalDateTime.now(); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G"); - StringBuilder uri = new StringBuilder(dateTime.format(formatter)); - uri.append("/"); - formatter = DateTimeFormatter.ofPattern("yyyy"); - uri.append(dateTime.format(formatter)); - uri.append("/"); - formatter = DateTimeFormatter.ofPattern("MM"); - uri.append(dateTime.format(formatter)); - uri.append("/"); - formatter = DateTimeFormatter.ofPattern("W"); - uri.append(dateTime.format(formatter)); - assertTrue(fsConfig.getPolicy().getURIs().get(0).endsWith(uri.toString())); + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) { + + assertEquals(1, policy.getURIs().size()); + + LocalDateTime dateTime = LocalDateTime.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G"); + StringBuilder uri = new StringBuilder(dateTime.format(formatter)); + uri.append("/"); + formatter = DateTimeFormatter.ofPattern("yyyy"); + uri.append(dateTime.format(formatter)); + uri.append("/"); + formatter = DateTimeFormatter.ofPattern("MM"); + uri.append(dateTime.format(formatter)); + uri.append("/"); + formatter = DateTimeFormatter.ofPattern("W"); + uri.append(dateTime.format(formatter)); + assertTrue(policy.getURIs().get(0).endsWith(uri.toString())); + } } @ParameterizedTest @@ -221,6 +222,70 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { }); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + try (Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig)) { + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + File tmp = File.createTempFile("test-", ".txt"); + try (PrintWriter writer = new PrintWriter(new FileOutputStream(tmp))) { + writer.append("test"); + } + fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), new Path(dir, System.nanoTime() + ".txt")); + + // this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + // we wait till FS has registered the files + Thread.sleep(5000); + } + + Iterator it = policy.execute(); + + // First batch of files (1 file) + assertTrue(it.hasNext()); + FileMetadata metadata = it.next(); + FileMetadata metadataFromFs = ((AbstractPolicy) policy) + .toMetadata(fs.listLocatedStatus(new Path(metadata.getPath())).next()); + assertEquals(metadata, metadataFromFs); + String firstPath = metadata.getPath(); + FileReader reader = policy.offer(metadata, new HashMap() {{ + put("offset", "1"); + put("file-size", "4"); + put("eof", "true"); + }}); + assertFalse(reader.hasNext()); + reader.seek(1000L); + assertThrows(NoSuchElementException.class, reader::next); + reader.close(); + assertEquals(0L, reader.currentOffset()); + assertEquals(metadata.getPath(), reader.getFilePath().toString()); + assertFalse(it.hasNext()); + + // Second batch of files (1 file) + it = policy.execute(); + assertTrue(it.hasNext()); + assertNotEquals(firstPath, it.next().getPath()); + assertFalse(it.hasNext()); + } + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidBatchSize(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one"); + assertThrows(ConfigException.class, () -> new FsSourceTaskConfig(originals)); + } + protected abstract FsSourceTaskConfig buildSourceTaskConfig(List directories); } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java index 279a775..f70e184 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java @@ -1,10 +1,20 @@ package com.github.mmolimar.kafka.connect.fs.policy; import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; +import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; + +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.*; +import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -26,4 +36,42 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { return new FsSourceTaskConfig(cfg); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + try (Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig)) { + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); + // this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + // we wait till FS has registered the files + Thread.sleep(3000); + } + + Iterator it = policy.execute(); + + // First batch of files (1 file) + assertFalse(policy.hasEnded()); + assertTrue(it.hasNext()); + String firstPath = it.next().getPath(); + assertFalse(it.hasNext()); + assertFalse(policy.hasEnded()); + + // Second batch of files (1 file) + it = policy.execute(); + assertTrue(it.hasNext()); + assertNotEquals(firstPath, it.next().getPath()); + assertFalse(it.hasNext()); + assertTrue(policy.hasEnded()); + } + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java index 65c41c7..8f340b0 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SleepyPolicyTest.java @@ -101,13 +101,14 @@ public void sleepExecution(PolicyFsTestConfig fsConfig) throws IOException { tConfig.put(SleepyPolicy.SLEEPY_POLICY_MAX_EXECS, "2"); FsSourceTaskConfig sleepConfig = new FsSourceTaskConfig(tConfig); - Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), sleepConfig); - assertFalse(policy.hasEnded()); - policy.execute(); - assertFalse(policy.hasEnded()); - policy.execute(); - assertTrue(policy.hasEnded()); + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), sleepConfig)) { + assertFalse(policy.hasEnded()); + policy.execute(); + assertFalse(policy.hasEnded()); + policy.execute(); + assertTrue(policy.hasEnded()); + } } @ParameterizedTest @@ -118,16 +119,15 @@ public void defaultExecutions(PolicyFsTestConfig fsConfig) throws IOException { tConfig.remove(SleepyPolicy.SLEEPY_POLICY_MAX_EXECS); FsSourceTaskConfig sleepConfig = new FsSourceTaskConfig(tConfig); - Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), sleepConfig); - - //it never ends - for (int i = 0; i < 100; i++) { - assertFalse(policy.hasEnded()); - policy.execute(); + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), sleepConfig)) { + // it never ends + for (int i = 0; i < 100; i++) { + assertFalse(policy.hasEnded()); + policy.execute(); + } + policy.interrupt(); + assertTrue(policy.hasEnded()); } - policy.interrupt(); - assertTrue(policy.hasEnded()); } - } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java index b4b5a4e..d76f82f 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java @@ -4,6 +4,7 @@ import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; +import com.github.mmolimar.kafka.connect.fs.policy.CronPolicy; import com.github.mmolimar.kafka.connect.fs.policy.Policy; import com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy; import org.apache.hadoop.fs.FileSystem; @@ -13,6 +14,8 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -28,7 +31,10 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -41,6 +47,8 @@ public class FsSourceTaskTest { new HdfsFsConfig() ); private static final int NUM_RECORDS = 10; + private static final long NUM_BYTES_PER_FILE = 390; + private static final String FILE_ALREADY_PROCESSED = "0101010101010101.txt"; @BeforeAll public static void initFs() throws IOException { @@ -69,24 +77,42 @@ public void initTask() { put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$"); }}; - //Mock initialization + // Mock initialization SourceTaskContext taskContext = PowerMock.createMock(SourceTaskContext.class); OffsetStorageReader offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); EasyMock.expect(taskContext.offsetStorageReader()) - .andReturn(offsetStorageReader); - - EasyMock.expect(taskContext.offsetStorageReader()) - .andReturn(offsetStorageReader); - - EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject())) - .andReturn(new HashMap() {{ - put("offset", (long) (NUM_RECORDS / 2)); - }}); - EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject())) - .andReturn(new HashMap() {{ - put("offset", (long) (NUM_RECORDS / 2)); - }}); + .andReturn(offsetStorageReader) + .times(2); + + // Every time the `offsetStorageReader.offsets(params)` method is called we want to capture the offsets params + // and return a different result based on the offset params passed in + // In this case, returning a different result based on the file path of the params + Capture>> captureOne = Capture.newInstance(CaptureType.ALL); + AtomicInteger executionNumber = new AtomicInteger(); + EasyMock.expect( + offsetStorageReader.offsets(EasyMock.capture(captureOne)) + ).andAnswer(() -> { + List>> capturedValues = captureOne.getValues(); + Collection> captured = capturedValues.get(executionNumber.get()); + executionNumber.addAndGet(1); + + Map, Map> map = new HashMap<>(); + captured.forEach(part -> { + if (((String) (part.get("path"))).endsWith(FILE_ALREADY_PROCESSED)) { + map.put(part, new HashMap() {{ + put("offset", (long) NUM_RECORDS); + put("eof", true); + put("file-size", NUM_BYTES_PER_FILE); + }}); + } else { + map.put(part, new HashMap() {{ + put("offset", (long) NUM_RECORDS / 2); + }}); + } + }); + return map; + }).times(2); EasyMock.checkOrder(taskContext, false); EasyMock.replay(taskContext); @@ -122,7 +148,7 @@ private static Stream fileSystemConfigProvider() { public void pollNoData(TaskFsTestConfig fsConfig) { fsConfig.getTask().start(fsConfig.getTaskConfig()); assertEquals(0, fsConfig.getTask().poll().size()); - //policy has ended + // policy has ended assertNull(fsConfig.getTask().poll()); } @@ -131,12 +157,12 @@ public void pollNoData(TaskFsTestConfig fsConfig) { public void emptyFilesToProcess(TaskFsTestConfig fsConfig) throws IOException { for (Path dir : fsConfig.getDirectories()) { fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); - //this file does not match the regexp + // this file does not match the regexp fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); } fsConfig.getTask().start(fsConfig.getTaskConfig()); assertEquals(0, fsConfig.getTask().poll().size()); - //policy has ended + // policy has ended assertNull(fsConfig.getTask().poll()); } @@ -146,7 +172,7 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException { for (Path dir : fsConfig.getDirectories()) { Path dataFile = new Path(dir, System.nanoTime() + ".txt"); createDataFile(fsConfig.getFs(), dataFile); - //this file does not match the regexp + // this file does not match the regexp fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); } @@ -154,7 +180,93 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException { List records = fsConfig.getTask().poll(); assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); checkRecords(records); - //policy has ended + // policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithFileReaderInBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + // this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + int readerBatchSize = 8; + Map taskConfig = fsConfig.getTaskConfig(); + taskConfig.put(FsSourceTaskConfig.POLICY_CLASS, CronPolicy.class.getName()); + taskConfig.put(CronPolicy.CRON_POLICY_EXPRESSION, "0/2 * * * * ?"); + taskConfig.put(CronPolicy.CRON_POLICY_END_DATE, LocalDateTime.now().plusDays(1).toString()); + taskConfig.put(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, String.valueOf(readerBatchSize)); + taskConfig.put(FsSourceTaskConfig.POLL_INTERVAL_MS, "1000"); + fsConfig.getTask().start(taskConfig); + + List records = fsConfig.getTask().poll(); + assertEquals(((readerBatchSize % (NUM_RECORDS / 2)) * fsConfig.getDirectories().size()), records.size()); + checkRecords(records); + + records = fsConfig.getTask().poll(); + assertEquals(((readerBatchSize % (NUM_RECORDS / 2)) * fsConfig.getDirectories().size()), records.size()); + checkRecords(records); + + // policy has ended + fsConfig.getTask().stop(); + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithBatchAndFileReaderInBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + // this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + int readerBatchSize = 8; + Map taskConfig = fsConfig.getTaskConfig(); + taskConfig.put(FsSourceTaskConfig.POLICY_CLASS, CronPolicy.class.getName()); + taskConfig.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + taskConfig.put(CronPolicy.CRON_POLICY_EXPRESSION, "0/2 * * * * ?"); + taskConfig.put(CronPolicy.CRON_POLICY_END_DATE, LocalDateTime.now().plusDays(1).toString()); + taskConfig.put(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, String.valueOf(readerBatchSize)); + taskConfig.put(FsSourceTaskConfig.POLL_INTERVAL_MS, "1000"); + fsConfig.getTask().start(taskConfig); + + List firstBatch = fsConfig.getTask().poll(); + assertEquals(readerBatchSize % (NUM_RECORDS / 2), firstBatch.size()); + checkRecords(firstBatch); + + List secondBatch = fsConfig.getTask().poll(); + assertEquals(readerBatchSize % (NUM_RECORDS / 2), secondBatch.size()); + checkRecords(secondBatch); + assertEquals(firstBatch.size(), secondBatch.size()); + IntStream.range(0, firstBatch.size()) + .forEach(index -> assertNotEquals( + firstBatch.get(index).sourcePartition().get("path"), + secondBatch.get(index).sourcePartition().get("path") + )); + + // policy has ended + fsConfig.getTask().stop(); + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + // this file will be skipped since the byte offset for the file is equal to the byte size of the file + Path dataFile = new Path(dir, FILE_ALREADY_PROCESSED); + createDataFile(fsConfig.getFs(), dataFile); + } + + fsConfig.getTask().start(fsConfig.getTaskConfig()); + List records = fsConfig.getTask().poll(); + assertEquals(0, records.size()); assertNull(fsConfig.getTask().poll()); } @@ -270,6 +382,99 @@ public void checkVersion(TaskFsTestConfig fsConfig) { assertFalse("unknown".equalsIgnoreCase(fsConfig.getTask().version())); } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void pollNoDataWithBatch(TaskFsTestConfig fsConfig) { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + assertEquals(0, fsConfig.getTask().poll().size()); + // policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFilesToProcessWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); + // this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + List fresh = fsConfig.getTask().poll(); + while (fresh != null) { + records.addAll(fresh); + fresh = fsConfig.getTask().poll(); + } + assertEquals(0, records.size()); + + // policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + // this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + List fresh = fsConfig.getTask().poll(); + while (fresh != null) { + records.addAll(fresh); + fresh = fsConfig.getTask().poll(); + } + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + // policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void shouldNotSleepBetweenBatches(TaskFsTestConfig fsConfig) throws IOException { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLL_INTERVAL_MS, "10000"); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + // this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> { + records.addAll(fsConfig.getTask().poll()); + records.addAll(fsConfig.getTask().poll()); + }); + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + // policy has ended + assertNull(fsConfig.getTask().poll()); + } + + protected void checkRecords(List records) { records.forEach(record -> { assertEquals("topic_test", record.topic()); @@ -301,5 +506,4 @@ private File fillDataFile() throws IOException { } return txtFile; } - } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index bb7782f..c39cf11 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -5,11 +5,12 @@ log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c:%L - %m%n +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.com.github.mmolimar.kafka.connect.fs=TRACE log4j.logger.org.apache.hadoop=ERROR log4j.logger.BlockStateChange=WARN log4j.logger.org.apache.parquet=WARN +log4j.logger.org.apache.orc=WARN log4j.logger.org.eclipse.jetty=WARN log4j.logger.io.confluent.connect.avro=WARN