Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Not Implemented Exception on Linux #1462

Merged
merged 4 commits into from May 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 98 additions & 14 deletions WalletWasabi/Nito/AsyncEx/AsyncMutex.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -10,6 +11,20 @@ namespace Nito.AsyncEx
{
public class AsyncMutex
{
/// <summary>
/// I did this because enum cannot be Interlocked easily.
/// </summary>
public enum AsyncLockStatus
{
StatusUninitialized = 0,
StatusReady = 1,
StatusAcquiring = 2,
StatusAcquired = 3,
StatusReleasing = 4,
}

private int _status;

/// <summary>
/// Short name of the mutex. This string added to the end of the mutex name.
/// </summary>
Expand Down Expand Up @@ -74,6 +89,7 @@ public AsyncMutex(string name)
AsyncMutexes.Add(FullName, this);
}
}
ChangeStatus(AsyncLockStatus.StatusReady, AsyncLockStatus.StatusUninitialized);
}

private int _command;
Expand Down Expand Up @@ -123,8 +139,10 @@ private void HoldLock(object cancellationTokenObj)
bool acquired = false;

// Timeout logic.
while (DateTime.Now - start > TimeSpan.FromSeconds(90))
while (true)
{
if (DateTime.Now - start > TimeSpan.FromSeconds(90))
throw new TimeoutException("Could not acquire mutex in time");
// Block for n ms and try to acquire the mutex. Blocking is not a problem
// we are on our own thread.
acquired = Mutex.WaitOne(1000);
Expand Down Expand Up @@ -160,7 +178,8 @@ private void HoldLock(object cancellationTokenObj)
return; // End of the Thread.
}

throw new NotImplementedException();
// If we get here something went wrong.
throw new NotImplementedException($"AsyncMutex thread operation failed in {ShortName}");
}
catch (Exception ex)
{
Expand All @@ -169,6 +188,8 @@ private void HoldLock(object cancellationTokenObj)
{
LatestHoldLockException = ex;
}
// Terminate the Thread.
return;
}
finally
{
Expand Down Expand Up @@ -220,6 +241,21 @@ private void SetCommand(int command)
ToDo.Set();
}

private void ChangeStatus(AsyncLockStatus newStatus, AsyncLockStatus expectedPreviousStatus)
{
ChangeStatus(newStatus, new[] { expectedPreviousStatus });
}

private void ChangeStatus(AsyncLockStatus newStatus, AsyncLockStatus[] expectedPreviousStatuses)
{
var prevstatus = Interlocked.Exchange(ref _status, (int)newStatus);

if (!expectedPreviousStatuses.Contains((AsyncLockStatus)prevstatus))
{
throw new InvalidOperationException($"Previous AsyncLock state was unexpected: prev:{((AsyncLockStatus)prevstatus).ToString()} now:{((AsyncLockStatus)_status).ToString()}.");
}
}

/// <summary>
/// The Lock mechanism designed for standard using blocks. This lock is thread and interprocess safe.
/// You can create and use it from anywhere.
Expand Down Expand Up @@ -247,8 +283,27 @@ public async Task<IDisposable> LockAsync(CancellationToken cancellationToken = d

MutexThread.Start(cancellationToken);

// Create the mutex and acquire it.
await SetCommandAsync(1, cancellationToken, pollInterval);
ChangeStatus(AsyncLockStatus.StatusAcquiring, AsyncLockStatus.StatusReady);

// Thread is running from now.

try
{
// Create the mutex and acquire it.
await SetCommandAsync(1, cancellationToken, pollInterval);
}
catch (Exception ex)
{
Logger.LogWarning<AsyncMutex>(ex);
// If the thread is still alive we must stop it
StopThread();

ChangeStatus(AsyncLockStatus.StatusReady, AsyncLockStatus.StatusAcquiring);

throw ex;
}

ChangeStatus(AsyncLockStatus.StatusAcquired, AsyncLockStatus.StatusAcquiring);

return new Key(this);
}
Expand All @@ -270,29 +325,58 @@ public async Task<IDisposable> LockAsync(CancellationToken cancellationToken = d
// Let it go.
}

// If something failed then release all.
ReleaseLock();
// Release the local lock.
AsyncLock.ReleaseLock();

throw new IOException($"Couldn't acquire system wide mutex on {ShortName}", inner);
}

private void ReleaseLock()
private void StopThread()
{
if (MutexThread != null)
try
{
if (!MutexThread.IsAlive)
var start = DateTime.Now;
while (MutexThread.IsAlive)
{
throw new InvalidOperationException($"Thread should be alive.");
SetCommand(2);
MutexThread?.Join(TimeSpan.FromSeconds(1));

if (DateTime.Now - start > TimeSpan.FromSeconds(10))
{
throw new TimeoutException("Could not stop MutexThread, aborting it.");
}
}

// Successfully stopped the thread.
return;
}
catch (Exception ex)
{
// Let it go...
Logger.LogWarning<AsyncMutex>(ex);
}
// Send release command to the mutex-thread.
SetCommand(2);

// Wait for it.
MutexThread?.Join();
// Final solution, abort the thread.

// MutexThread.Abort(); // Not supported on Linux!
}

private void ReleaseLock()
{
if (MutexThread != null && !MutexThread.IsAlive)
{
throw new InvalidOperationException($"Thread should be alive.");
}

// On multiply call we will get an exception. This is not a dispose so we can throw here.
ChangeStatus(AsyncLockStatus.StatusReleasing, AsyncLockStatus.StatusAcquired);

StopThread();

// Release the local lock.
AsyncLock?.ReleaseLock();

ChangeStatus(AsyncLockStatus.StatusReady, AsyncLockStatus.StatusReleasing);
}

/// <summary>
Expand Down