Skip to content

Commit

Permalink
refaractor(api/filesystems): move FileInputIterator implementation to…
Browse files Browse the repository at this point in the history
… commons-fs

This commits adds the new interfaces FileInputIteratorFactory and Storage
in order to decouple iterator from the underlying file-system
  • Loading branch information
fhussonnois committed Feb 12, 2021
1 parent 06385b3 commit 1658d35
Show file tree
Hide file tree
Showing 40 changed files with 1,221 additions and 574 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static boolean isFastSplit(final String regex) {
}

public static boolean isNotBlank(final String string) {
return !(Objects.isNull(string) || string.isBlank());
return !isBlank(string);
}

public static boolean isBlank(final String string) {
return Objects.isNull(string) || string.isBlank();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019-2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;

import java.net.URI;

/**
* A {@code FileInputIteratorFactory} can be used to get a new {@link FileInputIterator}.
*/
public interface FileInputIteratorFactory {

/**
* Creates a new {@link FileInputIterator} for the given object file.
*
* @param objectURI the {@link URI} of the object file.
* @return a new {@link FileInputIterator}.
*/
FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import org.apache.kafka.common.Configurable;

import java.net.URI;
Expand All @@ -30,7 +29,7 @@
/**
* A {@code FileInputReader} is the principal class used to read an input file/object.
*/
public interface FileInputReader extends Configurable, AutoCloseable {
public interface FileInputReader extends FileInputIteratorFactory, Configurable, AutoCloseable {

/**
* Configure this class with the given key-value pairs.
Expand All @@ -45,27 +44,27 @@ default void configure(final Map<String, ?> configs) {
/**
* Gets the metadata for the source object identified by the given {@link URI}.
*
* @param sourceURI the {@link URI} of the source object.
* @param objectURI the {@link URI} of the file object.
* @return a new {@link FileObjectMeta} instance.
*/
FileObjectMeta readMetadata(final URI sourceURI);
FileObjectMeta getObjectMetadata(final URI objectURI);

/**
* Checks whether the source object identified by the given {@link URI} can be read.
*
* @param sourceURI the {@link URI} of the source object.
* @param objectURI the {@link URI} of the file object.
* @return the {@code true}.
*/
boolean isReadable(final URI sourceURI);
boolean canBeRead(final URI objectURI);

/**
* Creates a new {@link FileInputIterator} instance.
* Creates a new {@link FileInputIterator} for the given {@link URI}.
*
* @param context the context of the file on which to create the iterator.
* @param objectURI the {@link URI} of the file object.
* @return a new {@link FileInputIterator} iterator instance.
*
*/
FileInputIterator<FileRecord<TypedStruct>> newIterator(final FileContext context);
FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI);

/**
* Close this reader and any remaining un-close iterators.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* {@code DelegateFileInputIterator}.
*/
public class DelegateFileInputIterator implements FileInputIterator<FileRecord<TypedStruct>> {

private static final Logger LOG = LoggerFactory.getLogger(DelegateFileInputIterator.class);

private final URI objectURI;
private final FileInputReader reader;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

private FileInputIterator<FileRecord<TypedStruct>> iterator;

/**
* Creates a new {@link DelegateFileInputIterator} instance.
*
* @param objectURI the input source file.
* @param reader the input source reader used to create a new {@link FileInputIterator}.
*/
DelegateFileInputIterator(final URI objectURI, final FileInputReader reader) {
this.objectURI = Objects.requireNonNull(objectURI, "source can't be null");
this.reader = Objects.requireNonNull(reader, "reader can't be null");
}

/**
* Gets the metadata of the backed object file.
*
* @return the {@link FileObjectMeta}
*/
public FileObjectMeta getMetadata() {
return reader.getObjectMetadata(objectURI);
}

/**
* Gets the URI of the backed object file.
*
* @return the {@link URI}
*/
public URI getObjectURI() {
return objectURI;
}

/**
* Gets the {@link FileInputIterator} or create a new one if none has been initialized yet.
*/
public void open() {
if (isOpen()) throw new IllegalStateException("Iterator is already open");
LOG.info("Opening new iterator for: {}", objectURI);
iterator = reader.newIterator(objectURI);
}

/**
* @return {@code true} if an iterator is already opened.
*/
boolean isOpen() {
return iterator != null && !iterator.isClose();
}

/**
*
* @return {@code true} if the backed object file can be read and is accessible.
*/
boolean isValid() {
return reader.canBeRead(objectURI);
}

/**
* {@inheritDoc}
*/
@Override
public FileContext context() {
checkIsOpen();
return iterator.context();
}

/**
* {@inheritDoc}
*/
@Override
public void seekTo(final FileObjectOffset offset) {
checkIsOpen();
iterator.seekTo(offset);
}

/**
* {@inheritDoc}
*/
@Override
public RecordsIterable<FileRecord<TypedStruct>> next() {
checkIsOpen();
return iterator.next();
}

/**
* {@inheritDoc}
*/
@Override
public boolean hasNext() {
checkIsOpen();
return iterator.hasNext();
}

/**
* {@inheritDoc}
*/
@Override
public void close() {
if (!isClosed.get()) {
LOG.info("Closing iterator for: {} ", iterator.context().metadata());
iterator.close();
isClosed.set(true);
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean isClose() {
return isClosed.get();
}

private void checkIsOpen() {
if (!isOpen()) throw new IllegalStateException("This iterator is not initialized yet or already closed");
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ default String stringURI() {
* @return the content-length of the source object.
*/
@JsonProperty("contentLength")
long contentLength();
Long contentLength();

/**
* @return the creation date or the last modified date, whichever is the latest.
*/
@JsonProperty("lastModified")
long lastModified();
Long lastModified();

/**
* @return the digest of the source object content.
Expand Down
Loading

0 comments on commit 1658d35

Please sign in to comment.