Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain() causes an infinite loop in Next() if it's waiting for messages #1524

Closed
mdawar opened this issue Jan 14, 2024 · 1 comment
Closed
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@mdawar
Copy link
Contributor

mdawar commented Jan 14, 2024

Observed behavior

Calling Drain() will cause an infinite loop in pullSubscription.Next() if it's already blocking and waiting for messages.

The code causing the issue:

// Code in pullSubscription.Next()
for {
	s.checkPending()
	select {
	case <-s.done:
		drainMode := atomic.LoadUint32(&s.draining) == 1
		if drainMode {
			// In drain mode this causes an infinite loop.
			// The Drain() method closes the s.done channel.
			continue
		}
		return nil, ErrMsgIteratorClosed
	case msg, ok := <-s.msgs:
		if !ok {
			// if msgs channel is closed, it means that subscription was either drained or stopped
			delete(s.consumer.subscriptions, s.id)
			atomic.CompareAndSwapUint32(&s.draining, 1, 0)
			return nil, ErrMsgIteratorClosed
		}
		// ...
	}
}

The issue here is that when Drain() is called it closes the s.done channel, so this code will cause an infinite loop.

This issue is somehow related to #1343.
The check for the s.done channel was added in #1344.

Possible solution

Closing the s.done channel is needed to exit from pullMessages() and call cleanup() to unsubscribe.

Changing the way we exit in Next() might be a good solution, we can remove the check for s.done being closed and rely on s.msgs being closed to exit the loop (Closed in the subscription's close handler).

But this might cause a deadlock with the current code.

Next() holds the lock and when Drain() or Stop() are called, they close the s.done channel which is used to stop pulling messages and call cleanup().

The cleanup() function will block and wait for the lock so it won't be able to unsubscribe which is required to close the s.msgs channel, so this leads to a deadlock.

The current drain test does not have this issue because Next() does not block and wait for messages, it's called once for every published message (The lock is held and released multiple times so the cleanup method has a chance to run).

Expected behavior

Calling Drain() should make Next() return with an error ErrMsgIteratorClosed.

Server and client version

nats-server: v2.10.9
nats.go: v1.32.0

Host environment

OS: Linux
Arch: amd64
Go: v1.21.6

Steps to reproduce

  1. Create a messages context:
iter, err := cons.Messages()
if err != nil {
	log.Fatal(err)
}
  1. Call Next() in a goroutine so it blocks waiting for messages:
var wg sync.WaitGroup

wg.Add(1)
go func() {
	defer wg.Done()

	for {
		msg, err := iter.Next()

		if err != nil {
			if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
				break
			}

			fmt.Println("Unexpected error: ", err)
			continue
		}

		msg.Ack()
	}
}()
  1. Call Drain() to unsubscribe:
iter.Drain()

Minimal Reproducible Example

git clone git@github.com:mdawar/jetstream-api-demos.git
cd jetstream-api-demos/drain
go test -race -v
@mdawar mdawar added the defect Suspected defect such as a bug or regression label Jan 14, 2024
@Jarema
Copy link
Member

Jarema commented Jan 14, 2024

Thank you for this extensive issue report!
Those are very valuable to us and allow addressing issues quicker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants