diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/PresenceIntegrationTests.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/PresenceIntegrationTests.kt index b3a9bdd38..99a20a4b4 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/PresenceIntegrationTests.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/PresenceIntegrationTests.kt @@ -324,7 +324,7 @@ class PresenceIntegrationTests : BaseIntegrationTest() { } @Test - fun testHereNowWithStartFrom() { + fun testHereNowWithOffset() { val offsetValue = 2 val totalClientsCount = 5 val expectedChannel = randomChannel() diff --git a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt index a76ee1b64..bc30cb94b 100644 --- a/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt +++ b/pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt @@ -5,6 +5,10 @@ import com.google.gson.JsonObject import com.pubnub.api.PubNub import com.pubnub.api.callbacks.SubscribeCallback import com.pubnub.api.enums.PNStatusCategory +import com.pubnub.api.logging.CustomLogger +import com.pubnub.api.logging.LogMessage +import com.pubnub.api.logging.LogMessageContent +import com.pubnub.api.logging.LogMessageType import com.pubnub.api.models.consumer.PNStatus import com.pubnub.api.models.consumer.channel_group.PNChannelGroupsAddChannelResult import com.pubnub.api.models.consumer.pubsub.PNMessageResult @@ -1253,6 +1257,165 @@ class SubscribeIntegrationTests : BaseIntegrationTest() { assertEquals(2, subscriptionSet.subscriptions.size) } + @Test + fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToSameChannelMultipleTimes() { + // given + val numberOfSubscribe = 4 + // punbub.subscribe does subscribe to already subscribed channel so only two subscribe calls should occur. Handshake and actual subscribe. + val countDownLatch = CountDownLatch(2) + var interceptedUrl: HttpUrl? = null + val testChannel = randomChannel() + + val customLogger = object : CustomLogger { + override fun debug(logMessage: LogMessage) { + if (logMessage.type == LogMessageType.NETWORK_REQUEST) { + val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest + if (networkRequestDetails.path.contains("/v2/subscribe/")) { + interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull() + countDownLatch.countDown() + } + } + } + } + + clientConfig = { + customLoggers = listOf(customLogger) + } + + try { + repeat(numberOfSubscribe) { iteration -> + pubnub.subscribe(channels = listOf(testChannel)) + Thread.sleep(150) + println("Subscribe call ${iteration + 1}/$numberOfSubscribe completed") + } + + // Wait for the subscribe request to be made + assertTrue(countDownLatch.await(12000, TimeUnit.MILLISECONDS)) + + // then: verify channel appears only once in subscribed channels + val subscribedChannels = pubnub.getSubscribedChannels() + + assertEquals(1, subscribedChannels.size) + assertTrue(subscribedChannels.contains(testChannel)) + + // then: verify the actual HTTP request only includes the channel once + assertNotNull("Expected to intercept subscribe URL", interceptedUrl) + + val channelsParam = interceptedUrl!!.encodedPath + .substringAfter("/subscribe/") + .substringAfter("/") + .substringBefore("/") + + val channelList = channelsParam.split(",").filter { it.isNotEmpty() } + + assertEquals(1, channelList.count { it == testChannel }) + } finally { + pubnub.forceDestroy() + } + } + + @Test + fun heartbeatShouldDeduplicateChannelNameInUrlWhenSubscribingToSameChannelMultipleTimes() { + // given + val numberOfSubscribe = 4 + val countDownLatch = CountDownLatch(2) // we want to verify second heartbeat URL + var interceptedUrl: HttpUrl? = null + val testChannel = randomChannel() + + val customLogger = object : CustomLogger { + override fun debug(logMessage: LogMessage) { + if (logMessage.type == LogMessageType.NETWORK_REQUEST) { + val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest + if (networkRequestDetails.path.contains("/v2/presence/") && networkRequestDetails.path.contains("/heartbeat")) { + interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull() + countDownLatch.countDown() + } + } + } + } + + clientConfig = { + customLoggers = listOf(customLogger) + heartbeatInterval = 5 + } + + try { + repeat(numberOfSubscribe) { iteration -> + pubnub.subscribe(channels = listOf(testChannel)) + Thread.sleep(150) + println("Subscribe call ${iteration + 1}/$numberOfSubscribe completed") + } + + // Wait for the heartbeat request to be made + assertTrue(countDownLatch.await(6000, TimeUnit.MILLISECONDS)) + + // then: verify the actual HTTP request only includes the channel once + assertNotNull("Expected to intercept heartbeat URL", interceptedUrl) + + // Extract channel from heartbeat URL: /v2/presence/sub-key/{sub-key}/channel/{channels}/heartbeat + val channelsParam = interceptedUrl!!.encodedPath + .substringAfter("/channel/") + .substringBefore("/heartbeat") + + val channelList = channelsParam.split(",").filter { it.isNotEmpty() } + + assertEquals(1, channelList.count { it == testChannel }) + } finally { + pubnub.forceDestroy() + } + } + + @Test + fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels() { + // given + val countDownLatch = CountDownLatch(2) // Only two subscribe calls should occur. Handshake and actual subscribe. + var interceptedUrl: HttpUrl? = null + val testChannel = randomChannel() + + val customLogger = object : CustomLogger { + override fun debug(logMessage: LogMessage) { + if (logMessage.type == LogMessageType.NETWORK_REQUEST) { + val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest + if (networkRequestDetails.path.contains("/v2/subscribe/")) { + interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull() + countDownLatch.countDown() + } + } + } + } + + clientConfig = { + customLoggers = listOf(customLogger) + } + + try { + pubnub.subscribe(channels = listOf(testChannel, testChannel, testChannel)) + + // Wait for the subscribe request to be made + assertTrue(countDownLatch.await(12000, TimeUnit.MILLISECONDS)) + + // then: verify channel appears only once in subscribed channels + val subscribedChannels = pubnub.getSubscribedChannels() + + assertEquals(1, subscribedChannels.size) + assertTrue(subscribedChannels.contains(testChannel)) + + // then: verify the actual HTTP request only includes the channel once + assertNotNull("Expected to intercept subscribe URL", interceptedUrl) + + val channelsParam = interceptedUrl!!.encodedPath + .substringAfter("/subscribe/") + .substringAfter("/") + .substringBefore("/") + + val channelList = channelsParam.split(",").filter { it.isNotEmpty() } + + assertEquals(1, channelList.count { it == testChannel }) + } finally { + pubnub.forceDestroy() + } + } + private fun publishToChannels(channelsList: List) { channelsList.forEach { channelName -> pubnub.publish(channelName, "-=message to $channelName").sync()