Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random blocking with provided confirmed publishing example #589

Open
programmador opened this issue Jan 9, 2024 · 0 comments
Open

Random blocking with provided confirmed publishing example #589

programmador opened this issue Jan 9, 2024 · 0 comments

Comments

@programmador
Copy link

The provided code "as is" never unblocks due to some strange issue. Maybe it's related to #179 though probably not.

So, this is the code which never unblocks when TIMES is anything except 1:

    connectionFactory.newConnection().use { connection ->
        connection.confirmChannel {
            declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
            publish {
                (1..TIMES)
                    .map { println(it); return@map }
                    .map { createMessage("message № $it") }
                    .map { async(Dispatchers.IO) { publishWithConfirm(it) } }
                    .awaitAll()
                    .forEach { println(it) }
            }
            println("after")
        }
    }

This code either blocks or not blocks randomly in my docker environment when TIMES is equal to 5:

    connectionFactory.newConnection().use { connection ->
        connection.confirmChannel {
            declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
            publish {
                async(Dispatchers.IO) { publishWithConfirm(createMessage("a message n1")) }.await()
                async(Dispatchers.IO) { publishWithConfirm(createMessage("a message n2")) }.await()
                (1..TIMES)
                    .map { println(it); return@map }
                    .map { createMessage("message № $it") }
                    .map { async(Dispatchers.IO) { publishWithConfirm(it) } }
                    .awaitAll()
                    .forEach { println(it) }
            }
            println("after")
        }
    }

As You can see, I send two messages synchronously, then five more asynchronously.
When TIMES equals to 4 it never blocks for me, when it's more than 5 - it always blocks.

In all of the cases messages are always published successfully even when TIMES is 1000. It seems like problems are with receiving confirms. Seems like receiving a confirm (which occurs in random time) breaks sending the next message.

Removing Dispatchers.IO argument makes it work as it's intended: send messages and then unblock. As I understand it makes code single-threaded regardless of being asynchronous.

BTW when are confirms awaited? After publish {} or after each publishWithConfirm(message)? Confirms are generally intended to be used between bunches of messages, not after each one. It's not prohibited but could be not so fast comparing to transactions.

Moreover both confirms and transactions are quite subtle things in AMQP in duplex-related scenarios. For example handling heartbeats while either a transaction is open or publisher confirm is being awaited leads to a connection failure.

Currently it's not clear whether the behaviour is actually a bug (though random result is most likely a bug) so the questions are:

  • Is it only me who noticed that this publisher code is never unblocked and thus never closes connection (You can see it in RabbitMQ log)? If yes - what am I doing wrong?
  • When are confirms actually awaited? One confirms per whole publish {} block or one confirm per message?
  • Is this issue really duplex related or maybe just confirm-timeout related? BTW how can I set this timeout?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant