-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batching processor #139
Conversation
@Override | ||
public void process(ProcessingContext<T> context, T task) throws InterruptedException { | ||
BatchingTask<T> newTask = new BatchingTask<>(context.deferCompletion(), context, task); | ||
boolean isInitialTask = windowedTasks.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation is to make a decision before adding a Task, and to scheduleFlush after adding a Task.
|
||
/** | ||
* *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or | ||
* {@link BatchingTask#context}'s {@link ProcessingContext#retry()} method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a result of worrying about the retry process, I left it to the implementation side, but I think that it should be improved...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the main method of this class that we expect users to implement, the document should be bit more informative IMO.
At least it should cover topics like what to do after complete processing batch of tasks. (call .completion().complete()
or retry()
for each right?),
what happens if an error is thrown, which threads may possibly call this method etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting a patch!
Added few early feedbacks.
return; | ||
} | ||
synchronized (windowedTasks) { | ||
processBatchingTasks(windowedTasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like clearing passed batchingTasks is also implementer's responsibility?
I guess it's not good design, as it could be error-prone.
Let's pass copy of windowedTasks and clear it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the advice! I fixed it.
1a2e2c2
} | ||
|
||
private void flush() { | ||
if (!windowedTasks.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is that intentional? (not windowedTasks.isEmpty()
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry... I made a mistake.
4fdf913
|
||
// visible for testing | ||
Runnable flushTask() { | ||
return this::flush; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems flushTask() isn't called periodically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about the following, but it was certainly difficult to understand, so I fixed it!
- When the first task comes in,
scheduleFlush()
is called. flush()
empties windowsTasks.- When the next task comes in after
flush()
,scheduleFlush()
is called because windowTasks is empty.
2ef1bc0
I'm sorry for the late response 🙇 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for delaying the review!
Reviewed 1st round. PTAL comments.
* Batch-flushing should be done in time-based and size-based. | ||
* @param <T> type of task to batch | ||
*/ | ||
abstract public class BatchingProcessor<T> implements DecatonProcessor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits] should be ordered as public abstract
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it. 4471e71
* @param capacity size limit for this processor. Every time tasks’size reaches capacity, | ||
* tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. | ||
*/ | ||
public BatchingProcessor(long lingerMillis, int capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the general practice, the constructor of abstract class should be marked as protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it. f6f97ba
@Accessors(fluent = true) | ||
public static class BatchingTask<T> { | ||
@Getter(AccessLevel.NONE) | ||
public final Completion completion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make all fields as private and provide getter explicitly?
Also I think getter for completion
is necessary to complete the task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it. c668bb3
BatchingTask<T> newTask = new BatchingTask<>(context.deferCompletion(), context, task); | ||
windowedTasks.add(newTask); | ||
if (windowedTasks.size() >= this.capacity) { | ||
flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be executed in scheduler thread to avoid unnecessary lock contention between Decaton's processor thread and scheduler thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, then the problem is at the time flush()
called, windowedTasks may contain more tasks than batch size, it's also not desirable.
Ok, then what do you think about following strategy?
- Instead of using single final
windowedTasks
, have non-finalList<BatchingTask>
for "current" batch. - When "current" batch exceeds batch size, submit it to flusher and substitute current batch with new List.
Decaton will pause consumption if there's too many pending tasks, so even with this strategy, heap usage will not grow indefinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I probably don't understand what you're saying...
I tried to correct it with my own understanding, but please point out if it is wrong 🙇
cb78a8b
|
||
processor.process(context, task1); | ||
processor.process(context, task2); | ||
Thread.sleep(lingerMs * 2); // doubling just to make sure flush completed in background |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep to wait something happen makes test result flaky.
Let's use CountDownLatch to make sure the condition happens. (Referring CompactionProcessorTest may be helpful)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your reference.
I fixed it so that the test result is stable as much as possible.
357f2fb
Thank you for your polite review! |
* *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or | ||
* {@link BatchingTask#context}'s {@link ProcessingContext#retry()} method. | ||
*/ | ||
abstract void processBatchingTasks(List<BatchingTask<T>> batchingTasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the general practice, let's make this method protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it. 5ae0959
public void process(ProcessingContext<T> context, T task) throws InterruptedException { | ||
BatchingTask<T> newTask = new BatchingTask<>(context.deferCompletion(), context, task); | ||
windowedTasks.add(newTask); | ||
if (windowedTasks.size() >= this.capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In flush-thread centralization and batch.size-bound point of view, looks okay in current impl.
But I suggested in #139 (comment) is something like below:
public abstract class BatchingProcessor<T> implements DecatonProcessor<T> {
private List<BatchingTask<T>> currentBatch = new ArrayList<>();
private void scheduleFlush() {
executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS);
}
private void periodicallyFlushTask() {
final List<BatchingTask<T>> batch;
synchronized (this) {
if (!currentBatch.isEmpty()) {
batch = currentBatch;
currentBatch = new ArrayList<>();
} else {
batch = null;
}
}
if (batch != null) {
processBatchingTasks(batch);
}
scheduleFlush();
}
public void process(ProcessingContext<T> context, T task) throws InterruptedException {
synchronized (this) {
if (currentBatch.size() >= batchSize) {
final List<BatchingTask<T>> batch = currentBatch;
executor.submit(() -> processBatchingTasks(batch));
currentBatch = new ArrayList<>();
}
currentBatch.add(task);
}
}
}
I guess it's more straightforward than maintaining single windowedTasks instance with calling subList
...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be executed in scheduler thread to avoid unnecessary lock contention between Decaton's processor thread and scheduler thread
Thank you! I finally understood the meaning of the above sentence.
executor.submit(() -> processBatchingTasks(batch));
is important.
I fixed it. 9e35f1b
#139 (comment) |
@ocadaruma @kawamuray I have corrected other comments I received. |
Thanks for the fix.
I guess it's simply because linger-based flush is disabled on that test? |
public abstract class BatchingProcessor<T> implements DecatonProcessor<T> { | ||
|
||
private final ScheduledExecutorService executor; | ||
private List<BatchingTask<T>> currentBatch = Collections.synchronizedList(new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh what I meant was for BatchingProcessorTest
. (#139 (comment))
For this, every modification is done in synchronized block so not necessary to be wrapped I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
@Value | ||
@Accessors(fluent = true) | ||
@RequiredArgsConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RequiredArgsConstructor
is redundant as there's @Value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it.
aa51f27
Thank you for the advice. |
Sorry for the late reply every time. |
Thanks for the update!
Ah maybe what I meant was more simple.
However, I agree with removing it. Because what we want to check in integration test is BatchingProcessor's behavior in overall (as commented in #139 (comment)), so we don't have to write test scenario for each of linger=MAX, capacity=MAX I think. |
|
||
/** | ||
* Instantiate {@link BatchingProcessor}. | ||
* If you only need one limit, please set large enough value to another. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As described in #139 (comment), we found that consumption will get stuck if we specify lingerMillis
to Int.MAX.
Hm, so I think realistically we always specify non-Int.MAX value for both parameters.
Then let's just remove this line? To avoid mis-use of these parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, I fixed it.
0cb57f7
ProcessorTestSuite | ||
.builder(rule) | ||
.configureProcessorsBuilder(builder -> builder.thenProcess( | ||
new BatchingProcessor<TestTask>(1000, Integer.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the purpose of this integration test is to verify the behavior of BatchingProcessor in overall (rather than checking if BatchingProcessor is implemented correctly for each individual cases like linger=Max, capacity=Max etc), so let's also specify non-Int.MAX value for capacity here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it. (I renamed the test name too.)
0317258
40d355b
to
0317258
Compare
What I immediately thought is because of multiple partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks 👍
|
As soon as this PR is merged, I will add a documentation like task-compaction.adoc in another PR! |
Motivation
Related Issue: #128