diff --git a/src/OpenTelemetry.Exporter.InMemory/InMemoryExporterLoggingExtensions.cs b/src/OpenTelemetry.Exporter.InMemory/InMemoryExporterLoggingExtensions.cs index ca67d4f3be5..6e10e5bd668 100644 --- a/src/OpenTelemetry.Exporter.InMemory/InMemoryExporterLoggingExtensions.cs +++ b/src/OpenTelemetry.Exporter.InMemory/InMemoryExporterLoggingExtensions.cs @@ -42,9 +42,7 @@ private static ExportResult ExportLogRecord(in Batch batch, ICollecti foreach (var log in batch) { - log.BufferLogScopes(); - - exportedItems.Add(log); + exportedItems.Add(log.Copy()); } return ExportResult.Success; diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index b518e8c5d05..537a19a6c75 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -22,6 +22,7 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using OpenTelemetry.Internal; +using OpenTelemetry.Logs; namespace OpenTelemetry { @@ -90,7 +91,11 @@ public void Dispose() // Drain anything left in the batch. while (this.circularBuffer.RemovedCount < this.targetCount) { - this.circularBuffer.Read(); + T item = this.circularBuffer.Read(); + if (typeof(T) == typeof(LogRecord)) + { + LogRecordSharedPool.Current.Return((LogRecord)(object)item); + } } } } @@ -140,6 +145,32 @@ public struct Enumerator : IEnumerator return false; }; + private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBufferLogRecord = (ref Enumerator enumerator) => + { + // Note: This type check here is to give the JIT a hint it can + // remove all of this code when T != LogRecord + if (typeof(T) == typeof(LogRecord)) + { + var circularBuffer = enumerator.circularBuffer; + + var currentItem = enumerator.Current; + if (currentItem != null) + { + LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + } + + if (circularBuffer!.RemovedCount < enumerator.targetCount) + { + enumerator.current = circularBuffer.Read(); + return true; + } + + enumerator.current = null; + } + + return false; + }; + private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) => { var items = enumerator.items; @@ -179,7 +210,7 @@ internal Enumerator(CircularBuffer circularBuffer, long targetCount) this.circularBuffer = circularBuffer; this.targetCount = targetCount; this.itemIndex = 0; - this.moveNextFunc = MoveNextCircularBuffer; + this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer; } internal Enumerator(T[] items, long targetCount) @@ -201,6 +232,15 @@ internal Enumerator(T[] items, long targetCount) /// public void Dispose() { + if (typeof(T) == typeof(LogRecord)) + { + var currentItem = this.current; + if (currentItem != null) + { + LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem); + this.current = null; + } + } } /// diff --git a/src/OpenTelemetry/BatchExportProcessor.cs b/src/OpenTelemetry/BatchExportProcessor.cs index d42c9464325..5fdc2c8893f 100644 --- a/src/OpenTelemetry/BatchExportProcessor.cs +++ b/src/OpenTelemetry/BatchExportProcessor.cs @@ -18,6 +18,7 @@ using System; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using OpenTelemetry.Internal; @@ -95,8 +96,8 @@ protected BatchExportProcessor( /// internal long ProcessedCount => this.circularBuffer.RemovedCount; - /// - protected override void OnExport(T data) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool TryExport(T data) { if (this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) { @@ -111,11 +112,19 @@ protected override void OnExport(T data) } } - return; // enqueue succeeded + return true; // enqueue succeeded } // either the queue is full or exceeded the spin limit, drop the item on the floor Interlocked.Increment(ref this.droppedCount); + + return false; + } + + /// + protected override void OnExport(T data) + { + this.TryExport(data); } /// diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 05ee2bcfda2..3b5674e684a 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -25,6 +25,9 @@ * Handle possible exception when initializing the default service name. ([#3405](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3405)) +* `LogRecord` instances are now reused to reduce memory pressure + ([#3385](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3385)) + ## 1.3.0 Released 2022-Jun-03 diff --git a/src/OpenTelemetry/CompositeProcessor.cs b/src/OpenTelemetry/CompositeProcessor.cs index a9546ab179a..38a2babf9c9 100644 --- a/src/OpenTelemetry/CompositeProcessor.cs +++ b/src/OpenTelemetry/CompositeProcessor.cs @@ -26,7 +26,7 @@ namespace OpenTelemetry { public class CompositeProcessor : BaseProcessor { - private readonly DoublyLinkedListNode head; + internal readonly DoublyLinkedListNode Head; private DoublyLinkedListNode tail; private bool disposed; @@ -40,8 +40,8 @@ public CompositeProcessor(IEnumerable> processors) throw new ArgumentException($"'{iter}' is null or empty", nameof(iter)); } - this.head = new DoublyLinkedListNode(iter.Current); - this.tail = this.head; + this.Head = new DoublyLinkedListNode(iter.Current); + this.tail = this.Head; while (iter.MoveNext()) { @@ -66,7 +66,7 @@ public CompositeProcessor AddProcessor(BaseProcessor processor) /// public override void OnEnd(T data) { - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { cur.Value.OnEnd(data); } @@ -75,7 +75,7 @@ public override void OnEnd(T data) /// public override void OnStart(T data) { - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { cur.Value.OnStart(data); } @@ -85,7 +85,7 @@ internal override void SetParentProvider(BaseProvider parentProvider) { base.SetParentProvider(parentProvider); - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { cur.Value.SetParentProvider(parentProvider); } @@ -99,7 +99,7 @@ protected override bool OnForceFlush(int timeoutMilliseconds) ? null : Stopwatch.StartNew(); - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { if (sw == null) { @@ -125,7 +125,7 @@ protected override bool OnShutdown(int timeoutMilliseconds) ? null : Stopwatch.StartNew(); - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { if (sw == null) { @@ -150,7 +150,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - for (var cur = this.head; cur != null; cur = cur.Next) + for (var cur = this.Head; cur != null; cur = cur.Next) { try { @@ -169,7 +169,7 @@ protected override void Dispose(bool disposing) base.Dispose(disposing); } - private class DoublyLinkedListNode + internal sealed class DoublyLinkedListNode { public readonly BaseProcessor Value; diff --git a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs index 8eb8cebe579..644f160dbb0 100644 --- a/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs +++ b/src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs @@ -57,9 +57,14 @@ public override void OnEnd(LogRecord data) // happen here. Debug.Assert(data != null, "LogRecord was null."); - data!.BufferLogScopes(); + data!.Buffer(); - base.OnEnd(data); + data.AddReference(); + + if (!this.TryExport(data)) + { + LogRecordSharedPool.Current.Return(data); + } } } } diff --git a/src/OpenTelemetry/Logs/LogRecord.cs b/src/OpenTelemetry/Logs/LogRecord.cs index 98c937e7244..61c8199391c 100644 --- a/src/OpenTelemetry/Logs/LogRecord.cs +++ b/src/OpenTelemetry/Logs/LogRecord.cs @@ -19,6 +19,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; using Microsoft.Extensions.Logging; using OpenTelemetry.Internal; @@ -30,15 +32,21 @@ namespace OpenTelemetry.Logs public sealed class LogRecord { internal LogRecordData Data; + internal List>? AttributeStorage; + internal List? BufferedScopes; + internal int PoolReferenceCount = int.MaxValue; private static readonly Action> AddScopeToBufferedList = (object? scope, List state) => { state.Add(scope); }; - private List? bufferedScopes; + internal LogRecord() + { + } // Note: Some users are calling this with reflection. Try not to change the signature to be nice. + [Obsolete("Call LogRecordPool.Rent instead.")] internal LogRecord( IExternalScopeProvider? scopeProvider, DateTime timestamp, @@ -191,9 +199,9 @@ public void ForEachScope(Action callback, TState var forEachScopeState = new ScopeForEachState(callback, state); - if (this.bufferedScopes != null) + if (this.BufferedScopes != null) { - foreach (object? scope in this.bufferedScopes) + foreach (object? scope in this.BufferedScopes) { ScopeForEachState.ForEachScope(scope, forEachScopeState); } @@ -213,22 +221,99 @@ internal ref LogRecordData GetDataRef() return ref this.Data; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void ResetReferenceCount() + { + this.PoolReferenceCount = 1; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void AddReference() + { + Interlocked.Increment(ref this.PoolReferenceCount); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal int RemoveReference() + { + return Interlocked.Decrement(ref this.PoolReferenceCount); + } + + // Note: Typically called when LogRecords are added into a batch so they + // can be safely processed outside of the log call chain. + internal void Buffer() + { + // Note: State values are buffered because some states are not safe + // to access outside of the log call chain. See: + // https://github.com/open-telemetry/opentelemetry-dotnet/issues/2905 + this.BufferLogStateValues(); + + this.BufferLogScopes(); + + // Note: There is no buffering of "State" only "StateValues". We + // don't inspect "object State" at all. It is undefined what + // exporters will do with "State". Some might ignore it, some might + // attempt to access it as a list. That is potentially dangerous. + // TODO: Investigate what to do here. Should we obsolete State and + // just use the StateValues design? + } + + internal LogRecord Copy() + { + // Note: We only buffer scopes here because state values are copied + // directly below. + this.BufferLogScopes(); + + return new() + { + Data = this.Data, + State = this.State, + StateValues = this.StateValues == null ? null : new List>(this.StateValues), + BufferedScopes = this.BufferedScopes == null ? null : new List(this.BufferedScopes), + }; + } + + /// + /// Buffers the state values attached to the log into a list so that + /// they can be safely processed after the log message lifecycle has + /// ended. + /// + private void BufferLogStateValues() + { + var stateValues = this.StateValues; + if (stateValues == null || stateValues == this.AttributeStorage) + { + return; + } + + var attributeStorage = this.AttributeStorage ??= new List>(stateValues.Count); + + // Note: AddRange here will copy all of the KeyValuePairs from + // stateValues to AttributeStorage. This "captures" the state and + // fixes issues where the values are generated at enumeration time + // like + // https://github.com/open-telemetry/opentelemetry-dotnet/issues/2905. + attributeStorage.AddRange(stateValues); + + this.StateValues = attributeStorage; + } + /// /// Buffers the scopes attached to the log into a list so that they can /// be safely processed after the log message lifecycle has ended. /// - internal void BufferLogScopes() + private void BufferLogScopes() { - if (this.ScopeProvider == null || this.bufferedScopes != null) + if (this.ScopeProvider == null) { return; } - List scopes = new List(); + List scopes = this.BufferedScopes ??= new List(LogRecordPoolHelper.DefaultMaxNumberOfScopes); - this.ScopeProvider?.ForEachScope(AddScopeToBufferedList, scopes); + this.ScopeProvider.ForEachScope(AddScopeToBufferedList, scopes); - this.bufferedScopes = scopes; + this.ScopeProvider = null; } private readonly struct ScopeForEachState diff --git a/src/OpenTelemetry/Logs/OpenTelemetryLogger.cs b/src/OpenTelemetry/Logs/OpenTelemetryLogger.cs index 8a128682d93..88b799434dd 100644 --- a/src/OpenTelemetry/Logs/OpenTelemetryLogger.cs +++ b/src/OpenTelemetry/Logs/OpenTelemetryLogger.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using OpenTelemetry.Internal; @@ -52,20 +53,32 @@ public void Log(LogLevel logLevel, EventId eventId, TState state, Except var processor = provider.Processor; if (processor != null) { - var record = new LogRecord( - provider.IncludeScopes ? this.ScopeProvider : null, - DateTime.UtcNow, - this.categoryName, - logLevel, - eventId, - provider.IncludeFormattedMessage ? formatter?.Invoke(state, exception) : null, - provider.ParseStateValues ? null : state, - exception, - provider.ParseStateValues ? this.ParseState(state) : null); + var pool = provider.LogRecordPool; + + var record = pool.Rent(); + + record.ScopeProvider = provider.IncludeScopes ? this.ScopeProvider : null; + record.State = provider.ParseStateValues ? null : state; + record.StateValues = provider.ParseStateValues ? ParseState(record, state) : null; + + ref LogRecordData data = ref record.Data; + + data.TimestampBacking = DateTime.UtcNow; + data.CategoryName = this.categoryName; + data.LogLevel = logLevel; + data.EventId = eventId; + data.Message = provider.IncludeFormattedMessage ? formatter?.Invoke(state, exception) : null; + data.Exception = exception; + + LogRecordData.SetActivityContext(ref data, Activity.Current); processor.OnEnd(record); record.ScopeProvider = null; + + // Attempt to return the LogRecord to the pool. This will no-op + // if a batch exporter has added a reference. + pool.Return(record); } } @@ -77,7 +90,7 @@ public bool IsEnabled(LogLevel logLevel) public IDisposable BeginScope(TState state) => this.ScopeProvider?.Push(state) ?? NullScope.Instance; - private IReadOnlyList> ParseState(TState state) + private static IReadOnlyList> ParseState(LogRecord logRecord, TState state) { if (state is IReadOnlyList> stateList) { @@ -85,14 +98,22 @@ public bool IsEnabled(LogLevel logLevel) } else if (state is IEnumerable> stateValues) { - return new List>(stateValues); + var attributeStorage = logRecord.AttributeStorage; + if (attributeStorage == null) + { + return logRecord.AttributeStorage = new List>(stateValues); + } + else + { + attributeStorage.AddRange(stateValues); + return attributeStorage; + } } else { - return new List> - { - new KeyValuePair(string.Empty, state), - }; + var attributeStorage = logRecord.AttributeStorage ??= new List>(LogRecordPoolHelper.DefaultMaxNumberOfAttributes); + attributeStorage.Add(new KeyValuePair(string.Empty, state)); + return attributeStorage; } } diff --git a/src/OpenTelemetry/Logs/OpenTelemetryLoggerProvider.cs b/src/OpenTelemetry/Logs/OpenTelemetryLoggerProvider.cs index c057445c84b..221dbd11651 100644 --- a/src/OpenTelemetry/Logs/OpenTelemetryLoggerProvider.cs +++ b/src/OpenTelemetry/Logs/OpenTelemetryLoggerProvider.cs @@ -38,6 +38,7 @@ public class OpenTelemetryLoggerProvider : BaseProvider, ILoggerProvider, ISuppo internal BaseProcessor? Processor; internal Resource Resource; private readonly Hashtable loggers = new(); + private ILogRecordPool? threadStaticPool = LogRecordThreadStaticPool.Instance; private bool disposed; static OpenTelemetryLoggerProvider() @@ -52,7 +53,7 @@ static OpenTelemetryLoggerProvider() /// /// . public OpenTelemetryLoggerProvider(IOptionsMonitor options) - : this(options?.CurrentValue!) + : this(options?.CurrentValue ?? throw new ArgumentNullException(nameof(options))) { } @@ -91,6 +92,8 @@ internal OpenTelemetryLoggerProvider(OpenTelemetryLoggerOptions options) internal IExternalScopeProvider? ScopeProvider { get; private set; } + internal ILogRecordPool LogRecordPool => this.threadStaticPool ?? LogRecordSharedPool.Current; + /// void ISupportExternalScope.SetScopeProvider(IExternalScopeProvider scopeProvider) { @@ -160,6 +163,11 @@ internal OpenTelemetryLoggerProvider AddProcessor(BaseProcessor proce processor.SetParentProvider(this); + if (this.threadStaticPool != null && this.ContainsBatchProcessor(processor)) + { + this.threadStaticPool = null; + } + if (this.Processor == null) { this.Processor = processor; @@ -182,6 +190,29 @@ internal OpenTelemetryLoggerProvider AddProcessor(BaseProcessor proce return this; } + internal bool ContainsBatchProcessor(BaseProcessor processor) + { + if (processor is BatchExportProcessor) + { + return true; + } + else if (processor is CompositeProcessor compositeProcessor) + { + var current = compositeProcessor.Head; + while (current != null) + { + if (this.ContainsBatchProcessor(current.Value)) + { + return true; + } + + current = current.Next; + } + } + + return false; + } + /// protected override void Dispose(bool disposing) { diff --git a/src/OpenTelemetry/Logs/Pool/ILogRecordPool.cs b/src/OpenTelemetry/Logs/Pool/ILogRecordPool.cs new file mode 100644 index 00000000000..14d12cc6e36 --- /dev/null +++ b/src/OpenTelemetry/Logs/Pool/ILogRecordPool.cs @@ -0,0 +1,27 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +namespace OpenTelemetry.Logs +{ + internal interface ILogRecordPool + { + LogRecord Rent(); + + void Return(LogRecord logRecord); + } +} diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs new file mode 100644 index 00000000000..b63028b920b --- /dev/null +++ b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs @@ -0,0 +1,61 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +namespace OpenTelemetry.Logs +{ + internal static class LogRecordPoolHelper + { + public const int DefaultMaxNumberOfAttributes = 64; + public const int DefaultMaxNumberOfScopes = 16; + + public static void Clear(LogRecord logRecord) + { + var attributeStorage = logRecord.AttributeStorage; + if (attributeStorage != null) + { + if (attributeStorage.Count > DefaultMaxNumberOfAttributes) + { + // Don't allow the pool to grow unconstained. + logRecord.AttributeStorage = null; + } + else + { + /* List.Clear sets the count/size to 0 but it maintains the + underlying array (capacity). */ + attributeStorage.Clear(); + } + } + + var bufferedScopes = logRecord.BufferedScopes; + if (bufferedScopes != null) + { + if (bufferedScopes.Count > DefaultMaxNumberOfScopes) + { + // Don't allow the pool to grow unconstained. + logRecord.BufferedScopes = null; + } + else + { + /* List.Clear sets the count/size to 0 but it maintains the + underlying array (capacity). */ + bufferedScopes.Clear(); + } + } + } + } +} diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs new file mode 100644 index 00000000000..57234057ae8 --- /dev/null +++ b/src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs @@ -0,0 +1,150 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Logs +{ + internal sealed class LogRecordSharedPool : ILogRecordPool + { + public const int DefaultMaxPoolSize = 2048; + + public static LogRecordSharedPool Current = new(DefaultMaxPoolSize); + + public readonly int Capacity; + private readonly LogRecord?[] pool; + private long rentIndex; + private long returnIndex; + + public LogRecordSharedPool(int capacity) + { + this.Capacity = capacity; + this.pool = new LogRecord?[capacity]; + } + + public int Count => (int)(Volatile.Read(ref this.returnIndex) - Volatile.Read(ref this.rentIndex)); + + // Note: It might make sense to expose this (somehow) in the future. + // Ideal config is shared pool capacity == max batch size. + public static void Resize(int capacity) + { + Guard.ThrowIfOutOfRange(capacity, min: 1); + + Current = new(capacity); + } + + public LogRecord Rent() + { + while (true) + { + var rentSnapshot = Volatile.Read(ref this.rentIndex); + var returnSnapshot = Volatile.Read(ref this.returnIndex); + + if (rentSnapshot >= returnSnapshot) + { + break; // buffer is empty + } + + if (Interlocked.CompareExchange(ref this.rentIndex, rentSnapshot + 1, rentSnapshot) == rentSnapshot) + { + var logRecord = Interlocked.Exchange(ref this.pool[rentSnapshot % this.Capacity], null); + if (logRecord == null && !this.TryRentCoreRare(rentSnapshot, out logRecord)) + { + continue; + } + + logRecord.ResetReferenceCount(); + return logRecord; + } + } + + var newLogRecord = new LogRecord(); + newLogRecord.ResetReferenceCount(); + return newLogRecord; + } + + public void Return(LogRecord logRecord) + { + if (logRecord.RemoveReference() != 0) + { + return; + } + + LogRecordPoolHelper.Clear(logRecord); + + while (true) + { + var rentSnapshot = Volatile.Read(ref this.rentIndex); + var returnSnapshot = Volatile.Read(ref this.returnIndex); + + if (returnSnapshot - rentSnapshot >= this.Capacity) + { + return; // buffer is full + } + + if (Interlocked.CompareExchange(ref this.returnIndex, returnSnapshot + 1, returnSnapshot) == returnSnapshot) + { + // If many threads are hammering rent/return it is possible + // for two threads to write to the same index. In that case + // only one of the logRecords will make it back into the + // pool. Anything lost in the race will collected by the GC + // and the pool will issue new instances as needed. This + // could be abated by an Interlocked.CompareExchange here + // but for the general use case of an exporter returning + // records one-by-one, better to keep this fast and not pay + // for Interlocked.CompareExchange. The race is more + // theoretical. + this.pool[returnSnapshot % this.Capacity] = logRecord; + return; + } + } + } + + private bool TryRentCoreRare(long rentSnapshot, [NotNullWhen(true)] out LogRecord? logRecord) + { + SpinWait wait = default; + while (true) + { + if (wait.NextSpinWillYield) + { + // Super rare case. If many threads are hammering + // rent/return it is possible a read was issued an index and + // then yielded while other threads caused the pointers to + // wrap around. When the yielded thread wakes up its read + // index could have been stolen by another thread. To + // prevent deadlock, bail out of read after spinning. This + // will cause either a successful rent from another index, + // or a new record to be created + logRecord = null; + return false; + } + + wait.SpinOnce(); + + logRecord = Interlocked.Exchange(ref this.pool[rentSnapshot % this.Capacity], null); + if (logRecord != null) + { + // Rare case where the write was still working when the read came in + return true; + } + } + } + } +} diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs new file mode 100644 index 00000000000..28b417f130d --- /dev/null +++ b/src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs @@ -0,0 +1,55 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +using System; + +namespace OpenTelemetry.Logs +{ + internal sealed class LogRecordThreadStaticPool : ILogRecordPool + { + [ThreadStatic] + public static LogRecord? Storage; + + private LogRecordThreadStaticPool() + { + } + + public static LogRecordThreadStaticPool Instance { get; } = new(); + + public LogRecord Rent() + { + var logRecord = Storage; + if (logRecord != null) + { + Storage = null; + return logRecord; + } + + return new(); + } + + public void Return(LogRecord logRecord) + { + if (Storage == null) + { + LogRecordPoolHelper.Clear(logRecord); + Storage = logRecord; + } + } + } +} diff --git a/test/Benchmarks/Logs/LogScopeBenchmarks.cs b/test/Benchmarks/Logs/LogScopeBenchmarks.cs index 7c21f970e9a..9cc2347d6e3 100644 --- a/test/Benchmarks/Logs/LogScopeBenchmarks.cs +++ b/test/Benchmarks/Logs/LogScopeBenchmarks.cs @@ -56,6 +56,7 @@ public LogScopeBenchmarks() new KeyValuePair("item5", "value5"), })); +#pragma warning disable CS0618 // Type or member is obsolete this.logRecord = new LogRecord( this.scopeProvider, DateTime.UtcNow, @@ -66,6 +67,7 @@ public LogScopeBenchmarks() null, null, null); +#pragma warning restore CS0618 // Type or member is obsolete } [Benchmark] diff --git a/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs new file mode 100644 index 00000000000..4a9e6ed3fd5 --- /dev/null +++ b/test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs @@ -0,0 +1,107 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#if !NETFRAMEWORK +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Exporter; +using Xunit; + +namespace OpenTelemetry.Logs.Tests +{ + public sealed class BatchLogRecordExportProcessorTests + { + [Fact] + public void StateValuesAndScopeBufferingTest() + { + var scopeProvider = new LoggerExternalScopeProvider(); + + List exportedItems = new(); + + using var exporter = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + using var scope = scopeProvider.Push(exportedItems); + + var logRecord = new LogRecord(); + + var state = new LogRecordTest.DisposingState("Hello world"); + + logRecord.ScopeProvider = scopeProvider; + logRecord.StateValues = state; + + exporter.OnEnd(logRecord); + + state.Dispose(); + + Assert.Empty(exportedItems); + + Assert.Null(logRecord.ScopeProvider); + Assert.False(ReferenceEquals(state, logRecord.StateValues)); + Assert.NotNull(logRecord.AttributeStorage); + Assert.NotNull(logRecord.BufferedScopes); + + KeyValuePair actualState = logRecord.StateValues[0]; + + Assert.Same("Value", actualState.Key); + Assert.Same("Hello world", actualState.Value); + + bool foundScope = false; + + logRecord.ForEachScope( + (s, o) => + { + foundScope = ReferenceEquals(s.Scope, exportedItems); + }, + null); + + Assert.True(foundScope); + } + + [Fact] + public void StateBufferingTest() + { + // LogRecord.State is never inspected or buffered. Accessing it + // after OnEnd may throw. This test verifies that behavior. TODO: + // Investigate this. Potentially obsolete logRecord.State and force + // StateValues/ParseStateValues behavior. + List exportedItems = new(); + + using var exporter = new BatchLogRecordExportProcessor( + new InMemoryExporter(exportedItems)); + + var logRecord = new LogRecord(); + + var state = new LogRecordTest.DisposingState("Hello world"); + logRecord.State = state; + + exporter.OnEnd(logRecord); + + state.Dispose(); + + Assert.Throws(() => + { + IReadOnlyList> state = (IReadOnlyList>)logRecord.State; + + foreach (var kvp in state) + { + } + }); + } + } +} +#endif diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs new file mode 100644 index 00000000000..0a6d7d37baf --- /dev/null +++ b/test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs @@ -0,0 +1,278 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Xunit; + +namespace OpenTelemetry.Logs.Tests +{ + public sealed class LogRecordSharedPoolTests + { + [Fact] + public void ResizeTests() + { + LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize); + Assert.NotNull(LogRecordSharedPool.Current); + Assert.Equal(LogRecordSharedPool.DefaultMaxPoolSize, LogRecordSharedPool.Current.Capacity); + + Assert.Throws(() => LogRecordSharedPool.Resize(0)); + + var beforePool = LogRecordSharedPool.Current; + + LogRecordSharedPool.Resize(1); + + Assert.NotNull(LogRecordSharedPool.Current); + Assert.Equal(1, LogRecordSharedPool.Current.Capacity); + Assert.NotEqual(beforePool, LogRecordSharedPool.Current); + } + + [Fact] + public void RentReturnTests() + { + LogRecordSharedPool.Resize(2); + + var pool = LogRecordSharedPool.Current; + + var logRecord1 = pool.Rent(); + Assert.NotNull(logRecord1); + + var logRecord2 = pool.Rent(); + Assert.NotNull(logRecord1); + + pool.Return(logRecord1); + + Assert.Equal(1, pool.Count); + + // Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue. + LogRecord manualRecord = new(); + Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount); + pool.Return(manualRecord); + + Assert.Equal(1, pool.Count); + + pool.Return(logRecord2); + + Assert.Equal(2, pool.Count); + + logRecord1 = pool.Rent(); + Assert.NotNull(logRecord1); + Assert.Equal(1, pool.Count); + + logRecord2 = pool.Rent(); + Assert.NotNull(logRecord2); + Assert.Equal(0, pool.Count); + + var logRecord3 = pool.Rent(); + var logRecord4 = pool.Rent(); + Assert.NotNull(logRecord3); + Assert.NotNull(logRecord4); + + pool.Return(logRecord1); + pool.Return(logRecord2); + pool.Return(logRecord3); + pool.Return(logRecord4); // <- Discarded due to pool size of 2 + + Assert.Equal(2, pool.Count); + } + + [Fact] + public void TrackReferenceTests() + { + LogRecordSharedPool.Resize(2); + + var pool = LogRecordSharedPool.Current; + + var logRecord1 = pool.Rent(); + Assert.NotNull(logRecord1); + + Assert.Equal(1, logRecord1.PoolReferenceCount); + + logRecord1.AddReference(); + + Assert.Equal(2, logRecord1.PoolReferenceCount); + + pool.Return(logRecord1); + + Assert.Equal(1, logRecord1.PoolReferenceCount); + + pool.Return(logRecord1); + + Assert.Equal(1, pool.Count); + Assert.Equal(0, logRecord1.PoolReferenceCount); + + pool.Return(logRecord1); + + Assert.Equal(-1, logRecord1.PoolReferenceCount); + Assert.Equal(1, pool.Count); // Record was not returned because PoolReferences was negative. + } + + [Fact] + public void ClearTests() + { + LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize); + + var pool = LogRecordSharedPool.Current; + + var logRecord1 = pool.Rent(); + logRecord1.AttributeStorage = new List>(16) + { + new KeyValuePair("key1", "value1"), + new KeyValuePair("key2", "value2"), + }; + logRecord1.BufferedScopes = new List(8) { null, null }; + + pool.Return(logRecord1); + + Assert.Empty(logRecord1.AttributeStorage); + Assert.Equal(16, logRecord1.AttributeStorage.Capacity); + Assert.Empty(logRecord1.BufferedScopes); + Assert.Equal(8, logRecord1.BufferedScopes.Capacity); + + logRecord1 = pool.Rent(); + + Assert.NotNull(logRecord1.AttributeStorage); + Assert.NotNull(logRecord1.BufferedScopes); + + for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfAttributes; i++) + { + logRecord1.AttributeStorage!.Add(new KeyValuePair("key", "value")); + } + + for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfScopes; i++) + { + logRecord1.BufferedScopes!.Add(null); + } + + pool.Return(logRecord1); + + Assert.Null(logRecord1.AttributeStorage); + Assert.Null(logRecord1.BufferedScopes); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ExportTest(bool warmup) + { + LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize); + + var pool = LogRecordSharedPool.Current; + + if (warmup) + { + for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++) + { + pool.Return(new LogRecord { PoolReferenceCount = 1 }); + } + } + + using BatchLogRecordExportProcessor processor = new(new NoopExporter()); + + List tasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + tasks.Add(Task.Run(async () => + { + Random random = new Random(); + + await Task.Delay(random.Next(100, 150)).ConfigureAwait(false); + + for (int i = 0; i < 1000; i++) + { + var logRecord = pool.Rent(); + + processor.OnEnd(logRecord); + + // This should no-op mostly. + pool.Return(logRecord); + + await Task.Delay(random.Next(0, 20)).ConfigureAwait(false); + } + })); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + + processor.ForceFlush(); + + if (warmup) + { + Assert.Equal(LogRecordSharedPool.DefaultMaxPoolSize, pool.Count); + } + + Assert.True(pool.Count <= LogRecordSharedPool.DefaultMaxPoolSize); + } + + [Fact] + public async Task DeadlockTest() + { + /* + * The way the LogRecordPool works is it maintains two counters one + * for readers and one for writers. The counters always increment + * and point to an index in the pool array by way of a modulus on + * the size of the array (index = counter % capacity). Under very + * heavy load it is possible for a reader to receive an index and + * then be yielded. When waking up that index may no longer be valid + * if other threads caused the counters to loop around. There is + * protection for this case in the pool, this test verifies it is + * working. + * + * This is considered a corner case. Many threads have to be renting + * & returning logs in a tight loop for this to happen. Real + * applications should be logging based on logic firing which should + * have more natural back-off time. + */ + + LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize); + + var pool = LogRecordSharedPool.Current; + + List tasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + tasks.Add(Task.Run(async () => + { + await Task.Delay(2000).ConfigureAwait(false); + + for (int i = 0; i < 100_000; i++) + { + var logRecord = pool.Rent(); + + pool.Return(logRecord); + } + })); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + + Assert.True(pool.Count <= LogRecordSharedPool.DefaultMaxPoolSize); + } + + private sealed class NoopExporter : BaseExporter + { + public override ExportResult Export(in Batch batch) + { + return ExportResult.Success; + } + } + } +} diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordTest.cs b/test/OpenTelemetry.Tests/Logs/LogRecordTest.cs index 05305be10ff..79fb0d2fa19 100644 --- a/test/OpenTelemetry.Tests/Logs/LogRecordTest.cs +++ b/test/OpenTelemetry.Tests/Logs/LogRecordTest.cs @@ -256,8 +256,7 @@ public void CheckStateCanBeSet() using var loggerFactory = InitializeLoggerFactory(out List exportedItems, configure: null); var logger = loggerFactory.CreateLogger(); - var message = $"This does not matter."; - logger.LogInformation(message); + logger.LogInformation("This does not matter."); var logRecord = exportedItems[0]; logRecord.State = "newState"; @@ -744,6 +743,34 @@ public void ParseStateValuesUsingCustomTest() Assert.Same(state, actualState.Value); } + [Fact] + public void DisposingStateTest() + { + using var loggerFactory = InitializeLoggerFactory(out List exportedItems, configure: options => options.ParseStateValues = true); + var logger = loggerFactory.CreateLogger(); + + DisposingState state = new DisposingState("Hello world"); + + logger.Log( + LogLevel.Information, + 0, + state, + null, + (s, e) => "OpenTelemetry!"); + var logRecord = exportedItems[0]; + + state.Dispose(); + + Assert.Null(logRecord.State); + Assert.NotNull(logRecord.StateValues); + Assert.Equal(1, logRecord.StateValues.Count); + + KeyValuePair actualState = logRecord.StateValues[0]; + + Assert.Same("Value", actualState.Key); + Assert.Same("Hello world", actualState.Value); + } + private static ILoggerFactory InitializeLoggerFactory(out List exportedItems, Action configure = null) { var items = exportedItems = new List(); @@ -790,6 +817,54 @@ IEnumerator IEnumerable.GetEnumerator() } } + internal sealed class DisposingState : IReadOnlyList>, IDisposable + { + private string value; + private bool disposed; + + public DisposingState(string value) + { + this.Value = value; + } + + public int Count => 1; + + public string Value + { + get + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(DisposingState)); + } + + return this.value; + } + private set => this.value = value; + } + + public KeyValuePair this[int index] => index switch + { + 0 => new KeyValuePair(nameof(this.Value), this.Value), + _ => throw new IndexOutOfRangeException(nameof(index)), + }; + + public void Dispose() + { + this.disposed = true; + } + + public IEnumerator> GetEnumerator() + { + for (var i = 0; i < this.Count; i++) + { + yield return this[i]; + } + } + + IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); + } + private class RedactionProcessor : BaseProcessor { private readonly Field fieldToUpdate; diff --git a/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs new file mode 100644 index 00000000000..93a334cc567 --- /dev/null +++ b/test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs @@ -0,0 +1,90 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#nullable enable + +using System.Collections.Generic; +using Xunit; + +namespace OpenTelemetry.Logs.Tests +{ + public sealed class LogRecordThreadStaticPoolTests + { + [Fact] + public void RentReturnTests() + { + LogRecordThreadStaticPool.Storage = null; + + var logRecord = LogRecordThreadStaticPool.Instance.Rent(); + Assert.NotNull(logRecord); + Assert.Null(LogRecordThreadStaticPool.Storage); + + LogRecordThreadStaticPool.Instance.Return(logRecord); + Assert.NotNull(LogRecordThreadStaticPool.Storage); + Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); + + LogRecordThreadStaticPool.Instance.Return(new()); + Assert.NotNull(LogRecordThreadStaticPool.Storage); + Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage); + + LogRecordThreadStaticPool.Storage = null; + + var manual = new LogRecord(); + LogRecordThreadStaticPool.Instance.Return(manual); + Assert.NotNull(LogRecordThreadStaticPool.Storage); + Assert.Equal(manual, LogRecordThreadStaticPool.Storage); + } + + [Fact] + public void ClearTests() + { + var logRecord1 = LogRecordThreadStaticPool.Instance.Rent(); + logRecord1.AttributeStorage = new List>(16) + { + new KeyValuePair("key1", "value1"), + new KeyValuePair("key2", "value2"), + }; + logRecord1.BufferedScopes = new List(8) { null, null }; + + LogRecordThreadStaticPool.Instance.Return(logRecord1); + + Assert.Empty(logRecord1.AttributeStorage); + Assert.Equal(16, logRecord1.AttributeStorage.Capacity); + Assert.Empty(logRecord1.BufferedScopes); + Assert.Equal(8, logRecord1.BufferedScopes.Capacity); + + logRecord1 = LogRecordThreadStaticPool.Instance.Rent(); + + Assert.NotNull(logRecord1.AttributeStorage); + Assert.NotNull(logRecord1.BufferedScopes); + + for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfAttributes; i++) + { + logRecord1.AttributeStorage!.Add(new KeyValuePair("key", "value")); + } + + for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfScopes; i++) + { + logRecord1.BufferedScopes!.Add(null); + } + + LogRecordThreadStaticPool.Instance.Return(logRecord1); + + Assert.Null(logRecord1.AttributeStorage); + Assert.Null(logRecord1.BufferedScopes); + } + } +} diff --git a/test/OpenTelemetry.Tests/Logs/OpenTelemetryLoggerProviderTests.cs b/test/OpenTelemetry.Tests/Logs/OpenTelemetryLoggerProviderTests.cs index 330fa4b50e7..6ddd5d7caa4 100644 --- a/test/OpenTelemetry.Tests/Logs/OpenTelemetryLoggerProviderTests.cs +++ b/test/OpenTelemetry.Tests/Logs/OpenTelemetryLoggerProviderTests.cs @@ -85,5 +85,65 @@ public void ForceFlushTest() Assert.Single(exportedItems); } + + [Fact] + public void ThreadStaticPoolUsedByProviderTests() + { + using var provider1 = new OpenTelemetryLoggerProvider(new OpenTelemetryLoggerOptions()); + + Assert.Equal(LogRecordThreadStaticPool.Instance, provider1.LogRecordPool); + + var options = new OpenTelemetryLoggerOptions(); + options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter())); + + using var provider2 = new OpenTelemetryLoggerProvider(options); + + Assert.Equal(LogRecordThreadStaticPool.Instance, provider2.LogRecordPool); + + options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter())); + + using var provider3 = new OpenTelemetryLoggerProvider(options); + + Assert.Equal(LogRecordThreadStaticPool.Instance, provider3.LogRecordPool); + } + + [Fact] + public void SharedPoolUsedByProviderTests() + { + var options = new OpenTelemetryLoggerOptions(); + options.AddProcessor(new BatchLogRecordExportProcessor(new NoopExporter())); + + using var provider1 = new OpenTelemetryLoggerProvider(options); + + Assert.Equal(LogRecordSharedPool.Current, provider1.LogRecordPool); + + options = new OpenTelemetryLoggerOptions(); + options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter())); + options.AddProcessor(new BatchLogRecordExportProcessor(new NoopExporter())); + + using var provider2 = new OpenTelemetryLoggerProvider(options); + + Assert.Equal(LogRecordSharedPool.Current, provider2.LogRecordPool); + + options = new OpenTelemetryLoggerOptions(); + options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter())); + options.AddProcessor(new CompositeProcessor(new BaseProcessor[] + { + new SimpleLogRecordExportProcessor(new NoopExporter()), + new BatchLogRecordExportProcessor(new NoopExporter()), + })); + + using var provider3 = new OpenTelemetryLoggerProvider(options); + + Assert.Equal(LogRecordSharedPool.Current, provider3.LogRecordPool); + } + + private sealed class NoopExporter : BaseExporter + { + public override ExportResult Export(in Batch batch) + { + return ExportResult.Success; + } + } } }