Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prototype] Support ActivityLinks #2185

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void AzureClientSpansAreCollected()
Assert.AreEqual(sendActivity.SpanId.ToHexString(), telemetry.Id);

Assert.AreEqual("v1", telemetry.Properties["k1"]);

Assert.IsTrue(telemetry.Properties.TryGetValue("tracestate", out var tracestate));
Assert.AreEqual("state=some", tracestate);
}
Expand Down Expand Up @@ -608,6 +608,65 @@ public void AzureClientSpansAreCollectedLinks()
}
}

[TestMethod]
public void AzureClientSpansAreCollectedLinksFromActivitySource()
{
// Create the activity listener to make sure that activitySource.StartActivity returns a value
using (var activityListener = new ActivityListener()
{
ShouldListenTo = source => source.Name == "Azure.SomeClient",
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded
})
using (var listener = new DiagnosticListener("Azure.SomeClient"))
using (var activitySource = new ActivitySource("Azure.SomeClient"))
using (var module = new DependencyTrackingTelemetryModule())
{
ActivitySource.AddActivityListener(activityListener);

module.Initialize(this.configuration);

var link0TraceId = "70545f717a9aa6a490d820438b9d2bf6";
var link1TraceId = "c5aa06717eef0c4592af26323ade92f7";
var link0SpanId = "8b0b2fb40c84e64a";
var link1SpanId = "3a69ce690411bb4f";

var sendActivity = activitySource.StartActivity("Azure.SomeClient.Send", ActivityKind.Consumer, default(ActivityContext), links: new []
{
new ActivityLink(new ActivityContext(ActivityTraceId.CreateFromString(link0TraceId.AsSpan()), ActivitySpanId.CreateFromString(link0SpanId.AsSpan()), ActivityTraceFlags.None)),
new ActivityLink(new ActivityContext(ActivityTraceId.CreateFromString(link1TraceId.AsSpan()), ActivitySpanId.CreateFromString(link1SpanId.AsSpan()), ActivityTraceFlags.None))
});

listener.Write(sendActivity.OperationName + ".Start", sendActivity);
listener.Write(sendActivity.OperationName + ".Stop", null);

var telemetry = this.sentItems.Last() as DependencyTelemetry;

Assert.IsNotNull(telemetry);
Assert.AreEqual("SomeClient.Send", telemetry.Name);
Assert.IsTrue(telemetry.Success.Value);

Assert.IsNull(telemetry.Context.Operation.ParentId);
Assert.AreEqual(sendActivity.TraceId.ToHexString(), telemetry.Context.Operation.Id);
Assert.AreEqual(sendActivity.SpanId.ToHexString(), telemetry.Id);

// does not throw
Assert.IsTrue(telemetry.Properties.TryGetValue("_MS.links", out var linksStr));
var actualLinks = JsonConvert.DeserializeObject<ApplicationInsightsLink[]>(linksStr, JsonSettingThrowOnError);

Assert.IsNotNull(actualLinks);
Assert.AreEqual(2, actualLinks.Length);

Assert.AreEqual(link0TraceId, actualLinks[0].OperationId);
Assert.AreEqual(link1TraceId, actualLinks[1].OperationId);

Assert.AreEqual(link0SpanId, actualLinks[0].Id);
Assert.AreEqual(link1SpanId, actualLinks[1].Id);

Assert.AreEqual($"[{{\"operation_Id\":\"{link0TraceId}\",\"id\":\"{link0SpanId}\"}},{{\"operation_Id\":\"{link1TraceId}\",\"id\":\"{link1SpanId}\"}}]", linksStr);
Assert.IsFalse(telemetry.Metrics.Any());
}
}

[TestMethod]
public void AzureServerSpansAreCollectedLinks()
{
Expand Down Expand Up @@ -847,7 +906,7 @@ public void AzureClientSpansAreCollectedForHttp()
.AddTag("serviceRequestId", "service-request-id");

listener.StopActivity(httpActivity, payload);

var telemetry = this.sentItems.Last() as DependencyTelemetry;

Assert.IsNotNull(telemetry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace Microsoft.ApplicationInsights.DependencyCollector.Implementation
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -64,15 +65,44 @@ public override void OnEvent(KeyValuePair<string, object> evnt, DiagnosticListen
SetEventHubsProperties(currentActivity, telemetry);
}

if (this.linksPropertyFetcher.Fetch(evnt.Value) is IEnumerable<Activity> activityLinks)
{
PopulateLinks(activityLinks, telemetry);
IEnumerable<ActivityLink> activityLinks;

if (telemetry is RequestTelemetry request &&
TryGetAverageTimeInQueueForBatch(activityLinks, currentActivity.StartTimeUtc, out long enqueuedTime))
// Pre ActivitySource Azure SDKs are using custom activity subtype with
// public IEnumerable<Activity> Links { get; }
// property to represent links
if (evnt.Value.GetType() != typeof(Activity) &&
pakrym marked this conversation as resolved.
Show resolved Hide resolved
this.linksPropertyFetcher.Fetch(evnt.Value) is IEnumerable<Activity> links &&
links.Any())
{
List<ActivityLink> convertedLinks = new List<ActivityLink>();
foreach (var linksAsActivity in links)
{
request.Metrics["timeSinceEnqueued"] = enqueuedTime;
if (linksAsActivity.ParentId != null &&
ActivityContext.TryParse(linksAsActivity.ParentId, null, out var context))
{
ActivityTagsCollection tags = null;
if (linksAsActivity.TagObjects.Any())
{
tags = new ActivityTagsCollection(linksAsActivity.TagObjects);
}

convertedLinks.Add(new ActivityLink(context, tags));
}
}

activityLinks = convertedLinks;
}
else
{
activityLinks = currentActivity.Links;
pakrym marked this conversation as resolved.
Show resolved Hide resolved
}

PopulateLinks(activityLinks, telemetry);

if (telemetry is RequestTelemetry request &&
TryGetAverageTimeInQueueForBatch(activityLinks, currentActivity.StartTimeUtc, out long enqueuedTime))
{
request.Metrics["timeSinceEnqueued"] = enqueuedTime;
}

this.operationHolder.Store(currentActivity, Tuple.Create(telemetry, /* isCustomCreated: */ false));
Expand Down Expand Up @@ -142,7 +172,7 @@ protected override bool IsOperationSuccessful(string eventName, object eventPayl
return true;
}

private static bool TryGetAverageTimeInQueueForBatch(IEnumerable<Activity> links, DateTimeOffset requestStartTime, out long avgTimeInQueue)
private static bool TryGetAverageTimeInQueueForBatch(IEnumerable<ActivityLink> links, DateTimeOffset requestStartTime, out long avgTimeInQueue)
{
avgTimeInQueue = 0;
int linksCount = 0;
Expand All @@ -153,7 +183,7 @@ private static bool TryGetAverageTimeInQueueForBatch(IEnumerable<Activity> links
// instrumentation does not consistently report enqueued time, ignoring whole span
return false;
}

long startEpochTime = 0;
#if NET452
startEpochTime = (long)(requestStartTime - EpochStart).TotalMilliseconds;
Expand All @@ -173,16 +203,19 @@ private static bool TryGetAverageTimeInQueueForBatch(IEnumerable<Activity> links
return true;
}

private static bool TryGetEnqueuedTime(Activity link, out long enqueuedTime)
private static bool TryGetEnqueuedTime(ActivityLink link, out long enqueuedTime)
{
enqueuedTime = 0;
foreach (var attribute in link.Tags)
if (link.Tags != null)
{
if (attribute.Key == "enqueuedTime")
foreach (var attribute in link.Tags)
{
if (attribute.Value is string strValue)
if (attribute.Key == "enqueuedTime")
{
return long.TryParse(strValue, out enqueuedTime);
if (attribute.Value is string strValue)
{
return long.TryParse(strValue, out enqueuedTime);
}
}
}
}
Expand Down Expand Up @@ -312,7 +345,7 @@ private static void SetEventHubsProperties(Activity activity, OperationTelemetry
return;
}

// Target uniquely identifies the resource, we use both: queueName and endpoint
// Target uniquely identifies the resource, we use both: queueName and endpoint
// with schema used for SQL-dependencies
string separator = "/";
if (endpoint.EndsWith(separator, StringComparison.Ordinal))
Expand All @@ -332,15 +365,15 @@ private static void SetEventHubsProperties(Activity activity, OperationTelemetry
}
}

private static void PopulateLinks(IEnumerable<Activity> links, OperationTelemetry telemetry)
private static void PopulateLinks(IEnumerable<ActivityLink> links, OperationTelemetry telemetry)
{
if (links.Any())
if (links != null && links.Any())
{
var linksJson = new StringBuilder();
linksJson.Append('[');
foreach (var link in links)
{
var linkTraceId = link.TraceId.ToHexString();
var linkTraceId = link.Context.TraceId.ToHexString();

// avoiding json serializers for now because of extra dependency.
// serialization is trivial and looks like `links` property with json blob
Expand All @@ -356,7 +389,7 @@ private static void PopulateLinks(IEnumerable<Activity> links, OperationTelemetr
linksJson
.Append("\"id\":")
.Append('\"')
.Append(link.ParentSpanId.ToHexString())
.Append(link.Context.SpanId.ToHexString())
.Append('\"');

// we explicitly ignore sampling flag, tracestate and attributes at this point.
Expand Down