Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] Provide better concurrency modes #5643

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
OpenTelemetry.ConcurrencyModes
OpenTelemetry.ConcurrencyModes.Multithreaded = 2 -> OpenTelemetry.ConcurrencyModes
OpenTelemetry.ConcurrencyModes.Reentrant = 1 -> OpenTelemetry.ConcurrencyModes
OpenTelemetry.ConcurrencyModesAttribute
OpenTelemetry.ConcurrencyModesAttribute.ConcurrencyModesAttribute(OpenTelemetry.ConcurrencyModes supported) -> void
OpenTelemetry.ConcurrencyModesAttribute.Supported.get -> OpenTelemetry.ConcurrencyModes
OpenTelemetry.Metrics.Exemplar
OpenTelemetry.Metrics.Exemplar.DoubleValue.get -> double
OpenTelemetry.Metrics.Exemplar.Exemplar() -> void
Expand Down
4 changes: 4 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Exposed `ConcurrencyModes` as a public contract for extensions (e.g.
exporters) to better express their concurrency requirements.
([#5643](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5643))

## 1.9.0-alpha.1

Released 2024-May-20
Expand Down
36 changes: 36 additions & 0 deletions src/OpenTelemetry/ConcurrencyModes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry;

/// <summary>
/// Describes the concurrency mode of an OpenTelemetry extension.
/// </summary>
[Flags]
public enum ConcurrencyModes : byte
{
/*
0 0 0 0 0 0 0 0
| | | | | | | |
| | | | | | | +--- Reentrant
| | | | | | +----- Multithreaded
| | | | | +------- (reserved)
| | | | +--------- (reserved)
| | | +----------- (reserved)
| | +------------- (reserved)
| +--------------- (reserved)
+----------------- (reserved)
*/

/// <summary>
/// Reentrant, the component can be invoked recursively without resulting
/// a deadlock or infinite loop.
/// </summary>
Reentrant = 0b1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed Global for now should we also remove Reentrant? Doesn't seem to be used at the moment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut feeling is no. Multithreaded and Reentrant are closely related, the existing implementation (which uses lock(obj)) implies that the exporter supports reentrancy but not multithreading (which is buggy IMHO).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong feelings though, I can scope this out and add it later once we figured out how to correctly handle reentrancy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. So if the attribute is not specified and defaults to 0 our implied default is essentially SingleThreaded | Reentrant?

Should the defined Reentrant flag then negate the default behavior?

public enum ConcurrencyModes : byte
{
    /// <summary>
    /// Nonreentrant, the component cannot be invoked recursively without resulting
    /// in a deadlock or infinite loop.
    /// </summary>
    Nonreentrant = 0b1,

    /// <summary>
    /// Multithreaded, the component can be invoked concurrently across
    /// multiple threads.
    /// </summary>
    Multithreaded = 0b10,
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. So if the attribute is not specified and defaults to 0 our implied default is essentially SingleThreaded | Reentrant?

Maybe. I personally consider this as a bug in the current SDK implementation.

Should the defined Reentrant flag then negate the default behavior?

I guess no, better to treat it as a bug, then figure out how to fix it in a non-breaking way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking - if the exporter doesn't have the attribute at all, fall back to the old (and buggy) behavior, if the exporter has the attribute, start to enforce the correct behavior.


/// <summary>
/// Multithreaded, the component can be invoked concurrently across
/// multiple threads.
/// </summary>
Multithreaded = 0b10,
}
27 changes: 27 additions & 0 deletions src/OpenTelemetry/ConcurrencyModesAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry;

/// <summary>
/// An attribute for declaring the supported <see cref="ConcurrencyModes"/> of an OpenTelemetry extension.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = true)]
public sealed class ConcurrencyModesAttribute : Attribute
{
private readonly ConcurrencyModes supportedConcurrencyModes;

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrencyModesAttribute"/> class.
/// </summary>
/// <param name="supported"><see cref="ConcurrencyModes"/>.</param>
public ConcurrencyModesAttribute(ConcurrencyModes supported)
{
this.supportedConcurrencyModes = supported;
}

/// <summary>
/// Gets the supported <see cref="ConcurrencyModes"/>.
/// </summary>
public ConcurrencyModes Supported => this.supportedConcurrencyModes;
}
43 changes: 35 additions & 8 deletions src/OpenTelemetry/SimpleExportProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

namespace OpenTelemetry;
Expand All @@ -12,7 +13,8 @@ namespace OpenTelemetry;
public abstract class SimpleExportProcessor<T> : BaseExportProcessor<T>
where T : class
{
private readonly object syncObject = new();
private readonly object? syncObject;
private readonly ConcurrencyModes supportedConcurrencyModes;

/// <summary>
/// Initializes a new instance of the <see cref="SimpleExportProcessor{T}"/> class.
Expand All @@ -21,21 +23,46 @@ public abstract class SimpleExportProcessor<T> : BaseExportProcessor<T>
protected SimpleExportProcessor(BaseExporter<T> exporter)
: base(exporter)
{
var exporterType = exporter.GetType();
var attributes = exporterType.GetCustomAttributes(typeof(ConcurrencyModesAttribute), true);
reyang marked this conversation as resolved.
Show resolved Hide resolved
if (attributes.Length > 0)
{
var attr = (ConcurrencyModesAttribute)attributes[attributes.Length - 1];
this.supportedConcurrencyModes = attr.Supported;
}

if (!this.supportedConcurrencyModes.HasFlag(ConcurrencyModes.Multithreaded))
{
this.syncObject = new object();
}
}

/// <inheritdoc />
protected override void OnExport(T data)
{
lock (this.syncObject)
if (this.syncObject is null)
{
try
{
this.exporter.Export(new Batch<T>(data));
}
catch (Exception ex)
this.OnExportInternal(data);
}
else
{
lock (this.syncObject)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnExport), ex);
this.OnExportInternal(data);
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnExportInternal(T data)
{
try
{
this.exporter.Export(new Batch<T>(data));
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnExport), ex);
}
}
}