Skip to content
Permalink
Browse files

Remove Nito.AsyncEx NuGet dependency: #901

  • Loading branch information...
nopara73 committed Nov 28, 2018
1 parent f948996 commit ffaf4aa693251c5506fc4dba8ca9f24b12c87baf
@@ -0,0 +1,209 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx.Synchronous;

// Written by Stephen Cleary: https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Coordination/AsyncLock.cs
// Original idea from Stephen Toub: http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx

namespace Nito.AsyncEx
{
/// <summary>
/// A mutual exclusion lock that is compatible with async. Note that this lock is <b>not</b> recursive!
/// </summary>
/// <remarks>
/// <para>This is the <c>async</c>-ready almost-equivalent of the <c>lock</c> keyword or the <see cref="Mutex"/> type, similar to <a href="http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx">Stephen Toub's AsyncLock</a>. It's only <i>almost</i> equivalent because the <c>lock</c> keyword permits reentrancy, which is not currently possible to do with an <c>async</c>-ready lock.</para>
/// <para>An <see cref="AsyncLock"/> is either taken or not. The lock can be asynchronously acquired by calling <see autoUpgrade="true" cref="LockAsync()"/>, and it is released by disposing the result of that task. <see cref="LockAsync(CancellationToken)"/> takes an optional <see cref="CancellationToken"/>, which can be used to cancel the acquiring of the lock.</para>
/// <para>The task returned from <see autoUpgrade="true" cref="LockAsync()"/> will enter the <c>Completed</c> state when it has acquired the <see cref="AsyncLock"/>. That same task will enter the <c>Canceled</c> state if the <see cref="CancellationToken"/> is signaled before the wait is satisfied; in that case, the <see cref="AsyncLock"/> is not taken by that task.</para>
/// <para>You can call <see cref="Lock(CancellationToken)"/> or <see cref="LockAsync(CancellationToken)"/> with an already-cancelled <see cref="CancellationToken"/> to attempt to acquire the <see cref="AsyncLock"/> immediately without actually entering the wait queue.</para>
/// </remarks>
/// <example>
/// <para>The vast majority of use cases are to just replace a <c>lock</c> statement. That is, with the original code looking like this:</para>
/// <code>
/// private readonly object _mutex = new object();
/// public void DoStuff()
/// {
/// lock (_mutex)
/// {
/// Thread.Sleep(TimeSpan.FromSeconds(1));
/// }
/// }
/// </code>
/// <para>If we want to replace the blocking operation <c>Thread.Sleep</c> with an asynchronous equivalent, it's not directly possible because of the <c>lock</c> block. We cannot <c>await</c> inside of a <c>lock</c>.</para>
/// <para>So, we use the <c>async</c>-compatible <see cref="AsyncLock"/> instead:</para>
/// <code>
/// private readonly AsyncLock _mutex = new AsyncLock();
/// public async Task DoStuffAsync()
/// {
/// using (await _mutex.LockAsync())
/// {
/// await Task.Delay(TimeSpan.FromSeconds(1));
/// }
/// }
/// </code>
/// </example>
[DebuggerDisplay("Id = {Id}, Taken = {_taken}")]
[DebuggerTypeProxy(typeof(DebugView))]
public sealed class AsyncLock
{
/// <summary>
/// Whether the lock is taken by a task.
/// </summary>
private bool _taken;

/// <summary>
/// The queue of TCSs that other tasks are awaiting to acquire the lock.
/// </summary>
private readonly IAsyncWaitQueue<IDisposable> _queue;

/// <summary>
/// The semi-unique identifier for this instance. This is 0 if the id has not yet been created.
/// </summary>
private int _id;

/// <summary>
/// The object used for mutual exclusion.
/// </summary>
private readonly object _mutex;

/// <summary>
/// Creates a new async-compatible mutual exclusion lock.
/// </summary>
public AsyncLock()
: this(null)
{
}

/// <summary>
/// Creates a new async-compatible mutual exclusion lock using the specified wait queue.
/// </summary>
/// <param name="queue">The wait queue used to manage waiters. This may be <c>null</c> to use a default (FIFO) queue.</param>
public AsyncLock(IAsyncWaitQueue<IDisposable> queue)
{
_queue = queue ?? new DefaultAsyncWaitQueue<IDisposable>();
_mutex = new object();
}

/// <summary>
/// Gets a semi-unique identifier for this asynchronous lock.
/// </summary>
public int Id
{
get { return IdManager<AsyncLock>.GetId(ref _id); }
}

/// <summary>
/// Asynchronously acquires the lock. Returns a disposable that releases the lock when disposed.
/// </summary>
/// <param name="cancellationToken">The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).</param>
/// <returns>A disposable that releases the lock when disposed.</returns>
private Task<IDisposable> RequestLockAsync(CancellationToken cancellationToken)
{
lock (_mutex)
{
if (!_taken)
{
// If the lock is available, take it immediately.
_taken = true;
return Task.FromResult<IDisposable>(new Key(this));
}
else
{
// Wait for the lock to become available or cancellation.
return _queue.Enqueue(_mutex, cancellationToken);
}
}
}

/// <summary>
/// Asynchronously acquires the lock. Returns a disposable that releases the lock when disposed.
/// </summary>
/// <param name="cancellationToken">The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).</param>
/// <returns>A disposable that releases the lock when disposed.</returns>
public AwaitableDisposable<IDisposable> LockAsync(CancellationToken cancellationToken)
{
return new AwaitableDisposable<IDisposable>(RequestLockAsync(cancellationToken));
}

/// <summary>
/// Asynchronously acquires the lock. Returns a disposable that releases the lock when disposed.
/// </summary>
/// <returns>A disposable that releases the lock when disposed.</returns>
public AwaitableDisposable<IDisposable> LockAsync()
{
return LockAsync(CancellationToken.None);
}

/// <summary>
/// Synchronously acquires the lock. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
/// </summary>
/// <param name="cancellationToken">The cancellation token used to cancel the lock. If this is already set, then this method will attempt to take the lock immediately (succeeding if the lock is currently available).</param>
public IDisposable Lock(CancellationToken cancellationToken)
{
return RequestLockAsync(cancellationToken).WaitAndUnwrapException();
}

/// <summary>
/// Synchronously acquires the lock. Returns a disposable that releases the lock when disposed. This method may block the calling thread.
/// </summary>
public IDisposable Lock()
{
return Lock(CancellationToken.None);
}

/// <summary>
/// Releases the lock.
/// </summary>
internal void ReleaseLock()
{
lock (_mutex)
{
if (_queue.IsEmpty)
_taken = false;
else
_queue.Dequeue(new Key(this));
}
}

/// <summary>
/// The disposable which releases the lock.
/// </summary>
private sealed class Key : Disposables.SingleDisposable<AsyncLock>
{
/// <summary>
/// Creates the key for a lock.
/// </summary>
/// <param name="asyncLock">The lock to release. May not be <c>null</c>.</param>
public Key(AsyncLock asyncLock)
: base(asyncLock)
{
}

protected override void Dispose(AsyncLock context)
{
context.ReleaseLock();
}
}

// ReSharper disable UnusedMember.Local
[DebuggerNonUserCode]
private sealed class DebugView
{
private readonly AsyncLock _mutex;

public DebugView(AsyncLock mutex)
{
_mutex = mutex;
}

public int Id { get { return _mutex.Id; } }

public bool Taken { get { return _mutex._taken; } }

public IAsyncWaitQueue<IDisposable> WaitQueue { get { return _mutex._queue; } }
}

// ReSharper restore UnusedMember.Local
}
}
@@ -0,0 +1,167 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nito.Collections;

namespace Nito.AsyncEx
{
/// <summary>
/// A collection of cancelable <see cref="TaskCompletionSource{T}"/> instances. Implementations must assume the caller is holding a lock.
/// </summary>
/// <typeparam name="T">The type of the results. If this isn't needed, use <see cref="object"/>.</typeparam>
public interface IAsyncWaitQueue<T>
{
/// <summary>
/// Gets whether the queue is empty.
/// </summary>
bool IsEmpty { get; }

/// <summary>
/// Creates a new entry and queues it to this wait queue. The returned task must support both synchronous and asynchronous waits.
/// </summary>
/// <returns>The queued task.</returns>
Task<T> Enqueue();

/// <summary>
/// Removes a single entry in the wait queue and completes it. This method may only be called if <see cref="IsEmpty"/> is <c>false</c>. The task continuations for the completed task must be executed asynchronously.
/// </summary>
/// <param name="result">The result used to complete the wait queue entry. If this isn't needed, use <c>default(T)</c>.</param>
void Dequeue(T result = default);

/// <summary>
/// Removes all entries in the wait queue and completes them. The task continuations for the completed tasks must be executed asynchronously.
/// </summary>
/// <param name="result">The result used to complete the wait queue entries. If this isn't needed, use <c>default(T)</c>.</param>
void DequeueAll(T result = default);

/// <summary>
/// Attempts to remove an entry from the wait queue and cancels it. The task continuations for the completed task must be executed asynchronously.
/// </summary>
/// <param name="task">The task to cancel.</param>
/// <param name="cancellationToken">The cancellation token to use to cancel the task.</param>
bool TryCancel(Task task, CancellationToken cancellationToken);

/// <summary>
/// Removes all entries from the wait queue and cancels them. The task continuations for the completed tasks must be executed asynchronously.
/// </summary>
/// <param name="cancellationToken">The cancellation token to use to cancel the tasks.</param>
void CancelAll(CancellationToken cancellationToken);
}

/// <summary>
/// Provides extension methods for wait queues.
/// </summary>
public static class AsyncWaitQueueExtensions
{
/// <summary>
/// Creates a new entry and queues it to this wait queue. If the cancellation token is already canceled, this method immediately returns a canceled task without modifying the wait queue.
/// </summary>
/// <param name="this">The wait queue.</param>
/// <param name="mutex">A synchronization object taken while cancelling the entry.</param>
/// <param name="token">The token used to cancel the wait.</param>
/// <returns>The queued task.</returns>
public static Task<T> Enqueue<T>(this IAsyncWaitQueue<T> @this, object mutex, CancellationToken token)
{
if (token.IsCancellationRequested)
return Task.FromCanceled<T>(token);

var ret = @this.Enqueue();
if (!token.CanBeCanceled)
return ret;

var registration = token.Register(() =>
{
lock (mutex)
@this.TryCancel(ret, token);
}, useSynchronizationContext: false);
ret.ContinueWith(_ => registration.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
return ret;
}
}

/// <summary>
/// The default wait queue implementation, which uses a double-ended queue.
/// </summary>
/// <typeparam name="T">The type of the results. If this isn't needed, use <see cref="object"/>.</typeparam>
[DebuggerDisplay("Count = {Count}")]
[DebuggerTypeProxy(typeof(DefaultAsyncWaitQueue<>.DebugView))]
public sealed class DefaultAsyncWaitQueue<T> : IAsyncWaitQueue<T>
{
private readonly Deque<TaskCompletionSource<T>> _queue = new Deque<TaskCompletionSource<T>>();

private int Count
{
get { return _queue.Count; }
}

bool IAsyncWaitQueue<T>.IsEmpty
{
get { return Count == 0; }
}

Task<T> IAsyncWaitQueue<T>.Enqueue()
{
var tcs = TaskCompletionSourceExtensions.CreateAsyncTaskSource<T>();
_queue.AddToBack(tcs);
return tcs.Task;
}

void IAsyncWaitQueue<T>.Dequeue(T result)
{
_queue.RemoveFromFront().TrySetResult(result);
}

void IAsyncWaitQueue<T>.DequeueAll(T result)
{
foreach (var source in _queue)
source.TrySetResult(result);
_queue.Clear();
}

bool IAsyncWaitQueue<T>.TryCancel(Task task, CancellationToken cancellationToken)
{
for (int i = 0; i != _queue.Count; ++i)
{
if (_queue[i].Task == task)
{
_queue[i].TrySetCanceled(cancellationToken);
_queue.RemoveAt(i);
return true;
}
}
return false;
}

void IAsyncWaitQueue<T>.CancelAll(CancellationToken cancellationToken)
{
foreach (var source in _queue)
source.TrySetCanceled(cancellationToken);
_queue.Clear();
}

[DebuggerNonUserCode]
internal sealed class DebugView
{
private readonly DefaultAsyncWaitQueue<T> _queue;

public DebugView(DefaultAsyncWaitQueue<T> queue)
{
_queue = queue;
}

[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public Task<T>[] Tasks
{
get
{
var result = new List<Task<T>>(_queue._queue.Count);
foreach (var entry in _queue._queue)
result.Add(entry.Task);
return result.ToArray();
}
}
}
}
}

0 comments on commit ffaf4aa

Please sign in to comment.
You can’t perform that action at this time.