Skip to content

Commit

Permalink
[prometheus] Fix issue with corrupted buffers when reading both OpenM…
Browse files Browse the repository at this point in the history
…etrics and plain text formats (#5623)

Co-authored-by: Piotr Kiełkowicz <pkiekowicz@splunk.com>
Co-authored-by: Vishwesh Bankwar <vishweshbankwar@users.noreply.github.com>
Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
  • Loading branch information
4 people committed May 17, 2024
1 parent b444464 commit 8177a39
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 86 deletions.
4 changes: 4 additions & 0 deletions src/OpenTelemetry.Exporter.Prometheus.AspNetCore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))

## 1.8.0-rc.1

Released 2024-Mar-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public async Task InvokeAsync(HttpContext httpContext)

try
{
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;

if (dataView.Count > 0)
{
response.StatusCode = 200;
#if NET8_0_OR_GREATER
Expand All @@ -69,7 +71,7 @@ public async Task InvokeAsync(HttpContext httpContext)
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await response.Body.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await response.Body.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))

## 1.8.0-rc.1

Released 2024-Mar-27
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ internal sealed class PrometheusCollectionManager
private readonly Dictionary<Metric, PrometheusMetric> metricsCache;
private readonly HashSet<string> scopes;
private int metricsCacheCount;
private byte[] buffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] plainTextBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] openMetricsBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private int targetInfoBufferLength = -1; // zero or positive when target_info has been written for the first time
private ArraySegment<byte> previousPlainTextDataView;
private ArraySegment<byte> previousOpenMetricsDataView;
private int globalLockState;
private ArraySegment<byte> previousDataView;
private DateTime? previousDataViewGeneratedAtUtc;
private DateTime? previousPlainTextDataViewGeneratedAtUtc;
private DateTime? previousOpenMetricsDataViewGeneratedAtUtc;
private int readerCount;
private bool collectionRunning;
private TaskCompletionSource<CollectionResponse> collectionTcs;
Expand All @@ -44,16 +47,20 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)

// If we are within {ScrapeResponseCacheDurationMilliseconds} of the
// last successful collect, return the previous view.
if (this.previousDataViewGeneratedAtUtc.HasValue
var previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;

if (previousDataViewGeneratedAtUtc.HasValue
&& this.scrapeResponseCacheDurationMilliseconds > 0
&& this.previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
&& previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
{
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
#if NET6_0_OR_GREATER
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#else
return Task.FromResult(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return Task.FromResult(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#endif
}

Expand All @@ -78,16 +85,37 @@ public Task<CollectionResponse> EnterCollect(bool openMetricsRequested)

// Start a collection on the current thread.
this.collectionRunning = true;
this.previousDataViewGeneratedAtUtc = null;

if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = null;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = null;
}

Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();

CollectionResponse response;
var result = this.ExecuteCollect(openMetricsRequested);
if (result)
{
this.previousDataViewGeneratedAtUtc = DateTime.UtcNow;
response = new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: false);
if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = DateTime.UtcNow;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = DateTime.UtcNow;
}

previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;

response = new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: false);
}
else
{
Expand Down Expand Up @@ -170,6 +198,7 @@ private bool ExecuteCollect(bool openMetricsRequested)
private ExportResult OnCollect(Batch<Metric> metrics)
{
var cursor = 0;
var buffer = this.exporter.OpenMetricsRequested ? this.openMetricsBuffer : this.plainTextBuffer;

try
{
Expand All @@ -192,13 +221,13 @@ private ExportResult OnCollect(Batch<Metric> metrics)
{
try
{
cursor = PrometheusSerializer.WriteScopeInfo(this.buffer, cursor, metric.MeterName);
cursor = PrometheusSerializer.WriteScopeInfo(buffer, cursor, metric.MeterName);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))

This comment has been minimized.

Copy link
@bbrandt

bbrandt Jun 21, 2024

This change results in a regression causing no output to be displayed when IncreaseBufferSize(...) is needed. The issue seems to be that IncreaseBufferSize(...) is taking a reference to the buffer local variable, but not to this.openMetricsBuffer or this.plainTextBuffer, so these members do not get the updated content after a resize occurs. Then on 294 when we try to access this.openMetricsBuffer or this.plainTextBuffer it is the original buffer instead of the resized buffer.

This comment has been minimized.

Copy link
@bbrandt

bbrandt Jun 21, 2024

Looks like this is fixed by #5676, which has not yet been merged.

{
// there are two cases we might run into the following condition:
// 1. we have many metrics to be exported - in this case we probably want
Expand Down Expand Up @@ -226,7 +255,7 @@ private ExportResult OnCollect(Batch<Metric> metrics)
try
{
cursor = PrometheusSerializer.WriteMetric(
this.buffer,
buffer,
cursor,
metric,
this.GetPrometheusMetric(metric),
Expand All @@ -236,7 +265,7 @@ private ExportResult OnCollect(Batch<Metric> metrics)
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
Expand All @@ -248,24 +277,40 @@ private ExportResult OnCollect(Batch<Metric> metrics)
{
try
{
cursor = PrometheusSerializer.WriteEof(this.buffer, cursor);
cursor = PrometheusSerializer.WriteEof(buffer, cursor);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
}
}

this.previousDataView = new ArraySegment<byte>(this.buffer, 0, cursor);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(this.openMetricsBuffer, 0, cursor);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(this.plainTextBuffer, 0, cursor);
}

return ExportResult.Success;
}
catch (Exception)
{
this.previousDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}

return ExportResult.Failure;
}
}
Expand All @@ -278,13 +323,13 @@ private int WriteTargetInfo()
{
try
{
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.buffer, 0, this.exporter.Resource);
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.openMetricsBuffer, 0, this.exporter.Resource);

break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
Expand All @@ -295,18 +340,18 @@ private int WriteTargetInfo()
return this.targetInfoBufferLength;
}

private bool IncreaseBufferSize()
private bool IncreaseBufferSize(ref byte[] buffer)
{
var newBufferSize = this.buffer.Length * 2;
var newBufferSize = buffer.Length * 2;

if (newBufferSize > 100 * 1024 * 1024)
{
return false;
}

var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;
buffer.CopyTo(newBuffer, 0);
buffer = newBuffer;

return true;
}
Expand All @@ -331,14 +376,17 @@ private PrometheusMetric GetPrometheusMetric(Metric metric)

public readonly struct CollectionResponse
{
public CollectionResponse(ArraySegment<byte> view, DateTime generatedAtUtc, bool fromCache)
public CollectionResponse(ArraySegment<byte> openMetricsView, ArraySegment<byte> plainTextView, DateTime generatedAtUtc, bool fromCache)
{
this.View = view;
this.OpenMetricsView = openMetricsView;
this.PlainTextView = plainTextView;
this.GeneratedAtUtc = generatedAtUtc;
this.FromCache = fromCache;
}

public ArraySegment<byte> View { get; }
public ArraySegment<byte> OpenMetricsView { get; }

public ArraySegment<byte> PlainTextView { get; }

public DateTime GeneratedAtUtc { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,18 @@ private async Task ProcessRequestAsync(HttpListenerContext context)
try
{
context.Response.Headers.Add("Server", string.Empty);
if (collectionResponse.View.Count > 0)

var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;

if (dataView.Count > 0)
{
context.Response.StatusCode = 200;
context.Response.Headers.Add("Last-Modified", collectionResponse.GeneratedAtUtc.ToString("R"));
context.Response.ContentType = openMetricsRequested
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";

await context.Response.OutputStream.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await context.Response.OutputStream.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{
Expand Down
Loading

0 comments on commit 8177a39

Please sign in to comment.