diff --git a/FollowingFileStream/FollowingFileStream.cs b/FollowingFileStream/FollowingFileStream.cs index d052b06..0721c91 100644 --- a/FollowingFileStream/FollowingFileStream.cs +++ b/FollowingFileStream/FollowingFileStream.cs @@ -65,7 +65,7 @@ public class FollowingFileStream : Stream /// public FollowingFileStream(string path) { - fileStream = new FileStream(path, FileMode.Open,FileAccess.Read, FileShare.ReadWrite); + fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); } /// @@ -122,7 +122,7 @@ public FollowingFileStream(string path) /// public FollowingFileStream(string path, int bufferSize, bool useAsync) { - fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync); + fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync); } /// @@ -194,7 +194,7 @@ public FollowingFileStream(string path, int bufferSize, bool useAsync) /// /// Attempted seeking past the end of a stream that does not support this. /// - public override long Position { get => fileStream.Position; set => fileStream.Position = value;} + public override long Position { get => fileStream.Position; set => fileStream.Position = value; } /// /// Reads a block of bytes from the stream and writes the data in a given buffer. @@ -228,7 +228,7 @@ public FollowingFileStream(string path, int bufferSize, bool useAsync) /// public override int Read(byte[] buffer, int offset, int count) { - return ReadAsync(buffer, offset,count, CancellationToken.None).GetAwaiter().GetResult(); + return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); } /// @@ -268,17 +268,34 @@ public override int Read(byte[] buffer, int offset, int count) public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int read = 0; - using(await locker.LockAsync()) + using (await locker.LockAsync()) { var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token); - do { - try{ + do + { + try + { read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token); } - catch (OperationCanceledException) { + catch (OperationCanceledException) + { cancellationToken.ThrowIfCancellationRequested(); } } while (read == 0 && await RetryNeededAsync()); + + // In case the filestream has been written and closed between the last read operation + // and the IsFileLockedForWriting() check + if (read == 0) + { + try + { + read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token); + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + } + } } return read; } @@ -295,12 +312,13 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, private async Task RetryNeededAsync() { bool retry = IsFileLockedForWriting(); - if (retry) { + if (retry) + { try { await Task.Delay(MillisecondsRetryTimeout, cts.Token).ConfigureAwait(false); } - catch(TaskCanceledException) + catch (TaskCanceledException) { retry = false; } @@ -349,11 +367,12 @@ private bool IsFileLockedForWriting() protected override void Dispose(bool disposing) { if (disposed) - return; - - if (disposing) { + return; + + if (disposing) + { cts.Cancel(); - using(locker.Lock()) + using (locker.Lock()) { fileStream.Dispose(); } @@ -361,7 +380,7 @@ protected override void Dispose(bool disposing) // Free any other managed objects here. // } - + // Free any unmanaged objects here. // @@ -400,7 +419,7 @@ public override void Flush() /// public override long Seek(long offset, SeekOrigin origin) { - using(locker.Lock()) + using (locker.Lock()) { return fileStream.Seek(offset, origin); } @@ -415,7 +434,7 @@ public override void SetLength(long value) throw new NotSupportedException(); } - + /// /// Writes a block of bytes to the file stream. /// @@ -492,13 +511,13 @@ public override int EndRead(IAsyncResult asyncResult) public sealed class AsyncLock : IDisposable { private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - + public async Task LockAsync() { await _semaphoreSlim.WaitAsync(); return this; } - + public AsyncLock Lock() { return LockAsync().GetAwaiter().GetResult(); @@ -512,24 +531,24 @@ public void Dispose() public static class AsyncExtensions { - public static IAsyncResult AsApm(this Task task, - AsyncCallback callback, + public static IAsyncResult AsApm(this Task task, + AsyncCallback callback, object state) { - if (task == null) + if (task == null) throw new ArgumentNullException("task"); - + var tcs = new TaskCompletionSource(state); - task.ContinueWith(t => + task.ContinueWith(t => { - if (t.IsFaulted) + if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions); - else if (t.IsCanceled) + else if (t.IsCanceled) tcs.TrySetCanceled(); - else + else tcs.TrySetResult(t.Result); - if (callback != null) + if (callback != null) callback(tcs.Task); }, TaskScheduler.Default); return tcs.Task;