Skip to content

Commit

Permalink
More determinitic transaction trace aggregation in threaded scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
JcolemanNR committed Jun 24, 2022
1 parent 9cdf2e9 commit 04c9233
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 49 deletions.
Expand Up @@ -43,45 +43,44 @@ public override void Collect(TransactionTraceWireModelComponents transactionTrac

protected override void Harvest()
{
var traces = _transactionCollectors
var traceSamples = _transactionCollectors
.Where(t => t != null)
.SelectMany(t => t.GetCollectedSamples())
.Distinct()
.ToList();

var traceWireModels = traceSamples
.Select(t => t.CreateWireModel())
.ToList();

if (!traces.Any())
if (!traceWireModels.Any())
return;

LogUnencodedTraceData(traces);
LogUnencodedTraceData(traceWireModels);

var responseStatus = DataTransportService.Send(traces);
HandleResponse(responseStatus, traces);
var responseStatus = DataTransportService.Send(traceWireModels);
HandleResponse(responseStatus, traceSamples);
}

private void HandleResponse(DataTransportResponseStatus responseStatus, ICollection<TransactionTraceWireModel> traces)
private void HandleResponse(DataTransportResponseStatus responseStatus, ICollection<TransactionTraceWireModelComponents> traceSamples)
{
switch (responseStatus)
{
case DataTransportResponseStatus.RequestSuccessful:
ClearTransactionTraces(); // Only clear traces after successfully sending data
break;
case DataTransportResponseStatus.Retain:
case DataTransportResponseStatus.ReduceSizeIfPossibleOtherwiseDiscard:
case DataTransportResponseStatus.Discard:
default:
// Feed collected samples back in if we didn't successfully send them
foreach (var traceSample in traceSamples)
{
Collect(traceSample);
}
break;
}
}

private void ClearTransactionTraces()
{
foreach (var transactionCollector in _transactionCollectors)
{
transactionCollector?.ClearCollectedSamples();
}
}

protected override void OnConfigurationUpdated(ConfigurationUpdateSource configurationUpdateSource)
{
// It is *CRITICAL* that this method never do anything more complicated than clearing data and starting and ending subscriptions.
Expand Down
Expand Up @@ -23,7 +23,5 @@ public interface ITransactionCollector
/// Returns an immutable enumerable of samples and clears the sample storage.
/// </summary>
IEnumerable<TransactionTraceWireModelComponents> GetCollectedSamples();

void ClearCollectedSamples();
}
}
Expand Up @@ -2,17 +2,19 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NewRelic.Agent.Core.Configuration;
using NewRelic.Agent.Core.Transformers.TransactionTransformer;

namespace NewRelic.Agent.Core.TransactionTraces
{
public class KeyTransactionCollector : ITransactionCollector, IDisposable
{
private volatile TransactionTraceWireModelComponents _slowTransaction;
private double _score = 0.0;
private volatile ConcurrentDictionary<double, TransactionTraceWireModelComponents> _keyTransactions =
new ConcurrentDictionary<double, TransactionTraceWireModelComponents>();

protected ConfigurationSubscriber ConfigurationSubscription = new ConfigurationSubscriber();

Expand All @@ -28,23 +30,26 @@ public void Collect(TransactionTraceWireModelComponents transactionTraceWireMode

// larger the score, the larger the diff
var score = 100.0 * (transactionTraceWireModelComponents.Duration.TotalMilliseconds / apdexTime.TotalMilliseconds);
if (_slowTransaction != null && _score > score)
return;

_slowTransaction = transactionTraceWireModelComponents;
_score = score;
// If there aren't any lower scores than what we currently encountered, then add this one to the collection
if (!_keyTransactions.Any(x => x.Key < score))
{
_keyTransactions[score] = transactionTraceWireModelComponents;
}
}

public IEnumerable<TransactionTraceWireModelComponents> GetCollectedSamples()
{
var slowTransaction = _slowTransaction;
return slowTransaction == null ? Enumerable.Empty<TransactionTraceWireModelComponents>() :
new TransactionTraceWireModelComponents[] { slowTransaction };
}
var harvestedKeyTransactions = Interlocked.Exchange(ref _keyTransactions,
new ConcurrentDictionary<double, TransactionTraceWireModelComponents>());

public void ClearCollectedSamples()
{
_slowTransaction = null;
if (harvestedKeyTransactions.Count == 0)
{
return Enumerable.Empty<TransactionTraceWireModelComponents>();
}

var worstScoredTransaction = harvestedKeyTransactions.Aggregate((x, y) => x.Key < y.Key ? x : y).Value;
return new TransactionTraceWireModelComponents[] { worstScoredTransaction };
}

public void Dispose()
Expand Down
Expand Up @@ -6,36 +6,43 @@
using NewRelic.Agent.Core.Configuration;
using NewRelic.Agent.Core.Transformers.TransactionTransformer;
using System.Linq;
using System.Collections.Concurrent;
using System.Threading;

namespace NewRelic.Agent.Core.TransactionTraces
{
public class SlowestTransactionCollector : ITransactionCollector, IDisposable
{
private volatile TransactionTraceWireModelComponents _slowTransaction;
private volatile ConcurrentBag<TransactionTraceWireModelComponents> _slowTransactions = new ConcurrentBag<TransactionTraceWireModelComponents>();

protected ConfigurationSubscriber ConfigurationSubscription = new ConfigurationSubscriber();

public void Collect(TransactionTraceWireModelComponents transactionTraceWireModelComponents)
{
if (transactionTraceWireModelComponents.Duration <= ConfigurationSubscription.Configuration.TransactionTraceThreshold)
{
return;
}

if (_slowTransaction != null && _slowTransaction.Duration > transactionTraceWireModelComponents.Duration)
return;

_slowTransaction = transactionTraceWireModelComponents;
// If this is the slowest transaction so far, save it!
if (!_slowTransactions.Any(x => x.Duration > transactionTraceWireModelComponents.Duration))
{
_slowTransactions.Add(transactionTraceWireModelComponents);
}
}

public IEnumerable<TransactionTraceWireModelComponents> GetCollectedSamples()
{
var slowTransaction = _slowTransaction;
return slowTransaction == null ? Enumerable.Empty<TransactionTraceWireModelComponents>() :
new TransactionTraceWireModelComponents[] { slowTransaction };
}
var harvestedSlowTransactions = Interlocked.Exchange(ref _slowTransactions,
new ConcurrentBag<TransactionTraceWireModelComponents>());

public void ClearCollectedSamples()
{
_slowTransaction = null;
if (harvestedSlowTransactions.Count == 0)
{
return Enumerable.Empty<TransactionTraceWireModelComponents>();
}

var slowestTransaction = harvestedSlowTransactions.Aggregate((x, y) => x.Duration > y.Duration ? x : y);
return new TransactionTraceWireModelComponents[] { slowestTransaction };
}

public void Dispose()
Expand Down
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using NewRelic.Agent.Core.Configuration;
using NewRelic.Agent.Core.Transactions;
using NewRelic.Agent.Core.Transformers.TransactionTransformer;
Expand All @@ -12,7 +14,7 @@ namespace NewRelic.Agent.Core.TransactionTraces
{
public class SyntheticsTransactionCollector : ITransactionCollector, IDisposable
{
private volatile ICollection<TransactionTraceWireModelComponents> _collectedSamples = new ConcurrentHashSet<TransactionTraceWireModelComponents>();
private volatile ConcurrentBag<TransactionTraceWireModelComponents> _collectedSamples = new ConcurrentBag<TransactionTraceWireModelComponents>();

private readonly ConfigurationSubscriber _configurationSubscription = new ConfigurationSubscriber();

Expand All @@ -28,13 +30,10 @@ public void Collect(TransactionTraceWireModelComponents transactionTraceWireMode

public IEnumerable<TransactionTraceWireModelComponents> GetCollectedSamples()
{
var oldCollectedSamples = _collectedSamples;
return oldCollectedSamples;
}
var harvestedTransactions = Interlocked.Exchange(ref _collectedSamples,
new ConcurrentBag<TransactionTraceWireModelComponents>());

public void ClearCollectedSamples()
{
_collectedSamples = new ConcurrentHashSet<TransactionTraceWireModelComponents>();
return harvestedTransactions;
}

public void Dispose()
Expand Down

0 comments on commit 04c9233

Please sign in to comment.