Skip to content

Commit

Permalink
Updating ActivityProcessor (#975)
Browse files Browse the repository at this point in the history
* Updating ActivityProcessor

updating tests

updating tests - 2

updating merge

updating tests - 3

updating tests - 4

using inrange instead of equal

updating tests

* updating to inRange

* following docs.microsoft

* updating tests and changelog

* Adding in changelog custom behavior

* Removing duplicated IDisposable

* forcing dispose

* updating tests

* moving to single instead of equal one

* returning to default

* setting more time to pass tests

* forcing dispose

* reiley's suggestions

* reverting order

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
  • Loading branch information
eddynaka and cijothomas committed Aug 4, 2020
1 parent da8cd0d commit 5cdfd30
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 86 deletions.
Expand Up @@ -37,7 +37,7 @@ internal class JaegerUdpBatcher : IDisposable
private readonly System.Timers.Timer maxFlushIntervalTimer;
private Dictionary<string, Process> processCache;
private int batchByteSize;
private bool isDisposed;
private bool disposed;

public JaegerUdpBatcher(JaegerExporterOptions options, TTransport clientTransport = null)
{
Expand Down Expand Up @@ -222,7 +222,7 @@ protected async Task SendAsync(Dictionary<string, Batch> batches, CancellationTo
}
}

protected virtual void Dispose(bool isDisposing)
protected virtual void Dispose(bool disposing)
{
try
{
Expand All @@ -232,15 +232,15 @@ protected virtual void Dispose(bool isDisposing)
{
}

if (isDisposing && !this.isDisposed)
if (disposing && !this.disposed)
{
this.maxFlushIntervalTimer.Dispose();
this.thriftClient.Dispose();
this.clientTransport.Dispose();
this.memoryProtocol.Dispose();
this.flushLock.Dispose();

this.isDisposed = true;
this.disposed = true;
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/OpenTelemetry.Exporter.ZPages/ZPagesProcessor.cs
Expand Up @@ -29,7 +29,7 @@ namespace OpenTelemetry.Exporter.ZPages
/// <summary>
/// Implements the zpages span processor that exports spans in OnEnd call without batching.
/// </summary>
public class ZPagesProcessor : ActivityProcessor, IDisposable
public class ZPagesProcessor : ActivityProcessor
{
private readonly ZPagesExporter exporter;

Expand Down Expand Up @@ -133,8 +133,8 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
throw new NotImplementedException();
}

/// <inheritdoc />
public void Dispose()
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
}
Expand Down
8 changes: 8 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Expand Up @@ -2,6 +2,14 @@

## Unreleased

* `ActivityProcessor` implements `IDisposable`.
* When `Dispose` occurs, it calls `ShutdownAsync`.
* If you want a custom behavior for dispose, you will have to override the
`Dispose(bool disposing)`.
* `BatchingActivityProcessor`/`SimpleActivityProcessor` is disposable and it
disposes the containing exporter.
* `BroadcastActivityProcessor`is disposable and it disposes the processors.

## 0.3.0-beta

Released 2020-07-23
Expand Down
24 changes: 23 additions & 1 deletion src/OpenTelemetry/Trace/ActivityProcessor.cs
Expand Up @@ -13,16 +13,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace
{
/// <summary>
/// Activity processor base class.
/// </summary>
public abstract class ActivityProcessor
public abstract class ActivityProcessor : IDisposable
{
/// <summary>
/// Activity start hook.
Expand All @@ -49,5 +52,24 @@ public abstract class ActivityProcessor
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public abstract Task ForceFlushAsync(CancellationToken cancellationToken);

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
}
}
}
25 changes: 6 additions & 19 deletions src/OpenTelemetry/Trace/BatchingActivityProcessor.cs
Expand Up @@ -27,7 +27,7 @@ namespace OpenTelemetry.Trace
/// <summary>
/// Implements processor that batches activities before calling exporter.
/// </summary>
public class BatchingActivityProcessor : ActivityProcessor, IDisposable
public class BatchingActivityProcessor : ActivityProcessor
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
Expand All @@ -44,7 +44,7 @@ public class BatchingActivityProcessor : ActivityProcessor, IDisposable
private readonly SemaphoreSlim flushLock = new SemaphoreSlim(1);
private readonly System.Timers.Timer flushTimer;
private volatile int currentQueueSize;
private bool isDisposed;
private bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters:
Expand Down Expand Up @@ -202,28 +202,15 @@ public override async Task ForceFlushAsync(CancellationToken cancellationToken)
OpenTelemetrySdkEventSource.Log.ForceFlushCompleted(this.currentQueueSize);
}

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
protected override void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.Dispose(disposing);

if (disposing && !this.isDisposed)
if (disposing && !this.disposed)
{
if (this.exporter is IDisposable disposableExporter)
{
Expand All @@ -239,7 +226,7 @@ protected virtual void Dispose(bool disposing)

this.flushTimer.Dispose();
this.flushLock.Dispose();
this.isDisposed = true;
this.disposed = true;
}
}

Expand Down
24 changes: 6 additions & 18 deletions src/OpenTelemetry/Trace/Internal/BroadcastActivityProcessor.cs
Expand Up @@ -24,10 +24,10 @@

namespace OpenTelemetry.Trace.Internal
{
internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable
internal class BroadcastActivityProcessor : ActivityProcessor
{
private readonly IEnumerable<ActivityProcessor> processors;
private bool isDisposed;
private bool disposed;

public BroadcastActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
Expand Down Expand Up @@ -96,23 +96,11 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
return Task.WhenAll(tasks);
}

public void Dispose()
protected override void Dispose(bool disposing)
{
this.Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.Dispose(disposing);

if (disposing && !this.isDisposed)
if (disposing && !this.disposed)
{
foreach (var processor in this.processors)
{
Expand All @@ -129,7 +117,7 @@ protected virtual void Dispose(bool disposing)
}
}

this.isDisposed = true;
this.disposed = true;
}
}
}
Expand Down
19 changes: 3 additions & 16 deletions src/OpenTelemetry/Trace/SimpleActivityProcessor.cs
Expand Up @@ -24,7 +24,7 @@ namespace OpenTelemetry.Trace
/// <summary>
/// Implements simple activity processor that exports activities in OnEnd call without batching.
/// </summary>
public class SimpleActivityProcessor : ActivityProcessor, IDisposable
public class SimpleActivityProcessor : ActivityProcessor
{
private readonly ActivityExporter exporter;
private bool stopped;
Expand Down Expand Up @@ -85,26 +85,13 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
#endif
}

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
protected override void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.Dispose(disposing);

if (disposing)
{
Expand Down
Expand Up @@ -73,7 +73,7 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
#endif
}

public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}
Expand Down
Expand Up @@ -73,7 +73,7 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
#endif
}

public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}
Expand Down
Expand Up @@ -74,7 +74,7 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
#endif
}

public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}
Expand Down
Expand Up @@ -73,7 +73,7 @@ public override Task ForceFlushAsync(CancellationToken cancellationToken)
#endif
}

public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}
Expand Down
Expand Up @@ -194,7 +194,7 @@ public void Dispose()
var currentActivity = Activity.Current;

Activity span;
Assert.Equal(2, activityProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, activityProcessor.Invocations.Count); // begin/end/dispose was called
span = (Activity)activityProcessor.Invocations[1].Arguments[0];

Assert.Equal(routeTemplate ?? HttpContext.Current.Request.Path, span.DisplayName);
Expand Down
Expand Up @@ -66,7 +66,7 @@ public void GrpcClientCallsAreCollectedSuccessfully(string baseAddress)
var rs = client.SayHello(new HelloRequest());
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];

Assert.Equal(parent.TraceId, span.Context.TraceId);
Expand Down Expand Up @@ -117,7 +117,7 @@ public void GrpcAndHttpClientInstrumentationIsInvoked()
var rs = client.SayHello(new HelloRequest());
}

Assert.Equal(4, spanProcessor.Invocations.Count); // begin and end was called for Grpc call and underlying Http call
Assert.Equal(5, spanProcessor.Invocations.Count); // begin and end was called for Grpc call and underlying Http call + dispose
var httpSpan = (Activity)spanProcessor.Invocations[2].Arguments[0];
var grpcSpan = (Activity)spanProcessor.Invocations[3].Arguments[0];

Expand Down
Expand Up @@ -98,7 +98,7 @@ public async Task HttpClientInstrumentationInjectsHeadersAsync()
await c.SendAsync(request);
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];

Assert.Equal(parent.TraceId, span.Context.TraceId);
Expand Down Expand Up @@ -151,7 +151,7 @@ public async Task HttpClientInstrumentationInjectsHeadersAsync_CustomFormat()
await c.SendAsync(request);
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];

Assert.Equal(parent.TraceId, span.Context.TraceId);
Expand Down Expand Up @@ -225,7 +225,7 @@ public async Task HttpClientInstrumentationBacksOffIfAlreadyInstrumented()
await c.SendAsync(request);
}

Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}

[Fact]
Expand All @@ -243,7 +243,7 @@ public async void HttpClientInstrumentationFiltersOutRequests()
await c.GetAsync(this.url);
}

Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}

[Fact]
Expand All @@ -268,7 +268,7 @@ public async Task HttpClientInstrumentationFiltersOutRequestsToExporterEndpoints
}
}

Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}

public void Dispose()
Expand Down
Expand Up @@ -85,7 +85,7 @@ public async Task HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOut
}
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];

Assert.Equal(tc.SpanName, span.DisplayName);
Expand Down
Expand Up @@ -173,7 +173,7 @@ public void SqlClient_BadArgs()
afterExecuteEventData);
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called

VerifyActivityData(sqlCommand.CommandType, sqlCommand.CommandText, captureStoredProcedureCommandName, captureTextCommandContent, false, sqlConnection.DataSource, (Activity)spanProcessor.Invocations[1].Arguments[0]);
}
Expand Down Expand Up @@ -219,7 +219,7 @@ public void SqlClientErrorsAreCollectedSuccessfully(string beforeCommand, string
commandErrorEventData);
}

Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // begin and end was called

VerifyActivityData(sqlCommand.CommandType, sqlCommand.CommandText, true, false, true, sqlConnection.DataSource, (Activity)spanProcessor.Invocations[1].Arguments[0]);
}
Expand Down

0 comments on commit 5cdfd30

Please sign in to comment.