Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the 'basicCancel' method to the AMQPClient #134

Merged
merged 9 commits into from
Nov 25, 2018

Conversation

iRevive
Copy link
Contributor

@iRevive iRevive commented Nov 21, 2018

A consumer can be canceled even if the connection is still open. So, I have added this missing method.

@codecov-io
Copy link

codecov-io commented Nov 21, 2018

Codecov Report

Merging #134 into master will increase coverage by 0.04%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #134      +/-   ##
==========================================
+ Coverage   97.43%   97.47%   +0.04%     
==========================================
  Files          13       13              
  Lines         117      119       +2     
==========================================
+ Hits          114      116       +2     
  Misses          3        3
Impacted Files Coverage Δ
...main/scala/com/github/gvolpe/fs2rabbit/model.scala 100% <ø> (ø) ⬆️
...ub/gvolpe/fs2rabbit/program/ConsumingProgram.scala 100% <100%> (ø) ⬆️
...ithub/gvolpe/fs2rabbit/interpreter/Fs2Rabbit.scala 90.32% <100%> (+0.32%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bcf49ac...7cc18d3. Read the comment docs.

@iRevive
Copy link
Contributor Author

iRevive commented Nov 22, 2018

@gvolpe the 'basicConsume' method can return G[_] as well. I've added resource management in a scope of this change.

Copy link
Member

@gvolpe gvolpe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good so far!

By introducing Stream.bracket(consumerF)(tag => AMQP.basicCancel(channel, tag)) I wonder whether a consumer will re-connect in case a queue gets deleted and by the mechanism of the ResilientStream it gets re-declared.

Can you verify that these statements hold true?

  • Run the IOAckerConsumer example.
  • Go to the RabbitMQ console and delete testQ.
  • You see the following output:
Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))

11:06:10.790 [scala-execution-context-global-20] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Releasing connection: amqp://guest@127.0.0.1:5672/ previously acquired.
11:06:10.806 [scala-execution-context-global-20] ERROR c.g.gvolpe.fs2rabbit.effects.Log$ - Queue might have been DELETED! amq.ctag-EJLX41gM42B3bKMJFJGlbg
java.lang.Exception: Queue might have been DELETED! amq.ctag-EJLX41gM42B3bKMJFJGlbg
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2(AMQPClientStream.scala:45) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2$adapted(AMQPClientStream.scala:43) ~[classes/:na]
	at scala.Option.fold(Option.scala:158) ~[scala-library-2.12.7.jar:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.handleCancel(AMQPClientStream.scala:43) ~[classes/:na]
	at com.rabbitmq.client.impl.ConsumerDispatcher$3.run(ConsumerDispatcher.java:115) ~[amqp-client-5.5.0.jar:5.5.0]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[amqp-client-5.5.0.jar:5.5.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
11:06:10.806 [scala-execution-context-global-20] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Restarting in 5 seconds...


Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))

@iRevive
Copy link
Contributor Author

iRevive commented Nov 24, 2018

My output:

Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))
10:14:12.759 [scala-execution-context-global-18] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Releasing connection: amqp://admin@127.0.0.1:53125/kredito previously acquired.
10:14:12.783 [scala-execution-context-global-18] ERROR c.g.gvolpe.fs2rabbit.effects.Log$ - Multiple exceptions were thrown (2), first java.lang.Exception: Queue might have been DELETED! amq.ctag-MlqS2629f7lm9BHCJhw1Ag
fs2.CompositeFailure: Multiple exceptions were thrown (2), first java.lang.Exception: Queue might have been DELETED! amq.ctag-MlqS2629f7lm9BHCJhw1Ag
	at fs2.CompositeFailure$.apply(CompositeFailure.scala:21) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.Algebra$.$anonfun$scope0$3(Algebra.scala:146) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.FreeC.$anonfun$transformWith$1(FreeC.scala:45) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.Algebra$.$anonfun$compileLoop$21(Algebra.scala:342) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:137) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.run(IORunLoop.scala:347) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) ~[scala-library-2.12.7.jar:na]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:133) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:120) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:155) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:155) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$1(Deferred.scala:167) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$1$adapted(Deferred.scala:166) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.collection.immutable.List.foreach(List.scala:388) ~[scala-library-2.12.7.jar:na]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.notifyReaders$1(Deferred.scala:166) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.loop$1(Deferred.scala:175) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$2(Deferred.scala:179) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:85) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:357) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:303) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_111]
Caused by: java.lang.Exception: Queue might have been DELETED! amq.ctag-MlqS2629f7lm9BHCJhw1Ag
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2(AMQPClientStream.scala:45) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2$adapted(AMQPClientStream.scala:43) ~[classes/:na]
	at scala.Option.fold(Option.scala:158) ~[scala-library-2.12.7.jar:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.handleCancel(AMQPClientStream.scala:43) ~[classes/:na]
	at com.rabbitmq.client.impl.ConsumerDispatcher$3.run(ConsumerDispatcher.java:115) ~[amqp-client-5.5.0.jar:5.5.0]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[amqp-client-5.5.0.jar:5.5.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_111]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_111]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_111]
10:14:12.783 [scala-execution-context-global-18] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Restarting in 5 seconds...
Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))

The only difference that there are two exceptions:

  1. java.lang.Exception: Queue might have been DELETED! amq.ctag-n-LXJs-XhMbwajoy5BClFQ;
  2. java.io.IOException: Unknown consumerTag - caused by 'basicCancel';

But the queue was recreated and the message was published and consumed successfully.

@gvolpe
Copy link
Member

gvolpe commented Nov 24, 2018

Great, I think that's the expected behavior. handleCancel gets called twice from the server: one because the queue was deleted and another time because of receiving the basicCancel command I believe.

Once you sync with master we can get it merged, thanks!

…re/basic-cancel

# Conflicts:
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/algebra/AckConsuming.scala
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/AMQPClientStream.scala
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/program/ConsumerProgram.scala
# Conflicts:
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/algebra/AMQPClient.scala
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/interpreter/AMQPClientStream.scala
#	core/src/main/scala/com/github/gvolpe/fs2rabbit/program/ConsumingProgram.scala
#	core/src/test/scala/com/github/gvolpe/fs2rabbit/interpreter/AMQPClientInMemory.scala
#	core/src/test/scala/com/github/gvolpe/fs2rabbit/interpreter/Fs2RabbitSpec.scala
@iRevive
Copy link
Contributor Author

iRevive commented Nov 25, 2018

Hi @gvolpe, I solved all conflicts.
Also, I checked the output of IOAckerConsumer and it the same as was before merging.

Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))
10:06:58.768 [scala-execution-context-global-19] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Releasing connection: amqp://admin@127.0.0.1:53125/kredito previously acquired.
10:06:58.788 [scala-execution-context-global-19] ERROR c.g.gvolpe.fs2rabbit.effects.Log$ - Multiple exceptions were thrown (2), first java.lang.Exception: Queue might have been DELETED! amq.ctag-ob8vVAhZSE492Q99Jg2-rQ
fs2.CompositeFailure: Multiple exceptions were thrown (2), first java.lang.Exception: Queue might have been DELETED! amq.ctag-ob8vVAhZSE492Q99Jg2-rQ
	at fs2.CompositeFailure$.apply(CompositeFailure.scala:21) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.Algebra$.$anonfun$scope0$3(Algebra.scala:146) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.FreeC.$anonfun$transformWith$1(FreeC.scala:45) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at fs2.internal.Algebra$.$anonfun$compileLoop$21(Algebra.scala:342) ~[fs2-core_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:137) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.run(IORunLoop.scala:347) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) ~[scala-library-2.12.7.jar:na]
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:133) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:120) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:155) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:155) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$1(Deferred.scala:167) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$1$adapted(Deferred.scala:166) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.collection.immutable.List.foreach(List.scala:388) ~[scala-library-2.12.7.jar:na]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.notifyReaders$1(Deferred.scala:166) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.loop$1(Deferred.scala:175) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$complete$2(Deferred.scala:179) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library-2.12.7.jar:na]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:85) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:357) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:303) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_111]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_111]
Caused by: java.lang.Exception: Queue might have been DELETED! amq.ctag-ob8vVAhZSE492Q99Jg2-rQ
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2(AMQPClientStream.scala:46) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.$anonfun$handleCancel$2$adapted(AMQPClientStream.scala:44) ~[classes/:na]
	at scala.Option.fold(Option.scala:158) ~[scala-library-2.12.7.jar:na]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream$$anon$1.handleCancel(AMQPClientStream.scala:44) ~[classes/:na]
	at com.rabbitmq.client.impl.ConsumerDispatcher$3.run(ConsumerDispatcher.java:115) ~[amqp-client-5.5.0.jar:5.5.0]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[amqp-client-5.5.0.jar:5.5.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_111]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_111]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_111]
10:06:58.788 [scala-execution-context-global-19] INFO  c.g.gvolpe.fs2rabbit.effects.Log$ - Restarting in 5 seconds...
Consumed: AmqpEnvelope(DeliveryTag(1),Hey!,AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map(app -> StringVal(fs2RabbitDemo), demoId -> LongVal(123))))
Consumed: AmqpEnvelope(DeliveryTag(2),{"id":1,"name":"Sherlock","address":{"number":212,"streetName":"Baker St"}},AmqpProperties(None,None,None,None,None,None,None,None,None,None,Map()))

@gvolpe gvolpe merged commit e374a93 into profunktor:master Nov 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants