Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
refactor(filesystems): add module filepulse-commons-fs
This commit relocates some classes
  • Loading branch information
fhussonnois committed Feb 11, 2021
1 parent e222414 commit 06385b3
Show file tree
Hide file tree
Showing 72 changed files with 2,277 additions and 1,639 deletions.
13 changes: 1 addition & 12 deletions checkstyle/suppressions.xml
Expand Up @@ -41,21 +41,10 @@
*/
-->
<suppress checks="LineLength" files="RecordFilter.java"/>
<suppress checks="LineLength" files="CommonFilterConfig.java"/>
<suppress checks="LineLength" files="DelimitedRowFilterConfig.java"/>
<suppress checks="LineLength" files="GroupRowFilterConfig.java"/>
<suppress checks="LineLength" files="GrokPatternCompiler.java"/>
<suppress checks="LineLength" files="ConvertFilterConfig.java"/>
<suppress checks="LineLength" files="RenameFilterConfig.java"/>
<suppress checks="LineLength" files="GrokFilterConfig.java"/>
<suppress checks="LineLength" files="SplitFilterConfig.java"/>
<suppress checks="LineLength" files="RowFileInputReaderConfig.java"/>
<suppress checks="LineLength" files="ConnectorConfig.java"/>
<suppress checks="LineLength" files="TaskConfig.java"/>
<suppress checks="LineLength" files="CommonConfig.java"/>
<suppress checks="LineLength" files="DefaultFileSystemScanner.java"/>
<suppress checks="LineLength" files="ComposeOffsetStrategy.java"/>
<suppress checks="LineLength" files="DefaultOffsetPolicyConfig.java"/>
<suppress checks="LineLength" files=".*Config.java"/>
<!-- Those classes are copy from kafka-connect api-->
<suppress checks="LineLength" files="KafkaBasedLog.java"/>
<suppress checks="NPathComplexity" files="KafkaBasedLog.java"/>
Expand Down
6 changes: 3 additions & 3 deletions config/connect-file-pulse-quickstart-avro.json
Expand Up @@ -2,14 +2,14 @@
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
"fs.scan.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.class" : "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms": "10000",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "5000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.AvroFileInputReader",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalAvroFileInputReader",
"topic": "connect-file-pulse-quickstart-avro",
"tasks.max": 1
},
Expand Down
6 changes: 4 additions & 2 deletions config/connect-file-pulse-quickstart-csv.json
Expand Up @@ -9,8 +9,10 @@
"filters.ParseDelimitedRow.trimColumn": "true",
"filters.ParseDelimitedRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
"fs.scan.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.class" : "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path":"/tmp/kafka-connect/examples/",
"fs.listing.filters":"io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.csv$",
"fs.scan.interval.ms": "10000",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
Expand Down
8 changes: 5 additions & 3 deletions config/connect-file-pulse-quickstart-log4j.json
Expand Up @@ -11,14 +11,16 @@
"filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
"filters.ParseLog4jLog.ignoreFailure": "true",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
"fs.scan.directory.path": "/var/log/kafka/",
"fs.listing.class" : "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path":"/tmp/kafka-connect/examples/",
"fs.listing.filters":"io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"fs.scan.interval.ms": "10000",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "5000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.RowFileInputReader",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"topic": "connect-file-pulse-quickstart-log4j",
"tasks.max": 1
},
Expand Down
Expand Up @@ -18,6 +18,8 @@
*/
package io.streamthoughts.kafka.connect.filepulse.internal;

import java.util.Objects;

public class StringUtils {

/**
Expand All @@ -35,4 +37,8 @@ public static boolean isFastSplit(final String regex) {
(ch < Character.MIN_HIGH_SURROGATE ||
ch > Character.MAX_LOW_SURROGATE);
}

public static boolean isNotBlank(final String string) {
return !(Objects.isNull(string) || string.isBlank());
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 StreamThoughts.
* Copyright 2019-2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -16,9 +16,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.local.reader;
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;

Expand Down Expand Up @@ -47,13 +46,14 @@ public abstract class AbstractFileInputIterator<T> implements FileInputIterator<
*/
public AbstractFileInputIterator(final IteratorManager iteratorManager,
final FileContext context) {
Objects.requireNonNull(iteratorManager, "iteratorManager can't be null");
Objects.requireNonNull(context, "context can't be null");
this.iteratorManager = iteratorManager;
this.context = context;
this.iteratorManager = Objects.requireNonNull(iteratorManager, "iteratorManager can't be null");
this.context = Objects.requireNonNull(context, "context can't be null");
closed = new AtomicBoolean(false);
}

/**
* {@inheritDoc}
*/
@Override
public FileContext context() {
return context;
Expand Down
Expand Up @@ -16,21 +16,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.local.reader;
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;

import java.io.File;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractFileInputReader implements FileInputReader {
Expand All @@ -42,35 +33,11 @@ public abstract class AbstractFileInputReader implements FileInputReader {
/**
* Creates a new {@link AbstractFileInputReader} instance.
*/
AbstractFileInputReader() {
public AbstractFileInputReader() {
this.isClosed = new AtomicBoolean(false);
this.openIterators = new IteratorManager();
}

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> configs) {

}

/**
* {@inheritDoc}
*/
@Override
public FileObjectMeta readMetadata(final URI fileURI) {
return new LocalFileObjectMeta(new File(fileURI));
}

/**
* {@inheritDoc}
*/
@Override
public boolean isReadable(final URI fileURI) {
return Files.isReadable(Paths.get(fileURI));
}

/**
* {@inheritDoc}
*/
Expand Down
Expand Up @@ -28,7 +28,7 @@
*
* @param <T> type of value.
*/
public interface FileInputIterator<T> extends Iterator<RecordsIterable<T>> {
public interface FileInputIterator<T> extends Iterator<RecordsIterable<T>>, AutoCloseable {

/**
* Gets the iterator context.
Expand Down
Expand Up @@ -38,7 +38,9 @@ public interface FileInputReader extends Configurable, AutoCloseable {
* @param configs the reader configuration.
*/
@Override
void configure(final Map<String, ?> configs);
default void configure(final Map<String, ?> configs) {

}

/**
* Gets the metadata for the source object identified by the given {@link URI}.
Expand Down
Expand Up @@ -16,9 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.local.reader;

import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
package io.streamthoughts.kafka.connect.filepulse.reader;

import java.util.Collections;
import java.util.HashSet;
Expand All @@ -27,14 +25,14 @@
/**
* Default class to easily close all open {@link FileInputIterator} instances.
*/
class IteratorManager {
public class IteratorManager {

private final Set<FileInputIterator<?>> openIterators;

/**
* Creates a new {@link IteratorManager} instance.
*/
IteratorManager() {
public IteratorManager() {
this.openIterators = Collections.synchronizedSet(new HashSet<>());
}

Expand Down
Expand Up @@ -47,10 +47,8 @@ public class FileInputIterable implements Iterable<RecordsIterable<FileRecord<Ty
* @param reader the input source reader used to create a new {@link FileInputIterator}.
*/
FileInputIterable(final URI source, final FileInputReader reader) {
Objects.requireNonNull(source, "source can't be null");
Objects.requireNonNull(reader, "reader can't be null");
this.source = source;
this.reader = reader;
this.source = Objects.requireNonNull(source, "source can't be null");;
this.reader = Objects.requireNonNull(reader, "reader can't be null");;
this.metadata = reader.readMetadata(source);
}

Expand Down Expand Up @@ -97,15 +95,13 @@ private void checkState() {

void close() {
if (isOpen.get()) {
LOG.debug("Closing iterator for source {} ", metadata().uri());
LOG.info("Closing iterator for source {} ", metadata().uri());
iterator.close();
}
}

static boolean isAlreadyCompleted(final FileObjectOffset committedOffset,
final FileObjectMeta metadata) {
return committedOffset != null &&
committedOffset.position() >= metadata.contentLength();
static boolean isAlreadyCompleted(final FileObjectOffset committedOffset, final FileObjectMeta metadata) {
return committedOffset != null && committedOffset.position() >= metadata.contentLength();
}

}
Expand Up @@ -103,7 +103,7 @@ class ContentDigest {
public ContentDigest(@JsonProperty("digest") final String digest,
@JsonProperty("algorithm") final String algorithm) {
this.digest = Objects.requireNonNull(digest, "digest should not be null");;
this.algorithm = Objects.requireNonNull(algorithm, "algorithm should not be null");
this.algorithm = Objects.requireNonNull(algorithm, "algorithm should not be null").toUpperCase();
}

@JsonProperty("digest")
Expand Down Expand Up @@ -132,10 +132,10 @@ public int hashCode() {

@Override
public String toString() {
return "ContentDigest{" +
return "[" +
"digest=" + digest +
", algorithm='" + algorithm + '\'' +
'}';
']';
}
}
}
48 changes: 48 additions & 0 deletions connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019-2021 StreamThoughts.
~
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>kafka-connect-filepulse-commons-fs</artifactId>

<properties>
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
</properties>

<dependencies>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 StreamThoughts.
* Copyright 2019-2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -16,32 +16,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs.local.reader;
package io.streamthoughts.kafka.connect.filepulse.fs.reader;

import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TimestampedRecordOffset;
import org.apache.kafka.common.utils.Time;

public class XMLRecordOffset extends TimestampedRecordOffset {
/**
* Represents the position of a record into a file based on an index.
*/
public class IndexRecordOffset extends TimestampedRecordOffset {

private final long records;

/**
* Creates a new {@link XMLRecordOffset} instance.
* Creates a new {@link IndexRecordOffset} instance.
*
* @param records
* @param records the record offset.
*/
public XMLRecordOffset(final long records) {
public IndexRecordOffset(final long records) {
this(Time.SYSTEM.milliseconds(), records);
}

/**
* Creates a new {@link XMLRecordOffset} instance.
* Creates a new {@link IndexRecordOffset} instance.
*
* @param timestamp
* @param records
* @param timestamp the timestamp attached to this offset.
* @param records the record offset.
*/
private XMLRecordOffset(final long timestamp, final long records) {
private IndexRecordOffset(final long timestamp, final long records) {
super(timestamp);
this.records = records;
}
Expand Down

0 comments on commit 06385b3

Please sign in to comment.