Permalink
Browse files

Adding support for sharded pubsub to ShardedJedis.

Specifically, added support for publish, subscribe, and unsubscribe.
  • Loading branch information...
1 parent 72d1531 commit 6d85bb71fedbf060961e421ff1d290bb335deba2 Noah Fiedel committed Jan 5, 2011
@@ -2033,6 +2033,11 @@ public void subscribe(final JedisPubSub jedisPubSub,
client.rollbackTimeout();
}
+ public void unsubscribe(final JedisPubSub jedisPubSub,
+ final String... channels) {
+ jedisPubSub.unsubscribe(channels);
+ }
+
public Long publish(final String channel, final String message) {
client.publish(channel, message);
return client.getIntegerReply();
@@ -2620,4 +2620,8 @@ public Long getbit(String key, int offset) {
client.getbit(key, offset);
return client.getIntegerReply();
}
+
+ public boolean isSameShardAs(Jedis other) {
+ return client.getHost().equals(other.getClient().getHost()) && client.getPort() == other.getClient().getPort();
+ }
}
@@ -143,4 +143,11 @@
Long linsert(String key, Client.LIST_POSITION where, String pivot,
String value);
+
+ Long publish(final String channel, final String message);
+
+ void subscribe(final JedisPubSub jedisPubSub, final String... channels);
+
+ void unsubscribe(final JedisPubSub jedisPubSub, final String... channels);
+
}
@@ -368,4 +368,32 @@ public Long linsert(String key, LIST_POSITION where, String pivot,
Jedis j = getShard(key);
return j.linsert(key, where, pivot, value);
}
+
+ public Long publish(String channel, String message) {
+ Jedis j = getShard(channel);
+ return j.publish(channel, message);
+ }
+
+ public void subscribe(JedisPubSub jedisPubSub, String... channels) {
+ Jedis j = getJedisShardForChannels(jedisPubSub, channels);
+ j.subscribe(jedisPubSub, channels);
+ }
+
+ public void unsubscribe(JedisPubSub jedisPubSub, String... channels) {
+ Jedis j = getJedisShardForChannels(jedisPubSub, channels);
+ j.unsubscribe(jedisPubSub, channels);
+ }
+
+ private Jedis getJedisShardForChannels(JedisPubSub jedisPubSub, String... channels) {
+ String firstChannel = channels[0];
+ Jedis first = getShard(firstChannel);
+ for (String channel : channels) {
+ if(!first.isSameShardAs(getShard(channel))) {
+ throw new IllegalArgumentException(
+ String.format("Subscribe attempted with channels on different shards. First=%s, Different=%s",
+ firstChannel, channel));
+ }
+ }
+ return first;
+ }
}

2 comments on commit 6d85bb7

Hi Noah, this is very good. Would you mind making a pull request to we can share this to other jedis users?

thanks!!!

Hi xetorthio, Noah,

Why I have not found PubSub implement in jedis master ShardedJedis.java?

thanks,
Emre

Please sign in to comment.