-
Notifications
You must be signed in to change notification settings - Fork 612
/
StaggeredRequest.java
113 lines (94 loc) · 4.3 KB
/
StaggeredRequest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package org.infinispan.remoting.transport.jgroups;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MultiTargetRequest;
import org.infinispan.remoting.transport.impl.RequestRepository;
import org.infinispan.remoting.transport.ResponseCollector;
import net.jcip.annotations.GuardedBy;
/**
* @author Dan Berindei
* @since 9.1
*/
public class StaggeredRequest<T> extends MultiTargetRequest<T> {
private final ReplicableCommand command;
private final DeliverOrder deliverOrder;
private final JGroupsTransport transport;
@GuardedBy("responseCollector")
private long deadline;
@GuardedBy("responseCollector")
private int targetIndex;
StaggeredRequest(ResponseCollector<T> responseCollector, long requestId, RequestRepository repository,
Collection<Address> targets, Address excludedTarget, ReplicableCommand command,
DeliverOrder deliverOrder, long timeout, TimeUnit unit, JGroupsTransport transport) {
super(responseCollector, requestId, repository, targets, excludedTarget);
this.command = command;
this.deliverOrder = deliverOrder;
this.transport = transport;
this.deadline = transport.timeService.expectedEndTime(timeout, unit);
}
@Override
public void setTimeout(ScheduledExecutorService timeoutExecutor, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException("Timeout can only be set with sendFirstMessage!");
}
@Override
public synchronized void onResponse(Address sender, Response response) {
super.onResponse(sender, response);
sendNextMessage();
}
@Override
protected void onTimeout() {
// Don't call super.onTimeout() if it's just a stagger timeout
boolean isFinalTimeout;
synchronized (responseCollector) {
isFinalTimeout = targetIndex >= getTargetsSize();
}
if (isFinalTimeout) {
super.onTimeout();
} else {
sendNextMessage();
}
}
void sendNextMessage() {
try {
Address target = null;
boolean isFinalTarget;
// Need synchronization because sendNextMessage can be called both directly and from addResponse()
synchronized (responseCollector) {
if (isDone() || targetIndex >= getTargetsSize()) {
return;
}
// Skip over targets that are no longer in the cluster view
while (target == null && targetIndex < getTargetsSize()) {
target = getTarget(targetIndex++);
}
if (target == null) {
// The final targets were removed because they have left the cluster,
// but the request is not yet complete because we're still waiting for a response
// from one of the other targets (i.e. we are being called from onTimeout).
// We don't need to send another message, just wait for the real timeout to expire.
long delayNanos = transport.getTimeService().remainingTime(deadline, TimeUnit.NANOSECONDS);
super.setTimeout(transport.getTimeoutExecutor(), delayNanos, TimeUnit.NANOSECONDS);
return;
}
isFinalTarget = targetIndex >= getTargetsSize();
}
// Sending may block in flow-control or even in TCP, so we must do it outside the critical section
transport.sendCommand(target, command, requestId, deliverOrder, false, true, false);
// Scheduling the timeout task may also block
// If this is the last target, set the request timeout at the deadline
// Otherwise, schedule a timeout task to send a staggered request to the next target
long delayNanos = transport.getTimeService().remainingTime(deadline, TimeUnit.NANOSECONDS);
if (!isFinalTarget) {
delayNanos = delayNanos / 10 / getTargetsSize();
}
super.setTimeout(transport.getTimeoutExecutor(), delayNanos, TimeUnit.NANOSECONDS);
} catch (Exception e) {
completeExceptionally(e);
}
}
}