/
TransientSpace.java
300 lines (291 loc) · 9.3 KB
/
TransientSpace.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
/*
* jPOS Project [http://jpos.org]
* Copyright (C) 2000-2013 Alejandro P. Revilla
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.jpos.space;
import java.util.*;
/**
* Transient Space implementation
* @author Alejandro Revilla
* @version $Revision$ $Date$
* @since 2.0
* @jmx:mbean description "TransientSpace"
*/
public class TransientSpace implements LocalSpace, TransientSpaceMBean {
protected Map map;
static LocalSpace defaultSpace = new TransientSpace ();
/**
* @jmx:managed-constructor description="Default Constructor"
*/
public TransientSpace () {
super();
map = new HashMap ();
}
public void out (Object key, Object value) {
List listeners;
synchronized (this) {
Data data = (Data) map.get (key);
if (data == null)
map.put (key, (data = new Data ()));
data.add (value);
this.notifyAll ();
listeners = data.getListeners();
}
if (listeners != null) {
Iterator iter = listeners.iterator();
while (iter.hasNext()) {
((SpaceListener)iter.next()).notify (key, value);
}
}
}
public void out (Object id, Object value, long timeout) {
LeasedReference ref = new LeasedReference (value, timeout);
out (id, ref);
}
public synchronized Object rdp (Object key) {
Object obj = null;
Data data = (Data) map.get (key);
if (data != null) {
return data.get (key);
}
return obj;
}
public synchronized Object inp (Object key) {
Object obj = null;
Data data = (Data) map.get (key);
if (data != null) {
obj = data.remove ();
if (data.isEmpty ())
map.remove (key);
}
return obj;
}
public synchronized Object in (Object key) {
Object obj;
while ((obj = inp (key)) == null) {
try {
this.wait ();
} catch (InterruptedException e) { }
}
return obj;
}
public synchronized Object in (Object key, long timeout) {
Object obj;
long now = System.currentTimeMillis();
long end = now + timeout;
while ((obj = inp (key)) == null &&
((now = System.currentTimeMillis()) < end))
{
try {
this.wait (end - now);
} catch (InterruptedException e) { }
}
return obj;
}
public synchronized Object rd (Object key) {
Object obj;
while ((obj = rdp (key)) == null) {
try {
this.wait ();
} catch (InterruptedException e) { }
}
return obj;
}
public synchronized Object rd (Object key, long timeout) {
Object obj;
long now = System.currentTimeMillis();
long end = now + timeout;
while ((obj = rdp (key)) == null &&
((now = System.currentTimeMillis()) < end))
{
try {
this.wait (end - now);
} catch (InterruptedException e) { }
}
return obj;
}
public void put (Object key, Object value) {
throw new SpaceError ("Unsupported operation");
}
public void put (Object key, Object value, long timeout) {
throw new SpaceError ("Unsupported operation");
}
public synchronized void addListener (Object key, SpaceListener listener) {
Data data = (Data) map.get (key);
if (data == null)
map.put (key, (data = new Data()));
data.addListener (listener);
}
public synchronized void addListener
(Object key, SpaceListener listener, long timeout)
{
// Not properly implemented, use new TSpace class instead
addListener (key, listener);
}
public synchronized void removeListener (Object key, SpaceListener listener) {
Data data = (Data) map.get (key);
if (data != null)
data.removeListener (listener);
}
protected static final class Data {
LinkedList data;
LinkedList listeners;
protected Data () {
super();
data = new LinkedList ();
listeners = null;
}
protected Data (Object value) {
this ();
add (value);
}
protected void add (Object value) {
data.add (value);
}
protected Object get (Object value) {
Object obj = null;
while (size() > 0) {
obj = data.getFirst();
if (obj instanceof LeasedReference) {
obj = ((LeasedReference)obj).get ();
if (obj == null) {
data.removeFirst ();
continue;
}
}
break;
}
return obj;
}
protected int size () {
return data.size ();
}
protected Object remove () {
Object obj = null;
while (size() > 0) {
obj = data.removeFirst();
if (obj instanceof LeasedReference) {
LeasedReference ref = (LeasedReference) obj;
obj = ref.get ();
if (obj == null) {
continue;
}
ref.discard ();
}
break;
}
return obj;
}
protected boolean isEmpty () {
return data.isEmpty () && listeners == null;
}
protected void addListener (SpaceListener l) {
if (listeners == null)
listeners = new LinkedList ();
listeners.add (l);
}
protected void removeListener (SpaceListener l) {
if (listeners != null) {
listeners.remove (l);
if (listeners.isEmpty ())
listeners = null;
}
}
protected List getListeners () {
return listeners;
}
}
public static final LocalSpace getSpace () {
return defaultSpace;
}
public static final LocalSpace getSpace (String spaceName) {
String key = "jpos:space/"+spaceName;
Object obj = getSpace().rdp (key);
Space sp = getSpace();
if (obj == null) {
synchronized (TransientSpace.class) {
obj = sp.rdp (key);
if (obj == null) {
obj = new TransientSpace ();
sp.out (key, obj);
}
}
}
return (LocalSpace) obj;
}
/**
* @return set of keys present in the Space
* @jmx:managed-attribute description="Keys in Space"
*/
public Set getKeySet () {
Set keySet;
synchronized (this) {
keySet = map.keySet();
}
return keySet;
}
public String getKeys () {
StringBuffer sb = new StringBuffer ();
Iterator iter = map.keySet().iterator ();
boolean first = true;
while (iter.hasNext()) {
if (!first)
sb.append (' ');
else
first = false;
sb.append (iter.next().toString ());
}
return sb.toString ();
}
/**
* same as Space.out (key,value)
* @param key Key
* @param value value
* @jmx:managed-operation description="Write value to key"
* @jmx:managed-operation-parameter position="0" name="key" description="Space Key"
* @jmx:managed-operation-parameter position="1" name="value" description="Value to write"
*/
public void write (String key, String value) {
out (key, value);
}
/**
* same as (String) Space.rdp (key)
* @param key Key
* @return value.toString()
* @jmx:managed-operation description="Read value from key"
* @jmx:managed-operation-parameter position="0" name="key" description="Space Key"
*/
public String read (String key) {
Object o = rdp (key);
return (o != null) ? o.toString() : "null";
}
public int size (Object key) {
Data data = (Data) map.get (key);
return data == null ? 0 : data.size ();
}
public void push (Object id, Object value) {
throw new SpaceError ("Unsupported operation");
}
public void push (Object id, Object value, long timeout) {
throw new SpaceError ("Unsupported operation");
}
public boolean existAny (Object[] keys) {
throw new SpaceError ("Unsupported operation");
}
public boolean existAny (Object[] keys, long timeout) {
throw new SpaceError ("Unsupported operation");
}
}