From 04c6919385ee3fe6b043642940245ca835aeda5a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 11 Jul 2023 11:54:04 +0100 Subject: [PATCH] Update `TestJetStreamServerReencryption` to also test converting ciphers at the same time as changing keys --- server/jetstream_test.go | 192 ++++++++++++++++++++++----------------- 1 file changed, 108 insertions(+), 84 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fcb1013ced..d8638833d6 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -20906,104 +20906,128 @@ func TestJetStreamMaxBytesIgnored(t *testing.T) { func TestJetStreamServerReencryption(t *testing.T) { storeDir := t.TempDir() - tmpl1, tmpl2, tmpl3 := ` - server_name: S22 - listen: 127.0.0.1:-1 - jetstream: { - key: %q, - cipher: aes, - store_dir: %q - } - `, ` - server_name: S22 - listen: 127.0.0.1:-1 - jetstream: { - key: %q, - cipher: aes, - old_key: %q, - store_dir: %q - } - `, ` - server_name: S22 - listen: 127.0.0.1:-1 - jetstream: { - key: %q, - cipher: aes, - store_dir: %q - } - ` - conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl1, "firstencryptionkey", storeDir))) - conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl2, "secondencryptionkey", "firstencryptionkey", storeDir))) - conf3 := createConfFile(t, []byte(fmt.Sprintf(tmpl3, "secondencryptionkey", storeDir))) - expected := 30 + for i, algo := range []struct { + from string + to string + }{ + {"aes", "aes"}, + {"aes", "chacha"}, + {"chacha", "chacha"}, + {"chacha", "aes"}, + } { + t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) { + streamName := fmt.Sprintf("TEST_%d", i) + subjectName := fmt.Sprintf("foo_%d", i) + expected := 30 + + checkStream := func(js nats.JetStreamContext) { + si, err := js.StreamInfo(streamName) + if err != nil { + t.Fatal(err) + } - checkStream := func(js nats.JetStreamContext) { - si, err := js.StreamInfo("TEST") - if err != nil { - t.Fatal(err) - } + if si.State.Msgs != uint64(expected) { + t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) + } - if si.State.Msgs != uint64(expected) { - t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) - } + sub, err := js.PullSubscribe(subjectName, "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } - sub, err := js.PullSubscribe("foo", "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + c := 0 + for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { + m.AckSync() + c++ + } + if c != expected { + t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + } + } - c := 0 - for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { - m.AckSync() - c++ - } - if c != expected { - t.Fatalf("Should have read back %d messages but got %d messages", expected, c) - } - } + // First off, we start up using the original encryption key and algorithm. + // We'll create a stream and populate it with some messages. + t.Run("setup", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + store_dir: %q + } + `, "firstencryptionkey", algo.from, storeDir))) - func() { - s, _ := RunServerWithConfig(conf1) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - cfg := &nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - } - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + cfg := &nats.StreamConfig{ + Name: streamName, + Subjects: []string{subjectName}, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } - for i := 0; i < expected; i++ { - if _, err := js.Publish("foo", []byte("ENCRYPTED PAYLOAD!!")); err != nil { - t.Fatalf("Unexpected publish error: %v", err) - } - } + for i := 0; i < expected; i++ { + if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } - checkStream(js) - }() + checkStream(js) + }) - func() { - s, _ := RunServerWithConfig(conf2) - defer s.Shutdown() + // Next up, we will restart the server, this time with both the new key + // and algorithm and also the old key. At startup, the server will detect + // the change in encryption key and/or algorithm and re-encrypt the stream. + t.Run("reencrypt", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + old_key: %q, + store_dir: %q + } + `, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir))) - nc, js := jsClientConnect(t, s) - defer nc.Close() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - checkStream(js) - }() + nc, js := jsClientConnect(t, s) + defer nc.Close() - func() { - s, _ := RunServerWithConfig(conf3) - defer s.Shutdown() + checkStream(js) + }) - nc, js := jsClientConnect(t, s) - defer nc.Close() + // Finally, we'll restart the server using only the new key and algorithm. + // At this point everything should have been re-encrypted, so we should still + // be able to access the stream. + t.Run("restart", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: S22 + listen: 127.0.0.1:-1 + jetstream: { + key: %q, + cipher: %s, + store_dir: %q + } + `, "secondencryptionkey", algo.to, storeDir))) - checkStream(js) - }() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + checkStream(js) + }) + }) + } }