From 1e6b6fe6c8dc5c46a8ff9b7fd26e15f7119c4572 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 24 Nov 2025 09:32:58 -0800 Subject: [PATCH] Fix Ginkgo test structure in `environment_test.go` Seven `Describe` blocks in `environment_test.go` incorrectly contain test code directly in the `Describe` block body instead of inside `It` blocks. Ginkgo requires all assertions and test setup to be inside leaf nodes (`It`, `BeforeEach`, `AfterEach`, etc.), not in container nodes. This causes panics during test tree construction when the test framework is not fully initialized. This change wraps all test code in the following `Describe` blocks with appropriate `It` blocks: - "TCP Parameters" - "Environment Validations" - "Validation Query Offset/Sequence" - "Stream Existing/Meta data" - "Address Resolver" - "Query Offset should return the value from Store Offset" - "QueryOffset DeclareStream StoreOffset should reconnect the locator" All tests now pass successfully with proper Ginkgo structure. --- pkg/amqp/error_stdlib.go | 1 - pkg/stream/environment_debug.go | 1 - pkg/stream/environment_test.go | 233 +++++++++++++++++--------------- pkg/test-helper/matchers.go | 6 +- 4 files changed, 124 insertions(+), 117 deletions(-) diff --git a/pkg/amqp/error_stdlib.go b/pkg/amqp/error_stdlib.go index 5805a29b..c8483f22 100644 --- a/pkg/amqp/error_stdlib.go +++ b/pkg/amqp/error_stdlib.go @@ -22,7 +22,6 @@ // SOFTWARE //go:build !pkgerrors -// +build !pkgerrors package amqp diff --git a/pkg/stream/environment_debug.go b/pkg/stream/environment_debug.go index 307a6611..49c49c6c 100644 --- a/pkg/stream/environment_debug.go +++ b/pkg/stream/environment_debug.go @@ -1,5 +1,4 @@ //go:build debug -// +build debug package stream diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 1da50f7d..5ad03dcb 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -203,40 +203,42 @@ var _ = Describe("Environment test", func() { }) Describe("TCP Parameters", func() { + It("should accept custom TCP parameters", func() { + env, err := NewEnvironment(&EnvironmentOptions{ + ConnectionParameters: []*Broker{ + newBrokerDefault(), + }, + TCPParameters: &TCPParameters{ + tlsConfig: nil, + RequestedHeartbeat: defaultHeartbeat, + RequestedMaxFrameSize: defaultMaxFrameSize, + WriteBuffer: 100, + ReadBuffer: 200, + NoDelay: false, + }, + MaxProducersPerClient: 1, + MaxConsumersPerClient: 1, + AddressResolver: nil, + RPCTimeout: defaultSocketCallTimeout, + }) - env, err := NewEnvironment(&EnvironmentOptions{ - ConnectionParameters: []*Broker{ - newBrokerDefault(), - }, - TCPParameters: &TCPParameters{ - tlsConfig: nil, - RequestedHeartbeat: defaultHeartbeat, - RequestedMaxFrameSize: defaultMaxFrameSize, - WriteBuffer: 100, - ReadBuffer: 200, - NoDelay: false, - }, - MaxProducersPerClient: 1, - MaxConsumersPerClient: 1, - AddressResolver: nil, - RPCTimeout: defaultSocketCallTimeout, + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) }) - - Expect(err).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) - }) Describe("Environment Validations", func() { - _, err := NewEnvironment(NewEnvironmentOptions(). - SetMaxConsumersPerClient(0). - SetMaxProducersPerClient(0)) - Expect(err).To(HaveOccurred()) + It("should reject invalid max consumers/producers", func() { + _, err := NewEnvironment(NewEnvironmentOptions(). + SetMaxConsumersPerClient(0). + SetMaxProducersPerClient(0)) + Expect(err).To(HaveOccurred()) - _, err = NewEnvironment(NewEnvironmentOptions(). - SetMaxConsumersPerClient(500). - SetMaxProducersPerClient(500)) - Expect(err).To(HaveOccurred()) + _, err = NewEnvironment(NewEnvironmentOptions(). + SetMaxConsumersPerClient(500). + SetMaxProducersPerClient(500)) + Expect(err).To(HaveOccurred()) + }) It("Malformed URI", func() { _, err := NewEnvironment(NewEnvironmentOptions(). @@ -273,63 +275,66 @@ var _ = Describe("Environment test", func() { }) Describe("Validation Query Offset/Sequence", func() { + It("should return error for non-existent streams", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + _, err = env.QuerySequence("my_prod", + "Stream_Doesnt_exist") + Expect(err).To(HaveOccurred()) - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - _, err = env.QuerySequence("my_prod", - "Stream_Doesnt_exist") - Expect(err).To(HaveOccurred()) - - _, err = env.QueryOffset("my_cons", - "Stream_Doesnt_exist") - Expect(err).To(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + _, err = env.QueryOffset("my_cons", + "Stream_Doesnt_exist") + Expect(err).To(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) Describe("Stream Existing/Meta data", func() { - - env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552). - SetUser("guest"). - SetPassword("guest").SetHost("localhost")) - Expect(err).NotTo(HaveOccurred()) - stream := uuid.New().String() - err = env.DeclareStream(stream, nil) - Expect(err).NotTo(HaveOccurred()) - exists, err := env.StreamExists(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(exists).To(Equal(true)) - metaData, err := env.StreamMetaData(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(metaData.Leader.Host).To(Equal("localhost")) - Expect(metaData.Leader.Port).To(Equal("5552")) - Expect(len(metaData.Replicas)).To(Equal(0)) - Expect(env.DeleteStream(stream)).NotTo(HaveOccurred()) - exists, err = env.StreamExists(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(exists).To(Equal(false)) - Expect(env.Close()).NotTo(HaveOccurred()) - + It("should check stream existence and metadata", func() { + env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552). + SetUser("guest"). + SetPassword("guest").SetHost("localhost")) + Expect(err).NotTo(HaveOccurred()) + stream := uuid.New().String() + err = env.DeclareStream(stream, nil) + Expect(err).NotTo(HaveOccurred()) + exists, err := env.StreamExists(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(Equal(true)) + metaData, err := env.StreamMetaData(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(metaData.Leader.Host).To(Equal("localhost")) + Expect(metaData.Leader.Port).To(Equal("5552")) + Expect(len(metaData.Replicas)).To(Equal(0)) + Expect(env.DeleteStream(stream)).NotTo(HaveOccurred()) + exists, err = env.StreamExists(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(Equal(false)) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) Describe("Address Resolver", func() { - addressResolver := AddressResolver{ - Host: "localhost", - Port: 5552, - } - env, err := NewEnvironment( - NewEnvironmentOptions(). - SetHost(addressResolver.Host). - SetPort(addressResolver.Port). - SetAddressResolver(addressResolver). - SetMaxProducersPerClient(1)) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - p, err := env.NewProducer(streamName, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(p.Close()).NotTo(HaveOccurred()) - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should connect using address resolver", func() { + addressResolver := AddressResolver{ + Host: "localhost", + Port: 5552, + } + env, err := NewEnvironment( + NewEnvironmentOptions(). + SetHost(addressResolver.Host). + SetPort(addressResolver.Port). + SetAddressResolver(addressResolver). + SetMaxProducersPerClient(1)) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + p, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(p.Close()).NotTo(HaveOccurred()) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) It("Multi Uris/Multi Uris Fails", func() { @@ -477,44 +482,48 @@ var _ = Describe("Environment test", func() { }) Describe("Query Offset should return the value from Store Offset", func() { - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - const consumerName = "my_consumer" - Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) - off, err := env.QueryOffset(consumerName, streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(off).To(Equal(int64(123))) - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should store and query offset", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + const consumerName = "my_consumer" + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(off).To(Equal(int64(123))) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) // PR:https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388 Describe("QueryOffset DeclareStream StoreOffset should reconnect the locator", func() { - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - // here we force the client closing - env.locator.client.Close() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - const consumerName = "my_consumer_1" - // here we force the client closing - env.locator.client.Close() - Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - // here we force the client closing - env.locator.client.Close() - off, err := env.QueryOffset(consumerName, streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - Expect(off).To(Equal(int64(123))) - // here we force the client closing - env.locator.client.Close() - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should reconnect locator when needed", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + // here we force the client closing + env.locator.client.Close() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + const consumerName = "my_consumer_1" + // here we force the client closing + env.locator.client.Close() + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + // here we force the client closing + env.locator.client.Close() + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + Expect(off).To(Equal(int64(123))) + // here we force the client closing + env.locator.client.Close() + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) }) diff --git a/pkg/test-helper/matchers.go b/pkg/test-helper/matchers.go index fea447a7..5161c11c 100644 --- a/pkg/test-helper/matchers.go +++ b/pkg/test-helper/matchers.go @@ -18,7 +18,7 @@ func HaveMatchingData(expected string) types.GomegaMatcher { } } -func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err error) { +func (matcher *MessageDataMatcher) Match(actual any) (success bool, err error) { msg, ok := actual.(*amqp.Message) if !ok { return false, fmt.Errorf("HaveMatchingData matcher expects a *amqp.Message") @@ -37,7 +37,7 @@ func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err return bytes.Equal(actualData, []byte(matcher.ExpectedData)), nil } -func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message string) { +func (matcher *MessageDataMatcher) FailureMessage(actual any) (message string) { msg := actual.(*amqp.Message) var actualData []byte for _, data := range msg.Data { @@ -46,7 +46,7 @@ func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message s return fmt.Sprintf("Expected\n\t%#v\nto have data matching\n\t%#v\nbut it was\n\t%#v", actual, matcher.ExpectedData, string(actualData)) } -func (matcher *MessageDataMatcher) NegatedFailureMessage(actual interface{}) (message string) { +func (matcher *MessageDataMatcher) NegatedFailureMessage(actual any) (message string) { msg := actual.(*amqp.Message) var actualData []byte for _, data := range msg.Data {