Skip to content

Commit

Permalink
minor refactor, add traceidsource to headers
Browse files Browse the repository at this point in the history
  • Loading branch information
mt89vein committed Jan 3, 2021
1 parent ae59c24 commit c949526
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
13 changes: 11 additions & 2 deletions ReRabbit/src/ReRabbit.Publishers/MessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using ReRabbit.Core;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -229,14 +230,22 @@ ILogger<MessagePublisher> logger
properties.ContentType = contentType;
properties.MessageId = message.MessageId.ToString();
properties.CorrelationId = TraceContext.Current.TraceId.ToString();
// TODO: traceIdSource
properties.Timestamp = new AmqpTimestamp(((DateTimeOffset)message.MessageCreatedAt).ToUnixTimeSeconds());

if (message.MessageCreatedAt != default)
{
properties.Timestamp = new AmqpTimestamp(((DateTimeOffset)message.MessageCreatedAt).ToUnixTimeSeconds());
}

if (expires.HasValue)
{
properties.Expiration = expires.Value.TotalMilliseconds.ToString(CultureInfo.InvariantCulture);
}

if (!string.IsNullOrWhiteSpace(TraceContext.Current.TraceIdSource))
{
routeInfo.Arguments["x-trace-id-source"] = TraceContext.Current.TraceIdSource;
}

properties.Type = routeInfo.Name;
properties.Headers = routeInfo.Arguments;

Expand Down
17 changes: 10 additions & 7 deletions ReRabbit/src/ReRabbit.Subscribers/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,14 @@ public async Task StartAsync()
MessageContext ctx
)
{
var mqMessage = GetMqMessageFrom(mapper, serializer, subscribedMessages, ctx);

return middlewareExecutor.ExecuteAsync(
messageHandlerType,
new MessageContext<TMessage>(mqMessage, ctx.MessageData)
var messageContext = CreateMessageContext(
mapper,
serializer,
subscribedMessages,
ctx
);

return middlewareExecutor.ExecuteAsync(messageHandlerType, messageContext);
}

/// <summary>
Expand All @@ -270,7 +272,7 @@ MessageContext ctx
/// <param name="ctx">Контекст сообщения.</param>
/// <param name="messageMapper">Маппер.</param>
/// <returns>Сообщение в формате, который ожидает обработчик.</returns>
private static TMessage GetMqMessageFrom(
private static MessageContext<TMessage> CreateMessageContext(
IMessageMapper messageMapper,
ISerializer serializer,
IEnumerable<RabbitMessageInfo> subscribedMessages,
Expand Down Expand Up @@ -300,6 +302,7 @@ MessageContext ctx
mqMessage = serializer.Deserialize<TMessage>(ctx.MessageData.MqMessage.Payload.ToString()!);
}

// дополняем данными, которых могло не быть среди данных в Payload
var message = (TMessage)mqMessage!;
if (ctx.MessageData.CreatedAt.HasValue)
{
Expand All @@ -326,7 +329,7 @@ MessageContext ctx
tracedMessage.TraceId = ctx.MessageData.TraceId.Value;
}

return message;
return new MessageContext<TMessage>(message, ctx.MessageData);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public MiddlewareExecutor(IMiddlewareRegistryAccessor registry, IServiceProvider
MessageContext<TMessageType> ctx
) where TMessageType : class, IMessage
{
var scope = _serviceProvider.CreateScope();
using var scope = _serviceProvider.CreateScope();

if (ActivatorUtilities.CreateInstance(scope.ServiceProvider, messageHandlerType) is not IMessageHandler<TMessageType> messageHandler)
{
// такого кейса быть не должно, но всё же.
Expand Down

0 comments on commit c949526

Please sign in to comment.