diff --git a/FollowingFileStream.Tests/AsyncStreamTest.cs b/FollowingFileStream.Tests/AsyncStreamTest.cs index da25567..a32eb68 100644 --- a/FollowingFileStream.Tests/AsyncStreamTest.cs +++ b/FollowingFileStream.Tests/AsyncStreamTest.cs @@ -1,5 +1,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; +using System; +using System.IO; +using System.Linq.Expressions; using System.Threading; namespace Manandre.IO @@ -14,7 +17,7 @@ public void AS_Read() var expected = 42; sut.Setup(x => x.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(expected); - var read = sut.Object.Read(null, 0 , 0); + var read = sut.Object.Read(null, 0, 0); Assert.AreEqual(expected, read); read = sut.Object.EndRead(sut.Object.BeginRead(null, 0, 0, null, null)); @@ -27,7 +30,7 @@ public void AS_Write() var sut = new Mock() { CallBase = true }; sut.Setup(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Verifiable(); - sut.Object.Write(null, 0 , 0); + sut.Object.Write(null, 0, 0); sut.Verify(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); sut.Object.EndWrite(sut.Object.BeginWrite(null, 0, 0, null, null)); @@ -50,5 +53,110 @@ public void AS_Dispose() sut.Object.Dispose(); sut.Object.Dispose(); } + + [TestMethod] + public void AS_Synchronized() + { + Assert.ThrowsException(() => AsyncStream.Synchronized(null)); + var sut = new Mock() { CallBase = true }; + var synchronized = AsyncStream.Synchronized(sut.Object); + Assert.IsNotNull(synchronized); + Assert.AreSame(synchronized, AsyncStream.Synchronized(synchronized)); + + // Caps + var funcs = new Expression>[] + { + x => x.CanRead, + x => x.CanWrite, + x => x.CanSeek, + x => x.CanTimeout + }; + foreach (var func in funcs) + { + foreach (var expected in new[] { false, true }) + { + sut.Setup(func).Returns(expected); + Assert.AreEqual(expected, func.Compile()(synchronized)); + } + } + + // Position + var expected2 = 42; + sut.Setup(x => x.Position).Returns(expected2); + Assert.AreEqual(expected2, synchronized.Position); + + sut.SetupProperty(x => x.Position); + synchronized.Position = expected2; + Assert.AreEqual(expected2, sut.Object.Position); + + // ReadTimeout + sut.Setup(x => x.ReadTimeout).Returns(expected2); + Assert.AreEqual(expected2, synchronized.ReadTimeout); + + sut.SetupProperty(x => x.ReadTimeout); + synchronized.ReadTimeout = expected2; + Assert.AreEqual(expected2, sut.Object.ReadTimeout); + + // WriteTimeout + sut.Setup(x => x.WriteTimeout).Returns(expected2); + Assert.AreEqual(expected2, synchronized.WriteTimeout); + + sut.SetupProperty(x => x.WriteTimeout); + synchronized.WriteTimeout = expected2; + Assert.AreEqual(expected2, sut.Object.WriteTimeout); + + // Seek + var expected_offset = 42; + var expected_origin = SeekOrigin.End; + var result_offset = 0L; + var result_origin = SeekOrigin.Begin; + sut.Setup(x => x.Seek(It.IsAny(), It.IsAny())) + .Callback((offset, origin) => + { + result_offset = offset; + result_origin = origin; + }); + synchronized.Seek(expected_offset, expected_origin); + Assert.AreEqual(expected_offset, result_offset); + Assert.AreEqual(expected_origin, result_origin); + + // SetLength + var expected_length = 42; + var result_length = 0L; + sut.Setup(x => x.SetLength(It.IsAny())) + .Callback(length => result_length = length); + synchronized.SetLength(expected_length); + Assert.AreEqual(expected_length, result_length); + + // Read + sut.Setup(x => x.ReadAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(expected2); + var read = synchronized.ReadAsync(null, 0, 0).Result; + Assert.AreEqual(expected2, read); + read = synchronized.Read(null, 0, 0); + Assert.AreEqual(expected2, read); + read = sut.Object.EndRead(sut.Object.BeginRead(null, 0, 0, null, null)); + Assert.AreEqual(expected2, read); + + // Write + sut.Setup(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Verifiable(); + synchronized.WriteAsync(null, 0, 0).Wait(); + sut.Verify(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + synchronized.Write(null, 0, 0); + sut.Verify(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + synchronized.EndWrite(sut.Object.BeginWrite(null, 0, 0, null, null)); + sut.Verify(x => x.WriteAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(3)); + Assert.ThrowsExceptionAsync(() => synchronized.WriteAsync(null, 0, 0, new CancellationToken(true))); + + // Async + sut.Setup(x => x.FlushAsync(It.IsAny())).Verifiable(); + synchronized.FlushAsync().Wait(); + sut.Verify(x => x.FlushAsync(It.IsAny()), Times.Once); + synchronized.Flush(); + sut.Verify(x => x.FlushAsync(It.IsAny()), Times.Exactly(2)); + + Assert.ThrowsExceptionAsync(() => synchronized.FlushAsync(new CancellationToken(true))); + } } } \ No newline at end of file diff --git a/FollowingFileStream.Tests/FollowingFileStreamTest.cs b/FollowingFileStream.Tests/FollowingFileStreamTest.cs index 21287b5..19186e6 100644 --- a/FollowingFileStream.Tests/FollowingFileStreamTest.cs +++ b/FollowingFileStream.Tests/FollowingFileStreamTest.cs @@ -168,7 +168,7 @@ public void FFS_FollowingRead(bool async) public void FFS_Close(bool async) { using (var input = File.CreateText(inputFilePath)) - using (var ffs = new FollowingFileStream(inputFilePath, 4*1024, async)) + using (var ffs = new FollowingFileStream(inputFilePath, 4*1024, async).Synchronized()) using (var destination = File.CreateText(outputFilePath)) { destination.AutoFlush = true; diff --git a/FollowingFileStream/AsyncLock.cs b/FollowingFileStream/AsyncLock.cs index c926b63..cc48025 100644 --- a/FollowingFileStream/AsyncLock.cs +++ b/FollowingFileStream/AsyncLock.cs @@ -21,7 +21,7 @@ public sealed class AsyncLock : IDisposable /// /// Asynchronous method to request lock. /// - /// AsyncLock taken an optional CancellationToken, which can be used to cancel the acquiring of the lock. + /// AsyncLock takes an optional CancellationToken, which can be used to cancel the acquiring of the lock. /// /// The task returned from LockAsync will enter the Completed state when it has acquired the AsyncLock. /// That same task will enter the Canceled state if the CancellationToken is signaled before the wait is satisfied; @@ -36,10 +36,11 @@ public sealed class AsyncLock : IDisposable /// /// Synchronous method to request lock. /// + /// AsyncLock takes an optional CancellationToken, which can be used to cancel the acquiring of the lock. /// An instance of AsyncLock - public AsyncLock Lock() + public AsyncLock Lock(CancellationToken cancellationToken = default(CancellationToken)) { - return LockAsync().GetAwaiter().GetResult(); + return LockAsync(cancellationToken).GetAwaiter().GetResult(); } /// diff --git a/FollowingFileStream/AsyncStream.cs b/FollowingFileStream/AsyncStream.cs index e0b58ce..391c811 100644 --- a/FollowingFileStream/AsyncStream.cs +++ b/FollowingFileStream/AsyncStream.cs @@ -11,15 +11,6 @@ namespace Manandre.IO /// public abstract class AsyncStream : Stream { - /// - /// Cancellation token source for retry attempts - /// - protected readonly CancellationTokenSource cts = new CancellationTokenSource(); - /// - /// Asynchronous lock to avoid race conditions - /// - protected readonly AsyncLock locker = new AsyncLock(); - /// /// Begins an asynchronous read operation. (Consider using AsyncStream.ReadAsync(System.Byte[],System.Int32,System.Int32,System.Threading.CancellationToken) /// instead.) @@ -294,7 +285,7 @@ public sealed override void Write(byte[] buffer, int offset, int count) public abstract override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken); private bool disposed = false; - + /// /// Releases the unmanaged resources used by the FollowingFileStream and optionally /// releases the managed resources. @@ -306,14 +297,204 @@ protected override void Dispose(bool disposing) if (disposed) return; - if (disposing) - { - cts.Dispose(); - } - disposed = true; // Call stream class implementation. base.Dispose(disposing); } + + /// + /// Synchronized version of an async stream + /// + /// Stream to synchronize + /// + public static AsyncStream Synchronized(AsyncStream stream) + { + if (stream == null) + { + throw new ArgumentNullException(nameof(stream)); + } + if (stream is AsyncSafeStream) + { + return stream; + } + + return new AsyncSafeStream(stream); + } + + private sealed class AsyncSafeStream : AsyncStream + { + private readonly AsyncStream _stream; + private readonly CancellationTokenSource cts = new CancellationTokenSource(); + private readonly AsyncLock locker = new AsyncLock(); + + public AsyncSafeStream(AsyncStream stream) + { + if (stream == null) + { + throw new ArgumentNullException(nameof(stream)); + } + + _stream = stream; + } + + public override bool CanRead => _stream.CanRead; + + public override bool CanWrite => _stream.CanWrite; + + public override bool CanSeek => _stream.CanSeek; + + public override bool CanTimeout => _stream.CanTimeout; + + public override long Length + { + get + { + using (locker.Lock(cts.Token)) + { + return _stream.Length; + } + } + } + + public override long Position + { + get + { + using (locker.Lock(cts.Token)) + { + return _stream.Position; + } + } + set + { + using (locker.Lock(cts.Token)) + { + _stream.Position = value; + } + } + } + + public override int ReadTimeout + { + get + { + return _stream.ReadTimeout; + } + set + { + _stream.ReadTimeout = value; + } + } + + public override int WriteTimeout + { + get + { + return _stream.WriteTimeout; + } + set + { + _stream.WriteTimeout = value; + } + } + + protected override void Dispose(bool disposing) + { + try + { + // Explicitly pick up a potentially methodimpl'ed Dispose + if (disposing) + { + cts.Cancel(); + using (locker.Lock()) + { + ((IDisposable)_stream).Dispose(); + } + } + } + finally + { + base.Dispose(disposing); + } + } + + public override long Seek(long offset, SeekOrigin origin) + { + using (locker.Lock(cts.Token)) + return _stream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + using (locker.Lock(cts.Token)) + _stream.SetLength(value); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var read = 0; + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); + try + { + using (await locker.LockAsync(linkedCts.Token)) + { + read = await _stream.ReadAsync(buffer, offset, count, linkedCts.Token); + } + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + return read; + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); + try + { + using (await locker.LockAsync(linkedCts.Token)) + { + await _stream.WriteAsync(buffer, offset, count, linkedCts.Token); + } + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); + try + { + using (await locker.LockAsync(linkedCts.Token)) + { + await _stream.FlushAsync(linkedCts.Token); + } + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + } + } + } + + /// + /// AsyncStream class extensions + /// + public static class AsyncStreamExtensions + { + /// + /// Synchronized version of an async stream + /// + /// Stream to synchronize + /// + public static AsyncStream Synchronized(this AsyncStream stream) + { + return AsyncStream.Synchronized(stream); + } } } \ No newline at end of file diff --git a/FollowingFileStream/FollowingFileStream.cs b/FollowingFileStream/FollowingFileStream.cs index 4ee682d..3a3a7aa 100644 --- a/FollowingFileStream/FollowingFileStream.cs +++ b/FollowingFileStream/FollowingFileStream.cs @@ -20,6 +20,10 @@ public class FollowingFileStream : AsyncStream /// private const int MillisecondsRetryTimeout = 100; + /// + /// CancellationTokenSource + /// + private readonly CancellationTokenSource cts = new CancellationTokenSource(); #region Constructors /// @@ -226,29 +230,19 @@ public FollowingFileStream(string path, int bufferSize, bool useAsync) public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int read = 0; - var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); - try + do { - using (await locker.LockAsync(linkedCts.Token)) - { - do - { - read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token); - } - while (read == 0 && await RetryNeededAsync()); - - // In case the filestream has been written and closed between the last read operation - // and the IsFileLockedForWriting() check - if (read == 0) - { - read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token); - } - } + read = await fileStream.ReadAsync(buffer, offset, count, cancellationToken); } - catch (OperationCanceledException) + while (read == 0 && await RetryNeededAsync()); + + // In case the filestream has been written and closed between the last read operation + // and the IsFileLockedForWriting() check + if (read == 0) { - cancellationToken.ThrowIfCancellationRequested(); + read = await fileStream.ReadAsync(buffer, offset, count, cancellationToken); } + return read; } @@ -324,10 +318,7 @@ protected override void Dispose(bool disposing) if (disposing) { cts.Cancel(); - using (locker.Lock()) - { - fileStream.Dispose(); - } + fileStream.Dispose(); } disposed = true; @@ -365,10 +356,7 @@ public override Task FlushAsync(CancellationToken cancellationToken) /// public override long Seek(long offset, SeekOrigin origin) { - using (locker.Lock()) - { - return fileStream.Seek(offset, origin); - } + return fileStream.Seek(offset, origin); } ///