Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 47 additions & 28 deletions FollowingFileStream/FollowingFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class FollowingFileStream : Stream
/// </exception>
public FollowingFileStream(string path)
{
fileStream = new FileStream(path, FileMode.Open,FileAccess.Read, FileShare.ReadWrite);
fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
}

/// <summary>
Expand Down Expand Up @@ -122,7 +122,7 @@ public FollowingFileStream(string path)
/// </exception>
public FollowingFileStream(string path, int bufferSize, bool useAsync)
{
fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync);
fileStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, bufferSize, useAsync);
}

/// <summary>
Expand Down Expand Up @@ -194,7 +194,7 @@ public FollowingFileStream(string path, int bufferSize, bool useAsync)
/// <exception cref="System.IO.EndOfStreamException">
/// Attempted seeking past the end of a stream that does not support this.
/// </exception>
public override long Position { get => fileStream.Position; set => fileStream.Position = value;}
public override long Position { get => fileStream.Position; set => fileStream.Position = value; }

/// <summary>
/// Reads a block of bytes from the stream and writes the data in a given buffer.
Expand Down Expand Up @@ -228,7 +228,7 @@ public FollowingFileStream(string path, int bufferSize, bool useAsync)
/// </exception>
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset,count, CancellationToken.None).GetAwaiter().GetResult();
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}

/// <summary>
Expand Down Expand Up @@ -268,17 +268,34 @@ public override int Read(byte[] buffer, int offset, int count)
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = 0;
using(await locker.LockAsync())
using (await locker.LockAsync())
{
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
do {
try{
do
{
try
{
read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token);
}
catch (OperationCanceledException) {
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
}
} while (read == 0 && await RetryNeededAsync());

// In case the filestream has been written and closed between the last read operation
// and the IsFileLockedForWriting() check
if (read == 0)
{
try
{
read = await fileStream.ReadAsync(buffer, offset, count, linkedCts.Token);
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
}
}
}
return read;
}
Expand All @@ -295,12 +312,13 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
private async Task<bool> RetryNeededAsync()
{
bool retry = IsFileLockedForWriting();
if (retry) {
if (retry)
{
try
{
await Task.Delay(MillisecondsRetryTimeout, cts.Token).ConfigureAwait(false);
}
catch(TaskCanceledException)
catch (TaskCanceledException)
{
retry = false;
}
Expand Down Expand Up @@ -349,19 +367,20 @@ private bool IsFileLockedForWriting()
protected override void Dispose(bool disposing)
{
if (disposed)
return;

if (disposing) {
return;

if (disposing)
{
cts.Cancel();
using(locker.Lock())
using (locker.Lock())
{
fileStream.Dispose();
}
cts.Dispose();
// Free any other managed objects here.
//
}

// Free any unmanaged objects here.
//

Expand Down Expand Up @@ -400,7 +419,7 @@ public override void Flush()
/// </exception>
public override long Seek(long offset, SeekOrigin origin)
{
using(locker.Lock())
using (locker.Lock())
{
return fileStream.Seek(offset, origin);
}
Expand All @@ -415,7 +434,7 @@ public override void SetLength(long value)
throw new NotSupportedException();
}


/// <summary>
/// Writes a block of bytes to the file stream.
/// </summary>
Expand Down Expand Up @@ -492,13 +511,13 @@ public override int EndRead(IAsyncResult asyncResult)
public sealed class AsyncLock : IDisposable
{
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);

public async Task<AsyncLock> LockAsync()
{
await _semaphoreSlim.WaitAsync();
return this;
}

public AsyncLock Lock()
{
return LockAsync().GetAwaiter().GetResult();
Expand All @@ -512,24 +531,24 @@ public void Dispose()

public static class AsyncExtensions
{
public static IAsyncResult AsApm<T>(this Task<T> task,
AsyncCallback callback,
public static IAsyncResult AsApm<T>(this Task<T> task,
AsyncCallback callback,
object state)
{
if (task == null)
if (task == null)
throw new ArgumentNullException("task");

var tcs = new TaskCompletionSource<T>(state);
task.ContinueWith(t =>
task.ContinueWith(t =>
{
if (t.IsFaulted)
if (t.IsFaulted)
tcs.TrySetException(t.Exception.InnerExceptions);
else if (t.IsCanceled)
else if (t.IsCanceled)
tcs.TrySetCanceled();
else
else
tcs.TrySetResult(t.Result);

if (callback != null)
if (callback != null)
callback(tcs.Task);
}, TaskScheduler.Default);
return tcs.Task;
Expand Down