Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AlreadyClosedException during message sequences in 2.0 #226

Closed
videege opened this issue May 19, 2017 · 8 comments
Closed

AlreadyClosedException during message sequences in 2.0 #226

videege opened this issue May 19, 2017 · 8 comments

Comments

@videege
Copy link
Contributor

videege commented May 19, 2017

I was experimenting with 2.0 (beta 6) and I noticed that I encounter this error when handling the completion of a message sequence.

My setup is

  • ASP.NET app that launches a message sequence (simple HelloWorld / HelloWorldResponse)
  • Console app which subscribes to HelloWorld and, in the handler, publishes a HelloWorldResponse.

Everything works fine but in the debug output for the ASP.NET app I see logs like this:

MessageSequence:Information: Initializing Message Sequence that starts with HelloWorld.
ConsumeConfigurationMiddleware:Information: Configuration action for 'helloworldresponse' found.
ExecutionIdRoutingMiddleware:Information: Routing key updated with GlobalExecutionId: helloworldresponse.8998c39c-c797-4552-869e-4f9c52e7a0dd.#.
TopologyProvider:Information: Declaring queue 'helloworldresponse_8998c39c-c797-4552-869e-4f9c52e7a0dd'.
TopologyProvider:Information: Binding queue 'helloworldresponse_8998c39c-c797-4552-869e-4f9c52e7a0dd' to exchange 'newmessaging.messages' with routing key 'helloworldresponse.8998c39c-c797-4552-869e-4f9c52e7a0dd.#'
ConsumerFactory:Information: Preparing to consume message from queue 'helloworldresponse_8998c39c-c797-4552-869e-4f9c52e7a0dd'.
AppendGlobalExecutionIdMiddleware:Information: GlobalExecutionId '8998c39c-c797-4552-869e-4f9c52e7a0dd' was allready found in PipeContext.
ExecutionIdRoutingMiddleware:Information: Routing key updated with GlobalExecutionId: helloworld.8998c39c-c797-4552-869e-4f9c52e7a0dd.
PublishAcknowledgeMiddleware:Information: Setting up publish acknowledgement for 2 with timeout 0:00:01
BasicPublishMiddleware:Information: Performing basic publish with routing key helloworld.8998c39c-c797-4552-869e-4f9c52e7a0dd on exchange newmessaging.messages.
MessageSequence:Information: Sequence '8998c39c-c797-4552-869e-4f9c52e7a0dd' completed with message 'HelloWorldResponse'.
Microsoft.AspNetCore.Mvc.ViewFeatures.Internal.ViewResultExecutor:Information: Executing ViewResult, running view at path /Views/Home/About.cshtml.
Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker:Information: Executed action NewMessaging.Web.Controllers.HomeController.About (NewMessaging.Web) in 147.5554ms
Application Insights Telemetry (unconfigured): {"name":"Microsoft.ApplicationInsights.Dev.Request","time":"2017-05-19T20:04:59.4676651Z","tags":{"ai.user.userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36","ai.device.roleInstance":"S1005491","ai.operation.id":"ljyer8JtfPs=","ai.internal.sdkVersion":"aspnet5c:1.0.0","ai.operation.name":"GET Home/About"},"data":{"baseType":"RequestData","baseData":{"ver":2,"id":"ljyer8JtfPs=","name":"GET Home/About","startTime":"2017-05-19T20:04:59.4676651+00:00","duration":"00:00:00.1531647","success":true,"responseCode":"200","url":"http://localhost:52556/Home/About","httpMethod":"GET","properties":{"DeveloperMode":"true"}}}}
Microsoft.AspNetCore.Hosting.Internal.WebHost:Information: Request finished in 163.8095ms 200 text/html; charset=utf-8
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in RabbitMQ.Client.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
The thread 0x2744 has exited with code 0 (0x0).
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
ExceptionHandlingMiddleware:Error: Exception thrown. Will be handled by Exception Handler
SubscriptionExceptionMiddleware:Error: Unhandled exception thrown when consuming message
TopologyProvider:Information: Declaring exchange 'default_error_exchange'.
Exception thrown: 'RabbitMQ.Client.Exceptions.OperationInterruptedException' in RabbitMQ.Client.dll
TopologyProvider:Error: Unable to declare exchange default_error_exchange
Microsoft.AspNetCore.Hosting.Internal.WebHost:Information: Request starting HTTP/1.1 GET http://localhost:52556/favicon.ico  
Exception thrown: 'RabbitMQ.Client.Exceptions.OperationInterruptedException' in System.Private.CoreLib.ni.dll
SubscriptionExceptionMiddleware:Error: Unable to publish message to Error Exchange
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in RabbitMQ.Client.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in System.Private.CoreLib.ni.dll
MessageConsumeMiddleware:Error: An unhandled exception was thrown when consuming message with routing key helloworldresponse.8998c39c-c797-4552-869e-4f9c52e7a0dd
Exception thrown: 'RabbitMQ.Client.Exceptions.AlreadyClosedException' in RawRabbit.dll
Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware:Information: Sending file. Request path: '/favicon.ico'. Physical path: 'c:\users\jfbarro\documents\visual studio 2015\Projects\NewMessaging.Web\src\NewMessaging.Web\wwwroot\favicon.ico'
Application Insights Telemetry (unconfigured): {"name":"Microsoft.ApplicationInsights.Dev.Request","time":"2017-05-19T20:04:59.7532235Z","tags":{"ai.user.userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36","ai.device.roleInstance":"S1005491","ai.operation.id":"+ZT4OYnIAz4=","ai.internal.sdkVersion":"aspnet5c:1.0.0","ai.operation.name":"GET /favicon.ico"},"data":{"baseType":"RequestData","baseData":{"ver":2,"id":"+ZT4OYnIAz4=","name":"GET /favicon.ico","startTime":"2017-05-19T20:04:59.7532235+00:00","duration":"00:00:00.4915018","success":true,"responseCode":"200","url":"http://localhost:52556/favicon.ico","httpMethod":"GET","properties":{"DeveloperMode":"true"}}}}
Microsoft.AspNetCore.Hosting.Internal.WebHost:Information: Request finished in 522.9097ms 200 image/x-icon
TopologyProvider:Information: Disposing topology channel (if exists).
The thread 0x6988 has exited with code 0 (0x0).
The thread 0x5bd8 has exited with code 0 (0x0).
The thread 0x4754 has exited with code 0 (0x0).

Is this output to be expected, or am I doing something wrong? I basically copied from the sample app in the 2.0 branch.

@pardahlman
Copy link
Owner

Hello @videege 👋 ,

I tried to reproduce this in the RawRabbit.Todo project, without any success.

Looking at the log entries, everything seems to go well. Early on the sequences completes (Sequence '8998c39c-c797-4552-869e-4f9c52e7a0dd' completed), and then it looks like the http request also completes (Request finished in 163.8095ms). It is a bit strange that the exception is thrown after the execution is done.

I wonder if:

  1. You could provide the code snippet that you run
  2. If you could add time stamps to the logs

If possible, you could share your code and I'll try to run it locally!

@pardahlman
Copy link
Owner

@videege is this still an issue for you?

@videege
Copy link
Contributor Author

videege commented Jun 6, 2017

I can confirm I'm still seeing this but it doesn't seem to negatively impact execution at all. I can't share code but let me try to get you some logs.

@videege
Copy link
Contributor Author

videege commented Jul 18, 2017

Hi @pardahlman, sorry for the long delay on this. This seems to be causing some actual problems in our application now. Let me see if I can describe the sequence of events:

  • Service A executes a message sequence, expecting a response from Service B
  • Service B, responding to the message from service A, executes another message sequence to get data from Service C. This succeeds.
  • Service B finishes processing and does client.PublishAsync(/* Class that Service A is expecting */). No error is thrown until execution drops out of the method that is executing due to the subscription on Service A's message.
  • At this point, Service B says that it encountered an error consuming the message with the routing key for the sequence that has already completed (the sequence from B -> C).
  • Somehow, this prevents the publish of the message back to Service A, and Service A's originating message sequence times out.

When I hook up to the default_error_exchange, I can see that the error being logged is the AlreadyClosedException above. Here's the relevant data from the error message that gets published:

exception_type:	AlreadyClosedException
exception_stacktrace:	at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple)
at RawRabbit.Pipe.Middleware.ExplicitAckMiddleware.AcknowledgeMessage(IPipeContext context)
at RawRabbit.Pipe.Middleware.ExplicitAckMiddleware.<InvokeAsync>d__7.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RawRabbit.Pipe.Middleware.CancellationMiddleware.<InvokeAsync>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at RawRabbit.Pipe.Middleware.ExceptionHandlingMiddleware.<InvokeAsync>d__4.MoveNext()

Here are the logs of Service B:

2017-07-18 12:43:58.387 -06:00 [Debug] Existing connection is open and will be used.
2017-07-18 12:43:58.398 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.398 -06:00 [Debug] Found message type 'CollectionDataResponse`1' in context. Creating consume config based on it.
2017-07-18 12:43:58.399 -06:00 [Information] Configuration action for 'collectiondataresponse[dto]' found.
2017-07-18 12:43:58.404 -06:00 [Debug] Stage 'ConsumeConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.404 -06:00 [Debug] Updating routing key with GlobalExecutionId ''
2017-07-18 12:43:58.405 -06:00 [Information] Routing key updated with GlobalExecutionId: collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874.#.
2017-07-18 12:43:58.406 -06:00 [Debug] Declaring queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.407 -06:00 [Information] Declaring queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.414 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.414 -06:00 [Debug] Stage 'QueueDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.415 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.responses'.
2017-07-18 12:43:58.416 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.416 -06:00 [Information] Binding queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874' to exchange 'cobra.messages.responses' with routing key 'collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874.#'
2017-07-18 12:43:58.423 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.423 -06:00 [Debug] Stage 'QueueBound' has no additional middlewares registered.
2017-07-18 12:43:58.424 -06:00 [Debug] Stage 'ConsumerChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.425 -06:00 [Information] Preparing to consume message from queue 'collectiondataresponse[dto]_9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.428 -06:00 [Debug] Stage 'ConsumerCreated' has no additional middlewares registered.
2017-07-18 12:43:58.492 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.492 -06:00 [Debug] Stage 'ProducerInitialized' has no additional middlewares registered.
2017-07-18 12:43:58.493 -06:00 [Information] GlobalExecutionId '9338b802-69bf-4f31-8b2c-a1a90eda7874' was allready found in PipeContext.
2017-07-18 12:43:58.495 -06:00 [Debug] Stage 'PublishConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.496 -06:00 [Debug] Updating routing key with GlobalExecutionId '9338b802-69bf-4f31-8b2c-a1a90eda7874'
2017-07-18 12:43:58.496 -06:00 [Information] Routing key updated with GlobalExecutionId: dtoquerybyid.9338b802-69bf-4f31-8b2c-a1a90eda7874.
2017-07-18 12:43:58.498 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.queries'.
2017-07-18 12:43:58.498 -06:00 [Information] Declaring exchange 'cobra.messages.queries'.
2017-07-18 12:43:58.500 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.501 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.504 -06:00 [Debug] Stage 'MessageSerialized' has no additional middlewares registered.
2017-07-18 12:43:58.505 -06:00 [Debug] Stage 'BasicPropertiesCreated' has no additional middlewares registered.
2017-07-18 12:43:58.506 -06:00 [Debug] Begining to process 'GetChannel' requests.
2017-07-18 12:43:58.507 -06:00 [Debug] 'GetChannel' has been processed.
2017-07-18 12:43:58.508 -06:00 [Debug] Adding channel 16 to Execution Context.
2017-07-18 12:43:58.509 -06:00 [Debug] Stage 'ChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.510 -06:00 [Debug] No Mandatory Callback registered.
2017-07-18 12:43:58.511 -06:00 [Debug] Publish Acknowledgement is disabled.
2017-07-18 12:43:58.512 -06:00 [Debug] Stage 'PreMessagePublish' has no additional middlewares registered.
2017-07-18 12:43:58.514 -06:00 [Information] Performing basic publish with routing key dtoquerybyid.9338b802-69bf-4f31-8b2c-a1a90eda7874 on exchange cobra.messages.queries.
2017-07-18 12:43:58.518 -06:00 [Debug] Stage 'MessagePublished' has no additional middlewares registered.
2017-07-18 12:43:58.630 -06:00 [Debug] Invoking consumer pipe for message 'e38ead2f-b487-43e5-af79-cc2a48933282'.
2017-07-18 12:43:58.631 -06:00 [Debug] Stage 'MessageRecieved' has no additional middlewares registered.
2017-07-18 12:43:58.632 -06:00 [Debug] Trying to extract global_execution_id from header
2017-07-18 12:43:58.633 -06:00 [Debug] Header type extracted: 'String'
2017-07-18 12:43:58.659 -06:00 [Debug] Stage 'MessageDeserialized' has no additional middlewares registered.
2017-07-18 12:43:58.688 -06:00 [Debug] Disposing subscriptions for Message Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874'.
2017-07-18 12:43:58.744 -06:00 [Information] Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874' completed with message 'CollectionDataResponse`1'.
2017-07-18 12:43:58.748 -06:00 [Debug] Stage 'Initialized' has no additional middlewares registered.
2017-07-18 12:43:58.748 -06:00 [Debug] Stage 'ProducerInitialized' has no additional middlewares registered.
2017-07-18 12:43:58.751 -06:00 [Information] Using GlobalExecutionId '1b6d2901-6e05-4408-af95-461cec243fa6' that was found in the execution process.
2017-07-18 12:43:58.753 -06:00 [Debug] Stage 'PublishConfigured' has no additional middlewares registered.
2017-07-18 12:43:58.755 -06:00 [Debug] Updating routing key with GlobalExecutionId '1b6d2901-6e05-4408-af95-461cec243fa6'
2017-07-18 12:43:58.758 -06:00 [Information] Routing key updated with GlobalExecutionId: dataresponse[dtovaliditydata].1b6d2901-6e05-4408-af95-461cec243fa6.
2017-07-18 12:43:58.760 -06:00 [Debug] Exchange configuration found. Declaring 'cobra.messages.responses'.
2017-07-18 12:43:58.761 -06:00 [Debug] Stage 'ExchangeDeclared' has no additional middlewares registered.
2017-07-18 12:43:58.783 -06:00 [Debug] Stage 'MessageSerialized' has no additional middlewares registered.
2017-07-18 12:43:58.783 -06:00 [Debug] Stage 'BasicPropertiesCreated' has no additional middlewares registered.
2017-07-18 12:43:58.786 -06:00 [Debug] Begining to process 'GetChannel' requests.
2017-07-18 12:43:58.788 -06:00 [Debug] 'GetChannel' has been processed.
2017-07-18 12:43:58.789 -06:00 [Debug] Adding channel 16 to Execution Context.
2017-07-18 12:43:58.790 -06:00 [Debug] Stage 'ChannelCreated' has no additional middlewares registered.
2017-07-18 12:43:58.791 -06:00 [Debug] No Mandatory Callback registered.
2017-07-18 12:43:58.792 -06:00 [Debug] Publish Acknowledgement is disabled.
2017-07-18 12:43:58.793 -06:00 [Debug] Stage 'PreMessagePublish' has no additional middlewares registered.
2017-07-18 12:43:58.794 -06:00 [Information] Performing basic publish with routing key dataresponse[dtovaliditydata].1b6d2901-6e05-4408-af95-461cec243fa6 on exchange cobra.messages.responses.
2017-07-18 12:43:58.796 -06:00 [Debug] Stage 'MessagePublished' has no additional middlewares registered.
2017-07-18 12:43:58.803 -06:00 [Debug] Stage 'HandlerInvoked' has no additional middlewares registered.
2017-07-18 12:43:58.907 -06:00 [Error] Exception thrown. Will be handled by Exception Handler
2017-07-18 12:43:58.912 -06:00 [Error] Unhandled exception thrown when consuming message
2017-07-18 12:43:58.914 -06:00 [Information] Declaring exchange 'default_error_exchange'.
2017-07-18 12:43:58.916 -06:00 [Debug] Done processing topology work.
2017-07-18 12:43:58.917 -06:00 [Debug] Existing connection is open and will be used.
2017-07-18 12:43:59.636 -06:00 [Error] An unhandled exception was thrown when consuming message with routing key collectiondataresponse[dto].9338b802-69bf-4f31-8b2c-a1a90eda7874

The thing that concerns me is that the sequence ID 9338b802-69bf-4f31-8b2c-a1a90eda7874 seems to get picked up successfully at the beginning, and then it gets re-used by service B to do the query and response to Service C. Service B then disposes subscriptions for this sequence:

2017-07-18 12:43:58.688 -06:00 [Debug] Disposing subscriptions for Message Sequence '9338b802-69bf-4f31-8b2c-a1a90eda7874'.

Does this hose up the original sequence, which has not yet completed, with this same GUID (Service A -> Service B)? I might just be misinterpreting this, but it seems like it is impossible to nest message sequences, i.e., A -> B -> C.

@pardahlman
Copy link
Owner

Hello @videege, thanks for taking moving this forward 👍

I'm trying to recreate what you describe in a unit test. Based on my understanding, this is what you are trying to do (message name/number indicates "flow order")

await serviceB.SubscribeAsync<FirstMessage>(async message =>
    {
        var nestedSequence = serviceB.ExecuteSequence(s => s
            .PublishAsync(new SecondMessage())
            .Complete<ThirdMessage>());
        await nestedSequence.Task;
        await serviceB.PublishAsync(new ForthMessage());
    }
);

await serviceC.SubscribeAsync<SecondMessage>(message =>
{
    return serviceC.PublishAsync(new ThirdMessage());
});

var mainSequence = serviceA.ExecuteSequence(s => s
    .PublishAsync(new FirstMessage())
    .Complete<ForthMessage>()
);
await mainSequence.Task;

Is this close to what you're doing?

@garethrampton
Copy link

@pardahlman I'm also seeing these 'already closed' exceptions. They seem fairly benign - our requests complete without issue.

Our scenario differs very slightly - rather than the typical request/response pattern for our request command handlers we're currently using the pub/sub model instead - so we're not really doing fully nested sequences. If I flip the code back to the Request/Response pattern obviously the issue doesn't occur.

The consistent issue appears to be that publishing messages within the context of an existing handler causes this exception to be thrown.

@pardahlman
Copy link
Owner

I think I've narrowed down to where the problem is. Stateless fires of an event that appears to be running in a different thread than the rest of the execution. That part of the code disposes the subscription(s) and channel for the sequence, at the same time as RawRabbit tries to use the same channel to ack the message that completes the sequence. That is, I don't think it has to do with the nested sequences. I'll need to look in to this a bit further, now that I've found a way to repro it.

@pardahlman
Copy link
Owner

This issue has been resolved in the just released beta8. Closing this ticket now, but feel free to re-open if you experience any related problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants