From 8a567f898719e52650ff4520e09db30fd41fdfb9 Mon Sep 17 00:00:00 2001 From: "marcin.cebo" Date: Wed, 19 Nov 2025 10:16:00 +0100 Subject: [PATCH 1/2] When subscribing to already subscribed channel or group should not resubscribe just emit SubscriptionChanged status. --- .../integration/SubscribeIntegrationTests.kt | 132 ++++++++++++++++++ .../com/pubnub/internal/eventengine/State.kt | 4 + .../eventengine/state/SubscribeState.kt | 36 +++-- .../TransitionFromReceivingStateTest.kt | 41 +++++- 4 files changed, 200 insertions(+), 13 deletions(-) diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt index bc30cb94b3..14ae7795e5 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt @@ -1365,6 +1365,138 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { } } + @Test + fun whenSubscribingToAlreadySubscribedChannelShouldNotResubscribeButShouldEmitSubscriptionChangedStatus() { + val channelName = randomChannel() + val channelName02 = randomChannel() + val expectedMessage = "test_${randomValue()}" + + val connectedLatch = CountDownLatch(1) // Wait for first connection + val subscriptionChangedLatch = CountDownLatch(1) // Wait for subscription changed event + val messagesLatch = CountDownLatch(2) // Wait for both listeners to receive message + + val connectedEventCount = AtomicInteger(0) + val subscriptionChangedCount = + AtomicInteger(0) // status event emitted but no actual resubscribe when channels unchanged + val subscribeHttpRequestCount = AtomicInteger(0) // Count actual HTTP subscribe requests + + // Track messages received by each listener + val listenerAMessageCount = AtomicInteger(0) + val listenerBMessageCount = AtomicInteger(0) + + // Custom logger to count HTTP subscribe requests + val customLogger = object : CustomLogger { + override fun debug(logMessage: LogMessage) { + if (logMessage.type == LogMessageType.NETWORK_REQUEST) { + val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest + if (networkRequestDetails.path.contains("/v2/subscribe/")) { + subscribeHttpRequestCount.incrementAndGet() + println("HTTP Subscribe request #${subscribeHttpRequestCount.get()}: ${networkRequestDetails.path}") + } + } + } + } + + clientConfig = { + customLoggers = listOf(customLogger) + } + + pubnub.addListener( + object : StatusListener { + override fun status( + pubnub: PubNub, + status: PNStatus, + ) { + when (status.category) { + PNStatusCategory.PNConnectedCategory -> { + connectedEventCount.incrementAndGet() + connectedLatch.countDown() + } + + PNStatusCategory.PNSubscriptionChanged -> { + subscriptionChangedCount.incrementAndGet() + subscriptionChangedLatch.countDown() + } + + else -> {} + } + } + } + ) + + // First subscription + val subscriptionSet1 = pubnub.subscriptionSetOf(setOf(channelName, channelName02)) + subscriptionSet1.addListener( + object : EventListener { + override fun message( + pubnub: PubNub, + result: PNMessageResult, + ) { + println("ListenerA received: ${result.message.asString}") + listenerAMessageCount.incrementAndGet() + messagesLatch.countDown() + } + } + ) + + // Second subscription (SAME channel, different listener) + val subscriptionSet2 = pubnub.subscriptionSetOf(setOf(channelName)) + subscriptionSet2.addListener( + object : EventListener { + override fun message( + pubnub: PubNub, + result: PNMessageResult, + ) { + println("ListenerB received: ${result.message.asString}") + listenerBMessageCount.incrementAndGet() + messagesLatch.countDown() + } + } + ) + + try { + subscriptionSet1.subscribe() + assertTrue("Failed to receive PNConnectedCategory", connectedLatch.await(5, TimeUnit.SECONDS)) + + subscriptionSet2.subscribe() + assertTrue("Failed to receive PNSubscriptionChanged", subscriptionChangedLatch.await(5, TimeUnit.SECONDS)) + + // Give a moment for any potential resubscribe to occur + Thread.sleep(500) + + // Publish a message - both listeners should receive it + pubnub.publish(channelName, expectedMessage).sync() + assertTrue("Failed to receive messages on both listeners", messagesLatch.await(5, TimeUnit.SECONDS)) + + // Verify both listeners received the message (this works regardless of the bug) + assertEquals("ListenerA should receive message", 1, listenerAMessageCount.get()) + assertEquals("ListenerB should receive message", 1, listenerBMessageCount.get()) + + assertEquals( + "Should emit PNSubscriptionChanged status but not resubscribe when channels unchanged", + 1, // Status emitted but no actual resubscribe (no cancel/new receive) + subscriptionChangedCount.get() + ) + + assertEquals( + "Should have exactly 1 PNConnectedCategory (initial handshake only)", + 1, + connectedEventCount.get() + ) + + // Should only have 2 HTTP "/subscribe" requests: handshake + subscribe + assertEquals( + "Should have exactly 2 HTTP subscribe requests (handshake + subscribe, NO resubscribe)", + 2, + subscribeHttpRequestCount.get() + ) + } finally { + // Ensure cleanup happens even if assertions fail + subscriptionSet1.close() + subscriptionSet2.close() + } + } + @Test fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels() { // given diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt index 55afe1d45a..56c3df8627 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt @@ -10,6 +10,10 @@ internal interface State internal fun > S.noTransition(): Pair> = Pair(this, emptySet()) +internal fun > S.noTransitionWithEffects( + vararg invocations: Ei, +): Pair> = Pair(this, invocations.toSet()) + internal fun > S.transitionTo( state: S, vararg invocations: Ei, diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt index b8fc62c302..7c5f8bad1b 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt @@ -5,6 +5,7 @@ import com.pubnub.api.enums.PNStatusCategory import com.pubnub.api.models.consumer.PNStatus import com.pubnub.internal.eventengine.State import com.pubnub.internal.eventengine.noTransition +import com.pubnub.internal.eventengine.noTransitionWithEffects import com.pubnub.internal.eventengine.transitionTo import com.pubnub.internal.subscribe.eventengine.effect.SubscribeEffectInvocation import com.pubnub.internal.subscribe.eventengine.event.SubscribeEvent @@ -227,17 +228,32 @@ internal sealed class SubscribeState : State { - transitionTo( - Receiving(event.channels, event.channelGroups, subscriptionCursor), - SubscribeEffectInvocation.EmitStatus( - PNStatus( - PNStatusCategory.PNSubscriptionChanged, - currentTimetoken = subscriptionCursor.timetoken, - affectedChannels = event.channels, - affectedChannelGroups = event.channelGroups, + // If channels and channelGroups haven't changed, emit status without resubscribing + if (event.channels == channels && event.channelGroups == channelGroups) { + noTransitionWithEffects( + SubscribeEffectInvocation.EmitStatus( + PNStatus( + PNStatusCategory.PNSubscriptionChanged, + currentTimetoken = subscriptionCursor.timetoken, + affectedChannels = event.channels, + affectedChannelGroups = event.channelGroups, + ), ), - ), - ) + ) + } else { + // Channels changed, need to resubscribe + transitionTo( + Receiving(event.channels, event.channelGroups, subscriptionCursor), + SubscribeEffectInvocation.EmitStatus( + PNStatus( + PNStatusCategory.PNSubscriptionChanged, + currentTimetoken = subscriptionCursor.timetoken, + affectedChannels = event.channels, + affectedChannelGroups = event.channelGroups, + ), + ), + ) + } } is SubscribeEvent.SubscriptionRestored -> { diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt index 4864f91b3d..07ed3548f9 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt @@ -84,8 +84,44 @@ class TransitionFromReceivingStateTest { } @Test - fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event() { + fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event_with_different_channels_and_groups() { // given + val newChannels = setOf("Channel2", "Channel3") + val newChannelGroups = setOf("ChannelGroup2") + + // when + val (state, invocations) = + transition( + SubscribeState.Receiving(channels, channelGroups, subscriptionCursor), + SubscribeEvent.SubscriptionChanged(newChannels, newChannelGroups), + ) + + // then + Assertions.assertTrue(state is SubscribeState.Receiving) + state as SubscribeState.Receiving + + assertEquals(newChannels, state.channels) + assertEquals(newChannelGroups, state.channelGroups) + assertEquals(subscriptionCursor, state.subscriptionCursor) + assertEquals( + setOf( + SubscribeEffectInvocation.CancelReceiveMessages, + SubscribeEffectInvocation.EmitStatus( + createSubscriptionChangedStatus( + state.subscriptionCursor, + newChannels, + newChannelGroups, + ), + ), + SubscribeEffectInvocation.ReceiveMessages(newChannels, newChannelGroups, subscriptionCursor), + ), + invocations, + ) + } + + @Test + fun stays_in_RECEIVING_and_emits_status_without_resubscribing_when_SUBSCRIPTION_CHANGED_event_has_same_channels_and_groups() { + // given - channels and channelGroups are the same // when val (state, invocations) = transition( @@ -100,9 +136,9 @@ class TransitionFromReceivingStateTest { assertEquals(channels, state.channels) assertEquals(channelGroups, state.channelGroups) assertEquals(subscriptionCursor, state.subscriptionCursor) + // Should only emit status - no cancel or new receive assertEquals( setOf( - SubscribeEffectInvocation.CancelReceiveMessages, SubscribeEffectInvocation.EmitStatus( createSubscriptionChangedStatus( state.subscriptionCursor, @@ -110,7 +146,6 @@ class TransitionFromReceivingStateTest { channelGroups, ), ), - SubscribeEffectInvocation.ReceiveMessages(channels, channelGroups, subscriptionCursor), ), invocations, ) From d377ea8658a00fbee1d7aaa89ff72a2c3c5bdc87 Mon Sep 17 00:00:00 2001 From: "marcin.cebo" Date: Thu, 20 Nov 2025 13:28:29 +0100 Subject: [PATCH 2/2] Changes after review --- .../api/integration/SubscribeIntegrationTests.kt | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt index 14ae7795e5..3d16c24b98 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt @@ -1371,14 +1371,13 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { val channelName02 = randomChannel() val expectedMessage = "test_${randomValue()}" - val connectedLatch = CountDownLatch(1) // Wait for first connection - val subscriptionChangedLatch = CountDownLatch(1) // Wait for subscription changed event - val messagesLatch = CountDownLatch(2) // Wait for both listeners to receive message + val connectedLatch = CountDownLatch(1) + val subscriptionChangedLatch = CountDownLatch(1) + val messagesLatch = CountDownLatch(2) val connectedEventCount = AtomicInteger(0) - val subscriptionChangedCount = - AtomicInteger(0) // status event emitted but no actual resubscribe when channels unchanged - val subscribeHttpRequestCount = AtomicInteger(0) // Count actual HTTP subscribe requests + val subscriptionChangedCount = AtomicInteger(0) + val subscribeHttpRequestCount = AtomicInteger(0) // Track messages received by each listener val listenerAMessageCount = AtomicInteger(0) @@ -1391,7 +1390,6 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest if (networkRequestDetails.path.contains("/v2/subscribe/")) { subscribeHttpRequestCount.incrementAndGet() - println("HTTP Subscribe request #${subscribeHttpRequestCount.get()}: ${networkRequestDetails.path}") } } } @@ -1432,7 +1430,6 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { pubnub: PubNub, result: PNMessageResult, ) { - println("ListenerA received: ${result.message.asString}") listenerAMessageCount.incrementAndGet() messagesLatch.countDown() } @@ -1447,7 +1444,6 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { pubnub: PubNub, result: PNMessageResult, ) { - println("ListenerB received: ${result.message.asString}") listenerBMessageCount.incrementAndGet() messagesLatch.countDown() }