/
Synchronizer.java
executable file
·138 lines (126 loc) · 5.73 KB
/
Synchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* #%L
* Netarchivesuite - common
* %%
* Copyright (C) 2005 - 2018 The Royal Danish Library,
* the National Library of France and the Austrian National Library.
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 2.1 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Lesser Public License for more details.
*
* You should have received a copy of the GNU General Lesser Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/lgpl-2.1.html>.
* #L%
*/
package dk.netarkivet.common.distribute;
import java.util.Hashtable;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dk.netarkivet.common.exceptions.ArgumentNotValid;
import dk.netarkivet.common.exceptions.IOFailure;
/**
* Converts an asynchronous call to a synchronous call. The method sendAndWaitForOneReply() is a blocking call which
* responds when a reply is received or returns null on timeout.
*/
public class Synchronizer implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(Synchronizer.class);
/** Collection containing messages on which a reply is awaited. */
private final Hashtable<String, NetarkivetMessage> requests;
/** Collection containing reply messages which have not yet been returned to the caller. */
private Hashtable<String, NetarkivetMessage> replies;
/**
* Initialise maps containing requests and replies.
*/
public Synchronizer() {
requests = new Hashtable<String, NetarkivetMessage>();
replies = new Hashtable<String, NetarkivetMessage>();
}
/**
* Receives replies from a message queue and triggers the blocked call in sendAndWaitForOneReply().
*
* @param msg an ObjectMessage containing a NetarkivetMessage.
*/
public void onMessage(Message msg) {
ArgumentNotValid.checkNotNull(msg, "msg");
NetarkivetMessage naMsg = JMSConnection.unpack(msg);
NetarkivetMessage requestMsg;
synchronized (requests) {
requestMsg = requests.get(naMsg.getReplyOfId());
}
if (requestMsg != null) {
synchronized (requestMsg) {
replies.put(naMsg.getReplyOfId(), naMsg);
requestMsg.notifyAll();
}
} else {
log.warn("Received unexpected reply for unknown message '{}' of type '{}'. Ignored!!: {}",
naMsg.getReplyOfId(), naMsg.getClass().getName(), naMsg.toString());
}
}
/**
* Sends a message to a message queue and blocks the method invocation until a reply arrives. If it times out a null
* is returned. If a spurious wakeup is received and a timeout is set, the method will carry on waiting for the
* reply until the total timeout time has been used up. If a spurious wakeup is received and no timeout is set the
* method will just go back to waiting
*
* @param msg the request message
* @param timeout the timeout in milliseconds (or zero for no timeout)
* @return a reply message from the receiver of the request or null if timed out.
*/
public NetarkivetMessage sendAndWaitForOneReply(NetarkivetMessage msg, long timeout) {
ArgumentNotValid.checkNotNull(msg, "msg");
boolean noTimeout = (timeout == 0);
JMSConnection con = JMSConnectionFactory.getInstance();
synchronized (msg) {
synchronized (requests) {
con.send(msg);
requests.put(msg.getID(), msg);
}
try {
while (!replies.containsKey(msg.getID())) {
long timeBeforeWait = System.currentTimeMillis();
msg.wait(timeout);
synchronized (requests) {
if (!replies.containsKey(msg.getID())) {
// At this point we either got an unexpected wakeup
// or timed out
long timeAfterWait = System.currentTimeMillis();
// the new timeout value
timeout -= timeAfterWait - timeBeforeWait;
if (noTimeout || timeout > 0) { // Unexpected wakeup
log.debug("Unexpected wakeup for {}", msg.toString());
} else {
// timed out
// NB! if timeout is exactly zero here then this
// counts as a timeout. Otherwise we would call
// wait(0) on the next loop with disastrous
// results
requests.remove(msg.getID());
log.debug("Timed out waiting for reply to {}", msg.toString());
return null;
}
}
}
}
} catch (InterruptedException e) {
throw new IOFailure("Interrupted while waiting for reply to " + msg, e);
}
}
// If we get here, we must have received the expected reply
synchronized (requests) {
requests.remove(msg.getID());
log.debug("Received reply for message: {}", msg.toString());
return replies.remove(msg.getID());
}
}
}