-
Notifications
You must be signed in to change notification settings - Fork 6
/
RequestSeqIdsMessage.java
86 lines (76 loc) · 3.28 KB
/
RequestSeqIdsMessage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*******************************************************************************
* Copyright 2015 Klaus Pfeiffer <klaus@allpiper.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package com.jfastnet.messages;
import com.jfastnet.MessageKey;
import com.jfastnet.MessageLog;
import com.jfastnet.events.RequestedMessageNotInLogEvent;
import com.jfastnet.processors.MessageLogProcessor;
import com.jfastnet.state.ClientState;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** This message is used to request missing sequenced ids from the other side.
* @author Klaus Pfeiffer - klaus@allpiper.com */
@Slf4j
public class RequestSeqIdsMessage extends Message implements IDontFrame {
/** List of sequence message ids we require. */
private List<Long> missingIds = new ArrayList<>();
public RequestSeqIdsMessage(List<Long> missingIds, int receiverId) {
this.missingIds = missingIds;
setReceiverId(receiverId);
log.info("Request absent-Ids from {}: {}", receiverId, Arrays.toString(missingIds.toArray()));
}
@Override
public ReliableMode getReliableMode() {
// If this message gets lost, the ids will be requested again from the other side.
return ReliableMode.UNRELIABLE;
}
@Override
public void process(Object context) {
int senderId = getSenderId();
log.info("Resend absent ids: {} to {}", Arrays.toString(missingIds.toArray()), senderId);
int keySenderId = senderId;
if (!getState().idProvider.resolveEveryClientMessage()) {
// Clear sender id, if every client receives the same id for a particular message
keySenderId = 0;
}
degradeNetworkQuality();
MessageLog messageLog = getState().getProcessorOf(MessageLogProcessor.class).getMessageLog();
for (Long absentId : missingIds) {
MessageKey key = MessageKey.newKey(Message.ReliableMode.SEQUENCE_NUMBER, keySenderId, absentId);
Message message = messageLog.getSent(key);
if (message == null) {
log.error("Requested message {} not in log.", key);
getState().getEventLog().add(new RequestedMessageNotInLogEvent(key, senderId));
continue;
}
message.setReceiverId(senderId);
message.setResendMessage(true);
log.info("Resend {} to {}", message, senderId);
getConfig().internalSender.send(message);
getConfig().netStats.resentMessages.incrementAndGet();
}
}
private void degradeNetworkQuality() {
// If other side requests missing messages, it means we should slow down sending.
// Thus we degrade the network quality to this peer.
ClientState clientState = getState().getClientStates().getById(getSenderId());
if (clientState != null) {
clientState.getNetworkQuality().requestedMissingMessages(missingIds.size(), getConfig().timeProvider.get());
}
}
}