-
Notifications
You must be signed in to change notification settings - Fork 0
/
KahaChannel.java
109 lines (93 loc) · 3.42 KB
/
KahaChannel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package nl.flotsam.spring.integration.kaha;
import com.agilejava.blammo.BlammoLoggerFactory;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.integration.channel.AbstractPollableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.selector.MessageSelector;
import org.springframework.util.Assert;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class KahaChannel<T> extends AbstractPollableChannel {
private final ListContainer<Message<T>> container;
private final Semaphore available;
private static Logger logger = (Logger) BlammoLoggerFactory.create(Logger.class);
public KahaChannel(File directory, String id, MessageCodec<T> codec) throws IOException {
this(openStore(id, directory), id, codec);
}
public KahaChannel(Store store, String id, MessageCodec<T> codec) throws IOException {
assert store != null;
assert id != null;
assert codec != null;
container = store.getListContainer(id);
container.setMarshaller(new MessageCodecAdapter(codec));
available = new Semaphore(container.size());
}
private static Store openStore(String id, File directory) throws IOException {
assert id != null;
assert directory != null;
logger.logConfiguration(id, directory);
return StoreFactory.open(directory, "rw");
}
@Override
protected Message<?> doReceive(long timeout) {
return doSafeReceive(timeout);
}
private Message<T> doSafeReceive(long timeout) {
try {
if (timeout > 0) {
if (available.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
return container.removeFirst();
} else {
return null;
}
} else if (timeout < 0) {
available.acquire();
return container.removeFirst();
} else {
if (available.tryAcquire()) {
return container.removeFirst();
} else {
return null;
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return null;
}
}
@Override
protected boolean doSend(Message<?> message, long timeout) {
container.addLast((Message<T>) message);
available.release();
return true;
}
public List<Message<?>> clear() {
throw new UnsupportedOperationException();
}
public List<Message<?>> purge(MessageSelector selector) {
throw new UnsupportedOperationException();
}
/**
* @blammo.logger
*/
public interface Logger {
/**
* Logs the configuration details of the channel.
*
* @param id Id of the channel.
* @param directory The directory storing the files.
* @blammo.level info
* @blammo.message "Created Kaha Channel {id}, from directory {directory}"
*/
void logConfiguration(String id, File directory);
}
}