Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding proper OpenTelemetry integration via. registration helpers and better context propagation #1528

Merged
merged 14 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -90,6 +92,10 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
54 changes: 54 additions & 0 deletions projects/RabbitMQ.Client.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# RabbitMQ .NET Client - OpenTelemetry Instrumentation

## Introduction
This library makes it easy to instrument your RabbitMQ .NET Client applications with OpenTelemetry.

## Examples
The following examples demonstrate how to use the RabbitMQ .NET Client OpenTelemetry Instrumentation.

### Basic Usage

#### ASP.NET Core Configuration Example
```csharp
using OpenTelemetry.Trace;

// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.

var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
new TraceContextPropagator(),
new BaggagePropagator()
});

Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);

builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(serviceName: builder.Environment.ApplicationName))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddRabbitMQInstrumentation()
.AddConsoleExporter());
```

#### Console Application Configuration Example
```csharp
using OpenTelemetry.Trace;

// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.

var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
new TraceContextPropagator(),
new BaggagePropagator()
});

Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);
lukebakken marked this conversation as resolved.
Show resolved Hide resolved

var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddRabbitMQInstrumentation()
.AddConsoleExporter()
.Build();
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyTitle>RabbitMQ OpenTelemetry Integration Package for .NET</AssemblyTitle>
<Authors>VMware</Authors>
<Company>VMware, Inc. or its affiliates.</Company>
<Copyright>Copyright © 2007-2023 VMware, Inc. or its affiliates.</Copyright>
<Description>The RabbitMQ OAuth2 Client Library for .NET enables OAuth2 token refresh for RabbitMQ.Client</Description>
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageIcon>icon.png</PackageIcon>
<PackageLicenseExpression>Apache-2.0 OR MPL-2.0</PackageLicenseExpression>
<PackageProjectUrl>https://www.rabbitmq.com/dotnet.html</PackageProjectUrl>
<PackageTags>rabbitmq, amqp, oauth2</PackageTags>
<Product>RabbitMQ</Product>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<RepositoryUrl>https://github.com/rabbitmq/rabbitmq-dotnet-client.git</RepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<MinVerTagPrefix>otel-</MinVerTagPrefix>
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>../../packages</PackageOutputPath>
<PackageReadmeFile>README.md</PackageReadmeFile>
<LangVersion>7.3</LangVersion>
lukebakken marked this conversation as resolved.
Show resolved Hide resolved
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
<Deterministic>true</Deterministic>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
</ItemGroup>

<ItemGroup>
<None Remove="icon.png" />
<Content Include="icon.png" PackagePath="" />
<None Include="README.md" Pack="true" PackagePath="/" />
<InternalsVisibleTo Include="Unit" />
<InternalsVisibleTo Include="Benchmarks" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="all" />
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;

namespace OpenTelemetry.Trace
{
public static class OpenTelemetryExtensions
{
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
{
RabbitMQActivitySource.UseRoutingKeyAsOperationName = true;
RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
Comment on lines +16 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this part? Can it be set by default in main package?

I would expect that this method will only call builder.AddSource("RabbitMQ.Client.*") as a convenient way to register it in TracerProviderBuilder. Other parts should be done in the instrumentation library.

Alternative options is to have here the second method:

public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder, Action<RabbitMQSpecificOptions>? configure);

where you can configure other methods externally.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenTelemetryContextInjector is OTel specific for Baggage handling, that's pretty much the only reason for the package, because we don't want to pull OpenTelemetry.API dependencies into the main library.

builder.AddSource("RabbitMQ.Client.*");
return builder;
}

private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
{
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
Baggage.Current = parentContext.Baggage;
return parentContext.ActivityContext;
}

private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object> carrier, string key)
{
try
{
if (carrier.TryGetValue(key, out object value) && value is byte[] bytes)
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
}
catch (Exception)
{
//this.logger.LogError(ex, "Failed to extract trace context.");
}

return Enumerable.Empty<string>();
}

private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object> props)
{
// Inject the current Activity's context into the message headers.
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
}

private static void OpenTelemetryContextSetter(IDictionary<string, object> carrier, string key, string value)
{
carrier[key] = Encoding.UTF8.GetBytes(value);
}
}
}
Binary file added projects/RabbitMQ.Client.OpenTelemetry/icon.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
14 changes: 9 additions & 5 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;
Expand Down Expand Up @@ -87,17 +88,20 @@ public static class IChannelExtensions
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties,
ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
29 changes: 7 additions & 22 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,20 +1048,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}

private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
{
if (!(propsObj is Dictionary<string, object> headers))
{
return;
}

// Only propagate headers if they haven't already been set
if (!headers.ContainsKey(key))
{
headers[key] = value;
}
}

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
Expand All @@ -1083,10 +1069,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
var cmd = new BasicPublishMemory(
Encoding.UTF8.GetBytes(exchange),
Encoding.UTF8.GetBytes(routingKey),
mandatory, default);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;

if (sendActivity != null)
Expand Down Expand Up @@ -1144,10 +1132,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;

if (sendActivity != null)
Expand Down Expand Up @@ -1906,9 +1892,8 @@ await tokenRegistration.DisposeAsync()
}

var headers = props.Headers ?? new Dictionary<string, object>();

// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties);
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
props.Headers = headers;
return props;
}
Expand Down