Skip to content

Commit

Permalink
Merge pull request dotnet/corefx#11569 from stephentoub/filestream_co…
Browse files Browse the repository at this point in the history
…pytoasync

Optimize overlapped I/O FileStream.CopyToAsync implementation on Windows

Commit migrated from dotnet/corefx@f3a2e16
  • Loading branch information
stephentoub committed Sep 13, 2016
2 parents 0e454f8 + 5e714d5 commit 00bb21c
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/libraries/System.IO.FileSystem/src/Resources/Strings.resx
Expand Up @@ -291,4 +291,7 @@
<data name="UnknownError_Num" xml:space="preserve">
<value>Unknown error '{0}'.</value>
</data>
<data name="ObjectDisposed_StreamClosed" xml:space="preserve">
<value>Cannot access a closed Stream.</value>
</data>
</root>
320 changes: 320 additions & 0 deletions src/libraries/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;
using System.Runtime.CompilerServices;

/*
* Win32FileStream supports different modes of accessing the disk - async mode
Expand Down Expand Up @@ -1684,6 +1685,325 @@ private int GetLastWin32ErrorAndDisposeHandleIfInvalid(bool throwIfInvalidHandle
return errorCode;
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// Validate arguments as would the base implementation
if (destination == null)
{
throw new ArgumentNullException(nameof(destination));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), SR.ArgumentOutOfRange_NeedPosNum);
}
bool parentCanRead = _parent.CanRead;
if (!parentCanRead && !_parent.CanWrite)
{
throw new ObjectDisposedException(null, SR.ObjectDisposed_StreamClosed);
}
bool destinationCanWrite = destination.CanWrite;
if (!destination.CanRead && !destinationCanWrite)
{
throw new ObjectDisposedException(nameof(destination), SR.ObjectDisposed_StreamClosed);
}
if (!parentCanRead)
{
throw new NotSupportedException(SR.NotSupported_UnreadableStream);
}
if (!destinationCanWrite)
{
throw new NotSupportedException(SR.NotSupported_UnwritableStream);
}

// Bail early for cancellation if cancellation has been requested
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
}

// Fail if the file was closed
if (_handle.IsClosed)
{
throw Error.GetFileNotOpen();
}

// Do the async copy, with differing implementations based on whether the FileStream was opened as async or sync
Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
return _isAsync ?
AsyncModeCopyToAsync(destination, bufferSize, cancellationToken) :
base.CopyToAsync(destination, bufferSize, cancellationToken);
}

private async Task AsyncModeCopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
Debug.Assert(_isAsync, "This implementation is for async mode only");
Debug.Assert(!_handle.IsClosed, "!_handle.IsClosed");
Debug.Assert(_parent.CanRead, "_parent.CanRead");

// Make sure any pending writes have been flushed before we do a read.
if (_writePos > 0)
{
await FlushWriteAsync(cancellationToken).ConfigureAwait(false);
}

// Typically CopyToAsync would be invoked as the only "read" on the stream, but it's possible some reading is
// done and then the CopyToAsync is issued. For that case, see if we have any data available in the buffer.
if (_buffer != null)
{
int bufferedBytes = _readLen - _readPos;
if (bufferedBytes > 0)
{
await destination.WriteAsync(_buffer, _readPos, bufferedBytes, cancellationToken).ConfigureAwait(false);
_readPos = _readLen = 0;
}
}

// For efficiency, we avoid creating a new task and associated state for each asynchronous read.
// Instead, we create a single reusable awaitable object that will be triggered when an await completes
// and reset before going again.
var readAwaitable = new AsyncCopyToAwaitable(this);

// Make sure we are reading from the position that we think we are.
// Only set the position in the awaitable if we can seek (e.g. not for pipes).
bool canSeek = _parent.CanSeek;
if (canSeek)
{
if (_exposedHandle)
{
VerifyOSHandlePosition();
}
readAwaitable._position = _pos;
}

// Create the buffer to use for the copy operation, as the base CopyToAsync does. We don't try to use
// _buffer here, even if it's not null, as concurrent operations are allowed, and another operation may
// actually be using the buffer already. Plus, it'll be rare for _buffer to be non-null, as typically
// CopyToAsync is used as the only operation performed on the stream, and the buffer is lazily initialized.
// Further, typically the CopyToAsync buffer size will be larger than that used by the FileStream, such that
// we'd likely be unable to use it anyway. A better option than using _buffer would be a future pooling solution.
byte[] copyBuffer = new byte[bufferSize];

// Allocate an Overlapped we can use repeatedly for all operations
var awaitableOverlapped = new PreAllocatedOverlapped(AsyncCopyToAwaitable.s_callback, readAwaitable, copyBuffer);
var cancellationReg = default(CancellationTokenRegistration);
try
{
// Register for cancellation. We do this once for the whole copy operation, and just try to cancel
// whatever read operation may currently be in progress, if there is one. It's possible the cancellation
// request could come in between operations, in which case we flag that with explicit calls to ThrowIfCancellationRequested
// in the read/write copy loop.
if (cancellationToken.CanBeCanceled)
{
cancellationReg = cancellationToken.Register(s =>
{
var innerAwaitable = (AsyncCopyToAwaitable)s;
unsafe
{
lock (innerAwaitable.CancellationLock) // synchronize with cleanup of the overlapped
{
if (innerAwaitable._nativeOverlapped != null)
{
// Try to cancel the I/O. We ignore the return value, as cancellation is opportunistic and we
// don't want to fail the operation because we couldn't cancel it.
Interop.mincore.CancelIoEx(innerAwaitable._fileStream._handle, innerAwaitable._nativeOverlapped);
}
}
}
}, readAwaitable);
}

// Repeatedly read from this FileStream and write the results to the destination stream.
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
readAwaitable.ResetForNextOperation();

try
{
bool synchronousSuccess;
int errorCode;
unsafe
{
// Allocate a native overlapped for our reusable overlapped, and set position to read based on the next
// desired address stored in the awaitable. (This position may be 0, if either we're at the beginning or
// if the stream isn't seekable.)
readAwaitable._nativeOverlapped = _handle.ThreadPoolBinding.AllocateNativeOverlapped(awaitableOverlapped);
if (canSeek)
{
readAwaitable._nativeOverlapped->OffsetLow = unchecked((int)readAwaitable._position);
readAwaitable._nativeOverlapped->OffsetHigh = (int)(readAwaitable._position >> 32);
}

// Kick off the read.
synchronousSuccess = ReadFileNative(_handle, copyBuffer, 0, copyBuffer.Length, readAwaitable._nativeOverlapped, out errorCode) >= 0;
}

// If the operation did not synchronously succeed, it either failed or initiated the asynchronous operation.
if (!synchronousSuccess)
{
switch (errorCode)
{
case ERROR_IO_PENDING:
// Async operation in progress.
break;
case ERROR_BROKEN_PIPE:
case ERROR_HANDLE_EOF:
// We're at or past the end of the file, and the overlapped callback
// won't be raised in these cases. Mark it as completed so that the await
// below will see it as such.
readAwaitable.MarkCompleted();
break;
default:
// Everything else is an error (and there won't be a callback).
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}

// Wait for the async operation (which may or may not have already completed), then throw if it failed.
await readAwaitable;
switch (readAwaitable._errorCode)
{
case 0: // success
Debug.Assert(readAwaitable._numBytes >= 0, $"Expected non-negative numBytes, got {readAwaitable._numBytes}");
break;
case ERROR_BROKEN_PIPE: // logically success with 0 bytes read (write end of pipe closed)
case ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
Debug.Assert(readAwaitable._numBytes == 0, $"Expected 0 bytes read, got {readAwaitable._numBytes}");
break;
case Interop.mincore.Errors.ERROR_OPERATION_ABORTED: // canceled
throw new OperationCanceledException(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true));
default: // error
throw Win32Marshal.GetExceptionForWin32Error((int)readAwaitable._errorCode);
}

// Successful operation. If we got zero bytes, we're done: exit the read/write loop.
// Otherwise, update the read position for next time accordingly.
if (readAwaitable._numBytes == 0)
{
break;
}
else if (canSeek)
{
readAwaitable._position += (int)readAwaitable._numBytes;
}
}
finally
{
// Free the resources for this read operation
unsafe
{
NativeOverlapped* overlapped;
lock (readAwaitable.CancellationLock) // just an Exchange, but we need this to be synchronized with cancellation, so using the same lock
{
overlapped = readAwaitable._nativeOverlapped;
readAwaitable._nativeOverlapped = null;
}
if (overlapped != null)
{
_handle.ThreadPoolBinding.FreeNativeOverlapped(overlapped);
}
}
}

// Write out the read data.
await destination.WriteAsync(copyBuffer, 0, (int)readAwaitable._numBytes, cancellationToken).ConfigureAwait(false);
}
}
finally
{
// Cleanup from the whole copy operation
cancellationReg.Dispose();
awaitableOverlapped.Dispose();

// Make sure the stream's current position reflects where we ended up
if (!_handle.IsClosed && _parent.CanSeek)
{
SeekCore(0, SeekOrigin.End);
}
}
}

/// <summary>Used by CopyToAsync to enable awaiting the result of an overlapped I/O operation with minimal overhead.</summary>
private sealed unsafe class AsyncCopyToAwaitable : ICriticalNotifyCompletion
{
/// <summary>Sentinel object used to indicate that the I/O operation has completed before being awaited.</summary>
private readonly static Action s_sentinel = () => { };
/// <summary>Cached delegate to IOCallback.</summary>
internal static readonly IOCompletionCallback s_callback = IOCallback;

/// <summary>The FileStream that owns this instance.</summary>
internal readonly Win32FileStream _fileStream;

/// <summary>Tracked position representing the next location from which to read.</summary>
internal long _position;
/// <summary>The current native overlapped pointer. This changes for each operation.</summary>
internal NativeOverlapped* _nativeOverlapped;
/// <summary>
/// null if the operation is still in progress,
/// s_sentinel if the I/O operation completed before the await,
/// s_callback if it completed after the await yielded.
/// </summary>
internal Action _continuation;
/// <summary>Last error code from completed operation.</summary>
internal uint _errorCode;
/// <summary>Last number of read bytes from completed operation.</summary>
internal uint _numBytes;

/// <summary>Lock object used to protect cancellation-related access to _nativeOverlapped.</summary>
internal object CancellationLock => this;

/// <summary>Initialize the awaitable.</summary>
internal unsafe AsyncCopyToAwaitable(Win32FileStream fileStream)
{
_fileStream = fileStream;
}

/// <summary>Reset state to prepare for the next read operation.</summary>
internal void ResetForNextOperation()
{
Debug.Assert(_position >= 0, $"Expected non-negative position, got {_position}");
_continuation = null;
_errorCode = 0;
_numBytes = 0;
}

/// <summary>Overlapped callback: store the results, then invoke the continuation delegate.</summary>
internal unsafe static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOVERLAP)
{
var awaitable = (AsyncCopyToAwaitable)ThreadPoolBoundHandle.GetNativeOverlappedState(pOVERLAP);

Debug.Assert(awaitable._continuation != s_sentinel, "Sentinel must not have already been set as the continuation");
awaitable._errorCode = errorCode;
awaitable._numBytes = numBytes;

(awaitable._continuation ?? Interlocked.CompareExchange(ref awaitable._continuation, s_sentinel, null))?.Invoke();
}

/// <summary>
/// Called when it's known that the I/O callback for an operation will not be invoked but we'll
/// still be awaiting the awaitable.
/// </summary>
internal void MarkCompleted()
{
Debug.Assert(_continuation == null, "Expected null continuation");
_continuation = s_sentinel;
}

public AsyncCopyToAwaitable GetAwaiter() => this;
public bool IsCompleted => _continuation == s_sentinel;
public void GetResult() { }
public void OnCompleted(Action continuation) => UnsafeOnCompleted(continuation);
public void UnsafeOnCompleted(Action continuation)
{
if (_continuation == s_sentinel ||
Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
{
Debug.Assert(_continuation == s_sentinel, $"Expected continuation set to s_sentinel, got ${_continuation}");
Task.Run(continuation);
}
}
}

[System.Security.SecuritySafeCritical]
public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Expand Down

0 comments on commit 00bb21c

Please sign in to comment.