-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add files batching capabilities to the Policies #59
Conversation
abcb768
to
04be206
Compare
Many thanks! |
I went through the #60 and from my understanding it doesn't solve the same challenge as this one. I'm gonna try to refactor this to work at the |
Great @Symbianx! |
dec8c77
to
45c0709
Compare
81f6e1e
to
21175ba
Compare
21175ba
to
7d1e4e9
Compare
@mmolimar Made the change to setup the batching in the |
@mmolimar been trying to fix the failing test but with no luck. Can't even reproduce on my machine, you have any idea of what could be wrong? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Symbianx
I added some comments.
src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java
Outdated
Show resolved
Hide resolved
@@ -105,16 +109,25 @@ private String convert(String uri) { | |||
if (hasEnded()) { | |||
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried."); | |||
} | |||
|
|||
if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to avoid this check and set the files
iterator with the previous
value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we skip this check, the preCheck
and postCheck
methods will be executed which will cause the CronPolicy
, SleepyPolicy
and HdfsFileWatcherPolicy
to sleep between batches which breaks the current behaviour of sleeping only after handling all the files. Is this what you want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes totally sense but just returning previous
(in case it has elements and ignoring the batch size), ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds ok.
Since previous
is the original iterator (not BatchIterator
) I will need to create a custom class extending Iterator with a method to reset the counter to 0
. Then we can store the BatchIterator
in the previous
variable instead.
I will change it tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to use com.google.common.collect.Iterators.partition
instead which simplified the implementation and becomes more standard.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class BatchIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add this functionality inside the listFiles
method in the AbstractPolicy
class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we put it in the listFiles
method we will be batching each file system independently. Feels to me that is not the intended behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Maybe renaming this class to Iterators
with a partition
method with the batch size and a "duplicates" flag to allow removing duplicates in the same partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't understand what you mean by "allow removing duplicates in the same partition".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example: in the first execution of the policy you get an iterator with 5 records (with a batch size of 3) so 2 remaining. In the next iteration (let's say we're using the Cron policy) we concat the previous 2 with other 5 records. We re-split that iterator and "maybe" in this batch could be an item duplicated, so we should ignore this record and get another one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that will not be inline with the present implementation.
An execution is only finished when the iterator is fully traversed, only after that we will sleep. If we mix files from the current iterator with the next one we will be creating a weird behaviour in the policies.
4327185
to
02dd5a4
Compare
Thanks! |
We use the
SimplePolicy
to restore events from S3 (1 event per file) and noticed messages took a lot of time before they were pushed into the topics.This PR adds a new configuration option
policy.batch_size
that allows the connector to handle a maximum number of files at a time as configured.Fixes #58.