-
Hi, I have the following handler for the BasicDeliverEventArgs: try
{
// Other stuff
var consumer = (EventingBasicConsumer)sender;
if (!consumer.IsRunning)
return;
try
{
_logger.DebugChannel(channelName, $"Waiting for semaphore for delivery tag {ea.DeliveryTag}");
_channelSemaphore[channelName].Wait(cancellation.Token);
_logger.DebugChannel(channelName, $"Obtained semaphore for delivery tag {ea.DeliveryTag}");
}
catch (OperationCanceledException)
{
_logger.DebugChannel(channelName, $"Cancelation token was canceled");
return;
}
if (!consumer.IsRunning)
return;
Task.Run(() =>
{
try
{
Stopwatch sw = Stopwatch.StartNew();
Stopwatch swInternal = Stopwatch.StartNew();
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var busMessage = JsonConvert.DeserializeObject<BusMessage>(message);
swInternal.Stop();
_logger.DebugChannel(channelName, $"Message {busMessage.Header} deserialized from delivery tag {ea.DeliveryTag} in {swInternal.ElapsedMilliseconds} ms");
if ((channel?.IsOpen ?? false) && (_connection?.IsOpen ?? false))
{
if (cancellation.IsCancellationRequested)
return;
messageReceived?.Invoke(busMessage, channelName);
try
{
if ((channel?.IsOpen ?? false) && (_connection?.IsOpen ?? false))
{
channel.BasicAck(ea.DeliveryTag, false);
_logger.DebugChannel(channelName, $"Message {busMessage.Header} with sequence {ea.DeliveryTag} Acked");
}
else
{
_logger.InfoChannel(channelName, $"Channel or connection closed. Skipping ACK of message tag {ea.DeliveryTag}");
}
}
catch (Exception ex)
{
_logger.ErrorExceptionChannel(channelName, $"Error Acking message with sequence {ea.DeliveryTag} and header {busMessage.Header}.", ex);
}
}
sw.Stop();
_logger.DebugChannel(channelName, $"Message {busMessage.Header} processed in {sw.ElapsedMilliseconds} ms");
}
catch (Exception ex)
{
_logger.ErrorExceptionChannel(channelName, $"Exception processing message with delivery tag: {ea.DeliveryTag}", ex);
channel.BasicNack(ea.DeliveryTag, false, true);
}
finally
{
_channelSemaphore[channelName].Release();
_logger.DebugChannel(channelName, $"Semaphore released for delivery tag {ea.DeliveryTag}");
}
})
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorExceptionChannel(channelName, $"Exception processing message with delivery tag: {ea.DeliveryTag}", ex);
channel.BasicNack(ea.DeliveryTag, false, true);
} While doing some load tests once in a while I get the follwoing exception deserializing the string to a .net class:
It seems the string is not complete? Could it be due to executing the Regards |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Hi @pabermod, I suppose the issue is here:
RabbitMQ Client pools byte arrays and return them (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs#L94) right after HandleBasicDeliver is completed. In case of EventingBasicConsumer it happens right after Received is called (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs#L90). So, to fix the code you could consider doing the following:
By doing that ea.Body is accessed in a valid manner, but you still offload processing to the thread pool. |
Beta Was this translation helpful? Give feedback.
Hi @pabermod,
I suppose the issue is here:
RabbitMQ Client pools byte arrays and return them (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs#L94) right after HandleBasicDeliver is completed.
In case of EventingBasicConsumer it happens right after Received is called (https://github.com/rabbitmq/rabbitmq-dotnet-client/blob/6.x/projects/RabbitMQ.Client/c…