Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Renci.SshNet/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ async Task<T> WaitCore()
return await completedTask.ConfigureAwait(false);
}
}

extension(Task t)
{
internal bool IsCompletedSuccessfully
{
get { return t.Status == TaskStatus.RanToCompletion; }
}
}
#endif
}
}
87 changes: 87 additions & 0 deletions src/Renci.SshNet/Common/ReadOnlyMemoryOwner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#nullable enable
using System;
using System.Buffers;
using System.Diagnostics;
using System.Net;

namespace Renci.SshNet.Common
{
/// <summary>
/// A type representing ownership of a rented, read-only buffer.
/// </summary>
internal sealed class ReadOnlyMemoryOwner : IMemoryOwner<byte>
{
private ArrayBuffer _buffer;

public ReadOnlyMemoryOwner(ArrayBuffer buffer)
{
_buffer = buffer;

AssertValid();
}

[Conditional("DEBUG")]
private void AssertValid()
{
Debug.Assert(
_buffer.ActiveLength > 0 || _buffer.AvailableLength == 0,
"If the buffer is empty, then it should have been returned to the pool.");
}

public int Length
{
get
{
AssertValid();
return _buffer.ActiveLength;
}
}

public bool IsEmpty
{
get
{
AssertValid();
return _buffer.ActiveLength == 0;
}
}

public ReadOnlySpan<byte> Span
{
get
{
AssertValid();
return _buffer.ActiveReadOnlySpan;
}
}

Memory<byte> IMemoryOwner<byte>.Memory
{
get
{
AssertValid();
return _buffer.ActiveMemory;
}
}

public void Slice(int start)
{
AssertValid();

_buffer.Discard(start);

if (_buffer.ActiveLength == 0)
{
// Return the rented buffer as soon as it's no longer in use.
_buffer.ClearAndReturnBuffer();
}
}

public void Dispose()
{
AssertValid();

_buffer.ClearAndReturnBuffer();
}
}
}
3 changes: 2 additions & 1 deletion src/Renci.SshNet/Sftp/ISftpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;

using Renci.SshNet.Common;
using Renci.SshNet.Sftp.Responses;

namespace Renci.SshNet.Sftp
Expand Down Expand Up @@ -198,7 +199,7 @@ internal interface ISftpSession : ISubsystemSession
/// its <see cref="Task{Task}.Result"/> contains the data read from the file, or an empty
/// array when the end of the file is reached.
/// </returns>
Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);
Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);

/// <summary>
/// Performs a <c>SSH_FXP_READDIR</c> request.
Expand Down
10 changes: 6 additions & 4 deletions src/Renci.SshNet/Sftp/Responses/SftpDataResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Renci.SshNet.Sftp.Responses
using System;

namespace Renci.SshNet.Sftp.Responses
{
internal sealed class SftpDataResponse : SftpResponse
{
Expand All @@ -7,7 +9,7 @@ public override SftpMessageTypes SftpMessageType
get { return SftpMessageTypes.Data; }
}

public byte[] Data { get; set; }
public ArraySegment<byte> Data { get; set; }

public SftpDataResponse(uint protocolVersion)
: base(protocolVersion)
Expand All @@ -18,14 +20,14 @@ protected override void LoadData()
{
base.LoadData();

Data = ReadBinary();
Data = ReadBinarySegment();
}

protected override void SaveData()
{
base.SaveData();

WriteBinary(Data, 0, Data.Length);
WriteBinary(Data.Array, Data.Offset, Data.Count);
}
}
}
23 changes: 14 additions & 9 deletions src/Renci.SshNet/Sftp/SftpFileReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
using System.Threading;
using System.Threading.Tasks;

#if !NET
using Renci.SshNet.Common;
#endif

namespace Renci.SshNet.Sftp
{
Expand Down Expand Up @@ -58,7 +56,7 @@ public SftpFileReader(byte[] handle, ISftpSession sftpSession, int chunkSize, lo
_cts = new CancellationTokenSource();
}

public async Task<byte[]> ReadAsync(CancellationToken cancellationToken)
public async Task<ReadOnlyMemoryOwner> ReadAsync(CancellationToken cancellationToken)
{
_exception?.Throw();

Expand Down Expand Up @@ -172,14 +170,21 @@ public void Dispose()

if (_requests.Count > 0)
{
// Cancel outstanding requests and observe the exception on them
// as an effort to prevent unhandled exceptions.

_cts.Cancel();

foreach (var request in _requests.Values)
{
_ = request.Task.Exception;
// Return rented buffers to the pool, or observe exception on
// the task as an effort to prevent unhandled exceptions.

if (request.Task.IsCompletedSuccessfully)
{
request.Task.GetAwaiter().GetResult().Dispose();
}
else
{
_ = request.Task.Exception;
}
}

_requests.Clear();
Expand All @@ -190,7 +195,7 @@ public void Dispose()

private sealed class Request
{
public Request(ulong offset, uint count, Task<byte[]> task)
public Request(ulong offset, uint count, Task<ReadOnlyMemoryOwner> task)
{
Offset = offset;
Count = count;
Expand All @@ -199,7 +204,7 @@ public Request(ulong offset, uint count, Task<byte[]> task)

public ulong Offset { get; }
public uint Count { get; }
public Task<byte[]> Task { get; }
public Task<ReadOnlyMemoryOwner> Task { get; }
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/Renci.SshNet/Sftp/SftpFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public sealed partial class SftpFileStream : Stream
private readonly int _readBufferSize;

private SftpFileReader? _sftpFileReader;
private ReadOnlyMemory<byte> _readBuffer;
private ReadOnlyMemoryOwner _readBuffer;
private System.Net.ArrayBuffer _writeBuffer;

private long _position;
Expand Down Expand Up @@ -153,6 +153,7 @@ private SftpFileStream(
_readBufferSize = readBufferSize;
_position = position;
_writeBuffer = new System.Net.ArrayBuffer(writeBufferSize);
_readBuffer = new ReadOnlyMemoryOwner(new System.Net.ArrayBuffer(0, usePool: true));
_sftpFileReader = initialReader;
}

Expand Down Expand Up @@ -390,7 +391,7 @@ await _session.RequestWriteAsync(

private void InvalidateReads()
{
_readBuffer = ReadOnlyMemory<byte>.Empty;
_readBuffer.Dispose();
_sftpFileReader?.Dispose();
_sftpFileReader = null;
}
Expand Down Expand Up @@ -441,7 +442,7 @@ private int Read(Span<byte> buffer)
var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);

_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer);
_readBuffer = _readBuffer.Slice(bytesRead);
_readBuffer.Slice(bytesRead);

_position += bytesRead;

Expand Down Expand Up @@ -494,8 +495,8 @@ private async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ca

var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);

_readBuffer.Slice(0, bytesRead).CopyTo(buffer);
_readBuffer = _readBuffer.Slice(bytesRead);
_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer.Span);
_readBuffer.Slice(bytesRead);

_position += bytesRead;

Expand Down Expand Up @@ -649,7 +650,7 @@ public override long Seek(long offset, SeekOrigin origin)

if (readBufferStart <= newPosition && newPosition <= readBufferEnd)
{
_readBuffer = _readBuffer.Slice((int)(newPosition - readBufferStart));
_readBuffer.Slice((int)(newPosition - readBufferStart));
}
else
{
Expand Down
29 changes: 22 additions & 7 deletions src/Renci.SshNet/Sftp/SftpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,7 +25,7 @@ internal sealed class SftpSession : SubsystemSession, ISftpSession
private readonly Dictionary<uint, SftpRequest> _requests = new Dictionary<uint, SftpRequest>();
private readonly ISftpResponseFactory _sftpResponseFactory;
private readonly Encoding _encoding;
private System.Net.ArrayBuffer _buffer = new(32 * 1024);
private ArrayBuffer _buffer = new(32 * 1024);
private EventWaitHandle _sftpVersionConfirmed = new AutoResetEvent(initialState: false);
private IDictionary<string, string> _supportedExtensions;

Expand Down Expand Up @@ -495,7 +496,7 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
length,
response =>
{
data = response.Data;
data = response.Data.ToArray();
wait.SetIgnoringObjectDisposed();
},
response =>
Expand Down Expand Up @@ -526,28 +527,42 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
}

/// <inheritdoc/>
public Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
public Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
{
Debug.Assert(length > 0, "This implementation cannot distinguish between EOF and zero-length reads");

if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<byte[]>(cancellationToken);
return Task.FromCanceled<ReadOnlyMemoryOwner>(cancellationToken);
}

var tcs = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs = new TaskCompletionSource<ReadOnlyMemoryOwner>(TaskCreationOptions.RunContinuationsAsynchronously);

SendRequest(new SftpReadRequest(ProtocolVersion,
NextRequestId,
handle,
offset,
length,
response => tcs.TrySetResult(response.Data),
response =>
{
ArrayBuffer buffer = new(response.Data.Count, usePool: true);

response.Data.AsSpan().CopyTo(buffer.AvailableSpan);

buffer.Commit(response.Data.Count);

ReadOnlyMemoryOwner owner = new(buffer);

if (!tcs.TrySetResult(owner))
{
owner.Dispose();
}
},
response =>
{
if (response.StatusCode == StatusCode.Eof)
{
_ = tcs.TrySetResult(Array.Empty<byte>());
_ = tcs.TrySetResult(new(new(0, usePool: true)));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Constructor()
{
var target = new SftpDataResponse(_protocolVersion);

Assert.IsNull(target.Data);
Assert.AreEqual(default, target.Data);
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
Assert.AreEqual((uint)0, target.ResponseId);
Assert.AreEqual(SftpMessageTypes.Data, target.SftpMessageType);
Expand All @@ -52,7 +52,6 @@ public void Load()

target.Load(sshData);

Assert.IsNotNull(target.Data);
Assert.IsTrue(target.Data.SequenceEqual(_data));
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
Assert.AreEqual(_responseId, target.ResponseId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Renci.SshNet.Sftp.Responses;
using System;

using Renci.SshNet.Sftp.Responses;

namespace Renci.SshNet.Tests.Classes.Sftp
{
Expand Down Expand Up @@ -31,7 +33,7 @@ public SftpDataResponse Build()
return new SftpDataResponse(_protocolVersion)
{
ResponseId = _responseId,
Data = _data
Data = new ArraySegment<byte>(_data)
};
}
}
Expand Down
Loading