diff --git a/README.md b/README.md index 5af74f1..8a1bea3 100644 --- a/README.md +++ b/README.md @@ -317,7 +317,17 @@ q, err := rmqc.GetShovel("/", "a.shovel") // => ShovelInfo, err // declares a shovel -shovelDetails := rabbithole.ShovelDefinition{SourceURI: "amqp://sourceURI", SourceQueue: "mySourceQueue", DestinationURI: "amqp://destinationURI", DestinationQueue: "myDestQueue", AddForwardHeaders: true, AckMode: "on-confirm", DeleteAfter: "never"} +shovelDetails := rabbithole.ShovelDefinition{ + SourceURI: "amqp://sourceURI", + SourceProtocol: "amqp091", + SourceQueue: "mySourceQueue", + DestinationURI: "amqp://destinationURI", + DestinationProtocol: "amqp10", + DestinationAddress: "myDestQueue", + DestinationAddForwardHeaders: true, + AckMode: "on-confirm", + SrcDeleteAfter: "never", +} resp, err := rmqc.DeclareShovel("/", "a.shovel", shovelDetails) // => *http.Response, err diff --git a/rabbithole_test.go b/rabbithole_test.go index 245a438..02384bd 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -2309,6 +2309,63 @@ var _ = Describe("Rabbithole", func() { }) }) + Context("PUT /parameters/shovel/{vhost}/{name}", func() { + It("declares a shovel using AMQP 1.0 protocol", func() { + vh := "rabbit/hole" + sn := "temporary" + + ssu := "amqp://127.0.0.1/%2f" + sdu := "amqp://127.0.0.1/%2f" + + shovelDefinition := ShovelDefinition{ + SourceURI: ssu, + SourceAddress: "mySourceQueue", + SourceProtocol: "amqp10", + DestinationURI: sdu, + DestinationProtocol: "amqp10", + DestinationAddress: "myDestQueue", + DestinationAddForwardHeaders: true, + DestinationAddTimestampHeader: true, + AckMode: "on-confirm", + SourcePrefetchCount: 42, + SourceDeleteAfter: "never"} + + _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) + Ω(err).Should(BeNil()) + + awaitEventPropagation() + x, err := rmqc.GetShovel(vh, sn) + Ω(err).Should(BeNil()) + Ω(x.Name).Should(Equal(sn)) + Ω(x.Vhost).Should(Equal(vh)) + Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.SourceAddress).Should(Equal("mySourceQueue")) + Ω(x.Definition.SourceURI).Should(Equal(ssu)) + Ω(x.Definition.SourcePrefetchCount).Should(Equal(42)) + Ω(x.Definition.SourceProtocol).Should(Equal("amqp10")) + Ω(x.Definition.DestinationAddress).Should(Equal("myDestQueue")) + Ω(x.Definition.DestinationURI).Should(Equal(sdu)) + Ω(x.Definition.DestinationProtocol).Should(Equal("amqp10")) + Ω(x.Definition.DestinationAddForwardHeaders).Should(Equal(true)) + Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true)) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.SourceDeleteAfter).Should(Equal("never")) + + _, err = rmqc.DeleteShovel(vh, sn) + Ω(err).Should(BeNil()) + awaitEventPropagation() + + _, err = rmqc.DeleteQueue("/", "mySourceQueue") + Ω(err).Should(BeNil()) + _, err = rmqc.DeleteQueue("/", "myDestQueue") + Ω(err).Should(BeNil()) + + x, err = rmqc.GetShovel(vh, sn) + Ω(x).Should(BeNil()) + Ω(err).Should(Equal(ErrorResponse{404, "Object Not Found", "Not Found"})) + }) + }) + Context("PUT /parameters/shovel/{vhost}/{name}", func() { It("declares a shovel", func() { vh := "rabbit/hole" diff --git a/shovels.go b/shovels.go index 8da78ae..6b2a400 100644 --- a/shovels.go +++ b/shovels.go @@ -20,19 +20,30 @@ type ShovelInfo struct { // ShovelDefinition contains the details of the shovel configuration type ShovelDefinition struct { - SourceURI string `json:"src-uri"` - SourceExchange string `json:"src-exchange,omitempty"` - SourceExchangeKey string `json:"src-exchange-key,omitempty"` - SourceQueue string `json:"src-queue,omitempty"` - DestinationURI string `json:"dest-uri"` - DestinationExchange string `json:"dest-exchange,omitempty"` - DestinationExchangeKey string `json:"dest-exchange-key,omitempty"` - DestinationQueue string `json:"dest-queue,omitempty"` - PrefetchCount int `json:"prefetch-count,omitempty"` - ReconnectDelay int `json:"reconnect-delay,omitempty"` - AddForwardHeaders bool `json:"add-forward-headers"` - AckMode string `json:"ack-mode"` - DeleteAfter string `json:"delete-after"` + AckMode string `json:"ack-mode,omitempty"` + AddForwardHeaders bool `json:"add-forward-headers,omitempty"` + DeleteAfter string `json:"delete-after,omitempty"` + DestinationAddForwardHeaders bool `json:"dest-add-forward-headers,omitempty"` + DestinationAddTimestampHeader bool `json:"dest-add-timestamp-header,omitempty"` + DestinationAddress string `json:"dest-address,omitempty"` + DestinationApplicationProperties string `json:"dest-application-properties,omitempty"` + DestinationExchange string `json:"dest-exchange,omitempty"` + DestinationExchangeKey string `json:"dest-exchange-key,omitempty"` + DestinationProperties string `json:"dest-properties,omitempty"` + DestinationProtocol string `json:"dest-protocol,omitempty"` + DestinationPublishProperties string `json:"dest-publish-properties,omitempty"` + DestinationQueue string `json:"dest-queue,omitempty"` + DestinationURI string `json:"dest-uri"` + PrefetchCount int `json:"prefetch-count,omitempty"` + ReconnectDelay int `json:"reconnect-delay,omitempty"` + SourceAddress string `json:"src-address,omitempty"` + SourceDeleteAfter string `json:"src-delete-after,omitempty"` + SourceExchange string `json:"src-exchange,omitempty"` + SourceExchangeKey string `json:"src-exchange-key,omitempty"` + SourcePrefetchCount int `json:"src-prefetch-count,omitempty"` + SourceProtocol string `json:"src-protocol,omitempty"` + SourceQueue string `json:"src-queue,omitempty"` + SourceURI string `json:"src-uri"` } // ShovelDefinitionDTO provides a data transfer object