Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/amqp/error_stdlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
// SOFTWARE

//go:build !pkgerrors
// +build !pkgerrors

package amqp

Expand Down
1 change: 0 additions & 1 deletion pkg/stream/environment_debug.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build debug
// +build debug

package stream

Expand Down
233 changes: 121 additions & 112 deletions pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,40 +203,42 @@ var _ = Describe("Environment test", func() {
})

Describe("TCP Parameters", func() {
It("should accept custom TCP parameters", func() {
env, err := NewEnvironment(&EnvironmentOptions{
ConnectionParameters: []*Broker{
newBrokerDefault(),
},
TCPParameters: &TCPParameters{
tlsConfig: nil,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: defaultMaxFrameSize,
WriteBuffer: 100,
ReadBuffer: 200,
NoDelay: false,
},
MaxProducersPerClient: 1,
MaxConsumersPerClient: 1,
AddressResolver: nil,
RPCTimeout: defaultSocketCallTimeout,
})

env, err := NewEnvironment(&EnvironmentOptions{
ConnectionParameters: []*Broker{
newBrokerDefault(),
},
TCPParameters: &TCPParameters{
tlsConfig: nil,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: defaultMaxFrameSize,
WriteBuffer: 100,
ReadBuffer: 200,
NoDelay: false,
},
MaxProducersPerClient: 1,
MaxConsumersPerClient: 1,
AddressResolver: nil,
RPCTimeout: defaultSocketCallTimeout,
Expect(err).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})

Expect(err).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())

})

Describe("Environment Validations", func() {
_, err := NewEnvironment(NewEnvironmentOptions().
SetMaxConsumersPerClient(0).
SetMaxProducersPerClient(0))
Expect(err).To(HaveOccurred())
It("should reject invalid max consumers/producers", func() {
_, err := NewEnvironment(NewEnvironmentOptions().
SetMaxConsumersPerClient(0).
SetMaxProducersPerClient(0))
Expect(err).To(HaveOccurred())

_, err = NewEnvironment(NewEnvironmentOptions().
SetMaxConsumersPerClient(500).
SetMaxProducersPerClient(500))
Expect(err).To(HaveOccurred())
_, err = NewEnvironment(NewEnvironmentOptions().
SetMaxConsumersPerClient(500).
SetMaxProducersPerClient(500))
Expect(err).To(HaveOccurred())
})

It("Malformed URI", func() {
_, err := NewEnvironment(NewEnvironmentOptions().
Expand Down Expand Up @@ -273,63 +275,66 @@ var _ = Describe("Environment test", func() {
})

Describe("Validation Query Offset/Sequence", func() {
It("should return error for non-existent streams", func() {
env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
_, err = env.QuerySequence("my_prod",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())

env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
_, err = env.QuerySequence("my_prod",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())

_, err = env.QueryOffset("my_cons",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
_, err = env.QueryOffset("my_cons",
"Stream_Doesnt_exist")
Expect(err).To(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})
})

Describe("Stream Existing/Meta data", func() {

env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552).
SetUser("guest").
SetPassword("guest").SetHost("localhost"))
Expect(err).NotTo(HaveOccurred())
stream := uuid.New().String()
err = env.DeclareStream(stream, nil)
Expect(err).NotTo(HaveOccurred())
exists, err := env.StreamExists(stream)
Expect(err).NotTo(HaveOccurred())
Expect(exists).To(Equal(true))
metaData, err := env.StreamMetaData(stream)
Expect(err).NotTo(HaveOccurred())
Expect(metaData.Leader.Host).To(Equal("localhost"))
Expect(metaData.Leader.Port).To(Equal("5552"))
Expect(len(metaData.Replicas)).To(Equal(0))
Expect(env.DeleteStream(stream)).NotTo(HaveOccurred())
exists, err = env.StreamExists(stream)
Expect(err).NotTo(HaveOccurred())
Expect(exists).To(Equal(false))
Expect(env.Close()).NotTo(HaveOccurred())

It("should check stream existence and metadata", func() {
env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552).
SetUser("guest").
SetPassword("guest").SetHost("localhost"))
Expect(err).NotTo(HaveOccurred())
stream := uuid.New().String()
err = env.DeclareStream(stream, nil)
Expect(err).NotTo(HaveOccurred())
exists, err := env.StreamExists(stream)
Expect(err).NotTo(HaveOccurred())
Expect(exists).To(Equal(true))
metaData, err := env.StreamMetaData(stream)
Expect(err).NotTo(HaveOccurred())
Expect(metaData.Leader.Host).To(Equal("localhost"))
Expect(metaData.Leader.Port).To(Equal("5552"))
Expect(len(metaData.Replicas)).To(Equal(0))
Expect(env.DeleteStream(stream)).NotTo(HaveOccurred())
exists, err = env.StreamExists(stream)
Expect(err).NotTo(HaveOccurred())
Expect(exists).To(Equal(false))
Expect(env.Close()).NotTo(HaveOccurred())
})
})

Describe("Address Resolver", func() {
addressResolver := AddressResolver{
Host: "localhost",
Port: 5552,
}
env, err := NewEnvironment(
NewEnvironmentOptions().
SetHost(addressResolver.Host).
SetPort(addressResolver.Port).
SetAddressResolver(addressResolver).
SetMaxProducersPerClient(1))
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
p, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.Close()).NotTo(HaveOccurred())
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
It("should connect using address resolver", func() {
addressResolver := AddressResolver{
Host: "localhost",
Port: 5552,
}
env, err := NewEnvironment(
NewEnvironmentOptions().
SetHost(addressResolver.Host).
SetPort(addressResolver.Port).
SetAddressResolver(addressResolver).
SetMaxProducersPerClient(1))
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
p, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(p.Close()).NotTo(HaveOccurred())
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})
})

It("Multi Uris/Multi Uris Fails", func() {
Expand Down Expand Up @@ -477,44 +482,48 @@ var _ = Describe("Environment test", func() {
})

Describe("Query Offset should return the value from Store Offset", func() {
env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
const consumerName = "my_consumer"
Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred())
off, err := env.QueryOffset(consumerName, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(off).To(Equal(int64(123)))
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
It("should store and query offset", func() {
env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
const consumerName = "my_consumer"
Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred())
off, err := env.QueryOffset(consumerName, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(off).To(Equal(int64(123)))
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})
})

// PR:https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/388
Describe("QueryOffset DeclareStream StoreOffset should reconnect the locator", func() {
env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
// here we force the client closing
env.locator.client.Close()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
const consumerName = "my_consumer_1"
// here we force the client closing
env.locator.client.Close()
Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
// here we force the client closing
env.locator.client.Close()
off, err := env.QueryOffset(consumerName, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
Expect(off).To(Equal(int64(123)))
// here we force the client closing
env.locator.client.Close()
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
Expect(env.Close()).NotTo(HaveOccurred())
It("should reconnect locator when needed", func() {
env, err := NewEnvironment(NewEnvironmentOptions())
Expect(err).NotTo(HaveOccurred())
streamName := uuid.New().String()
// here we force the client closing
env.locator.client.Close()
Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
const consumerName = "my_consumer_1"
// here we force the client closing
env.locator.client.Close()
Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
// here we force the client closing
env.locator.client.Close()
off, err := env.QueryOffset(consumerName, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
Expect(off).To(Equal(int64(123)))
// here we force the client closing
env.locator.client.Close()
Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred())
Expect(env.locator.client.socket.isOpen()).To(BeTrue())
Expect(env.Close()).NotTo(HaveOccurred())
})
})

})
6 changes: 3 additions & 3 deletions pkg/test-helper/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func HaveMatchingData(expected string) types.GomegaMatcher {
}
}

func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err error) {
func (matcher *MessageDataMatcher) Match(actual any) (success bool, err error) {
msg, ok := actual.(*amqp.Message)
if !ok {
return false, fmt.Errorf("HaveMatchingData matcher expects a *amqp.Message")
Expand All @@ -37,7 +37,7 @@ func (matcher *MessageDataMatcher) Match(actual interface{}) (success bool, err
return bytes.Equal(actualData, []byte(matcher.ExpectedData)), nil
}

func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message string) {
func (matcher *MessageDataMatcher) FailureMessage(actual any) (message string) {
msg := actual.(*amqp.Message)
var actualData []byte
for _, data := range msg.Data {
Expand All @@ -46,7 +46,7 @@ func (matcher *MessageDataMatcher) FailureMessage(actual interface{}) (message s
return fmt.Sprintf("Expected\n\t%#v\nto have data matching\n\t%#v\nbut it was\n\t%#v", actual, matcher.ExpectedData, string(actualData))
}

func (matcher *MessageDataMatcher) NegatedFailureMessage(actual interface{}) (message string) {
func (matcher *MessageDataMatcher) NegatedFailureMessage(actual any) (message string) {
msg := actual.(*amqp.Message)
var actualData []byte
for _, data := range msg.Data {
Expand Down
Loading