Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsafe creation of consumers #416

Closed
wookievx opened this issue Oct 13, 2020 · 2 comments
Closed

Unsafe creation of consumers #416

wookievx opened this issue Oct 13, 2020 · 2 comments

Comments

@wookievx
Copy link
Contributor

Issue

As in title creation of consumers in current version of the library is thread-pool unsafe, meaning working threads might get all stuck in IO when attempting to open multiple consumers in parallel, for example, consider how auto-ack consumer is created (this is were I observed the issue):

override def createConsumer[A](
      queueName: QueueName,
      channel: AMQPChannel,
      basicQos: BasicQos,
      autoAck: Boolean = false,
      noLocal: Boolean = false,
      exclusive: Boolean = false,
      consumerTag: ConsumerTag = ConsumerTag(""),
      args: Arguments = Map.empty
  )(implicit decoder: EnvelopeDecoder[F, A]): F[Stream[F, AmqpEnvelope[A]]] = {

    val setup = for {
      internalQ <- internalQueue.create
      internals = AMQPInternals[F](Some(internalQ))
      _         <- consume.basicQos(channel, basicQos)
      consumerTag <- consume.basicConsume(
                      channel,
                      queueName,
                      autoAck,
                      consumerTag,
                      noLocal,
                      exclusive,
                      args
                    )(internals)
    } yield (consumerTag, internalQ)

    Stream
      .bracket(setup) {
        case (tag, _) =>
          consume.basicCancel(channel, tag)
      }
      .flatMap {
        case (_, queue) =>
          queue.dequeue.rethrow
            .evalMap(env => decoder(env).map(a => env.copy(payload = a)))
      }
      .pure[F]
  }

Both basicQos, basicConsume and basicCancel will all block on IO, so if multiple streams are created it might lead to unexpected and dangerous behavior.

Fix

It should be relatively easy to fix this issue, for example in the mentioned snippet we should probably run the consumer creation code using blocker (or each IO call invocation should be using blocker, it might be less performant with more context-switching but probably cleaner so it is a trade-off):

case class WrapperConsumingProgram[F[_]: Effect: ContextShift] private (
    internalQueue: InternalQueue[F],
    consume: Consume[F],
    blocker: Blocker[F]
)  extends ConsumingProgram[F] {
 override def createConsumer[A](... )(implicit decoder: EnvelopeDecoder[F, A]): F[Stream[F, AmqpEnvelope[A]]] = {

    val setup = ...

    Stream
      .bracket(blocker.blockOn(setup)) {
        case (tag, _) =>
          blocker.blockOn(consume.basicCancel(channel, tag))
      }
      .flatMap {
        case (_, queue) =>
          queue.dequeue.rethrow
            .evalMap(env => decoder(env).map(a => env.copy(payload = a)))
      }
      .pure[F]
  } 
}

I need this issue fixed so I will be creating PR with required changes, reviewing them from your part would be most welcomed.

@wookievx
Copy link
Contributor Author

I crated PR, I am not sure how to write tests for this one, help with that would be greatly appreciated:
#417

@agustafson
Copy link
Collaborator

Are we happy to close this issue @wookievx now that the PR has been merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants