diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln
index 8717f1acee..8974d4a9ce 100644
--- a/RabbitMQDotNetClient.sln
+++ b/RabbitMQDotNetClient.sln
@@ -40,6 +40,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -90,6 +92,10 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/projects/RabbitMQ.Client.OpenTelemetry/README.md b/projects/RabbitMQ.Client.OpenTelemetry/README.md
new file mode 100644
index 0000000000..8c697bbdbc
--- /dev/null
+++ b/projects/RabbitMQ.Client.OpenTelemetry/README.md
@@ -0,0 +1,54 @@
+# RabbitMQ .NET Client - OpenTelemetry Instrumentation
+
+## Introduction
+This library makes it easy to instrument your RabbitMQ .NET Client applications with OpenTelemetry.
+
+## Examples
+The following examples demonstrate how to use the RabbitMQ .NET Client OpenTelemetry Instrumentation.
+
+### Basic Usage
+
+#### ASP.NET Core Configuration Example
+```csharp
+using OpenTelemetry.Trace;
+
+// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
+// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
+
+var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
+{
+ new TraceContextPropagator(),
+ new BaggagePropagator()
+});
+
+Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);
+
+builder.Services.AddOpenTelemetry()
+ .ConfigureResource(resource => resource
+ .AddService(serviceName: builder.Environment.ApplicationName))
+ .WithTracing(tracing => tracing
+ .AddAspNetCoreInstrumentation()
+ .AddRabbitMQInstrumentation()
+ .AddConsoleExporter());
+```
+
+#### Console Application Configuration Example
+```csharp
+using OpenTelemetry.Trace;
+
+// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
+// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
+
+var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
+{
+ new TraceContextPropagator(),
+ new BaggagePropagator()
+});
+
+Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);
+
+var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddConsoleExporter()
+ .Build();
+```
diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj
new file mode 100644
index 0000000000..f3f0c07150
--- /dev/null
+++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj
@@ -0,0 +1,64 @@
+
+
+
+ net6.0;netstandard2.0
+ $(NoWarn);CS1591
+ true
+ RabbitMQ OpenTelemetry Integration Package for .NET
+ VMware
+ VMware, Inc. or its affiliates.
+ Copyright © 2007-2023 VMware, Inc. or its affiliates.
+ The RabbitMQ OpenTelemetry Library for .NET adds convenience extension methods for RabbitMQ/OpenTelemetry
+ true
+ icon.png
+ Apache-2.0 OR MPL-2.0
+ https://www.rabbitmq.com/dotnet.html
+ rabbitmq, amqp, oauth2
+ RabbitMQ
+ true
+ https://github.com/rabbitmq/rabbitmq-dotnet-client.git
+ true
+ snupkg
+ ../rabbit.snk
+ true
+ otel-
+ minimal
+ true
+ ../../packages
+ README.md
+ 7.3
+
+
+
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs
new file mode 100644
index 0000000000..0cbefbba10
--- /dev/null
+++ b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs
@@ -0,0 +1,58 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using OpenTelemetry.Context.Propagation;
+using RabbitMQ.Client;
+
+namespace OpenTelemetry.Trace
+{
+ public static class OpenTelemetryExtensions
+ {
+ public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
+ {
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = true;
+ RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
+ RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
+ builder.AddSource("RabbitMQ.Client.*");
+ return builder;
+ }
+
+ private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
+ {
+ // Extract the PropagationContext of the upstream parent from the message headers.
+ var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
+ Baggage.Current = parentContext.Baggage;
+ return parentContext.ActivityContext;
+ }
+
+ private static IEnumerable OpenTelemetryContextGetter(IDictionary carrier, string key)
+ {
+ try
+ {
+ if (carrier.TryGetValue(key, out object value) && value is byte[] bytes)
+ {
+ return new[] { Encoding.UTF8.GetString(bytes) };
+ }
+ }
+ catch (Exception)
+ {
+ //this.logger.LogError(ex, "Failed to extract trace context.");
+ }
+
+ return Enumerable.Empty();
+ }
+
+ private static void OpenTelemetryContextInjector(Activity activity, IDictionary props)
+ {
+ // Inject the current Activity's context into the message headers.
+ Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
+ }
+
+ private static void OpenTelemetryContextSetter(IDictionary carrier, string key, string value)
+ {
+ carrier[key] = Encoding.UTF8.GetBytes(value);
+ }
+ }
+}
diff --git a/projects/RabbitMQ.Client.OpenTelemetry/icon.png b/projects/RabbitMQ.Client.OpenTelemetry/icon.png
new file mode 100644
index 0000000000..092bfef15c
Binary files /dev/null and b/projects/RabbitMQ.Client.OpenTelemetry/icon.png differ
diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
index ea016d86c8..fc14824063 100644
--- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
+++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
@@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
+~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func
+~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
+~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action>
+~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task
diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
index 38374d146f..61e1f20dcb 100644
--- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
+++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
@@ -31,6 +31,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;
@@ -87,17 +88,20 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue
///
/// The publication occurs with mandatory=false and immediate=false.
///
- public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory body)
+ public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties,
+ ReadOnlyMemory body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}
- public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false)
- => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
+ public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
+ ReadOnlyMemory body = default, bool mandatory = false) =>
+ channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
- public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false)
- => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
+ public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
+ CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) =>
+ channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
#nullable disable
diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
index 2ac20152ee..ba2685debd 100644
--- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
@@ -1048,20 +1048,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}
- private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
- {
- if (!(propsObj is Dictionary headers))
- {
- return;
- }
-
- // Only propagate headers if they haven't already been set
- if (!headers.ContainsKey(key))
- {
- headers[key] = value;
- }
- }
-
public async ValueTask BasicPublishAsync(string exchange, string routingKey,
TProperties basicProperties, ReadOnlyMemory body, bool mandatory,
CancellationToken cancellationToken)
@@ -1083,10 +1069,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
- var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
- RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
+ var cmd = new BasicPublishMemory(
+ Encoding.UTF8.GetBytes(exchange),
+ Encoding.UTF8.GetBytes(routingKey),
+ mandatory, default);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
- ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext)
+ ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;
if (sendActivity != null)
@@ -1144,10 +1132,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
-
- RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
- ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
+ ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;
if (sendActivity != null)
@@ -1908,7 +1894,7 @@ private static BasicProperties PopulateActivityAndPropagateTraceId(
IDictionary headers = props.Headers ?? new Dictionary();
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
- DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties);
+ RabbitMQActivitySource.ContextInjector(sendActivity, headers);
props.Headers = headers;
return props;
}
diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
index f68d4e1596..b62690b310 100644
--- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
+++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs
@@ -1,4 +1,5 @@
-using System.Collections.Generic;
+using System;
+using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
@@ -29,12 +30,20 @@ public static class RabbitMQActivitySource
.GetCustomAttribute()
?.InformationalVersion ?? "";
- private static readonly ActivitySource s_publisherSource = new ActivitySource(PublisherSourceName, AssemblyVersion);
- private static readonly ActivitySource s_subscriberSource = new ActivitySource(SubscriberSourceName, AssemblyVersion);
+ private static readonly ActivitySource s_publisherSource =
+ new ActivitySource(PublisherSourceName, AssemblyVersion);
+
+ private static readonly ActivitySource s_subscriberSource =
+ new ActivitySource(SubscriberSourceName, AssemblyVersion);
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
+ public static Action> ContextInjector { get; set; } = DefaultContextInjector;
+
+ public static Func ContextExtractor { get; set; } =
+ DefaultContextExtractor;
+
public static bool UseRoutingKeyAsOperationName { get; set; } = true;
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners();
@@ -52,9 +61,11 @@ internal static Activity Send(string routingKey, string exchange, int bodySize,
if (s_publisherSource.HasListeners())
{
Activity activity = linkedContext == default
- ? s_publisherSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
+ ? s_publisherSource.StartRabbitMQActivity(
+ UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
ActivityKind.Producer)
- : s_publisherSource.StartLinkedRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
+ : s_publisherSource.StartLinkedRabbitMQActivity(
+ UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish",
ActivityKind.Producer, linkedContext);
if (activity?.IsAllDataRequested == true)
{
@@ -74,7 +85,8 @@ internal static Activity ReceiveEmpty(string queue)
return null;
}
- Activity activity = s_subscriberSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive",
+ Activity activity = s_subscriberSource.StartRabbitMQActivity(
+ UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive",
ActivityKind.Consumer);
if (activity.IsAllDataRequested)
{
@@ -95,12 +107,9 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv
}
// Extract the PropagationContext of the upstream parent from the message headers.
- DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties.Headers,
- ExtractTraceIdAndState, out string traceParent, out string traceState);
- ActivityContext.TryParse(traceParent, traceState, out ActivityContext linkedContext);
Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer,
- linkedContext);
+ ContextExtractor(readOnlyBasicProperties));
if (activity.IsAllDataRequested)
{
PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties,
@@ -118,12 +127,9 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs)
}
// Extract the PropagationContext of the upstream parent from the message headers.
- DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers,
- ExtractTraceIdAndState, out string traceparent, out string traceState);
- ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext);
Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver",
- ActivityKind.Consumer, parentContext);
+ ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange,
@@ -192,52 +198,6 @@ internal static void PopulateMessageEnvelopeSize(Activity activity, int size)
}
}
- internal static bool TryGetExistingContext(T props, out ActivityContext context)
- where T : IReadOnlyBasicProperties
- {
- if (props.Headers == null)
- {
- context = default;
- return false;
- }
-
- bool hasHeaders = false;
- foreach (string header in DistributedContextPropagator.Current.Fields)
- {
- if (props.Headers.ContainsKey(header))
- {
- hasHeaders = true;
- break;
- }
- }
-
- if (hasHeaders)
- {
- DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, ExtractTraceIdAndState,
- out string traceParent, out string traceState);
- return ActivityContext.TryParse(traceParent, traceState, out context);
- }
-
- context = default;
- return false;
- }
-
- private static void ExtractTraceIdAndState(object props, string name, out string value,
- out IEnumerable values)
- {
- if (props is Dictionary headers && headers.TryGetValue(name, out object propsVal) &&
- propsVal is byte[] bytes)
- {
- value = Encoding.UTF8.GetString(bytes);
- values = default;
- }
- else
- {
- value = default;
- values = default;
- }
- }
-
internal static void SetNetworkTags(this Activity activity, IFrameHandler frameHandler)
{
if (PublisherHasListeners && activity != null && activity.IsAllDataRequested)
@@ -286,5 +246,65 @@ internal static void SetNetworkTags(this Activity activity, IFrameHandler frameH
}
}
}
+
+ private static void DefaultContextInjector(Activity sendActivity, IDictionary props)
+ {
+ props ??= new Dictionary();
+ DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter);
+ }
+
+ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties props)
+ {
+ if (props.Headers == null)
+ {
+ return default;
+ }
+
+ bool hasHeaders = false;
+ foreach (string header in DistributedContextPropagator.Current.Fields)
+ {
+ if (props.Headers.ContainsKey(header))
+ {
+ hasHeaders = true;
+ break;
+ }
+ }
+
+
+ if (!hasHeaders)
+ {
+ return default;
+ }
+
+ DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string traceParent, out string traceState);
+ return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default;
+ }
+
+ private static void DefaultContextSetter(object carrier, string name, string value)
+ {
+ if (!(carrier is IDictionary carrierDictionary))
+ {
+ return;
+ }
+
+ // Only propagate headers if they haven't already been set
+ carrierDictionary[name] = value;
+ }
+
+ private static void DefaultContextGetter(object carrier, string name, out string value,
+ out IEnumerable values)
+ {
+ if (carrier is IDictionary carrierDict &&
+ carrierDict.TryGetValue(name, out object propsVal) && propsVal is byte[] bytes)
+ {
+ value = Encoding.UTF8.GetString(bytes);
+ values = default;
+ }
+ else
+ {
+ value = default;
+ values = default;
+ }
+ }
}
}
diff --git a/projects/Test/SequentialIntegration/SequentialIntegration.csproj b/projects/Test/SequentialIntegration/SequentialIntegration.csproj
index c0d225ccf1..52788acd29 100644
--- a/projects/Test/SequentialIntegration/SequentialIntegration.csproj
+++ b/projects/Test/SequentialIntegration/SequentialIntegration.csproj
@@ -20,7 +20,7 @@
-
+
@@ -40,6 +40,7 @@
+
diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs
index a25be68de2..2bd3227d48 100644
--- a/projects/Test/SequentialIntegration/TestActivitySource.cs
+++ b/projects/Test/SequentialIntegration/TestActivitySource.cs
@@ -37,6 +37,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
+using RabbitMQ.Client.client.impl;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;
@@ -113,6 +114,85 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
}
}
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var _activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(_activities))
+ {
+ await Task.Delay(500);
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ consumerReceivedTcs.SetResult(true);
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ CachedString exchange = new CachedString("");
+ CachedString routingKey = new CachedString(q.QueueName);
+ await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var _activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(_activities))
+ {
+ await Task.Delay(500);
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ consumerReceivedTcs.SetResult(true);
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
+ await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
+ await _channel.WaitForConfirmsOrDieAsync();
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true);
+ }
+ }
+
[Theory]
[InlineData(true)]
[InlineData(false)]
@@ -152,6 +232,87 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs
}
}
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(activities))
+ {
+ await Task.Delay(500);
+
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ consumerReceivedTcs.SetResult(true);
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ CachedString exchange = new CachedString("");
+ CachedString routingKey = new CachedString(q.QueueName);
+ await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(activities))
+ {
+ await Task.Delay(500);
+
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ consumerReceivedTcs.SetResult(true);
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
+ await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
+ await _channel.WaitForConfirmsOrDieAsync();
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true);
+ }
+ }
+
[Theory]
[InlineData(true)]
[InlineData(false)]
@@ -187,6 +348,80 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
}
}
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(activities))
+ {
+ await Task.Delay(500);
+ string queue = $"queue-{Guid.NewGuid()}";
+ const string msg = "for basic.get";
+
+ try
+ {
+ CachedString exchange = new CachedString("");
+ CachedString routingKey = new CachedString(queue);
+ await _channel.QueueDeclareAsync(queue, false, false, false, null);
+ await _channel.BasicPublishAsync(exchange, routingKey, Encoding.UTF8.GetBytes(msg), mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+ QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(1u, ok.MessageCount);
+ BasicGetResult res = await _channel.BasicGetAsync(queue, true);
+ Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
+ ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(0u, ok.MessageCount);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false);
+ }
+ finally
+ {
+ await _channel.QueueDeleteAsync(queue);
+ }
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ await _channel.ConfirmSelectAsync();
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ var activities = new List();
+ using (ActivityListener activityListener = StartActivityListener(activities))
+ {
+ await Task.Delay(500);
+ string queue = $"queue-{Guid.NewGuid()}";
+ const string msg = "for basic.get";
+
+ try
+ {
+ var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queue);
+ await _channel.QueueDeclareAsync(queue, false, false, false, null);
+ await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(),
+ Encoding.UTF8.GetBytes(msg));
+ await _channel.WaitForConfirmsOrDieAsync();
+ QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(1u, ok.MessageCount);
+ BasicGetResult res = await _channel.BasicGetAsync(queue, true);
+ Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
+ ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(0u, ok.MessageCount);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false);
+ }
+ finally
+ {
+ await _channel.QueueDeleteAsync(queue);
+ }
+ }
+ }
+
private static ActivityListener StartActivityListener(List activities)
{
ActivityListener activityListener = new ActivityListener();
diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs
new file mode 100644
index 0000000000..e113b31ba5
--- /dev/null
+++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs
@@ -0,0 +1,397 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2020 VMware, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
+//---------------------------------------------------------------------------
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using OpenTelemetry;
+using OpenTelemetry.Context.Propagation;
+using OpenTelemetry.Trace;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using Xunit;
+using Xunit.Abstractions;
+using Xunit.Sdk;
+
+namespace Test.SequentialIntegration
+{
+ public class TestOpenTelemetry : SequentialIntegrationFixture
+ {
+ public TestOpenTelemetry(ITestOutputHelper output) : base(output)
+ {
+ Sdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new TextMapPropagator[]
+ {
+ new TraceContextPropagator(), new BaggagePropagator()
+ }));
+ }
+
+ void AssertStringTagEquals(Activity activity, string name, string expected)
+ {
+ string tag = activity.GetTagItem(name) as string;
+ Assert.NotNull(tag);
+ Assert.Equal(expected, tag);
+ }
+
+ void AssertStringTagStartsWith(Activity activity, string name, string expected)
+ {
+ string tag = activity.GetTagItem(name) as string;
+ Assert.NotNull(tag);
+ Assert.StartsWith(expected, tag);
+ }
+
+ void AssertStringTagNotNullOrEmpty(Activity activity, string name)
+ {
+ string tag = activity.GetTagItem(name) as string;
+ Assert.NotNull(tag);
+ Assert.False(string.IsNullOrEmpty(tag));
+ }
+
+ void AssertIntTagGreaterThanZero(Activity activity, string name)
+ {
+ Assert.True(activity.GetTagItem(name) is int result && result > 0);
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ var exportedItems = new List();
+ using (var tracer = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddInMemoryExporter(exportedItems)
+ .Build())
+ {
+ string baggageGuid = Guid.NewGuid().ToString();
+ Baggage.SetBaggage("TestItem", baggageGuid);
+ Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ await Task.Delay(500);
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ string baggageItem = Baggage.GetBaggage("TestItem");
+ if (baggageItem == baggageGuid)
+ {
+ consumerReceivedTcs.SetResult(true);
+ }
+ else
+ {
+ consumerReceivedTcs.SetException(
+ EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
+ }
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+ Baggage.ClearBaggage();
+ Assert.Null(Baggage.GetBaggage("TestItem"));
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
+ {
+ var exportedItems = new List();
+ using (var tracer = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddInMemoryExporter(exportedItems)
+ .Build())
+ {
+ string baggageGuid = Guid.NewGuid().ToString();
+ Baggage.SetBaggage("TestItem", baggageGuid);
+ Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ await Task.Delay(500);
+
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ string baggageItem = Baggage.GetBaggage("TestItem");
+ if (baggageItem == baggageGuid)
+ {
+ consumerReceivedTcs.SetResult(true);
+ }
+ else
+ {
+ consumerReceivedTcs.SetException(
+ EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
+ }
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+ Baggage.ClearBaggage();
+ Assert.Null(Baggage.GetBaggage("TestItem"));
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
+ {
+ var exportedItems = new List();
+ using (var tracer = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddInMemoryExporter(exportedItems)
+ .Build())
+ {
+ string baggageGuid = Guid.NewGuid().ToString();
+ Baggage.SetBaggage("TestItem", baggageGuid);
+ Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ await Task.Delay(500);
+
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ string baggageItem = Baggage.GetBaggage("TestItem");
+ if (baggageItem == baggageGuid)
+ {
+ consumerReceivedTcs.SetResult(true);
+ }
+ else
+ {
+ consumerReceivedTcs.SetException(
+ EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
+ }
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName);
+ await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
+ await _channel.WaitForConfirmsOrDieAsync();
+ Baggage.ClearBaggage();
+ Assert.Null(Baggage.GetBaggage("TestItem"));
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName)
+ {
+ var exportedItems = new List();
+ using (var tracer = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddInMemoryExporter(exportedItems)
+ .Build())
+ {
+ string baggageGuid = Guid.NewGuid().ToString();
+ Baggage.SetBaggage("TestItem", baggageGuid);
+ Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
+ await _channel.ConfirmSelectAsync();
+
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ await Task.Delay(500);
+
+ string queueName = $"{Guid.NewGuid()}";
+ QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
+ byte[] sendBody = Encoding.UTF8.GetBytes("hi");
+ byte[] consumeBody = null;
+ var consumer = new EventingBasicConsumer(_channel);
+ var consumerReceivedTcs =
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ consumer.Received += (o, a) =>
+ {
+ consumeBody = a.Body.ToArray();
+ string baggageItem = Baggage.GetBaggage("TestItem");
+ if (baggageItem == baggageGuid)
+ {
+ consumerReceivedTcs.SetResult(true);
+ }
+ else
+ {
+ consumerReceivedTcs.SetException(
+ EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0));
+ }
+ };
+
+ string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
+ CachedString exchange = new CachedString("");
+ CachedString routingKey = new CachedString(queueName);
+ await _channel.BasicPublishAsync(exchange, routingKey, sendBody);
+ await _channel.WaitForConfirmsOrDieAsync();
+ Baggage.ClearBaggage();
+ Assert.Null(Baggage.GetBaggage("TestItem"));
+
+ await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ Assert.True(await consumerReceivedTcs.Task);
+
+ await _channel.BasicCancelAsync(consumerTag);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true);
+ }
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName)
+ {
+ var exportedItems = new List();
+ using (var tracer = Sdk.CreateTracerProviderBuilder()
+ .AddRabbitMQInstrumentation()
+ .AddInMemoryExporter(exportedItems)
+ .Build())
+ {
+ string baggageGuid = Guid.NewGuid().ToString();
+ Baggage.SetBaggage("TestItem", baggageGuid);
+ Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem"));
+ await _channel.ConfirmSelectAsync();
+ RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
+ await Task.Delay(500);
+ string queue = $"queue-{Guid.NewGuid()}";
+ const string msg = "for basic.get";
+
+ try
+ {
+ await _channel.QueueDeclareAsync(queue, false, false, false, null);
+ await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true);
+ await _channel.WaitForConfirmsOrDieAsync();
+ Baggage.ClearBaggage();
+ Assert.Null(Baggage.GetBaggage("TestItem"));
+ QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(1u, ok.MessageCount);
+ BasicGetResult res = await _channel.BasicGetAsync(queue, true);
+ Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
+ ok = await _channel.QueueDeclarePassiveAsync(queue);
+ Assert.Equal(0u, ok.MessageCount);
+ await Task.Delay(500);
+ AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false);
+ }
+ finally
+ {
+ await _channel.QueueDeleteAsync(queue);
+ }
+ }
+ }
+
+ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
+ List activityList, bool isDeliver = false, string baggageGuid = null)
+ {
+ string childName = isDeliver ? "deliver" : "receive";
+ Activity[] activities = activityList.ToArray();
+ Assert.NotEmpty(activities);
+ foreach (var item in activities)
+ {
+ _output.WriteLine(
+ $"{item.Context.TraceId}: {item.OperationName}");
+ _output.WriteLine($" Tags: {string.Join(", ", item.Tags.Select(x => $"{x.Key}: {x.Value}"))}");
+ _output.WriteLine($" Links: {string.Join(", ", item.Links.Select(x => $"{x.Context.TraceId}"))}");
+ }
+
+ Activity sendActivity = activities.First(x =>
+ x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") &&
+ x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
+ routingKeyTag == $"{queueName}");
+ Activity receiveActivity = activities.Single(x =>
+ x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
+ x.Links.First().Context.TraceId == sendActivity.TraceId);
+ Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
+ Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
+ Assert.Null(receiveActivity.ParentId);
+ AssertStringTagNotNullOrEmpty(sendActivity, "network.peer.address");
+ AssertStringTagNotNullOrEmpty(sendActivity, "network.local.address");
+ AssertStringTagNotNullOrEmpty(sendActivity, "server.address");
+ AssertStringTagNotNullOrEmpty(sendActivity, "client.address");
+ AssertIntTagGreaterThanZero(sendActivity, "network.peer.port");
+ AssertIntTagGreaterThanZero(sendActivity, "network.local.port");
+ AssertIntTagGreaterThanZero(sendActivity, "server.port");
+ AssertIntTagGreaterThanZero(sendActivity, "client.port");
+ AssertStringTagStartsWith(sendActivity, "network.type", "ipv");
+ AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingSystem, "rabbitmq");
+ AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolName, "amqp");
+ AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolVersion, "0.9.1");
+ AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestination, "amq.default");
+ AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestinationRoutingKey, queueName);
+ AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
+ AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
+ AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
+ }
+ }
+}