From fe07a8cfbadbc15bc5d29c6129c0efb854ef0252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Wed, 1 Oct 2025 23:46:10 +0200 Subject: [PATCH] Stabilize AdvancedShardAwarenessIT This test class recently fails with various causes. The following modifications were made: - Instead of waiting fixed amount of time for some tests, each test will await until all pools are initialized before checking the logs or the test times out. - Tolerance for additional reconnections was slighlty increased. It is still set to much lower number than what appears when running scenario without advanced shard awareness. - Configured size of channel pools was halved in the tests that used to create over 200 total channels. This should alleviate occasional bind exceptions. - The reconnection delays were increased. They were intentionally low to cause congestion, but it was too much for github actions. - Scanning for patterns in the logs is now done using the copy to avoid exceptions related to concurrent modification. - Commented out general checks for "Reconnection attempt complete". Those are already covered by more speficic patterns that include cluster's IP and it seems this general version collides with logs from other tests. This should not be happening but that's a separate, larger issue. - Added full node ip addresses to log patterns to avoid matching with logs caused by different cluster. --- .../core/pool/AdvancedShardAwarenessIT.java | 183 ++++++++++++++---- 1 file changed, 140 insertions(+), 43 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java index 7886442a70d..355bd109c7e 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/pool/AdvancedShardAwarenessIT.java @@ -10,27 +10,31 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; -import com.datastax.oss.driver.api.core.session.Session; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.testinfra.ScyllaOnly; import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.internal.core.pool.ChannelPool; +import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.Uninterruptibles; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; import com.tngtech.java.junit.dataprovider.UseDataProvider; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -55,9 +59,8 @@ public class AdvancedShardAwarenessIT { Level originalLevelReconnection; private final Pattern shardMismatchPattern = Pattern.compile(".*r configuration of shard aware port.*"); - private final Pattern reconnectionPattern = + private final Pattern generalReconnectionPattern = Pattern.compile(".*Scheduling next reconnection in.*"); - Set forbiddenOccurences = ImmutableSet.of(shardMismatchPattern, reconnectionPattern); @DataProvider public static Object[][] reuseAddressOption() { @@ -92,67 +95,105 @@ public void stopCapturingLogs() { @Test @UseDataProvider("reuseAddressOption") public void should_initialize_all_channels(boolean reuseAddress) { + int poolLocalSizeSetting = 4; // Will round up to 6 due to not being divisible by 3 shards + int expectedChannelsPerNode = 6; + String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1); + String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2); + Pattern reconnectionPattern1 = + Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*"); + Pattern reconnectionPattern2 = + Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*"); + Set forbiddenOccurences = + ImmutableSet.of(shardMismatchPattern, reconnectionPattern1, reconnectionPattern2); Map expectedOccurences = ImmutableMap.of( - Pattern.compile(".*\\.2:19042.*Reconnection attempt complete, 6/6 channels.*"), 1, - Pattern.compile(".*\\.1:19042.*Reconnection attempt complete, 6/6 channels.*"), 1, - Pattern.compile(".*Reconnection attempt complete.*"), 2, - Pattern.compile(".*\\.1:19042.*New channel added \\[.*"), 5, - Pattern.compile(".*\\.2:19042.*New channel added \\[.*"), 5, - Pattern.compile(".*\\.1:19042\\] Trying to create 5 missing channels.*"), 1, - Pattern.compile(".*\\.2:19042\\] Trying to create 5 missing channels.*"), 1); + Pattern.compile( + ".*" + + Pattern.quote(node1) + + ":19042.*Reconnection attempt complete, 6/6 channels.*"), + 1, + Pattern.compile( + ".*" + + Pattern.quote(node2) + + ":19042.*Reconnection attempt complete, 6/6 channels.*"), + 1, + // Temporarily commented out because hanging sessions from other tests pollute logs + // Pattern.compile(".*Reconnection attempt complete.*"), 2, + Pattern.compile(".*" + Pattern.quote(node1) + ":19042.*New channel added \\[.*"), 5, + Pattern.compile(".*" + Pattern.quote(node2) + ":19042.*New channel added \\[.*"), 5, + Pattern.compile( + ".*" + + Pattern.quote(node1) + + ":19042\\] Trying to create 5 missing channels.*"), + 1, + Pattern.compile( + ".*" + + Pattern.quote(node2) + + ":19042\\] Trying to create 5 missing channels.*"), + 1); DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, reuseAddress) .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000) - // Due to rounding up the connections per shard this will result in 6 connections per - // node - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 4) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, poolLocalSizeSetting) .build(); - try (Session session = + try (CqlSession session = CqlSession.builder() .addContactPoint( new InetSocketAddress(CCM_RULE.getCcmBridge().getNodeIpAddress(1), 19042)) .withConfigLoader(loader) .build()) { - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + List allSessions = Collections.singletonList(session); + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode)); + List logsCopy = ImmutableList.copyOf(appender.list); expectedOccurences.forEach( - (pattern, times) -> assertMatchesExactly(pattern, times, appender.list)); - forbiddenOccurences.forEach(pattern -> assertNoLogMatches(pattern, appender.list)); + (pattern, times) -> assertMatchesExactly(pattern, times, logsCopy)); + forbiddenOccurences.forEach(pattern -> assertNoLogMatches(pattern, logsCopy)); } } @Test public void should_see_mismatched_shard() { + int expectedChannelsPerNode = 33; DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000) .withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000) - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 64) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) .build(); - try (Session session = + try (CqlSession session = CqlSession.builder() .addContactPoint( new InetSocketAddress(CCM_RULE.getCcmBridge().getNodeIpAddress(1), 9042)) .withConfigLoader(loader) .build()) { - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - assertMatchesAtLeast(shardMismatchPattern, 5, appender.list); + List allSessions = Collections.singletonList(session); + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + // Waits until 2/3rds are initialized instead of all. It does not matter here. + .until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode * 2 / 3)); + List logsCopy = ImmutableList.copyOf(appender.list); + assertMatchesAtLeast(shardMismatchPattern, 5, logsCopy); } } // There is no need to run this as a test, but it serves as a comparison @SuppressWarnings("unused") public void should_struggle_to_fill_pools() { + int expectedChannelsPerNode = 33; // Divisible by number of shards DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, false) - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 64) - .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(200)) - .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(4000)) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) + .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1)) + .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofSeconds(20)) .build(); CqlSessionBuilder builder = CqlSession.builder() @@ -167,20 +208,26 @@ public void should_struggle_to_fill_pools() { CqlSession session2 = CompletableFutures.getUninterruptibly(stage2); CqlSession session3 = CompletableFutures.getUninterruptibly(stage3); CqlSession session4 = CompletableFutures.getUninterruptibly(stage4); ) { - Uninterruptibles.sleepUninterruptibly(20, TimeUnit.SECONDS); - assertNoLogMatches(shardMismatchPattern, appender.list); - assertMatchesAtLeast(reconnectionPattern, 8, appender.list); + List allSessions = Arrays.asList(session1, session2, session3, session4); + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode)); + List logsCopy = ImmutableList.copyOf(appender.list); + assertNoLogMatches(shardMismatchPattern, logsCopy); + assertMatchesAtLeast(generalReconnectionPattern, 8, logsCopy); } } @Test public void should_not_struggle_to_fill_pools() { + int expectedChannelsPerNode = 33; // Divisible by number of shards DriverConfigLoader loader = SessionUtils.configLoaderBuilder() .withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true) - .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 66) - .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(10)) - .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(20)) + .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode) + .withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1)) + .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofSeconds(20)) .build(); CqlSessionBuilder builder = CqlSession.builder() @@ -196,25 +243,75 @@ public void should_not_struggle_to_fill_pools() { CqlSession session2 = CompletableFutures.getUninterruptibly(stage2); CqlSession session3 = CompletableFutures.getUninterruptibly(stage3); CqlSession session4 = CompletableFutures.getUninterruptibly(stage4); ) { - Uninterruptibles.sleepUninterruptibly(8, TimeUnit.SECONDS); - int tolerance = 2; // Sometimes socket ends up already in use + + List allSessions = Arrays.asList(session1, session2, session3, session4); + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode)); + String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1); + String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2); + Pattern reconnectionPattern1 = + Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*"); + Pattern reconnectionPattern2 = + Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*"); + int tolerance = 4; // Sometimes socket ends up already in use Map expectedOccurences = ImmutableMap.of( - Pattern.compile(".*\\.2:19042.*Reconnection attempt complete, 66/66 channels.*"), + Pattern.compile( + ".*" + + Pattern.quote(node1) + + ":19042.*Reconnection attempt complete, 33/33 channels.*"), + 1 * sessions, + Pattern.compile( + ".*" + + Pattern.quote(node2) + + ":19042.*Reconnection attempt complete, 33/33 channels.*"), 1 * sessions, - Pattern.compile(".*\\.1:19042.*Reconnection attempt complete, 66/66 channels.*"), + // Temporarily commented out because hanging sessions from other tests pollute logs + // Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions, + Pattern.compile(".*" + Pattern.quote(node1) + ":19042.*New channel added \\[.*"), + 32 * sessions - tolerance, + Pattern.compile(".*" + Pattern.quote(node2) + ":19042.*New channel added \\[.*"), + 32 * sessions - tolerance, + Pattern.compile( + ".*" + + Pattern.quote(node1) + + ":19042\\] Trying to create 32 missing channels.*"), 1 * sessions, - Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions, - Pattern.compile(".*.1:19042.*New channel added \\[.*"), 65 * sessions - tolerance, - Pattern.compile(".*.2:19042.*New channel added \\[.*"), 65 * sessions - tolerance, - Pattern.compile(".*.1:19042\\] Trying to create 65 missing channels.*"), 1 * sessions, - Pattern.compile(".*.2:19042\\] Trying to create 65 missing channels.*"), + Pattern.compile( + ".*" + + Pattern.quote(node2) + + ":19042\\] Trying to create 32 missing channels.*"), 1 * sessions); + List logsCopy = ImmutableList.copyOf(appender.list); expectedOccurences.forEach( - (pattern, times) -> assertMatchesAtLeast(pattern, times, appender.list)); - assertNoLogMatches(shardMismatchPattern, appender.list); - assertMatchesAtMost(reconnectionPattern, tolerance, appender.list); + (pattern, times) -> assertMatchesAtLeast(pattern, times, logsCopy)); + assertNoLogMatches(shardMismatchPattern, logsCopy); + assertMatchesAtMost(reconnectionPattern1, tolerance, logsCopy); + assertMatchesAtMost(reconnectionPattern2, tolerance, logsCopy); + } + } + + private boolean areAllPoolsFullyInitialized( + List sessions, int expectedChannelsPerNode) { + for (CqlSession session : sessions) { + DefaultSession defaultSession = (DefaultSession) session; + Map pools = defaultSession.getPools(); + if (pools == null || pools.isEmpty()) { + return false; + } + + for (ChannelPool pool : pools.values()) { + if (pool == null) { + return false; + } + if (pool.size() < expectedChannelsPerNode) { + return false; + } + } } + return true; } private void assertNoLogMatches(Pattern pattern, List logs) {