Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exporter - step 5 #1087

Merged
merged 9 commits into from Aug 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/OpenTelemetry/Internal/CircularBuffer.cs
Expand Up @@ -64,6 +64,28 @@ public int Count
}
}

/// <summary>
/// Gets the number of items added to the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long AddedCount
{
get
{
return this.head;
}
}

/// <summary>
/// Gets the number of items removed from the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long RemovedCount
{
get
{
return this.tail;
}
}

/// <summary>
/// Attempts to add the specified item to the buffer.
/// </summary>
Expand Down
52 changes: 48 additions & 4 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Expand Up @@ -28,11 +28,12 @@ namespace OpenTelemetry.Trace
public class BatchExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private readonly int maxQueueSize;
private readonly CircularBuffer<Activity> queue;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly int maxExportBatchSize;
private bool disposed;
private long droppedCount = 0;

/// <summary>
/// Initializes a new instance of the <see cref="BatchExportActivityProcessor"/> class with custom settings.
Expand Down Expand Up @@ -70,17 +71,60 @@ public class BatchExportActivityProcessor : ActivityProcessor
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.queue = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis);
this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis);
this.maxExportBatchSize = maxExportBatchSize;
}

/// <summary>
/// Gets the number of <see cref="Activity"/> dropped (when the queue is full).
/// </summary>
internal long DroppedCount
{
get
{
return this.droppedCount;
}
}

/// <summary>
/// Gets the number of <see cref="Activity"/> received by the processor.
/// </summary>
internal long ReceivedCount
{
get
{
return this.queue.AddedCount + this.DroppedCount;
}
}

/// <summary>
/// Gets the number of <see cref="Activity"/> processed by the underlying exporter.
/// </summary>
internal long ProcessedCount
{
get
{
return this.queue.RemovedCount;
}
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
// TODO
throw new NotImplementedException();
if (this.queue.TryAdd(activity))
{
if (this.queue.Count >= this.maxExportBatchSize)
{
// TODO: signal the exporter
}

return; // enqueue succeeded
}

// drop item on the floor
Interlocked.Increment(ref this.droppedCount);
}

/// <inheritdoc/>
Expand Down