From 567077c67a92d3f6e1889ff681bbb665af077c99 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 31 May 2023 12:00:19 +0100 Subject: [PATCH] Add test for MQTT retained message migration Signed-off-by: Neil Twigg --- server/mqtt_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 9c9b0b8ff27..7c5888d926c 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -2993,6 +2993,78 @@ func TestMQTTRetainedMsgNetworkUpdates(t *testing.T) { } } +func TestMQTTRetainedMsgMigration(t *testing.T) { + o := testMQTTDefaultOptions() + s := testMQTTRunServer(t, o) + defer testMQTTShutdownServer(s) + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // Create the retained messages stream to listen on the old subject first. + // The server will correct this when the migration takes place. + _, err := js.AddStream(&nats.StreamConfig{ + Name: mqttRetainedMsgsStreamName, + Subjects: []string{`$MQTT.rmsgs`}, + Storage: nats.FileStorage, + Retention: nats.LimitsPolicy, + Replicas: 1, + }) + require_NoError(t, err) + + // Publish some retained messages on the old "$MQTT.rmsgs" subject. + for i := 0; i < 100; i++ { + msg := fmt.Sprintf( + `{"origin":"b5IQZNtG","subject":"test%d","topic":"test%d","msg":"YmFy","flags":1}`, i, i, + ) + _, err := js.Publish(`$MQTT.rmsgs`, []byte(msg)) + require_NoError(t, err) + } + + // Check that the old subject looks right. + si, err := js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{ + SubjectsFilter: `$MQTT.>`, + }) + require_NoError(t, err) + if si.State.NumSubjects != 1 { + t.Fatalf("expected 1 subject, got %d", si.State.NumSubjects) + } + if n := si.State.Subjects[`$MQTT.rmsgs`]; n != 100 { + t.Fatalf("expected to find 100 messages on the original subject but found %d", n) + } + + // Create an MQTT client, this will cause a migration to take place. + mc, rc := testMQTTConnect(t, &mqttConnInfo{clientID: "sub", cleanSess: true}, o.MQTT.Host, o.MQTT.Port) + defer mc.Close() + testMQTTCheckConnAck(t, rc, mqttConnAckRCConnectionAccepted, false) + + // Now look at the stream, there should be 100 messages on the new + // divided subjects and none on the old undivided subject. + si, err = js.StreamInfo(mqttRetainedMsgsStreamName, &nats.StreamInfoRequest{ + SubjectsFilter: `$MQTT.>`, + }) + require_NoError(t, err) + if si.State.NumSubjects != 100 { + t.Fatalf("expected 100 subjects, got %d", si.State.NumSubjects) + } + if n := si.State.Subjects[`$MQTT.rmsgs`]; n > 0 { + t.Fatalf("expected to find no messages on the original subject but found %d", n) + } + + // Check that the message counts look right. There should be one + // retained message per key. + for i := 0; i < 100; i++ { + expected := fmt.Sprintf(`$MQTT.rmsgs.test%d`, i) + n, ok := si.State.Subjects[expected] + if !ok { + t.Fatalf("expected to find %q but didn't", expected) + } + if n != 1 { + t.Fatalf("expected %q to have 1 message but had %d", expected, n) + } + } +} + func TestMQTTClusterReplicasCount(t *testing.T) { for _, test := range []struct { size int