Skip to content

Commit

Permalink
Async DiskWriterQueue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ltetak committed Jan 26, 2024
1 parent 33f85d5 commit f21cd84
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
18 changes: 12 additions & 6 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ internal class DiskWriterQueue : IDisposable

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();
private readonly object _queueSync = new object();
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false);
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
_stream = stream;
_task = Task.Run(ExecuteQueue);
}

/// <summary>
Expand All @@ -47,6 +46,11 @@ public void EnqueuePage(PageBuffer page)
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();

if (_task == null)
{
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
}
}
}

Expand All @@ -62,7 +66,7 @@ public void Wait()
/// <summary>
/// Execute all items in queue sync
/// </summary>
private void ExecuteQueue()
private async Task ExecuteQueue()
{
while (true)
{
Expand All @@ -72,15 +76,16 @@ private void ExecuteQueue()
}
else
{
_stream.FlushToDisk();
lock (_queueSync)
{
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
_queueHasItems.Reset();
if (_shouldClose) return;
}
_stream.FlushToDisk();

_queueHasItems.Wait();
if (_shouldClose) return;
await _queueHasItems.WaitAsync();
}
}
}
Expand All @@ -105,6 +110,7 @@ public void Dispose()
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items

// run all items in queue before dispose
this.Wait();
Expand Down
35 changes: 35 additions & 0 deletions LiteDB/Utils/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Threading;
using System.Threading.Tasks;

namespace LiteDB
{
/// <summary>
/// Async implementation of ManualResetEvent
/// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
/// </summary>
internal class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new TaskCompletionSource<bool>();

public Task WaitAsync()
{
return _tcs.Task;
}

public void Set()
{
_tcs.TrySetResult(true);
}

public void Reset()
{
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}
}

0 comments on commit f21cd84

Please sign in to comment.