Skip to content

Commit

Permalink
Allow flushing pending log messages (#157 closes #147, #154)
Browse files Browse the repository at this point in the history
  • Loading branch information
luigiberrettini committed Oct 28, 2018
1 parent 1e5fb22 commit 9fd9903
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/NLog.Targets.Syslog/AsyncLogger.cs
Expand Up @@ -24,6 +24,7 @@ internal class AsyncLogger
private readonly BlockingCollection<AsyncLogEventInfo> queue;
private readonly ByteArray buffer;
private readonly MessageTransmitter messageTransmitter;
private readonly LogEventInfo notLoggedLogEventInfo;

public AsyncLogger(Layout loggingLayout, EnforcementConfig enforcementConfig, MessageBuilder messageBuilder, MessageTransmitterConfig messageTransmitterConfig)
{
Expand All @@ -34,6 +35,7 @@ public AsyncLogger(Layout loggingLayout, EnforcementConfig enforcementConfig, Me
queue = NewBlockingCollection();
buffer = new ByteArray(enforcementConfig.TruncateMessageTo);
messageTransmitter = MessageTransmitter.FromConfig(messageTransmitterConfig);
notLoggedLogEventInfo = new LogEventInfo(LogLevel.Off, string.Empty, nameof(notLoggedLogEventInfo));
Task.Run(() => ProcessQueueAsync(messageBuilder));
}

Expand All @@ -42,6 +44,13 @@ public void Log(AsyncLogEventInfo asyncLogEvent)
throttling.Apply(queue.Count, timeout => Enqueue(asyncLogEvent, timeout));
}

public Task FlushAsync()
{
var flushTcs = new TaskCompletionSource<object>();
Enqueue(NewFlushCompletionMarker(flushTcs), Timeout.Infinite);
return flushTcs.Task;
}

private BlockingCollection<AsyncLogEventInfo> NewBlockingCollection()
{
var throttlingLimit = throttling.Limit;
Expand Down Expand Up @@ -69,6 +78,7 @@ private Task ProcessQueueAsync(MessageBuilder messageBuilder, TaskCompletionSour
try
{
var asyncLogEventInfo = queue.Take(token);
SignalFlushCompletionWhenIsMarker(asyncLogEventInfo);
var logEventMsgSet = new LogEventMsgSet(asyncLogEventInfo, buffer, messageBuilder, messageTransmitter);

logEventMsgSet
Expand Down Expand Up @@ -103,6 +113,21 @@ private void Enqueue(AsyncLogEventInfo asyncLogEventInfo, int timeout)
InternalLogger.Debug(() => $"Enqueued '{asyncLogEventInfo.ToFormattedMessage()}'");
}

private AsyncLogEventInfo NewFlushCompletionMarker(TaskCompletionSource<object> tcs)
{
var asyncContinuation = new AsyncContinuation(_ => tcs.TrySetResult(null));
return new AsyncLogEventInfo(notLoggedLogEventInfo, asyncContinuation);
}

private void SignalFlushCompletionWhenIsMarker(AsyncLogEventInfo asyncLogEventInfo)
{
bool IsFlushCompletionMarker(AsyncLogEventInfo x) => x.LogEvent == notLoggedLogEventInfo;
void SignalFlushCompletion() => asyncLogEventInfo.Continuation(null);

if (IsFlushCompletionMarker(asyncLogEventInfo))
SignalFlushCompletion();
}

public void Dispose()
{
cts.Cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/NLog.Targets.Syslog/Policies/Throttling.cs
Expand Up @@ -37,7 +37,7 @@ public void Apply(int waitingLogEntries, Action<int> actionWithTimeout)
{
if (Strategy == ThrottlingStrategy.None || waitingLogEntries < Limit)
{
actionWithTimeout(0);
actionWithTimeout(Timeout.Infinite);
return;
}

Expand Down
19 changes: 19 additions & 0 deletions src/NLog.Targets.Syslog/SyslogTarget.cs
Expand Up @@ -8,6 +8,7 @@
using System;
using System.ComponentModel;
using System.Linq;
using System.Threading.Tasks;

namespace NLog.Targets.Syslog
{
Expand Down Expand Up @@ -65,6 +66,24 @@ protected override void Write(AsyncLogEventInfo asyncLogEvent)
asyncLoggers[asyncLoggerId].Log(asyncLogEvent);
}

/// <summary>Flushes any pending log message</summary>
/// <param name="asyncContinuation">The asynchronous continuation</param>
protected override void FlushAsync(AsyncContinuation asyncContinuation)
{
var tasks = Enforcement.MessageProcessors.Select(i => asyncLoggers[i].FlushAsync()).ToArray();
Task.WhenAll(tasks)
.ContinueWith(t =>
{
if (t.Exception != null)
{
asyncContinuation(t.Exception.GetBaseException());
return;
}
asyncContinuation(null);
})
.Wait();
}

/// <summary>Disposes the instance</summary>
protected override void Dispose(bool disposing)
{
Expand Down

0 comments on commit 9fd9903

Please sign in to comment.