Skip to content

Commit

Permalink
DATAREDIS-1173 - Correctly unsubscribe from patterns/channels through…
Browse files Browse the repository at this point in the history
… LettuceSubscription.

doPUnsubscribe(…) and doUnsubscribe(…) now consider the all flag to unsubscribe from all subscribed patterns/channels. Previously, both methods didn't consider all and were invoked with an empty byte array which unsubscribed from an empty pattern/channel name and left subscriptions active.

Original Pull Request: #549
  • Loading branch information
mp911de authored and christophstrobl committed Jul 21, 2020
1 parent 17b0fbc commit e4fd036
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ protected void doPsubscribe(byte[]... patterns) {
*/
protected void doPUnsubscribe(boolean all, byte[]... patterns) {

// ignore `all` flag as Lettuce unsubscribes from all patterns if none provided.
pubsub.punsubscribe(patterns);
if (all) {
pubsub.punsubscribe();
} else {
pubsub.punsubscribe(patterns);
}
}

/*
Expand All @@ -113,8 +116,11 @@ protected void doSubscribe(byte[]... channels) {
*/
protected void doUnsubscribe(boolean all, byte[]... channels) {

// ignore `all` flag as Lettuce unsubscribes from all channels if none provided.
pubsub.unsubscribe(channels);
if (all) {
pubsub.unsubscribe();
} else {
pubsub.unsubscribe(channels);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public void setUp() {
public void testUnsubscribeAllAndClose() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.unsubscribe();
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
assertThat(subscription.isAlive()).isFalse();
Expand All @@ -81,9 +80,8 @@ public void testUnsubscribeAllChannelsWithPatterns() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe();
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels().isEmpty()).isTrue();
Collection<byte[]> patterns = subscription.getPatterns();
Expand All @@ -96,9 +94,9 @@ public void testUnsubscribeChannelAndClose() {
byte[][] channel = new byte[][] { "a".getBytes() };
subscription.subscribe(channel);
subscription.unsubscribe(channel);
verify(asyncCommands, times(1)).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
assertThat(subscription.isAlive()).isFalse();
Expand All @@ -111,9 +109,9 @@ public void testUnsubscribeChannelSomeLeft() {
byte[][] channels = new byte[][] { "a".getBytes(), "b".getBytes() };
subscription.subscribe(channels);
subscription.unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
Collection<byte[]> subChannels = subscription.getChannels();
assertThat(subChannels.size()).isEqualTo(1);
Expand All @@ -127,9 +125,9 @@ public void testUnsubscribeChannelWithPatterns() {
subscription.subscribe(channel);
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe(channel);
verify(asyncCommands, times(1)).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels().isEmpty()).isTrue();
Collection<byte[]> patterns = subscription.getPatterns();
Expand All @@ -143,9 +141,9 @@ public void testUnsubscribeChannelWithPatternsSomeLeft() {
subscription.subscribe(new byte[][] { "a".getBytes(), "b".getBytes() });
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe(channel);
verify(asyncCommands, times(1)).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe(channel);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
Collection<byte[]> channels = subscription.getChannels();
assertThat(channels.size()).isEqualTo(1);
Expand All @@ -159,8 +157,8 @@ public void testUnsubscribeChannelWithPatternsSomeLeft() {
public void testUnsubscribeAllNoChannels() {
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.unsubscribe();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getChannels().isEmpty()).isTrue();
Collection<byte[]> patterns = subscription.getPatterns();
Expand All @@ -172,13 +170,12 @@ public void testUnsubscribeAllNoChannels() {
public void testUnsubscribeNotAlive() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.unsubscribe();
verify(connectionProvider, times(1)).release(pubsub);
verify(pubsub, times(1)).removeListener(any(LettuceMessageListener.class));
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
assertThat(subscription.isAlive()).isFalse();
subscription.unsubscribe();
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
}

@Test(expected = RedisInvalidSubscriptionException.class)
Expand All @@ -193,9 +190,8 @@ public void testSubscribeNotAlive() {
public void testPUnsubscribeAllAndClose() {
subscription.pSubscribe(new byte[][] { "a*".getBytes() });
subscription.pUnsubscribe();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, times(1)).punsubscribe(new byte[][] { "a*".getBytes() });
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands).punsubscribe();
assertThat(subscription.isAlive()).isFalse();
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
Expand All @@ -208,9 +204,8 @@ public void testPUnsubscribeAllPatternsWithChannels() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.pSubscribe(new byte[][] { "s*".getBytes() });
subscription.pUnsubscribe();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, times(1)).punsubscribe(new byte[][] { "s*".getBytes() });
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns().isEmpty()).isTrue();
Collection<byte[]> channels = subscription.getChannels();
Expand All @@ -223,9 +218,9 @@ public void testPUnsubscribeAndClose() {
byte[][] pattern = new byte[][] { "a*".getBytes() };
subscription.pSubscribe(pattern);
subscription.pUnsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, times(1)).punsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
verify(asyncCommands).punsubscribe(pattern);
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
assertThat(subscription.isAlive()).isFalse();
Expand All @@ -238,9 +233,9 @@ public void testPUnsubscribePatternSomeLeft() {
byte[][] patterns = new byte[][] { "a*".getBytes(), "b*".getBytes() };
subscription.pSubscribe(patterns);
subscription.pUnsubscribe(new byte[][] { "a*".getBytes() });
verify(asyncCommands, times(1)).punsubscribe(new byte[][] { "a*".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).punsubscribe(new byte[][] { "a*".getBytes() });
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
Collection<byte[]> subPatterns = subscription.getPatterns();
assertThat(subPatterns.size()).isEqualTo(1);
Expand All @@ -254,9 +249,9 @@ public void testPUnsubscribePatternWithChannels() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.pSubscribe(pattern);
subscription.pUnsubscribe(pattern);
verify(asyncCommands, times(1)).punsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).punsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns().isEmpty()).isTrue();
Collection<byte[]> channels = subscription.getChannels();
Expand All @@ -270,9 +265,9 @@ public void testUnsubscribePatternWithChannelsSomeLeft() {
subscription.pSubscribe(new byte[][] { "a*".getBytes(), "b*".getBytes() });
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.pUnsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, times(1)).punsubscribe(pattern);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
verify(asyncCommands).punsubscribe(pattern);
assertThat(subscription.isAlive()).isTrue();
Collection<byte[]> channels = subscription.getChannels();
assertThat(channels.size()).isEqualTo(1);
Expand All @@ -286,8 +281,8 @@ public void testUnsubscribePatternWithChannelsSomeLeft() {
public void testPUnsubscribeAllNoPatterns() {
subscription.subscribe(new byte[][] { "s".getBytes() });
subscription.pUnsubscribe();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
assertThat(subscription.isAlive()).isTrue();
assertThat(subscription.getPatterns().isEmpty()).isTrue();
Collection<byte[]> channels = subscription.getChannels();
Expand All @@ -301,11 +296,10 @@ public void testPUnsubscribeNotAlive() {
subscription.unsubscribe();
assertThat(subscription.isAlive()).isFalse();
subscription.pUnsubscribe();
verify(connectionProvider, times(1)).release(pubsub);
verify(pubsub, times(1)).removeListener(any(LettuceMessageListener.class));
verify(asyncCommands, times(1)).unsubscribe(new byte[][] { "a".getBytes() });
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(connectionProvider).release(pubsub);
verify(pubsub).removeListener(any(LettuceMessageListener.class));
verify(asyncCommands).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
}

@Test(expected = RedisInvalidSubscriptionException.class)
Expand All @@ -319,23 +313,23 @@ public void testPSubscribeNotAlive() {
@Test
public void testDoCloseNotSubscribed() {
subscription.doClose();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
}

@Test
public void testDoCloseSubscribedChannels() {
subscription.subscribe(new byte[][] { "a".getBytes() });
subscription.doClose();
verify(asyncCommands, times(1)).unsubscribe(new byte[0]);
verify(asyncCommands, never()).punsubscribe(new byte[0]);
verify(asyncCommands).unsubscribe();
verify(asyncCommands, never()).punsubscribe();
}

@Test
public void testDoCloseSubscribedPatterns() {
subscription.pSubscribe(new byte[][] { "a*".getBytes() });
subscription.doClose();
verify(asyncCommands, never()).unsubscribe(new byte[0]);
verify(asyncCommands, times(1)).punsubscribe(new byte[0]);
verify(asyncCommands, never()).unsubscribe();
verify(asyncCommands).punsubscribe();
}
}

0 comments on commit e4fd036

Please sign in to comment.