Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing connection and/or channel hangs NotifyPublish is used #21

Closed
samyonr opened this issue Oct 7, 2021 · 13 comments
Closed

Closing connection and/or channel hangs NotifyPublish is used #21

samyonr opened this issue Oct 7, 2021 · 13 comments

Comments

@samyonr
Copy link

samyonr commented Oct 7, 2021

I'm using NotifyPublish to get confirms.
At the end of the run, I'm trying to close the connection and the associate channel, but it hangs indefinitely.

Here's a test to reproduce the issue (reproduces 80-90% of the time):


func TestCloseHandBug(t *testing.T) {
	ctx := context.Background()
	testQueue := "test"

	c, err := amqp091.Dial(fmt.Sprintf("amqp://%s:%s@%s%s", Config.Username, Config.Password, Config.Hostname, Config.VHost))
	if err != nil {
		t.Fatalf("Error connecting to server: %s", err)
	}
	t.Log("connected")

	closeChan := make(chan struct{})

	// close connection
	defer func() {
		close(closeChan)
		if c != nil {
			t.Log("disconnecting")
			//TODO: program hangs here - removing c.close and adding
			//TODO: <-time.After(time.Second * 120) doesn't help. closing channel hangs
			if err := c.Close(); err != nil {
				t.Logf("disconnect error: %s", err)
				return
			}
		}

		c = nil
		t.Log("disconnected")
	}()

	ch, err := c.Channel()
	if err != nil && err == amqp091.ErrClosed {
		t.Fatalf("couldn't open channel: %s", err)
	}

	err = ch.Qos(
		16,    // prefetch count
		0,     // prefetch size
		false, // global
	)
	if err != nil {
		t.Fatalf("couldn't configure qos: %s", err)
	}

	// handle confirms TODO: without this, disconnection works
	confirms := ch.NotifyPublish(make(chan amqp091.Confirmation))
	go func() {
		for {
			select {
			case <-confirms:
				// do something here in real scenario
				continue
			case <-closeChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	err = ch.Confirm(false)
	if err != nil {
		t.Fatalf("couldn't configure confirm: %s", err)
	}

	ch.ExchangeDeclare("amq.topic", "topic", true, false, false, false, nil)

	// handle close channel TODO: removing this func doesn't solve the issue
	go func() {
		for {
			select {
			case <-closeChan:
				t.Logf("closing channel. id=%d",1)
				//TODO: program hangs here as well
				if err := ch.Close(); err != nil {
					t.Logf("closing channel err. id=%d. err=%d",1, err)
					return
				}
				t.Logf("channel closed. id=%d",1)
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	_, err = ch.QueueDelete(testQueue, false, false, false)
	if err != nil {
		t.Fatalf("error deleting test queue: %s", err)
	}

	key := "subscribe.send"

	_, err = ch.QueueDeclare(testQueue, true, false, false, false, nil)
	if err != nil {
		t.Fatalf("error declaring queue: %s", err)
	}

	err = ch.QueueBind(testQueue, key, "amq.topic", false, nil)
	if err != nil {
		t.Fatalf("error binding queue: %s", err)
	}

	delivery, err := ch.Consume(testQueue, "", false, false, false, false, nil)
	if err != nil {
		t.Fatalf("error consuming: %s", err)
	}

	expected := "hello world!"

	t.Logf("sending. id=%d, data=%s",1, []byte(expected))

	m := amqp091.Publishing{
		ContentType:  "text/plain",
		Body:         []byte(expected),
		DeliveryMode: amqp091.Persistent,
	}

	err = ch.Publish("amq.topic", key, true, false, m)
	if err != nil {
		t.Fatalf("Error publishing: %s", err)
	}

	var actual string
	select {
	case message := <-delivery:
		message.Ack(false)
		actual = string(message.Body)
	case <-time.After(time.Second / 2):
		t.Fatalf("Timeout recieving message")
	}

	if actual != expected {
		t.Fatalf("Expected message %s, but got %s", expected, actual)
	}
}


Here is the result:
=== RUN TestCloseHandBug
provider_test.go:125: connected
provider_test.go:224: sending. id=1, data=hello world!
provider_test.go:133: disconnecting
provider_test.go:187: closing channel. id=1

  • One goroutine waits on l <- confirmation in func (c *confirms) confirm(confirmation Confirmation)
  • another goroutine waits on c.destructor.Do(func() { in func (c *Connection) shutdown(err *Error), originating in connection.close
  • and another goroutine waits in the same place: c.destructor.Do(func() { in func (c *Connection) shutdown(err *Error), originating in channel.close
  • another goroutine waits on c.m.Lock() in func (c *confirms) Close() error (originating in go c.shutdown(&Error{)

Notes:

  1. If I remove the "handle close channel" function, the issue still occurs.
  2. If I remove the c.close() part in the "close connection" section, waiting instead for a long time before closing the test, I will still never get to the "channel closed" part - it looks like closing the channel hangs indefinitely in this case as well.
  3. Sometimes it works fine (10-20% of the time):
    === RUN TestCloseHandBug
    provider_test.go:125: connected
    provider_test.go:225: sending. id=1, data=hello world!
    provider_test.go:133: disconnecting
    provider_test.go:187: closing channel. id=1
    provider_test.go:142: disconnected
    --- PASS: TestCloseHandBug (1.67s)
    PASS
@benmoss
Copy link
Contributor

benmoss commented Dec 15, 2021

Could this be a duplicate of #10?

@samyonr
Copy link
Author

samyonr commented Dec 16, 2021

Could this be a duplicate of #10?

Could be. Note that the above is related to closing connection/channel but the root cause might be the same.

@benmoss
Copy link
Contributor

benmoss commented Dec 17, 2021

It still reproduces on main but I am confused as to what the test is trying to exhibit. It doesn't reproduce anywhere near as frequently as you said it does on your machine for me, but I was able to still get a deadlock and this stack dump from it.

I think there's a race condition in the test between closing the channel and closing the connection, so the test itself is non-deterministic. It would be helpful if the test was a little more concise and clear in what the condition for the deadlock is.

@samyonr
Copy link
Author

samyonr commented Dec 19, 2021

The example can be modified to run simpler tests, in addition to the original one:
Variation 1: Remove the handle close channel function. The test still hangs.

	// handle close channel TODO: removing this func doesn't solve the issue
	/*
	go func() {
		for {
			select {
			case <-closeChan:
				t.Logf("closing channel. id=%d",1)
				//TODO: program hangs here as well
				if err := ch.Close(); err != nil {
					t.Logf("closing channel err. id=%d. err=%d",1, err)
					return
				}
				t.Logf("channel closed. id=%d",1)
				return
			case <-ctx.Done():
				return
			}
		}
	}()
	 */

Variation 2: In the "close connection" function, add <-time.After(time.Second * 120) but keep c.Close() worked fine for me most of the time today, but still hanged sometimes.

	// close connection
	defer func() {
		close(closeChan)
		if c != nil {
			t.Log("disconnecting")
			<-time.After(time.Second * 120) //doesn't help. closing channel hangs
			if err := c.Close(); err != nil {
				t.Logf("disconnect error: %s", err)
				return
			}
		}

		c = nil
		t.Log("disconnected")
	}()

@samyonr
Copy link
Author

samyonr commented Dec 19, 2021

Here, you can find dumps of the full test (dump0_full_test.log), the variation 1 (dump1_no_close_channel_func.log) and variation 2 (dump2_120_second_before_conn_close.log)

@DanielePalaia
Copy link
Contributor

Hi, can you confirm that the issue is still present? I tried several time to execute it to reproduce it but never been able both with the handle goroutine active or not.

Sometimes happens this panic which seems due to te test itself

panic: Log in goroutine after TestCloseHandBug has completed: closing channel err. id=1. err=&{504 %!d(string=channel/connection is not open) %!d(bool=false) %!d(bool=false)}

goroutine 5 [running]:
testing.(*common).logDepth(0xc000093ba0, {0xc00009e150, 0x70}, 0x3)
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:779 +0x4c9
testing.(*common).log(...)
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:761
testing.(*common).Logf(0xc000182480, {0x12ab98c, 0x0}, {0xc00020efb0, 0x0, 0x1})
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:807 +0x4c
github.com/rabbitmq/amqp091-go.TestCloseHandBug.func3()
        /Users/dpalaia/projects/amqp091-go/integration_test.go:1916 +0x166
created by github.com/rabbitmq/amqp091-go.TestCloseHandBug
        /Users/dpalaia/projects/amqp091-go/integration_test.go:1909 +0x43a
exit status 2
FAIL    github.com/rabbitmq/amqp091-go  0.234s

@samyonr
Copy link
Author

samyonr commented Mar 24, 2022

Hi, can you confirm that the issue is still present? I tried several time to execute it to reproduce it but never been able both with the handle goroutine active or not.

Sometimes happens this panic which seems due to te test itself

panic: Log in goroutine after TestCloseHandBug has completed: closing channel err. id=1. err=&{504 %!d(string=channel/connection is not open) %!d(bool=false) %!d(bool=false)}

goroutine 5 [running]:
testing.(*common).logDepth(0xc000093ba0, {0xc00009e150, 0x70}, 0x3)
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:779 +0x4c9
testing.(*common).log(...)
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:761
testing.(*common).Logf(0xc000182480, {0x12ab98c, 0x0}, {0xc00020efb0, 0x0, 0x1})
        /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:807 +0x4c
github.com/rabbitmq/amqp091-go.TestCloseHandBug.func3()
        /Users/dpalaia/projects/amqp091-go/integration_test.go:1916 +0x166
created by github.com/rabbitmq/amqp091-go.TestCloseHandBug
        /Users/dpalaia/projects/amqp091-go/integration_test.go:1909 +0x43a
exit status 2
FAIL    github.com/rabbitmq/amqp091-go  0.234s

Yes, the exact same test halts most of the time on my machine. Is there any additional information that I can provide to help with it?

@samyonr
Copy link
Author

samyonr commented Mar 31, 2022

Do you think this issue is also related to #59?

@DanielePalaia
Copy link
Contributor

Hi @samyonr I don't think so, the issue in #59 happened just when there was a non-graceful termination of the connection (like a network issue) on that case the library was sending synchronously the error message back to the caller through the go-channel returned by the notifiedClose which in that case wasn't consuming causing the deadlock. In your case you are terminating gracefully the connection and you are not notified by the notifyClose.
Actually I wasn't able to reproduce this issue in my environment with that example.
It maybe something similar happening elsewhere but we need to investigate more in details about it.

@DanielePalaia
Copy link
Contributor

DanielePalaia commented Apr 7, 2022

Hi @samyonr, I think I was able to reproduce this deadlock, even if for me wasn't really trivial, I had to look at the code and force it to happen.

Apparently as you said the situation is indeed similar to #59 and it is due to the synchronous nature of the library when managing channels.

When one goroutine is closing the rabbitmq channel the shutdown function of the rabbitmq channel needs to acquire the confirm mutex when it closes its confirm structure:

ch.confirms.Close()

https://github.com/rabbitmq/amqp091-go/blob/main/confirms.go#L104

But if at the same time we are sending the confirmation in the channel and there is no-gorutine consuming the channel the confirm gets blocked here while acquiring the lock and preventing the Shutdown to continue:
https://github.com/rabbitmq/amqp091-go/blob/main/confirms.go#L104

I think this is due because here:

	go func() {
		for {
			select {
			case <-confirms:
				// do something here in real scenario
				continue
			case <-closeChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

Can happen that the closeChan get notified and return the function preventing the confirms to continue to be consumed.

Could you try something similar to the example provided for @59 and see what happens?

	go func() {
		for {
			select {
			case _, ok := <-confirms:
				if ok {
					t.Logf("confirms received")
				}
				// do something here in real scenario
				if !ok {
					t.Logf(" confirms is closed ")
					confirms = nil
				}
				continue
			case <-closeChan:
				t.Logf("channel closed")
				closeChan = nil
				continue
			
			if confirms == nil && closeChan == nil {
				t.Logf("terminating loop")
				return
			}
		}
	}()

@samyonr
Copy link
Author

samyonr commented Apr 8, 2022

Hi @DanielePalaia, thanks for investigating it.

I've tried your example and it indeed fixes the issue. Also, the explanation makes much sense. Thanks.

For me, the main conclusion here is that when dealing with [go] channels, better to keep listening to them while they are open (i.e. don't stop listening due to external events, like connection close. It's relevant when using this library and as a general approach). This is especially true for unbuffered channels, but relevant to buffered channels as well.

@DanielePalaia
Copy link
Contributor

DanielePalaia commented Apr 9, 2022

@samyonr nice to hear that it solved the issue. So next week we will update the doc, mentioning it as a best practice similarly to what we did for #59 and we will close this issue. Thanks to have raised it and helped with the investigation.

@DanielePalaia
Copy link
Contributor

Hi, we extended the doc to include this case in #68. This will be updated on the next release. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants