Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrading Elasticsearch to OTel version 1.0.2 #83

Merged
merged 12 commits into from
Mar 10, 2021
Merged

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System;
using OpenTelemetry.Contrib.Instrumentation.ElasticsearchClient.Implementation;
using OpenTelemetry.Instrumentation;
using OpenTelemetry.Trace;

namespace OpenTelemetry.Contrib.Instrumentation.ElasticsearchClient
{
Expand All @@ -30,16 +29,10 @@ internal class ElasticsearchClientInstrumentation : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="ElasticsearchClientInstrumentation"/> class.
/// </summary>
/// <param name="activitySource">ActivitySource adapter instance.</param>
/// <param name="options">Configuration options for Elasticsearch client instrumentation.</param>
public ElasticsearchClientInstrumentation(ActivitySourceAdapter activitySource, ElasticsearchClientInstrumentationOptions options)
public ElasticsearchClientInstrumentation(ElasticsearchClientInstrumentationOptions options)
{
if (activitySource == null)
{
throw new ArgumentNullException(nameof(activitySource));
}

this.diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(new ElasticsearchRequestPipelineDiagnosticListener(activitySource, options), null);
this.diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(new ElasticsearchRequestPipelineDiagnosticListener(options), null);
this.diagnosticSourceSubscriber.Subscribe();
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// <copyright file="ActivityInstrumentationHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using System.Linq.Expressions;

namespace OpenTelemetry.Instrumentation
{
internal static class ActivityInstrumentationHelper
{
internal static readonly Action<Activity, ActivityKind> SetKindProperty = CreateActivityKindSetter();
internal static readonly Action<Activity, ActivitySource> SetActivitySourceProperty = CreateActivitySourceSetter();

private static Action<Activity, ActivitySource> CreateActivitySourceSetter()
{
ParameterExpression instance = Expression.Parameter(typeof(Activity), "instance");
ParameterExpression propertyValue = Expression.Parameter(typeof(ActivitySource), "propertyValue");
var body = Expression.Assign(Expression.Property(instance, "Source"), propertyValue);
return Expression.Lambda<Action<Activity, ActivitySource>>(body, instance, propertyValue).Compile();
}

private static Action<Activity, ActivityKind> CreateActivityKindSetter()
{
ParameterExpression instance = Expression.Parameter(typeof(Activity), "instance");
ParameterExpression propertyValue = Expression.Parameter(typeof(ActivityKind), "propertyValue");
var body = Expression.Assign(Expression.Property(instance, "Kind"), propertyValue);
return Expression.Lambda<Action<Activity, ActivityKind>>(body, instance, propertyValue).Compile();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// <copyright file="DiagnosticSourceListener.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;

namespace OpenTelemetry.Instrumentation
{
internal class DiagnosticSourceListener : IObserver<KeyValuePair<string, object>>
{
private readonly ListenerHandler handler;

public DiagnosticSourceListener(ListenerHandler handler)
{
this.handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public void OnCompleted()
{
}

public void OnError(Exception error)
{
}

public void OnNext(KeyValuePair<string, object> value)
{
if (!this.handler.SupportsNullActivity && Activity.Current == null)
{
InstrumentationEventSource.Log.NullActivity(value.Key);

return;
}

try
{
if (value.Key.EndsWith("Start", StringComparison.Ordinal))
{
this.handler.OnStartActivity(Activity.Current, value.Value);
}
else if (value.Key.EndsWith("Stop", StringComparison.Ordinal))
{
this.handler.OnStopActivity(Activity.Current, value.Value);
}
else if (value.Key.EndsWith("Exception", StringComparison.Ordinal))
{
this.handler.OnException(Activity.Current, value.Value);
}
else
{
this.handler.OnCustom(value.Key, Activity.Current, value.Value);
}
}
catch (Exception ex)
{
InstrumentationEventSource.Log.UnknownErrorProcessingEvent(this.handler?.SourceName, value.Key, ex);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// <copyright file="DiagnosticSourceSubscriber.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace OpenTelemetry.Instrumentation
{
internal class DiagnosticSourceSubscriber : IDisposable, IObserver<DiagnosticListener>
{
private readonly Func<string, ListenerHandler> handlerFactory;
private readonly Func<DiagnosticListener, bool> diagnosticSourceFilter;
private readonly Func<string, object, object, bool> isEnabledFilter;
private long disposed;
private IDisposable allSourcesSubscription;
private List<IDisposable> listenerSubscriptions;

public DiagnosticSourceSubscriber(
ListenerHandler handler,
Func<string, object, object, bool> isEnabledFilter)
: this(_ => handler, value => handler.SourceName == value.Name, isEnabledFilter)
{
}

public DiagnosticSourceSubscriber(
Func<string, ListenerHandler> handlerFactory,
Func<DiagnosticListener, bool> diagnosticSourceFilter,
Func<string, object, object, bool> isEnabledFilter)
{
this.listenerSubscriptions = new List<IDisposable>();
this.handlerFactory = handlerFactory ?? throw new ArgumentNullException(nameof(handlerFactory));
this.diagnosticSourceFilter = diagnosticSourceFilter;
this.isEnabledFilter = isEnabledFilter;
}

public void Subscribe()
{
if (this.allSourcesSubscription == null)
{
this.allSourcesSubscription = DiagnosticListener.AllListeners.Subscribe(this);
}
}

public void OnNext(DiagnosticListener value)
{
if ((Interlocked.Read(ref this.disposed) == 0) &&
this.diagnosticSourceFilter(value))
{
var handler = this.handlerFactory(value.Name);
var listener = new DiagnosticSourceListener(handler);
var subscription = this.isEnabledFilter == null ?
value.Subscribe(listener) :
value.Subscribe(listener, this.isEnabledFilter);

lock (this.listenerSubscriptions)
{
this.listenerSubscriptions.Add(subscription);
}
}
}

public void OnCompleted()
{
}

public void OnError(Exception error)
{
}

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) == 1)
{
return;
}

lock (this.listenerSubscriptions)
{
foreach (var listenerSubscription in this.listenerSubscriptions)
{
listenerSubscription?.Dispose();
}

this.listenerSubscriptions.Clear();
}

this.allSourcesSubscription?.Dispose();
this.allSourcesSubscription = null;
}
}
}
Loading