From d4e8a444996cc838409ba21a6912687d11994a6f Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 21 Sep 2023 16:28:22 +0100 Subject: [PATCH] Set S2 writer concurrency to 1 Signed-off-by: Neil Twigg --- server/leafnode_test.go | 93 +++++++++++++++++++++++++++++++++++++++++ server/server.go | 11 +++-- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index a0804a89ff..2b631cd557 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5471,6 +5471,99 @@ func TestLeafNodeCompression(t *testing.T) { } } +func BenchmarkLeafNodeCompression(b *testing.B) { + conf1 := createConfFile(b, []byte(` + port: -1 + server_name: "Hub" + accounts { + A { users: [{user: a, password: pwd}] } + B { users: [{user: b, password: pwd}] } + C { users: [{user: c, password: pwd}] } + D { users: [{user: d, password: pwd}] } + } + leafnodes { + port: -1 + } + `)) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + port := o1.LeafNode.Port + conf2 := createConfFile(b, []byte(fmt.Sprintf(` + port: -1 + server_name: "Spoke" + accounts { + A { users: [{user: a, password: pwd}] } + B { users: [{user: b, password: pwd}] } + C { users: [{user: c, password: pwd}] } + D { users: [{user: d, password: pwd}] } + } + leafnodes { + remotes [ + { url: "nats://a:pwd@127.0.0.1:%d", account: "A", compression: s2_better } + { url: "nats://b:pwd@127.0.0.1:%d", account: "B", compression: s2_best } + { url: "nats://c:pwd@127.0.0.1:%d", account: "C", compression: s2_fast } + { url: "nats://d:pwd@127.0.0.1:%d", account: "D", compression: off } + ] + } + `, port, port, port, port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkLeafNodeConnectedCount(b, s1, 4) + checkLeafNodeConnectedCount(b, s2, 4) + + l, err := s2.Leafz(nil) + require_NoError(b, err) + for _, r := range l.Leafs { + switch { + case r.Account == "A" && r.Compression == CompressionS2Better: + case r.Account == "B" && r.Compression == CompressionS2Best: + case r.Account == "C" && r.Compression == CompressionS2Fast: + case r.Account == "D" && r.Compression == CompressionOff: + default: + b.Fatalf("Account %q had incorrect compression mode %q on leaf connection", r.Account, r.Compression) + } + } + + msg := make([]byte, 1024) + for _, p := range []struct { + algo string + user string + }{ + {"Better", "a"}, + {"Best", "b"}, + {"Fast", "c"}, + {"Off", "d"}, + } { + nc1 := natsConnect(b, s1.ClientURL(), nats.UserInfo(p.user, "pwd")) + nc2 := natsConnect(b, s2.ClientURL(), nats.UserInfo(p.user, "pwd")) + + sub, err := nc1.SubscribeSync("foo") + require_NoError(b, err) + + time.Sleep(time.Second) + + b.Run(p.algo, func(b *testing.B) { + start := time.Now() + + for i := 0; i < b.N; i++ { + err = nc2.Publish("foo", msg) + require_NoError(b, err) + + _, err = sub.NextMsg(time.Second) + require_NoError(b, err) + } + + b.ReportMetric(float64(len(msg)*b.N)/1024/1024, "MB") + b.ReportMetric(float64(len(msg)*b.N)/1024/1024/float64(time.Since(start).Seconds()), "MB/sec") + }) + + nc1.Close() + nc2.Close() + } +} + func TestLeafNodeCompressionMatrixModes(t *testing.T) { for _, test := range []struct { name string diff --git a/server/server.go b/server/server.go index f6880f3f29..55d271e012 100644 --- a/server/server.go +++ b/server/server.go @@ -572,13 +572,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration // with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so // this is more versatile. func s2WriterOptions(cm string) []s2.WriterOption { + _opts := [2]s2.WriterOption{} + opts := append( + _opts[:0], + s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines + ) switch cm { case CompressionS2Uncompressed: - return []s2.WriterOption{s2.WriterUncompressed()} + return append(opts, s2.WriterUncompressed()) case CompressionS2Best: - return []s2.WriterOption{s2.WriterBestCompression()} + return append(opts, s2.WriterBestCompression()) case CompressionS2Better: - return []s2.WriterOption{s2.WriterBetterCompression()} + return append(opts, s2.WriterBetterCompression()) default: return nil }