diff --git a/server/jetstream_chaos_test.go b/server/jetstream_chaos_test.go index 5743e81df2..05ada63c04 100644 --- a/server/jetstream_chaos_test.go +++ b/server/jetstream_chaos_test.go @@ -192,14 +192,10 @@ func jsClientConnectCluster(t testing.TB, c *cluster) (*nats.Conn, nats.JetStrea connectURL := strings.Join(serverConnectURLs, ",") nc, err := nats.Connect(connectURL) - if err != nil { - t.Fatalf("Failed to connect: %s", err) - } + require_NoError(t, err) js, err := nc.JetStream() - if err != nil { - t.Fatalf("Failed to init JetStream context: %s", err) - } + require_NoError(t, err) return nc, js } @@ -290,18 +286,14 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes Subjects: []string{chaosConsumerTestsSubject}, Replicas: replicas, }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } + require_NoError(t, err) ackFutures := make([]nats.PubAckFuture, 0, publishBatchSize) for i := 1; i <= numMessages; i++ { message := []byte(fmt.Sprintf("%d", i)) pubAckFuture, err := pubJs.PublishAsync(chaosConsumerTestsSubject, message, nats.ExpectLastSequence(uint64(i-1))) - if err != nil { - t.Fatalf("Publish error: %s", err) - } + require_NoError(t, err) ackFutures = append(ackFutures, pubAckFuture) if (i > 0 && i%publishBatchSize == 0) || i == numMessages { @@ -312,7 +304,7 @@ func createStreamForConsumerChaosTest(t *testing.T, c *cluster, replicas, numMes case <-pubAckFuture.Ok(): // Noop case pubAckErr := <-pubAckFuture.Err(): - t.Fatalf("Error publishing: %s", pubAckErr) + require_NoError(t, pubAckErr) case <-time.After(30 * time.Second): t.Fatalf("Timeout verifying pubAck for message: %s", pubAckFuture.Msg().Data) } @@ -362,9 +354,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { chaosConsumerTestsSubject, nats.OrderedConsumer(), ) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, err) defer sub.Unsubscribe() if chaosConsumerTestsDebug { @@ -394,10 +384,7 @@ func TestJetStreamChaosConsumerOrdered(t *testing.T) { } metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } - + require_NoError(t, err) if metadata.Sequence.Stream != uint64(i) { t.Fatalf("Expecting stream sequence %d, got %d instead", i, metadata.Sequence.Stream) } @@ -454,9 +441,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { deliveryCount += 1 metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } + require_NoError(t, err) seq := metadata.Sequence.Stream var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) @@ -505,9 +490,7 @@ func TestJetStreamChaosConsumerAsync(t *testing.T) { subOpts := []nats.SubOpt{} sub, err := subJs.Subscribe(chaosConsumerTestsSubject, handleMsg, subOpts...) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, err) defer sub.Unsubscribe() chaos.start() @@ -663,9 +646,7 @@ func TestJetStreamChaosConsumerDurable(t *testing.T) { deliveryCount += 1 metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } + require_NoError(t, err) seq := metadata.Sequence.Stream var expectedMsgData = []byte(fmt.Sprintf("%d", seq)) @@ -778,9 +759,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) { chaosConsumerTestsSubject, durableConsumerName, ) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, err) defer sub.Unsubscribe() if chaosConsumerTestsDebug { @@ -819,9 +798,7 @@ func TestJetStreamChaosConsumerPull(t *testing.T) { deliveredCount += 1 metadata, err := msg.Metadata() - if err != nil { - t.Fatalf("Failed to get message metadata: %v", err) - } + require_NoError(t, err) streamSeq := metadata.Sequence.Stream @@ -878,34 +855,25 @@ func createBucketForKvChaosTest(t *testing.T, c *cluster, replicas int, allowDir } kvs, err := js.CreateKeyValue(&config) - if err != nil { - t.Fatalf("Error creating bucket: %v", err) - } + require_NoError(t, err) if !allowDirectGet { // Manually disable DirectGet on underlying stream, routing get request through the leader streamInfo, err := js.StreamInfo("KV_" + chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get stream info: %s", err) - } + require_NoError(t, err) + streamInfo.Config.AllowDirect = false streamInfo, err = js.UpdateStream(&streamInfo.Config) - if err != nil { - t.Fatalf("Failed to update KV stream: %s", err) - } else if streamInfo.Config.AllowDirect { - t.Fatalf("Failed to disable 'DirectGet'") - } + require_NoError(t, err) + require_False(t, streamInfo.Config.AllowDirect) kvs, err = js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed reopen KV handle: %s", err) - } + require_NoError(t, err) } status, err := kvs.Status() - if err != nil { - t.Fatalf("Error retrieving bucket status: %v", err) - } + require_NoError(t, err) + t.Logf("Bucket created: %s", status.Bucket()) } @@ -942,17 +910,12 @@ func TestJetStreamChaosKvPutGet(t *testing.T) { // Create KV bucket kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } + require_NoError(t, err) // Initialize the only key firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } else if firstRevision != 1 { - t.Fatalf("Unexpected revision: %d", firstRevision) - } + require_NoError(t, err) + require_Equal(t, firstRevision, 1) // Start chaos chaos.start() @@ -1055,17 +1018,12 @@ func TestJetStreamChaosKvPutGetWithRetries(t *testing.T) { defer nc.Close() kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } + require_NoError(t, err) // Initialize key value firstRevision, err := kv.Create(key, []byte("INITIAL VALUE")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } else if firstRevision != 1 { - t.Fatalf("Unexpected revision: %d", firstRevision) - } + require_NoError(t, err) + require_Equal(t, firstRevision, 1) // Start chaos chaos.start() @@ -1183,9 +1141,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) { // Create bucket kv, err := js.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } + require_NoError(t, err) // Create set of keys and initialize them with dummy value keys := make([]string, numKeys) @@ -1194,9 +1150,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) { keys[k] = key _, err := kv.Create(key, []byte("Initial value")) - if err != nil { - t.Fatalf("Failed to create key: %v", err) - } + require_NoError(t, err) } wgStart := sync.WaitGroup{} @@ -1292,9 +1246,7 @@ func TestJetStreamChaosKvCAS(t *testing.T) { defer cNc.Close() cKv, err := cJs.KeyValue(chaosKvTestsBucketName) - if err != nil { - t.Fatalf("Failed to get KV store: %v", err) - } + require_NoError(t, err) wgStart.Add(1) wgComplete.Add(1)