Skip to content

Commit

Permalink
Merge pull request redis#146 from TioBorracho/MasterSlaveConsistency2
Browse files Browse the repository at this point in the history
Give names to shards for backward compatibility
  • Loading branch information
xetorthio committed May 16, 2011
2 parents 4dcdf4e + 202d68a commit 3a291eb
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 35 deletions.
38 changes: 10 additions & 28 deletions src/main/java/redis/clients/jedis/Connection.java
Expand Up @@ -61,24 +61,6 @@ protected void flush() {
}
}

protected Object read() {
try {
return protocol.read(inputStream);
} catch (JedisConnectionException e) {
disconnect();
throw new JedisConnectionException(e);
}
}

protected void sendProtocolCommand(final Command cmd, final byte[]... args) {
try {
protocol.sendCommand(outputStream, cmd, args);
} catch (JedisConnectionException e) {
disconnect();
throw new JedisConnectionException(e);
}
}

protected Connection sendCommand(final Command cmd, final String... args) {
final byte[][] bargs = new byte[args.length][];
for (int i = 0; i < args.length; i++) {
Expand All @@ -89,14 +71,14 @@ protected Connection sendCommand(final Command cmd, final String... args) {

protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect();
sendProtocolCommand(cmd, args);
protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
}

protected Connection sendCommand(final Command cmd) {
connect();
sendProtocolCommand(cmd, new byte[0][]);
protocol.sendCommand(outputStream, cmd, new byte[0][]);
pipelinedCommands++;
return this;
}
Expand Down Expand Up @@ -163,7 +145,7 @@ public boolean isConnected() {
protected String getStatusCodeReply() {
flush();
pipelinedCommands--;
final byte[] resp = (byte[]) read();
final byte[] resp = (byte[]) protocol.read(inputStream);
if (null == resp) {
return null;
} else {
Expand All @@ -183,13 +165,13 @@ public String getBulkReply() {
public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--;
return (byte[]) read();
return (byte[]) protocol.read(inputStream);
}

public Long getIntegerReply() {
flush();
pipelinedCommands--;
return (Long) read();
return (Long) protocol.read(inputStream);
}

public List<String> getMultiBulkReply() {
Expand All @@ -200,14 +182,14 @@ public List<String> getMultiBulkReply() {
public List<byte[]> getBinaryMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<byte[]>) read();
return (List<byte[]>) protocol.read(inputStream);
}

@SuppressWarnings("unchecked")
public List<Object> getObjectMultiBulkReply() {
flush();
pipelinedCommands--;
return (List<Object>) read();
return (List<Object>) protocol.read(inputStream);
}

public List<Object> getAll() {
Expand All @@ -218,7 +200,7 @@ public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();
while (pipelinedCommands > except) {
all.add(read());
all.add(protocol.read(inputStream));
pipelinedCommands--;
}
return all;
Expand All @@ -227,6 +209,6 @@ public List<Object> getAll(int except) {
public Object getOne() {
flush();
pipelinedCommands--;
return read();
return protocol.read(inputStream);
}
}
}
25 changes: 21 additions & 4 deletions src/main/java/redis/clients/jedis/JedisShardInfo.java
Expand Up @@ -22,11 +22,12 @@ public class JedisShardInfo extends ShardInfo<Jedis> {
public String toString() {
return host + ":" + port + "*" + getWeight();
}

private int timeout;
private String host;
private int port;
private String password = null;
private String name = null;

public String getHost() {
return host;
Expand All @@ -39,15 +40,27 @@ public int getPort() {
public JedisShardInfo(String host) {
this(host, Protocol.DEFAULT_PORT);
}

public JedisShardInfo(String host, String name) {
this(host, Protocol.DEFAULT_PORT, name);
}

public JedisShardInfo(String host, int port) {
this(host, port, 2000);
}


public JedisShardInfo(String host, int port, String name) {
this(host, port, 2000, name);
}

public JedisShardInfo(String host, int port, int timeout) {
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
}


public JedisShardInfo(String host, int port, int timeout, String name) {
this(host, port, timeout, Sharded.DEFAULT_WEIGHT);
this.name = name;
}

public JedisShardInfo(String host, int port, int timeout, int weight) {
super(weight);
this.host = host;
Expand All @@ -70,6 +83,10 @@ public int getTimeout() {
public void setTimeout(int timeout) {
this.timeout = timeout;
}

public String getName() {
return name ;
}

@Override
public Jedis createResource() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/redis/clients/util/ShardInfo.java
Expand Up @@ -15,4 +15,6 @@ public int getWeight() {
}

protected abstract T createResource();

public abstract String getName();
}
11 changes: 8 additions & 3 deletions src/main/java/redis/clients/util/Sharded.java
Expand Up @@ -55,9 +55,14 @@ private void initialize(List<S> shards) {

for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
if (shardInfo.getName() == null)
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}
Expand Down
68 changes: 68 additions & 0 deletions src/test/java/redis/clients/jedis/tests/ShardedJedisPoolTest.java
Expand Up @@ -115,4 +115,72 @@ public void shouldNotShareInstances() {

assertNotSame(j1.getShard("foo"), j2.getShard("foo"));
}

public void checkConnectionsWithNoServers() {
shards = new ArrayList<JedisShardInfo>();
shards.add(new JedisShardInfo("localhost", 6379, "ssa"));
shards.add(new JedisShardInfo("localhost", 6380, "ssa"));
Config redisConfig = new Config();
redisConfig.testOnBorrow = false; // deactivated for now
redisConfig.testOnReturn = true;
redisConfig.maxActive = 200; // nro threads + margen de seguridad?
redisConfig.minIdle = 200;
ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards);
ShardedJedis jedis = pool.getResource();
pool.returnResource(jedis);
pool.destroy();
}

@Test
public void checkFailedJedisServer() {
ShardedJedisPool pool = new ShardedJedisPool(new Config(), shards);
ShardedJedis jedis = pool.getResource();
jedis.incr("foo");
pool.returnResource(jedis);
pool.destroy();
}

@Test
public void shouldReturnActiveShardsWhenOneGoesOffline() {
Config redisConfig = new Config();
redisConfig.testOnBorrow = false;
ShardedJedisPool pool = new ShardedJedisPool(redisConfig, shards);
ShardedJedis jedis = pool.getResource();
// fill the shards
for (int i = 0; i < 1000; i++) {
jedis.set("a-test-" + i, "0");
}
pool.returnResource(jedis);
// check quantity for each shard
Jedis j = new Jedis(shards.get(0));
j.connect();
Long c1 = j.dbSize();
j.disconnect();
j = new Jedis(shards.get(1));
j.connect();
Long c2 = j.dbSize();
j.disconnect();
// shutdown shard 2 and check thay the pool returns an instance with c1
// items on one shard
// alter shard 1 and recreate pool
pool.destroy();
shards.set(1, new JedisShardInfo("nohost", 1234));
pool = new ShardedJedisPool(redisConfig, shards);
jedis = pool.getResource();
Long actual = new Long(0);
Long fails = new Long(0);
for (int i = 0; i < 1000; i++) {
try {
jedis.get("a-test-" + i);
actual++;
} catch (RuntimeException e) {
fails++;
}
}
pool.returnResource(jedis);
pool.destroy();
assertEquals(actual, c1);
assertEquals(fails, c2);
}

}
58 changes: 58 additions & 0 deletions src/test/java/redis/clients/jedis/tests/ShardedJedisTest.java
Expand Up @@ -240,5 +240,63 @@ public void testMurmurSharding() {
assertTrue(shard_6380 > 300 && shard_6380 < 400);
assertTrue(shard_6381 > 300 && shard_6381 < 400);
}

@Test
public void testMasterSlaveShardingConsistency() {
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT));
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1));
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2));
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
shards, Hashing.MURMUR_HASH);

List<JedisShardInfo> otherShards = new ArrayList<JedisShardInfo>(3);
otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT));
otherShards.add(new JedisShardInfo("otherhost",
Protocol.DEFAULT_PORT + 1));
otherShards.add(new JedisShardInfo("otherhost",
Protocol.DEFAULT_PORT + 2));
Sharded<Jedis, JedisShardInfo> sharded2 = new Sharded<Jedis, JedisShardInfo>(
otherShards, Hashing.MURMUR_HASH);

for (int i = 0; i < 1000; i++) {
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
.toString(i));
JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer
.toString(i));
assertEquals(shards.indexOf(jedisShardInfo),
otherShards.indexOf(jedisShardInfo2));
}

}
@Test
public void testMasterSlaveShardingConsistencyWithShardNaming() {
List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>(3);
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT, "HOST1:1234"));
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 1,"HOST2:1234"));
shards.add(new JedisShardInfo("localhost", Protocol.DEFAULT_PORT + 2, "HOST3:1234"));
Sharded<Jedis, JedisShardInfo> sharded = new Sharded<Jedis, JedisShardInfo>(
shards, Hashing.MURMUR_HASH);

List<JedisShardInfo> otherShards = new ArrayList<JedisShardInfo>(3);
otherShards.add(new JedisShardInfo("otherhost", Protocol.DEFAULT_PORT, "HOST2:1234"));
otherShards.add(new JedisShardInfo("otherhost",
Protocol.DEFAULT_PORT + 1, "HOST3:1234"));
otherShards.add(new JedisShardInfo("otherhost",
Protocol.DEFAULT_PORT + 2, "HOST1:1234"));
Sharded<Jedis, JedisShardInfo> sharded2 = new Sharded<Jedis, JedisShardInfo>(
otherShards, Hashing.MURMUR_HASH);

for (int i = 0; i < 1000; i++) {
JedisShardInfo jedisShardInfo = sharded.getShardInfo(Integer
.toString(i));
JedisShardInfo jedisShardInfo2 = sharded2.getShardInfo(Integer
.toString(i));
assertEquals(jedisShardInfo.getName(),
jedisShardInfo2.getName());
}

}

}

0 comments on commit 3a291eb

Please sign in to comment.