Skip to content

Commit

Permalink
Merge pull request wagslane#35 from wagslane/lw_notify
Browse files Browse the repository at this point in the history
notify return safety
  • Loading branch information
wagslane authored Oct 1, 2021
2 parents d351101 + 7136c3d commit bedf1db
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
3 changes: 2 additions & 1 deletion examples/logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (c *customLogger) Printf(fmt string, args ...interface{}) {
func main() {
mylogger := &customLogger{}

publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
Expand All @@ -37,6 +37,7 @@ func main() {
log.Fatal(err)
}

returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
Expand Down
3 changes: 2 additions & 1 deletion examples/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func main() {
publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
Expand All @@ -27,6 +27,7 @@ func main() {
log.Fatal(err)
}

returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
Expand Down
45 changes: 27 additions & 18 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) {
options := &PublisherOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
Expand All @@ -145,26 +145,39 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher

chManager, err := newChannelManager(url, config, options.Logger)
if err != nil {
return Publisher{}, nil, err
return Publisher{}, err
}

publisher := Publisher{
chManager: chManager,
notifyReturnChan: make(chan Return, 1),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
notifyReturnChan: nil,
}

go publisher.startNotifyFlowHandler()

// restart notifiers when cancel/close is triggered
go func() {
publisher.startNotifyHandlers()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.startNotifyHandlers()
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
}
}()

return publisher, publisher.notifyReturnChan, nil
return publisher, nil
}

// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (publisher *Publisher) NotifyReturn() <-chan Return {
publisher.notifyReturnChan = make(chan Return)
go publisher.startNotifyReturnHandler()
return publisher.notifyReturnChan
}

// Publish publishes the provided data to the given routing keys over the connection
Expand Down Expand Up @@ -217,19 +230,8 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close()
}

func (publisher *Publisher) startNotifyHandlers() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
publisher.notifyReturnChan <- Return{ret}
}
}()

func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}

func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
Expand All @@ -248,3 +250,10 @@ func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Unlock()
}
}

func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
publisher.notifyReturnChan <- Return{ret}
}
}

0 comments on commit bedf1db

Please sign in to comment.