From 88462aa17dbf13d4ffc105ec37e4f76a9e2be84b Mon Sep 17 00:00:00 2001 From: Adam Friedman Date: Mon, 5 Mar 2018 11:33:25 +1100 Subject: [PATCH] Basic support for multiplexed WebSocket connections. --- ISSUE | 28 ++ .../K8sMultiplexer.cs | 365 ++++++++++++++++++ .../K8sWebSocket.cs | 2 +- .../K8sWebSocketOptions.cs | 4 +- .../KubeClient.Extensions.WebSockets.csproj | 2 +- .../KubeClientExtensions.cs | 104 +++-- .../Streams/K8sMultiplexedReadStream.cs | 311 +++++++++++++++ .../Streams/K8sMultiplexedWriteStream.cs | 177 +++++++++ 8 files changed, 956 insertions(+), 37 deletions(-) create mode 100644 ISSUE create mode 100644 src/KubeClient.Extensions.WebSockets/K8sMultiplexer.cs create mode 100644 src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedReadStream.cs create mode 100644 src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedWriteStream.cs diff --git a/ISSUE b/ISSUE new file mode 100644 index 00000000..4813e797 --- /dev/null +++ b/ISSUE @@ -0,0 +1,28 @@ +** Microsoft .NET Core Framework (CoreFX) +** +** https://github.com/dotnet/corefx/blob/fab6ae579aff17cb9b464b2b167b7dfd8fcea175/LICENSE.TXT + +The MIT License (MIT) + +Copyright (c) .NET Foundation and Contributors + +All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/src/KubeClient.Extensions.WebSockets/K8sMultiplexer.cs b/src/KubeClient.Extensions.WebSockets/K8sMultiplexer.cs new file mode 100644 index 00000000..603a06ee --- /dev/null +++ b/src/KubeClient.Extensions.WebSockets/K8sMultiplexer.cs @@ -0,0 +1,365 @@ +using System; +using System.Buffers; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace KubeClient.Extensions.WebSockets +{ + using Streams; + + /// + /// A multiplexer / demultiplexer for Kubernetes-style WebSocket streams. + /// + /// + /// Kubernetes streams add a single-byte prefix (indicating the stream index) to each payload (this can be one or more WebSocket packets, until EndOfMessage=true). + /// + /// For example, when using the PodV1 exec API, there are up to 3 separate streams: STDIN, STDOUT, and STDERR (with indexes 0, 1, and 2, respectively). + /// + public sealed class K8sMultiplexer + : IDisposable + { + /// + /// The default buffer size used for Kubernetes WebSockets. + /// + const int DefaultBufferSize = 1024; + + /// + /// Input (read) streams, keyed by stream index. + /// + readonly Dictionary _inputStreams = new Dictionary(); + + /// + /// Output (write) streams, keyed by stream index. + /// + readonly Dictionary _outputStreams = new Dictionary(); + + /// + /// Pending write requests from output streams that will be interleaved and written to the WebSocket. + /// + readonly BlockingCollection _pendingWrites = new BlockingCollection(new ConcurrentQueue()); + + /// + /// A source for cancellation tokens used to halt the multiplexer's operation. + /// + CancellationTokenSource _cancellationSource = new CancellationTokenSource(); + + /// + /// A representing the WebSocket message-receive pump. + /// + Task _receivePump; + + /// + /// A representing the WebSocket message-send pump. + /// + Task _sendPump; + + /// + /// Create a new . + /// + /// + /// The target WebSocket. + /// + /// + /// The number of of expected input streams. + /// + /// + /// The number of expected output streams. + /// + public K8sMultiplexer(WebSocket socket, byte inputStreamCount, byte outputStreamCount) + { + if (socket == null) + throw new ArgumentNullException(nameof(socket)); + + if (inputStreamCount == 0 && outputStreamCount == 0) + throw new ArgumentException($"Must specify at least one of {nameof(inputStreamCount)} or {nameof(outputStreamCount)}."); + + Socket = socket; + + for (byte readStreamIndex = 0; readStreamIndex < inputStreamCount; readStreamIndex++) + _inputStreams[readStreamIndex] = new K8sMultiplexedReadStream(readStreamIndex); + + for (byte writeStreamIndex = 0; writeStreamIndex < outputStreamCount; writeStreamIndex++) + _outputStreams[writeStreamIndex] = new K8sMultiplexedWriteStream(writeStreamIndex, EnqueueSend); + } + + /// + /// Dispose of resources being used by the . + /// + public void Dispose() + { + if (_cancellationSource != null) + { + _cancellationSource.Cancel(); + _cancellationSource.Dispose(); + _cancellationSource = null; + } + + if (_receivePump != null) + _receivePump.Wait(); + if (_sendPump != null) + _sendPump.Wait(); + } + + /// + /// The target WebSocket. + /// + WebSocket Socket { get; } + + /// + /// The used to halt the multiplexer's operation. + /// + CancellationToken Cancellation => _cancellationSource.Token; + + /// + /// Get the input stream (if defined) with the specified stream index. + /// + /// + /// The Kubernetes stream index of the target stream. + /// + /// + /// The , or null if no stream is defined with the specified index. + /// + public Stream GetInputStream(byte streamIndex) + { + K8sMultiplexedReadStream readStream; + _inputStreams.TryGetValue(streamIndex, out readStream); + + return readStream; + } + + /// + /// Get the output stream (if defined) with the specified stream index. + /// + /// + /// The Kubernetes stream index of the target stream. + /// + /// + /// The , or null if no stream is defined with the specified index. + /// + public Stream GetOutputStream(byte streamIndex) + { + K8sMultiplexedWriteStream writeStream; + _outputStreams.TryGetValue(streamIndex, out writeStream); + + return writeStream; + } + + /// + /// Start processing stream data. + /// + public void Start() + { + if (_receivePump != null || _sendPump != null) + throw new InvalidOperationException("Read / write pumps are already running."); + + _receivePump = ReceivePump(); + _sendPump = SendPump(); + } + + /// + /// Enqueue a send operation (asynchronously write data to the outgoing stream). + /// + /// + /// The data to write. + /// + /// + /// An optional that can be used to cancel the operation. + /// + /// + /// A representing the asynchronous operation. + /// + Task EnqueueSend(ArraySegment data, CancellationToken cancellation) + { + PendingSend pendingWrite = new PendingSend(data, cancellation); + cancellation.Register( + () => pendingWrite.Completion.TrySetCanceled(cancellation) + ); + _pendingWrites.Add(pendingWrite); + + return pendingWrite.Completion.Task; + } + + /// + /// Receive incoming data from the WebSocket. + /// + /// + /// A representing the pump's operation. + /// + async Task ReceivePump() + { + await Task.Yield(); + + ArraySegment buffer = null; + + try + { + while (Socket.State == WebSocketState.Open) + { + buffer = new ArraySegment( + ArrayPool.Shared.Rent(minimumLength: DefaultBufferSize) + ); + + WebSocketReceiveResult readResult = await Socket.ReceiveAsync(buffer, Cancellation); + if (readResult.Count <= 1 && readResult.EndOfMessage) + { + // Effectively an empty packet; ignore. + ArrayPool.Shared.Return(buffer.Array, clearArray: true); + + continue; + } + + // Extract stream index. + byte streamIndex = buffer[0]; + + K8sMultiplexedReadStream readStream; + if (!_inputStreams.TryGetValue(streamIndex, out readStream)) + { + // Unknown stream; discard the rest of the message. + while (!readResult.EndOfMessage) + readResult = await Socket.ReceiveAsync(buffer, Cancellation); + + ArrayPool.Shared.Return(buffer.Array, clearArray: true); + + continue; + } + + // Skip over stream index. + buffer = new ArraySegment(buffer.Array, + offset: buffer.Offset + 1, + count: readResult.Count - 1 + ); + + readStream.AddPendingRead(buffer); + + while (!readResult.EndOfMessage) + { + buffer = new ArraySegment( + ArrayPool.Shared.Rent(minimumLength: DefaultBufferSize) + ); + readResult = await Socket.ReceiveAsync(buffer, Cancellation); + if (readResult.Count == 0 || readResult.MessageType != WebSocketMessageType.Binary) + { + ArrayPool.Shared.Return(buffer.Array); + + break; + } + + buffer = new ArraySegment(buffer.Array, + offset: buffer.Offset, + count: readResult.Count - 1 + ); + readStream.AddPendingRead(buffer); + } + } + } + catch (OperationCanceledException) + { + // Clean termination. + if (buffer != null) + ArrayPool.Shared.Return(buffer.Array); + } + catch (Exception) + { + if (buffer != null) + ArrayPool.Shared.Return(buffer.Array); + + throw; + } + } + + /// + /// Send outgoing data to the WebSocket. + /// + /// + /// A representing the pump's operation. + /// + async Task SendPump() + { + await Task.Yield(); + + try + { + while (!Cancellation.IsCancellationRequested) + { + PendingSend pendingWrite; + if (!_pendingWrites.TryTake(out pendingWrite, Timeout.Infinite, Cancellation)) + continue; + + using (CancellationTokenSource linkedCancellation = CancellationTokenSource.CreateLinkedTokenSource(Cancellation, pendingWrite.Cancellation)) + { + try + { + await Socket.SendAsync(pendingWrite.Data, + WebSocketMessageType.Binary, + endOfMessage: true, + cancellationToken: linkedCancellation.Token + ); + + pendingWrite.Completion.TrySetResult(null); + } + catch (OperationCanceledException sendCancelled) when (sendCancelled.CancellationToken == linkedCancellation.Token) + { + pendingWrite.Completion.TrySetCanceled(sendCancelled.CancellationToken); + } + catch (Exception writeFailed) + { + pendingWrite.Completion.TrySetException(writeFailed); + } + finally + { + ArrayPool.Shared.Return(pendingWrite.Data.Array, clearArray: true); + } + } + } + } + catch (OperationCanceledException) + { + // Clean termination. + } + } + } + + /// + /// Represents a pending send operation. + /// + class PendingSend // TODO: Include stream index here and let the multiplexer prepend it instead of doing it in the write-stream. + { + /// + /// Create a new . + /// + /// + /// The data (including stream-index prefix) to be written to the web socket. + /// + /// + /// A cancellation token to that can be used to cancel the send operation. + /// + public PendingSend(ArraySegment data, CancellationToken cancellation) + { + if (data == null) + throw new ArgumentNullException(nameof(data)); + + Data = data; + Cancellation = cancellation; + } + + /// + /// The data (including stream-index prefix) to be written to the web socket. + /// + public ArraySegment Data { get; } + + /// + /// A cancellation token to that can be used to cancel the send operation. + /// + public CancellationToken Cancellation { get; } + + /// + /// A used to represent the asynchronous send operation. + /// + public TaskCompletionSource Completion { get; } = new TaskCompletionSource(); + } +} diff --git a/src/KubeClient.Extensions.WebSockets/K8sWebSocket.cs b/src/KubeClient.Extensions.WebSockets/K8sWebSocket.cs index 98d98044..0f08e6a1 100644 --- a/src/KubeClient.Extensions.WebSockets/K8sWebSocket.cs +++ b/src/KubeClient.Extensions.WebSockets/K8sWebSocket.cs @@ -23,7 +23,7 @@ using System.Threading; using System.Threading.Tasks; -namespace KubeClient.WebSockets +namespace KubeClient.Extensions.WebSockets { /// /// Connection factory for Kubernetes web sockets. diff --git a/src/KubeClient.Extensions.WebSockets/K8sWebSocketOptions.cs b/src/KubeClient.Extensions.WebSockets/K8sWebSocketOptions.cs index 770cb31d..3fc08a0d 100644 --- a/src/KubeClient.Extensions.WebSockets/K8sWebSocketOptions.cs +++ b/src/KubeClient.Extensions.WebSockets/K8sWebSocketOptions.cs @@ -4,7 +4,7 @@ using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; -namespace KubeClient.WebSockets +namespace KubeClient.Extensions.WebSockets { /// /// Options for connecting to Kubernetes web sockets. @@ -56,7 +56,7 @@ public K8sWebSocketOptions() /// /// An value representing the SSL protocols that the client supports. /// - public SslProtocols EnabledSslProtocols { get; set; } = SslProtocols.Default; + public SslProtocols EnabledSslProtocols { get; set; } = SslProtocols.Tls; /// /// The WebSocket keep-alive interval. diff --git a/src/KubeClient.Extensions.WebSockets/KubeClient.Extensions.WebSockets.csproj b/src/KubeClient.Extensions.WebSockets/KubeClient.Extensions.WebSockets.csproj index 4253c6a2..e2676794 100644 --- a/src/KubeClient.Extensions.WebSockets/KubeClient.Extensions.WebSockets.csproj +++ b/src/KubeClient.Extensions.WebSockets/KubeClient.Extensions.WebSockets.csproj @@ -1,6 +1,6 @@ - netstandard2.0 + netcoreapp2.1 WebSockets support for KubeClient diff --git a/src/KubeClient.Extensions.WebSockets/KubeClientExtensions.cs b/src/KubeClient.Extensions.WebSockets/KubeClientExtensions.cs index a3da834d..cc873ae6 100644 --- a/src/KubeClient.Extensions.WebSockets/KubeClientExtensions.cs +++ b/src/KubeClient.Extensions.WebSockets/KubeClientExtensions.cs @@ -8,45 +8,13 @@ namespace KubeClient { - using WebSockets; + using Extensions.WebSockets; /// /// WebSockets extension methods for . /// public static class KubeClientExtensions { - /// - /// Open a WebSocket connection. - /// - /// - /// The Kubernetes API client. - /// - /// - /// The target URI. - /// - /// - /// used to configure the WebSocket connection. - /// - /// - /// An optional cancellation token that can be used to cancel the request. - /// - /// - /// The configured . - /// - public static Task ConnectWebSocket(this KubeApiClient client, string targetUri, K8sWebSocketOptions webSocketOptions, CancellationToken cancellationToken = default) - { - if (client == null) - throw new ArgumentNullException(nameof(client)); - - if (String.IsNullOrWhiteSpace(targetUri)) - throw new ArgumentException("Argument cannot be null, empty, or entirely composed of whitespace: 'targetUri'.", nameof(targetUri)); - - if (webSocketOptions == null) - throw new ArgumentNullException(nameof(webSocketOptions)); - - return client.ConnectWebSocket(new Uri(client.ApiEndPoint, targetUri), webSocketOptions, cancellationToken); - } - /// /// Open a WebSocket connection. /// @@ -143,6 +111,38 @@ public static Task ConnectWebSocket(this KubeApiClient client, UriTem ); } + /// + /// Open a WebSocket connection. + /// + /// + /// The Kubernetes API client. + /// + /// + /// The target URI. + /// + /// + /// used to configure the WebSocket connection. + /// + /// + /// An optional cancellation token that can be used to cancel the request. + /// + /// + /// The configured . + /// + public static Task ConnectWebSocket(this KubeApiClient client, string targetUri, K8sWebSocketOptions webSocketOptions, CancellationToken cancellationToken = default) + { + if (client == null) + throw new ArgumentNullException(nameof(client)); + + if (String.IsNullOrWhiteSpace(targetUri)) + throw new ArgumentException("Argument cannot be null, empty, or entirely composed of whitespace: 'targetUri'.", nameof(targetUri)); + + if (webSocketOptions == null) + throw new ArgumentNullException(nameof(webSocketOptions)); + + return client.ConnectWebSocket(new Uri(client.ApiEndPoint, targetUri), webSocketOptions, cancellationToken); + } + /// /// Open a WebSocket connection. /// @@ -180,5 +180,43 @@ public static Task ConnectWebSocket(this KubeApiClient client, Uri ta return K8sWebSocket.ConnectAsync(targetUri, webSocketOptions, cancellationToken); } + + /// + /// Create a Kubernetes-style multiplexed connection over the WebSocket. + /// + /// + /// The . + /// + /// + /// The expected number of input streams. + /// + /// + /// The expected number of output streams. + /// + /// + /// The configured . + /// + public static K8sMultiplexer Multiplexed(this WebSocket websocket, byte inputStreamCount = 0, byte outputStreamCount = 0) + { + if (websocket == null) + throw new ArgumentNullException(nameof(websocket)); + + if (inputStreamCount == 0 && outputStreamCount == 0) + throw new ArgumentException($"Must specify at least one of {nameof(inputStreamCount)} or {nameof(outputStreamCount)}."); + + K8sMultiplexer multiplexer = null; + try + { + multiplexer = new K8sMultiplexer(websocket, inputStreamCount, outputStreamCount); + multiplexer.Start(); + + return multiplexer; + } + catch (Exception) + { + using (multiplexer) + throw; + } + } } } \ No newline at end of file diff --git a/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedReadStream.cs b/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedReadStream.cs new file mode 100644 index 00000000..18224ee1 --- /dev/null +++ b/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedReadStream.cs @@ -0,0 +1,311 @@ +using System; +using System.Buffers; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace KubeClient.Extensions.WebSockets.Streams +{ + /// + /// Represents a single input substream within a Kubernetes-style multiplexed stream. + /// + sealed class K8sMultiplexedReadStream + : Stream + { + /// + /// The stream's queue of pending read operations. + /// + readonly ConcurrentQueue _pendingReads = new ConcurrentQueue(); + + /// + /// A wait handle representing the availability of data to read. + /// + readonly AutoResetEvent _dataAvailable = new AutoResetEvent(initialState: false); + + /// + /// Create a new . + /// + /// + /// The Kubernetes stream index of the target input stream. + /// + public K8sMultiplexedReadStream(byte streamIndex) + { + StreamIndex = streamIndex; + } + + /// + /// Dispose of resources used by the . + /// + /// + /// Explicit disposal? + /// + protected override void Dispose(bool disposing) + { + if (disposing) + _dataAvailable.Dispose(); + } + + /// + /// The Kubernetes stream index of the target input stream. + /// + public byte StreamIndex { get; } + + /// + /// Does the stream support reading? + /// + public override bool CanRead => true; + + /// + /// Does the stream support seeking? + /// + public override bool CanSeek => false; + + /// + /// Does the stream support writing? + /// + public override bool CanWrite => false; + + /// + /// The stream length (not supported). + /// + public override long Length => throw new NotSupportedException("The stream does not support seeking."); + + /// + /// The stream position (not supported). + /// + public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + /// + /// Read data from the stream. + /// + /// + /// The buffer to which the incoming data will be written. + /// + /// + /// The offset within the buffer to which data will be written. + /// + /// + /// The maximum number of bytes to read. + /// + /// + /// The number of bytes that were read from the stream. + /// + public override int Read(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + PendingRead pendingRead = NextPendingRead(CancellationToken.None); + + int bytesRead = pendingRead.DrainTo(buffer, offset); + if (pendingRead.IsEmpty) + ConsumeNextPendingRead(); // Source buffer has been consumed. + + return bytesRead; + } + + /// + /// Asynchronously read data from the stream. + /// + /// + /// The buffer to which the incoming data will be written. + /// + /// + /// The offset within the buffer to which data will be written. + /// + /// + /// The maximum number of bytes to read. + /// + /// + /// A that can be used to cancel the operation. + /// + /// + /// The number of bytes that were read from the stream. + /// + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (buffer == null) + throw new ArgumentNullException(nameof(buffer)); + + await Task.Yield(); + + PendingRead pendingRead = NextPendingRead(cancellationToken); + + int bytesRead = pendingRead.DrainTo(buffer, offset); + if (pendingRead.IsEmpty) + ConsumeNextPendingRead(); // Source buffer has been consumed. + + return bytesRead; + } + + /// + /// Flush pending data (not supported). + /// + public override void Flush() => throw new NotSupportedException("The stream does not support writing."); + + /// + /// Seek to the specified position in the stream (not supported). + /// + /// + /// The seek offset, relative to the specified . + /// + /// + /// The seek origin. + /// + /// + /// The new stream position. + /// + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("Stream does not support seeking."); + + /// + /// Set the stream length (not supported). + /// + /// + /// The new stream length. + /// + public override void SetLength(long value) => throw new NotSupportedException("Stream does not support seeking."); + + /// + /// Write data to the stream (not supported). + /// + /// + /// A buffer containing the data to write. + /// + /// + /// The offset, within the buffer, of the data to write. + /// + /// + /// The number of bytes to write. + /// + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException("Stream does not support writing."); + + /// + /// Make data available to be read from the stream. + /// + /// + /// An representing the data. + /// + internal void AddPendingRead(ArraySegment data) + { + if (data == null) + throw new ArgumentNullException(nameof(data)); + + _pendingReads.Enqueue(new PendingRead(data)); + _dataAvailable.Set(); + } + + /// + /// Get the next available pending read. + /// + /// + /// A that can be used to abort the wait for a pending read. + /// + /// + /// The . + /// + /// + /// If no pending read is currently available, blocks until a pending read is available or the cancellation token is cancelled. + /// + PendingRead NextPendingRead(CancellationToken cancellation) + { + PendingRead pendingRead; + using (var canceled = new AutoResetEvent(initialState: false)) + using (var waitCancellation = cancellation.Register(() => canceled.Set())) + { + if (!_pendingReads.TryPeek(out pendingRead)) + { + // Wait for data. + int handleIndex = WaitHandle.WaitAny(new[] { _dataAvailable, canceled }); + if (handleIndex == 1) + throw new OperationCanceledException("Read operation was canceled.", cancellation); + + _pendingReads.TryPeek(out pendingRead); + Debug.Assert(pendingRead != null, "pendingRead != null"); + } + } + + return pendingRead; + } + + /// + /// Consume the currently-pending read, removing it from the queue. + /// + void ConsumeNextPendingRead() + { + bool consumed = _pendingReads.TryDequeue(out _); + Debug.Assert(consumed, "Attempted to consume pending read when none was available."); + } + + /// + /// Represents a pending read operation. + /// + class PendingRead + { + /// + /// The data that will be returned by the pending read. + /// + ArraySegment _data; + + /// + /// Create a new . + /// + /// + /// The data that will be returned by the pending read. + /// + public PendingRead(ArraySegment data) + { + _data = data; + } + + /// + /// The number of bytes available to read. + /// + public int Count => _data.Count; + + /// + /// Has the read buffer been exhausted? + /// + public bool IsEmpty => Count == 0; + + /// + /// Drain the read-buffer into the specified buffer. + /// + /// + /// The buffer to which data will be written. + /// + /// + /// The offset within the buffer at which data will be written. + /// + /// + /// The number of bytes written to the buffer. + /// + public int DrainTo(byte[] buffer, int offset) + { + int bytesAvailable = _data.Count; + int bufferCapacity = buffer.Length - (offset + 1); + if (bufferCapacity <= bytesAvailable) + { + // We still have data remaining. + Array.Copy(_data.Array, _data.Offset, buffer, offset, bufferCapacity); + _data = new ArraySegment(_data.Array, + offset: _data.Offset + bufferCapacity, + count: _data.Count - bufferCapacity + ); + + return bufferCapacity; + } + + // This is the last of our data; copy it all. + Array.Copy(_data.Array, _data.Offset, buffer, offset, bytesAvailable); + + ArrayPool.Shared.Return(_data.Array, clearArray: true); + _data = ArraySegment.Empty; + + return bytesAvailable; + } + } + } +} diff --git a/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedWriteStream.cs b/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedWriteStream.cs new file mode 100644 index 00000000..4c91eea9 --- /dev/null +++ b/src/KubeClient.Extensions.WebSockets/Streams/K8sMultiplexedWriteStream.cs @@ -0,0 +1,177 @@ +using System; +using System.Buffers; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace KubeClient.Extensions.WebSockets.Streams +{ + /// + /// Represents a single output substream within a Kubernetes-style multiplexed stream. + /// + sealed class K8sMultiplexedWriteStream + : Stream + { + /// + /// Create a new . + /// + /// + /// The Kubernetes stream index of the target output stream. + /// + /// + /// A delegate used to asynchronously send outgoing data. + /// + public K8sMultiplexedWriteStream(byte streamIndex, Func, CancellationToken, Task> sendAsync) + { + if (sendAsync == null) + throw new ArgumentNullException(nameof(sendAsync)); + + StreamIndex = streamIndex; + SendAsync = sendAsync; + } + + /// + /// The Kubernetes stream index of the target output stream. + /// + public byte StreamIndex { get; } + + /// + /// A delegate used to asynchronously send outgoing data. + /// + public Func, CancellationToken, Task> SendAsync { get; } + + /// + /// Does the stream support reading? + /// + public override bool CanRead => true; + + /// + /// Does the stream support seeking? + /// + public override bool CanSeek => false; + + /// + /// Does the stream support writing? + /// + public override bool CanWrite => false; + + /// + /// The stream length (not supported). + /// + public override long Length => throw new NotSupportedException("The stream does not support seeking."); + + /// + /// The stream position (not supported). + /// + public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + /// + /// Flush pending data (a no-op for this implementation). + /// + public override void Flush() + { + } + + /// + /// Write data to the stream. + /// + /// + /// A buffer containing the data to write. + /// + /// + /// The offset, within the buffer, of the data to write. + /// + /// + /// The number of bytes to write. + /// + public override void Write(byte[] buffer, int offset, int count) + { + byte[] dataWithPrefix = ArrayPool.Shared.Rent(count + 1); + try + { + dataWithPrefix[0] = StreamIndex; + Array.Copy(buffer, 0, dataWithPrefix, 1, buffer.Length); + + SendAsync(new ArraySegment(dataWithPrefix), CancellationToken.None) + .GetAwaiter() + .GetResult(); + } + finally + { + ArrayPool.Shared.Return(dataWithPrefix, clearArray: true); + } + } + + /// + /// Asynchronously write data to the stream (not supported). + /// + /// + /// A buffer containing the data to write. + /// + /// + /// The offset, within the buffer, of the data to write. + /// + /// + /// The number of bytes to write. + /// + /// + /// A that can be used to cancel the write operation. + /// + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + byte[] dataWithPrefix = ArrayPool.Shared.Rent(count + 1); + try + { + dataWithPrefix[0] = StreamIndex; + Array.Copy(buffer, 0, dataWithPrefix, 1, buffer.Length); + + await SendAsync(new ArraySegment(dataWithPrefix), cancellationToken); + } + finally + { + ArrayPool.Shared.Return(dataWithPrefix, clearArray: true); + } + } + + /// + /// Seek to the specified position in the stream (not supported). + /// + /// + /// The seek offset, relative to the specified . + /// + /// + /// The seek origin. + /// + /// + /// The new stream position. + /// + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("Stream does not support seeking."); + + /// + /// Set the stream length (not supported). + /// + /// + /// The new stream length. + /// + public override void SetLength(long value) => throw new NotSupportedException("Stream does not support seeking."); + + /// + /// Read data from the stream (not supported). + /// + /// + /// The buffer to which the incoming data will be written. + /// + /// + /// The offset within the buffer to which data will be written. + /// + /// + /// The maximum number of bytes to read. + /// + /// + /// The number of bytes that were read from the stream. + /// + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Stream does not support reading."); + } +} \ No newline at end of file