diff --git a/pkg/amqp/error_stdlib.go b/pkg/amqp/error_stdlib.go index 5805a29b..c8483f22 100644 --- a/pkg/amqp/error_stdlib.go +++ b/pkg/amqp/error_stdlib.go @@ -22,7 +22,6 @@ // SOFTWARE //go:build !pkgerrors -// +build !pkgerrors package amqp diff --git a/pkg/stream/environment_debug.go b/pkg/stream/environment_debug.go index 307a6611..49c49c6c 100644 --- a/pkg/stream/environment_debug.go +++ b/pkg/stream/environment_debug.go @@ -1,5 +1,4 @@ //go:build debug -// +build debug package stream diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 1da50f7d..5ad03dcb 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -203,40 +203,42 @@ var _ = Describe("Environment test", func() { }) Describe("TCP Parameters", func() { + It("should accept custom TCP parameters", func() { + env, err := NewEnvironment(&EnvironmentOptions{ + ConnectionParameters: []*Broker{ + newBrokerDefault(), + }, + TCPParameters: &TCPParameters{ + tlsConfig: nil, + RequestedHeartbeat: defaultHeartbeat, + RequestedMaxFrameSize: defaultMaxFrameSize, + WriteBuffer: 100, + ReadBuffer: 200, + NoDelay: false, + }, + MaxProducersPerClient: 1, + MaxConsumersPerClient: 1, + AddressResolver: nil, + RPCTimeout: defaultSocketCallTimeout, + }) - env, err := NewEnvironment(&EnvironmentOptions{ - ConnectionParameters: []*Broker{ - newBrokerDefault(), - }, - TCPParameters: &TCPParameters{ - tlsConfig: nil, - RequestedHeartbeat: defaultHeartbeat, - RequestedMaxFrameSize: defaultMaxFrameSize, - WriteBuffer: 100, - ReadBuffer: 200, - NoDelay: false, - }, - MaxProducersPerClient: 1, - MaxConsumersPerClient: 1, - AddressResolver: nil, - RPCTimeout: defaultSocketCallTimeout, + Expect(err).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) }) - - Expect(err).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) - }) Describe("Environment Validations", func() { - _, err := NewEnvironment(NewEnvironmentOptions(). - SetMaxConsumersPerClient(0). - SetMaxProducersPerClient(0)) - Expect(err).To(HaveOccurred()) + It("should reject invalid max consumers/producers", func() { + _, err := NewEnvironment(NewEnvironmentOptions(). + SetMaxConsumersPerClient(0). + SetMaxProducersPerClient(0)) + Expect(err).To(HaveOccurred()) - _, err = NewEnvironment(NewEnvironmentOptions(). - SetMaxConsumersPerClient(500). - SetMaxProducersPerClient(500)) - Expect(err).To(HaveOccurred()) + _, err = NewEnvironment(NewEnvironmentOptions(). + SetMaxConsumersPerClient(500). + SetMaxProducersPerClient(500)) + Expect(err).To(HaveOccurred()) + }) It("Malformed URI", func() { _, err := NewEnvironment(NewEnvironmentOptions(). @@ -273,63 +275,66 @@ var _ = Describe("Environment test", func() { }) Describe("Validation Query Offset/Sequence", func() { + It("should return error for non-existent streams", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + _, err = env.QuerySequence("my_prod", + "Stream_Doesnt_exist") + Expect(err).To(HaveOccurred()) - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - _, err = env.QuerySequence("my_prod", - "Stream_Doesnt_exist") - Expect(err).To(HaveOccurred()) - - _, err = env.QueryOffset("my_cons", - "Stream_Doesnt_exist") - Expect(err).To(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + _, err = env.QueryOffset("my_cons", + "Stream_Doesnt_exist") + Expect(err).To(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) Describe("Stream Existing/Meta data", func() { - - env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552). - SetUser("guest"). - SetPassword("guest").SetHost("localhost")) - Expect(err).NotTo(HaveOccurred()) - stream := uuid.New().String() - err = env.DeclareStream(stream, nil) - Expect(err).NotTo(HaveOccurred()) - exists, err := env.StreamExists(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(exists).To(Equal(true)) - metaData, err := env.StreamMetaData(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(metaData.Leader.Host).To(Equal("localhost")) - Expect(metaData.Leader.Port).To(Equal("5552")) - Expect(len(metaData.Replicas)).To(Equal(0)) - Expect(env.DeleteStream(stream)).NotTo(HaveOccurred()) - exists, err = env.StreamExists(stream) - Expect(err).NotTo(HaveOccurred()) - Expect(exists).To(Equal(false)) - Expect(env.Close()).NotTo(HaveOccurred()) - + It("should check stream existence and metadata", func() { + env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552). + SetUser("guest"). + SetPassword("guest").SetHost("localhost")) + Expect(err).NotTo(HaveOccurred()) + stream := uuid.New().String() + err = env.DeclareStream(stream, nil) + Expect(err).NotTo(HaveOccurred()) + exists, err := env.StreamExists(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(Equal(true)) + metaData, err := env.StreamMetaData(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(metaData.Leader.Host).To(Equal("localhost")) + Expect(metaData.Leader.Port).To(Equal("5552")) + Expect(len(metaData.Replicas)).To(Equal(0)) + Expect(env.DeleteStream(stream)).NotTo(HaveOccurred()) + exists, err = env.StreamExists(stream) + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(Equal(false)) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) Describe("Address Resolver", func() { - addressResolver := AddressResolver{ - Host: "localhost", - Port: 5552, - } - env, err := NewEnvironment( - NewEnvironmentOptions(). - SetHost(addressResolver.Host). - SetPort(addressResolver.Port). - SetAddressResolver(addressResolver). - SetMaxProducersPerClient(1)) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - p, err := env.NewProducer(streamName, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(p.Close()).NotTo(HaveOccurred()) - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should connect using address resolver", func() { + addressResolver := AddressResolver{ + Host: "localhost", + Port: 5552, + } + env, err := NewEnvironment( + NewEnvironmentOptions(). + SetHost(addressResolver.Host). + SetPort(addressResolver.Port). + SetAddressResolver(addressResolver). + SetMaxProducersPerClient(1)) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + p, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(p.Close()).NotTo(HaveOccurred()) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) It("Multi Uris/Multi Uris Fails", func() { @@ -477,44 +482,48 @@ var _ = Describe("Environment test", func() { }) Describe("Query Offset should return the value from Store Offset", func() { - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - const consumerName = "my_consumer" - Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) - off, err := env.QueryOffset(consumerName, streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(off).To(Equal(int64(123))) - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should store and query offset", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + const consumerName = "my_consumer" + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(off).To(Equal(int64(123))) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) // PR:https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388 Describe("QueryOffset DeclareStream StoreOffset should reconnect the locator", func() { - env, err := NewEnvironment(NewEnvironmentOptions()) - Expect(err).NotTo(HaveOccurred()) - streamName := uuid.New().String() - // here we force the client closing - env.locator.client.Close() - Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - const consumerName = "my_consumer_1" - // here we force the client closing - env.locator.client.Close() - Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - // here we force the client closing - env.locator.client.Close() - off, err := env.QueryOffset(consumerName, streamName) - Expect(err).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - Expect(off).To(Equal(int64(123))) - // here we force the client closing - env.locator.client.Close() - Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) - Expect(env.locator.client.socket.isOpen()).To(BeTrue()) - Expect(env.Close()).NotTo(HaveOccurred()) + It("should reconnect locator when needed", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + // here we force the client closing + env.locator.client.Close() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + const consumerName = "my_consumer_1" + // here we force the client closing + env.locator.client.Close() + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + // here we force the client closing + env.locator.client.Close() + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + Expect(off).To(Equal(int64(123))) + // here we force the client closing + env.locator.client.Close() + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.locator.client.socket.isOpen()).To(BeTrue()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) }) }) diff --git a/pkg/test-helper/matchers.go b/pkg/test-helper/matchers.go index fea447a7..5161c11c 100644 --- a/pkg/test-helper/matchers.go +++ b/pkg/test-helper/matchers.go @@ -18,7 +18,7 @@ func HaveMatchingData(expected string) types.GomegaMatcher { } } -func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err error) { +func (matcher *MessageDataMatcher) Match(actual any) (success bool, err error) { msg, ok := actual.(*amqp.Message) if !ok { return false, fmt.Errorf("HaveMatchingData matcher expects a *amqp.Message") @@ -37,7 +37,7 @@ func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err return bytes.Equal(actualData, []byte(matcher.ExpectedData)), nil } -func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message string) { +func (matcher *MessageDataMatcher) FailureMessage(actual any) (message string) { msg := actual.(*amqp.Message) var actualData []byte for _, data := range msg.Data { @@ -46,7 +46,7 @@ func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message s return fmt.Sprintf("Expected\n\t%#v\nto have data matching\n\t%#v\nbut it was\n\t%#v", actual, matcher.ExpectedData, string(actualData)) } -func (matcher *MessageDataMatcher) NegatedFailureMessage(actual interface{}) (message string) { +func (matcher *MessageDataMatcher) NegatedFailureMessage(actual any) (message string) { msg := actual.(*amqp.Message) var actualData []byte for _, data := range msg.Data {