Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NotifyPublish ReconnectCount - how is it intended to be used? #161

Closed
jamoflaw opened this issue May 9, 2024 · 2 comments
Closed

NotifyPublish ReconnectCount - how is it intended to be used? #161

jamoflaw opened this issue May 9, 2024 · 2 comments

Comments

@jamoflaw
Copy link

jamoflaw commented May 9, 2024

Similar to #66 I am trying to enable confirms using the NotifyPublish handler - attempting to manually keep track of the DeliveryTag being set by incrementing each time I successfully publish

However on reconnect we don't seem to have any mechanism for the publisher to know that a reconnect happens so this manual tracking of DeliveryTags falls over when the channels DeliveryTag silently resets to 0

For example:

	// Publisher func
	go func() {

		publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
			publisher_confirms <- p
		})

		// Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
		deliveryCounter := messagestore.DeliveryTag(1)
		for msg := range messages {

			err := publisher.PublishWithContext(
				context.Background(),
				msg.Body,
				[]string{msg.RoutingKey},

				rabbitmq.WithPublishOptionsExchange("ingress"),
				rabbitmq.WithPublishOptionsContentType(msg.ContentType),
				rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
				rabbitmq.WithPublishOptionsAppID(msg.AppId),
				rabbitmq.WithPublishOptionsPersistentDelivery,
			)
			if err != nil {
				logger.Errorf("error publishing message, %s", err)
				continue
			}

			store.AppendMessage(deliveryCounter, msg)
			deliveryCounter++

		}
	}()

	// Confirms func
	go func() {

		for {
			select {
			case confirm := <-publisher_confirms:
				// Try to confirm
				if confirm.Ack {
					/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
					// Whatever I do here  you can't work out the delivery counter since the reconnection count isn't related to what the delivery counter was _before_
					// it was reset
					/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
					store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag + uint64(confirm.ReconnectionCount)))
				} else {
					store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag+uint64(confirm.ReconnectionCount)), true)
				}
			}
		}
	}()

Am I missing something in how it is supposed to be used? Is there any internal delivery tag which could be returned from the publisher object instead to track it?

@jamoflaw
Copy link
Author

jamoflaw commented May 10, 2024

A potentially naïve fix (assuming I'm not missing how its supposed to be used) would be to expose the GetNextPublishSeqNo() method from the underlying channel

in channel_manager.go

func (chanManager *ChannelManager) GetNextPublishSeqNo() uint64 {
	return chanManager.channel.GetNextPublishSeqNo()
}

in publish.go

func (publisher *Publisher) GetNextPublishSeqNo() uint64 {
	publisher.disablePublishDueToBlockedMux.RLock()
	defer publisher.disablePublishDueToBlockedMux.RUnlock()
	return publisher.chanManager.GetNextPublishSeqNo()
}

however the thread safety of the lib is something I'm not 100% certain about - grabbing the next delivery tag sequence + publishing in two steps opens the possibility of a race condition if the publisher were to be shared between two go routines (is that even something which is possible / supported?)

I've tested this on a simple publisher / consumer app and that now works with a app where the producer is contained within a single go routine (showing changes from the original code)

	// Publisher func
	go func() {

		publisher.NotifyPublish(func(p rabbitmq.Confirmation) {
			publisher_confirms <- p
		})
		publisher.NotifyReturn(func(r rabbitmq.Return) {
			publisher_returns <- &r
		})

		// Initialise a delivery counter to keep track of the DeliveryTags assigned to the messages
		for msg := range messages {

			deliveryTag := messagestore.DeliveryTag(publisher.GetNextPublishSeqNo())
			err := publisher.PublishWithContext(
				context.Background(),
				msg.Body,
				[]string{msg.RoutingKey},

				rabbitmq.WithPublishOptionsExchange("ingress"),
				rabbitmq.WithPublishOptionsContentType(msg.ContentType),
				rabbitmq.WithPublishOptionsContentEncoding(msg.ContentEncoding),
				rabbitmq.WithPublishOptionsAppID(msg.AppId),
				rabbitmq.WithPublishOptionsPersistentDelivery,
			)
			if err != nil {
				logger.Errorf("error publishing message, %s", err)
				continue
			}

			store.AppendMessage(deliveryTag, msg)
		}
	}()

	// Confirms func
	go func() {

		for {
			select {
			case confirm := <-publisher_confirms:
				// Try to confirm
				if confirm.Ack {
					logger.Infof("Attempting to ack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
					store.Ack(messagestore.DeliveryTag(confirm.DeliveryTag))
				} else {
					logger.Infof("Attempting to nack DeliveryTag: %d and ReconnectionCount: %d", confirm.DeliveryTag, confirm.ReconnectionCount)
					store.Nack(messagestore.DeliveryTag(confirm.DeliveryTag), true)
				}

			case <-publisher_returns:

			}
		}
	}()

@wagslane
Copy link
Owner

The reconnect count should let you know when a reconnect happens, does that not help you know when the counter is reset?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants