From 1ce12ba19589c12a3d7af06d42c3b0fb78df649a Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 8 May 2023 09:06:16 -0600 Subject: [PATCH] Fixed issue with route s2_auto when compression is actually off This is a fix for PR https://github.com/nats-io/nats-server/pull/4001. If a server has an s2_auto configuration, the compression level needs to be updated based on the RTT, however, this should not happen if a particular route is actually not using compression, either because it is a connection to an older server or the other side has explicitly configure compression to be "off". Extended a test that would have caught this issue. Signed-off-by: Ivan Kozlovic --- server/client.go | 12 ++++++------ server/routes_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index fc552c83c8..4f1049de6f 100644 --- a/server/client.go +++ b/server/client.go @@ -2389,12 +2389,12 @@ func (c *client) processPong() { c.rtt = computeRTT(c.rttStart) srv := c.srv reorderGWs := c.kind == GATEWAY && c.gw.outbound - // For routes, check if we have compression auto and if we should change - // the compression level. However, exclude the route if compression is - // "not supported", which indicates that this is a route to an older server. - if c.kind == ROUTER && c.route.compression != CompressionNotSupported { - if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto { - if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression { + // If compression is currently active for a route connection, if the + // compression configuration is s2_auto, check if we should change + // the compression level. + if c.kind == ROUTER && needsCompression(c.route.compression) { + if co := &(srv.getOpts().Cluster.Compression); co.Mode == CompressionS2Auto { + if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != c.route.compression { c.route.compression = cm c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...) } diff --git a/server/routes_test.go b/server/routes_test.go index 9fa3018b1c..04ddda68f4 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3637,6 +3637,9 @@ func TestRouteCompressionAuto(t *testing.T) { checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { s2.mu.RLock() defer s2.mu.RUnlock() + if n := s2.numRoutes(); n != 4 { + return fmt.Errorf("Cluster not formed properly, got %v routes", n) + } var err error s2.forEachRoute(func(r *client) { if err != nil { @@ -3694,6 +3697,18 @@ func TestRouteCompressionAuto(t *testing.T) { time.Sleep(100 * time.Millisecond) np.updateRTT(25 * time.Millisecond) checkComp(CompressionS2Fast) + + // Now disable compression on s1 + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", "10s", "off", _EMPTY_)) + // Wait a bit to make sure we don't check for cluster too soon since + // we expect a disconnect. + time.Sleep(100 * time.Millisecond) + checkClusterFormed(t, s1, s2) + // Now change the RTT values in the proxy. + np.updateRTT(1 * time.Millisecond) + // Now check that s2 also shows as "off". Wait for some ping intervals. + time.Sleep(200 * time.Millisecond) + checkComp(CompressionOff) } func TestRoutePings(t *testing.T) {