-
Notifications
You must be signed in to change notification settings - Fork 13
/
BinaryJedisque.java
62 lines (51 loc) · 1.69 KB
/
BinaryJedisque.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.github.xetorthio.jedisque;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.SafeEncoder;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
public class BinaryJedisque extends redis.clients.jedis.Connection {
static final private int DISQUE_PORT = 7711;
private final List<URI> uris = new ArrayList<URI>();
private Random randomGenerator = new Random();
public BinaryJedisque() {
try {
uris.add(new URI("disque://localhost:" + DISQUE_PORT));
} catch (URISyntaxException e) {
}
}
public BinaryJedisque(URI... uris) {
this.uris.addAll(Arrays.asList(uris));
}
@Override
public void connect() {
while (!this.isConnected()) {
if (uris.size() == 0) {
throw new JedisConnectionException("Could not connect to any of the provided nodes");
}
int index = randomGenerator.nextInt(uris.size());
try {
URI uri = uris.get(index);
setHost(uri.getHost());
setPort(uri.getPort());
super.connect();
} catch (JedisConnectionException e) {
uris.remove(index);
}
}
}
public String addJob(byte[] queueName, byte[] job, long mstimeout) {
sendCommand(Command.ADDJOB, queueName, job, Protocol.toByteArray(mstimeout));
return getBulkReply();
}
public String addJob(byte[] queueName, byte[] job, long mstimeout, JobParams params) {
List<byte[]>addJobCommand = new ArrayList<byte[]>();
addJobCommand.add(queueName);
addJobCommand.add(job);
addJobCommand.add(Protocol.toByteArray(mstimeout));
addJobCommand.addAll(params.getParams());
sendCommand(Command.ADDJOB, addJobCommand.toArray(new byte[addJobCommand.size()][]));
return getBulkReply();
}
}