Skip to content

Commit

Permalink
PositionAndListener
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed May 7, 2024
1 parent 3b92576 commit 18af1e0
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.core.Tuple;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -25,11 +24,13 @@
*/
class ProgressListenableActionFuture extends PlainActionFuture<Long> {

private record PositionAndListener(Long position, ActionListener<Long> listener) {}

protected final long start;
protected final long end;

// modified under 'this' mutex
private volatile List<Tuple<Long, ActionListener<Long>>> listeners;
private volatile List<PositionAndListener> listeners;
protected volatile long progress;
private volatile boolean completed;

Expand All @@ -55,7 +56,7 @@ private boolean invariant() {
assert completed == false || listeners == null;
assert start <= progress : start + " <= " + progress;
assert progress <= end : progress + " <= " + end;
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1());
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.position());
}
return true;
}
Expand Down Expand Up @@ -87,11 +88,11 @@ public void onProgress(final long progressValue) {
assert this.progress < progressValue : this.progress + " < " + progressValue;
this.progress = progressValue;

final List<Tuple<Long, ActionListener<Long>>> listenersCopy = this.listeners;
final List<PositionAndListener> listenersCopy = this.listeners;
if (listenersCopy != null) {
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
for (Tuple<Long, ActionListener<Long>> listener : listenersCopy) {
if (progressValue < listener.v1()) {
List<PositionAndListener> listenersToKeep = null;
for (PositionAndListener listener : listenersCopy) {
if (progressValue < listener.position()) {
if (listenersToKeep == null) {
listenersToKeep = new ArrayList<>();
}
Expand All @@ -100,7 +101,7 @@ public void onProgress(final long progressValue) {
if (listenersToExecute == null) {
listenersToExecute = new ArrayList<>();
}
listenersToExecute.add(listener.v2());
listenersToExecute.add(listener.listener());
}
}
this.listeners = listenersToKeep;
Expand Down Expand Up @@ -137,15 +138,16 @@ private void ensureNotCompleted() {
@Override
protected void done(boolean success) {
super.done(success);
final List<Tuple<Long, ActionListener<Long>>> listenersToExecute;
final List<PositionAndListener> listenersToExecute;
assert invariant();
synchronized (this) {
assert completed == false;
completed = true;
listenersToExecute = this.listeners;
listeners = null;
}
if (listenersToExecute != null) {
listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, this::actionResult));
listenersToExecute.forEach(listener -> executeListener(listener.listener(), this::actionResult));
}
assert invariant();
}
Expand All @@ -165,11 +167,11 @@ public void addListener(ActionListener<Long> listener, long value) {
if (completed || value <= progressValue) {
executeImmediate = true;
} else {
List<Tuple<Long, ActionListener<Long>>> listenersCopy = this.listeners;
List<PositionAndListener> listenersCopy = this.listeners;
if (listenersCopy == null) {
listenersCopy = new ArrayList<>();
}
listenersCopy.add(Tuple.tuple(value, listener));
listenersCopy.add(new PositionAndListener(value, listener));
this.listeners = listenersCopy;
}
}
Expand Down

0 comments on commit 18af1e0

Please sign in to comment.