diff --git a/_examples/producer/producer.go b/_examples/producer/producer.go index c92c6fa..2d91cc2 100644 --- a/_examples/producer/producer.go +++ b/_examples/producer/producer.go @@ -4,7 +4,6 @@ package main import ( - "context" "flag" amqp "github.com/rabbitmq/amqp091-go" "log" @@ -43,7 +42,7 @@ func main() { startConfirmHandler(publishOkCh, confirmsCh, confirmsDoneCh, exitCh) - publish(context.Background(), publishOkCh, confirmsCh, confirmsDoneCh, exitCh) + publish(publishOkCh, confirmsCh, confirmsDoneCh, exitCh) } func setupCloseHandler(exitCh chan struct{}) { @@ -56,7 +55,7 @@ func setupCloseHandler(exitCh chan struct{}) { }() } -func publish(ctx context.Context, publishOkCh <-chan struct{}, confirmsCh chan<- *amqp.DeferredConfirmation, confirmsDoneCh <-chan struct{}, exitCh chan struct{}) { +func publish(publishOkCh <-chan struct{}, confirmsCh chan<- *amqp.DeferredConfirmation, confirmsDoneCh <-chan struct{}, exitCh chan struct{}) { config := amqp.Config{ Vhost: "/", Properties: amqp.NewConnectionProperties(), @@ -140,8 +139,7 @@ func publish(ctx context.Context, publishOkCh <-chan struct{}, confirmsCh chan<- } Log.Printf("producer: publishing %dB body (%q)", len(*body), *body) - dConfirmation, err := channel.PublishWithDeferredConfirmWithContext( - ctx, + dConfirmation, err := channel.PublishWithDeferredConfirm( *exchange, *routingKey, true, diff --git a/_examples/pubsub/pubsub.go b/_examples/pubsub/pubsub.go index 1c8ebc7..1d67823 100644 --- a/_examples/pubsub/pubsub.go +++ b/_examples/pubsub/pubsub.go @@ -12,12 +12,10 @@ import ( "crypto/sha1" "flag" "fmt" + amqp "github.com/rabbitmq/amqp091-go" "io" "log" "os" - "time" - - amqp "github.com/rabbitmq/amqp091-go" ) var url = flag.String("url", "amqp:///", "AMQP url for both the publisher and subscriber") @@ -88,9 +86,6 @@ func redial(ctx context.Context, url string) chan chan session { // publish publishes messages to a reconnecting session to a fanout exchange. // It receives from the application specific source of messages. func publish(sessions chan chan session, messages <-chan message) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - pending := make(chan message, 1) for session := range sessions { @@ -128,7 +123,7 @@ func publish(sessions chan chan session, messages <-chan message) { case body = <-pending: routingKey := "ignored for fanout exchanges, application dependent for other exchanges" - err := pub.PublishWithContext(ctx, exchange, routingKey, false, false, amqp.Publishing{ + err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{ Body: body, }) // Retry failed delivery on the next session diff --git a/channel.go b/channel.go index 96ebd0b..3dfd7fa 100644 --- a/channel.go +++ b/channel.go @@ -7,7 +7,6 @@ package amqp091 import ( "context" - "errors" "reflect" "sync" "sync/atomic" @@ -1484,17 +1483,17 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. - -Deprecated: Use PublishWithContext instead. */ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error { - _, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) + _, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) return err } /* PublishWithContext sends a Publishing from the client to an exchange on the server. +NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. + When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is because every declared queue gets an implicit route to the default exchange. @@ -1524,34 +1523,17 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. */ -func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { - _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) - return err +func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + return ch.Publish(exchange, key, mandatory, immediate, msg) } /* -PublishWithDeferredConfirm behaves identically to Publish but additionally returns a -DeferredConfirmation, allowing the caller to wait on the publisher confirmation -for this message. If the channel has not been put into confirm mode, -the DeferredConfirmation will be nil. - -Deprecated: Use PublishWithDeferredConfirmWithContext instead. +PublishWithDeferredConfirm behaves identically to Publish, but additionally +returns a DeferredConfirmation, allowing the caller to wait on the publisher +confirmation for this message. If the channel has not been put into confirm +mode, the DeferredConfirmation will be nil. */ func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) -} - -/* -PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a -DeferredConfirmation, allowing the caller to wait on the publisher confirmation -for this message. If the channel has not been put into confirm mode, -the DeferredConfirmation will be nil. -*/ -func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - if ctx == nil { - return nil, errors.New("amqp091-go: nil Context") - } - if err := msg.Headers.Validate(); err != nil { return nil, err } @@ -1595,6 +1577,19 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex return dc, nil } +/* +PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. + +NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed +to this function is not honoured. +*/ +func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +} + /* Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be diff --git a/doc.go b/doc.go index 8cb0b64..461173f 100644 --- a/doc.go +++ b/doc.go @@ -95,12 +95,11 @@ prior to calling [Channel.PublishWithContext] or [Channel.Consume]. When Dial encounters an amqps:// scheme, it will use the zero value of a tls.Config. This will only perform server certificate and host verification. -Use DialTLS when you wish to provide a client certificate (recommended), -include a private certificate authority's certificate in the cert chain for -server validity, or run insecure by not verifying the server certificate dial -your own connection. DialTLS will use the provided tls.Config when it -encounters an amqps:// scheme and will dial a plain connection when it -encounters an amqp:// scheme. +Use DialTLS when you wish to provide a client certificate (recommended), include +a private certificate authority's certificate in the cert chain for server +validity, or run insecure by not verifying the server certificate. DialTLS will +use the provided tls.Config when it encounters an amqps:// scheme and will dial +a plain connection when it encounters an amqp:// scheme. SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html @@ -110,17 +109,18 @@ In order to be notified when a connection or channel gets closed, both structures offer the possibility to register channels using [Channel.NotifyClose] and [Connection.NotifyClose] functions: - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error)) + notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) No errors will be sent in case of a graceful connection close. In case of a non-graceful closure due to e.g. network issue, or forced connection closure from the Management UI, the error will be notified synchronously by the library. -The error is sent synchronously to the channel, so that the flow will wait until -the receiver consumes from the channel. To avoid deadlocks in the library, it is -necessary to consume from the channels. This could be done inside a -different goroutine with a select listening on the two channels inside a for -loop like: +The library sends to notification channels just once. After sending a +notification to all channels, the library closes all registered notification +channels. After receiving a notification, the application should create and +register a new channel. To avoid deadlocks in the library, it is necessary to +consume from the channels. This could be done inside a different goroutine with +a select listening on the two channels inside a for loop like: go func() { for notifyConnClose != nil || notifyChanClose != nil { @@ -141,13 +141,8 @@ loop like: } }() -Another approach is to use buffered channels: - - notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) - -The library sends to notification channels just once. After sending a notification -to all channels, the library closes all registered notification channels. After -receiving a notification, the application should create and register a new channel. +It is strongly recommended to use buffered channels to avoid deadlocks inside +the library. # Best practises for NotifyPublish notifications: