From 52bd6427271d1b6207a64b9924a4bd1a8e3c728c Mon Sep 17 00:00:00 2001 From: Rafael Teixeira Date: Wed, 19 Apr 2023 19:48:21 -0300 Subject: [PATCH] v12.0.0 - Avoid dictionary collisions on error logging from multiple threads Not yet finished: how to periodically cleanup error caching --- InterlockLedger.Peer2Peer/ConnectionBase.cs | 42 +++++----- .../ErrorCachingLogger.cs | 83 +++++++++++++++++++ .../InterlockLedger.Peer2Peer.csproj | 5 +- 3 files changed, 107 insertions(+), 23 deletions(-) create mode 100644 InterlockLedger.Peer2Peer/ErrorCachingLogger.cs diff --git a/InterlockLedger.Peer2Peer/ConnectionBase.cs b/InterlockLedger.Peer2Peer/ConnectionBase.cs index 3dc8576..e9ff23f 100644 --- a/InterlockLedger.Peer2Peer/ConnectionBase.cs +++ b/InterlockLedger.Peer2Peer/ConnectionBase.cs @@ -30,6 +30,8 @@ // // ****************************************************************************************************************************** +#nullable enable + using System.Collections.Concurrent; using System.Net.Sockets; @@ -41,10 +43,10 @@ public abstract class ConnectionBase : ListenerBase, IConnection public const string ExceptionCantProxyWithSinkMessage = "Can't proxy a connection already with a default sink"; public const string ExceptionChannelNotFoundFormat = "Channel {0} not found!!!"; - public event Action ConnectionStopped; + public event Action? ConnectionStopped; public abstract bool CanReconnect { get; } - public bool Connected => !_stopping && GetPipelineAsync().Result.Connected; + public bool Connected => !_stopping && ResolvedPipeline.Connected; public long LastChannelUsed => _lastChannelUsed; public int NumberOfActiveChannels => _channelSinks.Count; @@ -73,11 +75,15 @@ public override void Stop() { internal Task SinkAsync(NetworkMessageSlice slice) => DoAsync(() => InnerSinkAsync(slice)); protected readonly ConcurrentDictionary _channelSinks = new(); - protected IChannelSink _sink; - protected ISocket _socket; + protected IChannelSink? _sink; + protected ISocket? _socket; protected ConnectionBase(string id, INetworkConfig config, CancellationTokenSource source, ILogger logger) - : base(id, config, source, logger) => _pipeline = null; + : base(id, config, source, logger) { + _pipeline = null; + _errorCachingLogger = new ErrorCachingLogger(() => Abandon || _stopping, _logger); + NetworkAddress = "?"; + } protected string NetworkAddress { get; set; } protected int NetworkPort { get; set; } @@ -92,24 +98,18 @@ protected override void DisposeManagedResources() { _socket?.Dispose(); } - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2254:Template should be a static expression", Justification = "Nope")] - protected void LogError(string message) { - if (!(_errors.TryGetValue(message, out var dateTime) && (DateTimeOffset.Now - dateTime).Hours < _hoursOfSilencedDuplicateErrors)) { - _logger.LogError(message); - _errors[message] = DateTimeOffset.Now; - } - } - - protected void StartPipeline() => _ = GetPipelineAsync().Result; + + protected void StartPipeline() => _ = ResolvedPipeline; - private const int _hoursOfSilencedDuplicateErrors = 8; - private static readonly Dictionary _errors = new(); private readonly AsyncLock _pipelineLock = new(); private readonly ConcurrentQueue _sendingQueue = new(); - private long _lastChannelUsed = 0; - private Pipeline _pipeline; + private readonly ErrorCachingLogger _errorCachingLogger; + private long _lastChannelUsed; + private Pipeline? _pipeline; private bool _stopping; + + public Pipeline ResolvedPipeline => GetPipelineAsync().Result; private async Task GetPipelineAsync() { try { if (_pipeline is null) @@ -140,13 +140,13 @@ private async Task InnerSendAsync(NetworkMessageSlice slice) { throw ae.Flatten().InnerExceptions.First(); } } catch (PeerException pe) { - LogError(pe.Message); + _errorCachingLogger.LogError(pe.Message); } catch (SocketException se) { - LogError($"Client could not communicate with address {NetworkAddress}:{NetworkPort}.{Environment.NewLine}{se.Message}"); + _errorCachingLogger.LogError($"Client could not communicate with address {NetworkAddress}:{NetworkPort}.{Environment.NewLine}{se.Message}"); } catch (TaskCanceledException) { // just ignore } catch (Exception e) { - LogError($"Unexpected exception : {e}"); + _errorCachingLogger.LogError($"Unexpected exception : {e}"); } return false; } diff --git a/InterlockLedger.Peer2Peer/ErrorCachingLogger.cs b/InterlockLedger.Peer2Peer/ErrorCachingLogger.cs new file mode 100644 index 0000000..81ba78f --- /dev/null +++ b/InterlockLedger.Peer2Peer/ErrorCachingLogger.cs @@ -0,0 +1,83 @@ +// ****************************************************************************************************************************** +// +// Copyright (c) 2018-2022 InterlockLedger Network +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met +// +// * Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES, LOSS OF USE, DATA, OR PROFITS, OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// ****************************************************************************************************************************** + +#nullable enable + +using System.Collections.Concurrent; + +namespace InterlockLedger.Peer2Peer +{ + public record ErrorCachingLogger(Func AbandonNow, ILogger Logger) + { + + private const int _hoursOfSilencedDuplicateErrors = 8; + private const int _tooManyErrors = 500; + private static readonly ConcurrentDictionary _errors = new(); + public async Task CleanOldestErrors() { + while (!AbandonNow()) { + if (_errors.Count > 100) { + var now = DateTimeOffset.Now; + var oldestErrors = _errors.ToArray() + .OrderBy(pair => pair.Value) + .Where(pair => (now - pair.Value).TotalHours > _hoursOfSilencedDuplicateErrors * 2); + Remove(oldestErrors); + } + await Task.Delay(5_000); + if (!AbandonNow() && _errors.Count > _tooManyErrors) { + var oldestErrors = _errors.ToArray() + .OrderBy(pair => pair.Value) + .Take(_errors.Count - _tooManyErrors); + Remove(oldestErrors); + } + await Task.Delay(10_000); + } + } + + public void LogError(string message) { + var now = DateTimeOffset.Now; + if (_errors.TryGetValue(message, out var dateTime) && (now - dateTime).Hours < _hoursOfSilencedDuplicateErrors) + return; + _ = _errors.AddOrUpdate(message, m => LogAt(now, m), (m, lastMoment) => now > lastMoment ? LogAt(now, m) : lastMoment); + } + + private DateTimeOffset LogAt(DateTimeOffset now, string message) { + Logger.LogError("{now} - {message}", now, message); + return now; + } + private static void Remove(IEnumerable> errorsToRemove) { + foreach (var error in errorsToRemove) { + if (!_errors.TryRemove(error)) + return; + } + } + } +} \ No newline at end of file diff --git a/InterlockLedger.Peer2Peer/InterlockLedger.Peer2Peer.csproj b/InterlockLedger.Peer2Peer/InterlockLedger.Peer2Peer.csproj index 95809c6..51dcfc2 100644 --- a/InterlockLedger.Peer2Peer/InterlockLedger.Peer2Peer.csproj +++ b/InterlockLedger.Peer2Peer/InterlockLedger.Peer2Peer.csproj @@ -2,7 +2,7 @@ net7.0 - 11.1.0 + 12.0.0 Rafael Teixeira InterlockLedger Network InterlockLedger @@ -13,7 +13,7 @@ https://github.com/interlockledger/interlockledger-peer2peer.git git true - Avoid some reallocation on proxying messages - Larger message size limit (16MB => 64MB) + Avoid some reallocation on proxying messages - Larger message size limit (16MB => 64MB) preview LICENSE enable @@ -22,6 +22,7 @@ +