Skip to content

Commit

Permalink
v12.0.0 - Avoid dictionary collisions on error logging from multiple …
Browse files Browse the repository at this point in the history
…threads

Not yet finished: how to periodically cleanup error caching
  • Loading branch information
monoman committed Apr 19, 2023
1 parent 2d3025c commit 52bd642
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 23 deletions.
42 changes: 21 additions & 21 deletions InterlockLedger.Peer2Peer/ConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//
// ******************************************************************************************************************************

#nullable enable

using System.Collections.Concurrent;
using System.Net.Sockets;

Expand All @@ -41,10 +43,10 @@ public abstract class ConnectionBase : ListenerBase, IConnection
public const string ExceptionCantProxyWithSinkMessage = "Can't proxy a connection already with a default sink";
public const string ExceptionChannelNotFoundFormat = "Channel {0} not found!!!";

public event Action<INetworkIdentity> ConnectionStopped;
public event Action<INetworkIdentity>? ConnectionStopped;

public abstract bool CanReconnect { get; }
public bool Connected => !_stopping && GetPipelineAsync().Result.Connected;
public bool Connected => !_stopping && ResolvedPipeline.Connected;
public long LastChannelUsed => _lastChannelUsed;
public int NumberOfActiveChannels => _channelSinks.Count;

Expand Down Expand Up @@ -73,11 +75,15 @@ public IActiveChannel GetChannel(ulong channel)
internal Task<Success> SinkAsync(NetworkMessageSlice slice) => DoAsync(() => InnerSinkAsync(slice));

protected readonly ConcurrentDictionary<ulong, IActiveChannel> _channelSinks = new();
protected IChannelSink _sink;
protected ISocket _socket;
protected IChannelSink? _sink;
protected ISocket? _socket;

protected ConnectionBase(string id, INetworkConfig config, CancellationTokenSource source, ILogger logger)
: base(id, config, source, logger) => _pipeline = null;
: base(id, config, source, logger) {
_pipeline = null;
_errorCachingLogger = new ErrorCachingLogger(() => Abandon || _stopping, _logger);
NetworkAddress = "?";
}

protected string NetworkAddress { get; set; }
protected int NetworkPort { get; set; }
Expand All @@ -92,24 +98,18 @@ protected ConnectionBase(string id, INetworkConfig config, CancellationTokenSour
_socket?.Dispose();
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2254:Template should be a static expression", Justification = "Nope")]
protected void LogError(string message) {
if (!(_errors.TryGetValue(message, out var dateTime) && (DateTimeOffset.Now - dateTime).Hours < _hoursOfSilencedDuplicateErrors)) {
_logger.LogError(message);
_errors[message] = DateTimeOffset.Now;
}
}

protected void StartPipeline() => _ = GetPipelineAsync().Result;

protected void StartPipeline() => _ = ResolvedPipeline;

private const int _hoursOfSilencedDuplicateErrors = 8;
private static readonly Dictionary<string, DateTimeOffset> _errors = new();
private readonly AsyncLock _pipelineLock = new();
private readonly ConcurrentQueue<NetworkMessageSlice> _sendingQueue = new();
private long _lastChannelUsed = 0;
private Pipeline _pipeline;
private readonly ErrorCachingLogger _errorCachingLogger;
private long _lastChannelUsed;
private Pipeline? _pipeline;
private bool _stopping;


public Pipeline ResolvedPipeline => GetPipelineAsync().Result;
private async Task<Pipeline> GetPipelineAsync() {
try {
if (_pipeline is null)
Expand Down Expand Up @@ -140,13 +140,13 @@ protected ConnectionBase(string id, INetworkConfig config, CancellationTokenSour
throw ae.Flatten().InnerExceptions.First();
}
} catch (PeerException pe) {
LogError(pe.Message);
_errorCachingLogger.LogError(pe.Message);
} catch (SocketException se) {
LogError($"Client could not communicate with address {NetworkAddress}:{NetworkPort}.{Environment.NewLine}{se.Message}");
_errorCachingLogger.LogError($"Client could not communicate with address {NetworkAddress}:{NetworkPort}.{Environment.NewLine}{se.Message}");
} catch (TaskCanceledException) {
// just ignore
} catch (Exception e) {
LogError($"Unexpected exception : {e}");
_errorCachingLogger.LogError($"Unexpected exception : {e}");
}
return false;
}
Expand Down
83 changes: 83 additions & 0 deletions InterlockLedger.Peer2Peer/ErrorCachingLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// ******************************************************************************************************************************
//
// Copyright (c) 2018-2022 InterlockLedger Network
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met
//
// * Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES, LOSS OF USE, DATA, OR PROFITS, OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// ******************************************************************************************************************************

#nullable enable

using System.Collections.Concurrent;

namespace InterlockLedger.Peer2Peer
{
public record ErrorCachingLogger(Func<bool> AbandonNow, ILogger Logger)
{

private const int _hoursOfSilencedDuplicateErrors = 8;
private const int _tooManyErrors = 500;
private static readonly ConcurrentDictionary<string, DateTimeOffset> _errors = new();
public async Task CleanOldestErrors() {
while (!AbandonNow()) {
if (_errors.Count > 100) {
var now = DateTimeOffset.Now;
var oldestErrors = _errors.ToArray()
.OrderBy(pair => pair.Value)
.Where(pair => (now - pair.Value).TotalHours > _hoursOfSilencedDuplicateErrors * 2);
Remove(oldestErrors);
}
await Task.Delay(5_000);
if (!AbandonNow() && _errors.Count > _tooManyErrors) {
var oldestErrors = _errors.ToArray()
.OrderBy(pair => pair.Value)
.Take(_errors.Count - _tooManyErrors);
Remove(oldestErrors);
}
await Task.Delay(10_000);
}
}

public void LogError(string message) {
var now = DateTimeOffset.Now;
if (_errors.TryGetValue(message, out var dateTime) && (now - dateTime).Hours < _hoursOfSilencedDuplicateErrors)
return;
_ = _errors.AddOrUpdate(message, m => LogAt(now, m), (m, lastMoment) => now > lastMoment ? LogAt(now, m) : lastMoment);
}

private DateTimeOffset LogAt(DateTimeOffset now, string message) {
Logger.LogError("{now} - {message}", now, message);
return now;
}
private static void Remove(IEnumerable<KeyValuePair<string, DateTimeOffset>> errorsToRemove) {
foreach (var error in errorsToRemove) {
if (!_errors.TryRemove(error))
return;
}
}
}
}
5 changes: 3 additions & 2 deletions InterlockLedger.Peer2Peer/InterlockLedger.Peer2Peer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<Version>11.1.0</Version>
<Version>12.0.0</Version>
<Authors>Rafael Teixeira</Authors>
<Company>InterlockLedger Network</Company>
<Product>InterlockLedger</Product>
Expand All @@ -13,7 +13,7 @@
<RepositoryUrl>https://github.com/interlockledger/interlockledger-peer2peer.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReleaseNotes>Avoid some reallocation on proxying messages - Larger message size limit (16MB => 64MB)</PackageReleaseNotes>
<PackageReleaseNotes>Avoid some reallocation on proxying messages - Larger message size limit (16MB =&gt; 64MB)</PackageReleaseNotes>
<LangVersion>preview</LangVersion>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<ImplicitUsings>enable</ImplicitUsings>
Expand All @@ -22,6 +22,7 @@
<ItemGroup>
<PackageReference Include="InterlockLedger.Tags.ILInt" Version="12.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>

Expand Down

0 comments on commit 52bd642

Please sign in to comment.