Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/skip unchanged files #62

Merged

Conversation

grantatspothero
Copy link
Contributor

@grantatspothero grantatspothero commented May 26, 2020

Breaking this PR into two: #60

This PR just contains the bits about skipping unchanged files by storing the fileSizeBytes in the offset.

We store the fileSizeBytes as an offset in kafka connect and skip files whose file size has not changed
@coveralls
Copy link

coveralls commented May 26, 2020

Coverage Status

Coverage decreased (-0.3%) to 95.305% when pulling d46ddfa on grantatspothero:feature/skip_unchanged_files into 40888d1 on mmolimar:develop.

log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
return new ArrayList<SourceRecord>();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why returning here an empty list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment here: #60 (comment)

If we commit offsets for a file containing the fileSizeBytes but we only processed part way through the file due to an error we will skip the data from the rest of the file. Instead we should only commit the offsets for that file if the file was processed entirely.

@mmolimar
Copy link
Owner

Thanks @grantatspothero
Just a few comments. Could you change your PR to point to develop branch? The idea is to group new features there and then release a version and update the master branch.

Copy link
Owner

@mmolimar mmolimar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments below

Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("File {} has byte length and byte offset of: {}, skipping reading the file as it is unchanged since the last execution", metadata.getPath(), byteOffset);
return null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer returning here a "fake" file reader object implementing the FileReader interface with no elements.

Copy link
Contributor Author

@grantatspothero grantatspothero May 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of methods to implement and it complicates the code a lot for a simple operation (is this thing present?).

Is using java.util.Optional ok instead? This connector only supports java8 or higher so it seems reasonable to use.

EDIT: I guess the interaction of autocloseable and java.util.optional is not great. I can go ahead with the fake file reader 👍

@@ -78,13 +78,18 @@ public void start(Map<String, String> properties) {
List<SourceRecord> totalRecords = filesToProcess().map(metadata -> {
List<SourceRecord> records = new ArrayList<>();
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
if(reader == null){
Copy link
Owner

@mmolimar mmolimar May 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid this validation. See comment related with the "fake" file reader.

if(fileSizeBytesObject != null) {
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("File {} has byte length and byte offset of: {}, skipping reading the file as it is unchanged since the last execution", metadata.getPath(), byteOffset);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the message in the log to avoid a line too large

@grantatspothero grantatspothero changed the base branch from master to develop May 27, 2020 21:41

import java.util.Map;

public class EmptyFileReader extends AbstractFileReader<Void>{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test coverage dropped a lot because of this file. It doesn't really make sense to test most of these operations since they will never be called/they do nothing (ie: seekFile is never called and is an empty method)

@grantatspothero
Copy link
Contributor Author

@mmolimar just bumping this, made changes

@grantatspothero
Copy link
Contributor Author

@mmolimar Merged with recently updated develop, the failing test is a flaky test in the HDFS watcher so I think it is unrelated.

.andReturn(offsetStorageReader)
.times(2);

// Every time the `offsetStorageReader.offset(params)` method is called we want to capture the offset params
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a better way to use EasyMock to do this, but I could not find a way to do it in the EasyMock docs.

@mmolimar
Copy link
Owner

Thanks for the contrib!

@mmolimar mmolimar merged commit e60cef3 into mmolimar:develop Jun 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants