From 6e9cf2841458540d16abbc2e286b28fd2320d271 Mon Sep 17 00:00:00 2001 From: brnbs Date: Tue, 18 Jan 2022 15:47:43 +0100 Subject: [PATCH 1/2] Fix buffer handling for large websocket messages --- src/Core/Extensions/Ws.cs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/Core/Extensions/Ws.cs b/src/Core/Extensions/Ws.cs index cf56e04..7d05481 100644 --- a/src/Core/Extensions/Ws.cs +++ b/src/Core/Extensions/Ws.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.Linq; using System.Net.WebSockets; using System.Text; @@ -70,7 +71,8 @@ internal static async Task ExecuteWsProxyOperationAsync(this HttpContext context private static async Task PumpWebSocket(WebSocket source, WebSocket destination, int bufferSize, CancellationToken cancellationToken) { - var buffer = new byte[bufferSize]; + using var ms = new MemoryStream(); + var receiveBuffer = WebSocket.CreateServerBuffer(bufferSize); while (true) { @@ -78,29 +80,43 @@ private static async Task PumpWebSocket(WebSocket source, WebSocket destination, try { - result = await source.ReceiveAsync(new ArraySegment(buffer), cancellationToken).ConfigureAwait(false); + ms.SetLength(0); + + do + { + result = await source.ReceiveAsync(receiveBuffer, cancellationToken).ConfigureAwait(false); + ms.Write(receiveBuffer.Array!, receiveBuffer.Offset, result.Count); + } + while (!result.EndOfMessage); } catch (Exception e) { var closeMessageBytes = Encoding.UTF8.GetBytes($"WebSocket failure.\n\n{e.Message}\n\n{e.StackTrace}"); var closeMessage = Encoding.UTF8.GetString(closeMessageBytes, 0, Math.Min(closeMessageBytes.Length, CloseMessageMaxSize)); await destination.CloseOutputAsync(WebSocketCloseStatus.EndpointUnavailable, closeMessage, cancellationToken).ConfigureAwait(false); + return; } - if(destination.State != WebSocketState.Open && destination.State != WebSocketState.CloseReceived) + if (destination.State != WebSocketState.Open && destination.State != WebSocketState.CloseReceived) + { return; + } if (result.MessageType == WebSocketMessageType.Close) { - await destination.CloseOutputAsync(source.CloseStatus.Value, source.CloseStatusDescription, cancellationToken).ConfigureAwait(false); + var closeStatus = source.CloseStatus ?? WebSocketCloseStatus.Empty; + await destination.CloseOutputAsync(closeStatus, source.CloseStatusDescription, cancellationToken).ConfigureAwait(false); + return; } + var sendBuffer = new ArraySegment(ms.GetBuffer(), 0, (int)ms.Length); + // TODO: Add handlers here to allow the developer to edit message before forwarding, and vice versa? // Possibly in the future, if deemed useful. - await destination.SendAsync(new ArraySegment(buffer, 0, result.Count), result.MessageType, result.EndOfMessage, cancellationToken).ConfigureAwait(false); + await destination.SendAsync(sendBuffer, result.MessageType, result.EndOfMessage, cancellationToken).ConfigureAwait(false); } } } From 5d1ef860511e53944103e4ec356711ef4eccfa80 Mon Sep 17 00:00:00 2001 From: brnbs Date: Wed, 19 Jan 2022 08:19:47 +0100 Subject: [PATCH 2/2] Add tests for proxying 500 KB data chunk; Remove unused using statements --- src/Test/Extensions.cs | 29 +++++++++++++++++++--- src/Test/Http/HttpHelpers.cs | 2 -- src/Test/RunProxy/RunProxyHelpers.cs | 3 --- src/Test/Unit/Endpoints.cs | 1 - src/Test/Ws/WsHelpers.cs | 11 ++++++++- src/Test/Ws/WsIntegrationTests.cs | 36 +++++++++++++++++++++++++++- 6 files changed, 71 insertions(+), 11 deletions(-) diff --git a/src/Test/Extensions.cs b/src/Test/Extensions.cs index fd06185..2a92b06 100644 --- a/src/Test/Extensions.cs +++ b/src/Test/Extensions.cs @@ -1,5 +1,4 @@ using System; -using System.Diagnostics; using System.IO; using System.Net.WebSockets; using System.Text; @@ -37,13 +36,37 @@ internal static async Task ReceiveShortMessageAsync(this WebSocket socke return Encoding.UTF8.GetString(buffer, 0, result.Count); } + internal static Task SendMessageAsync(this WebSocket socket, string message) + { + return socket.SendAsync(new ArraySegment(Encoding.UTF8.GetBytes(message)), WebSocketMessageType.Text, true, CancellationToken.None); + } + + internal static async Task ReceiveMessageAsync(this WebSocket socket) + { + var buffer = new ArraySegment(new byte[BUFFER_SIZE]); + WebSocketReceiveResult result; + + using var ms = new MemoryStream(); + do + { + result = await socket.ReceiveAsync(buffer, CancellationToken.None); + ms.Write(buffer.Array!, buffer.Offset, result.Count); + } + while (!result.EndOfMessage); + + ms.Seek(0, SeekOrigin.Begin); + + using var reader = new StreamReader(ms, Encoding.UTF8); + return await reader.ReadToEndAsync(); + } + internal static async Task SocketBoomerang(this HttpContext context) { var socket = await context.WebSockets.AcceptWebSocketAsync(SupportedProtocol); while(true) { - var message = await socket.ReceiveShortMessageAsync(); + var message = await socket.ReceiveMessageAsync(); if(message == CloseMessage) { @@ -57,7 +80,7 @@ internal static async Task SocketBoomerang(this HttpContext context) } // Basically, this server just always sends back a message that is the message it received wrapped with "[]". - await socket.SendShortMessageAsync($"[{message}]"); + await socket.SendMessageAsync($"[{message}]"); } } diff --git a/src/Test/Http/HttpHelpers.cs b/src/Test/Http/HttpHelpers.cs index 240c3ac..7781e29 100644 --- a/src/Test/Http/HttpHelpers.cs +++ b/src/Test/Http/HttpHelpers.cs @@ -6,9 +6,7 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; -using AspNetCore.Proxy; using AspNetCore.Proxy.Options; using System.Diagnostics.CodeAnalysis; diff --git a/src/Test/RunProxy/RunProxyHelpers.cs b/src/Test/RunProxy/RunProxyHelpers.cs index e280230..ce38c22 100644 --- a/src/Test/RunProxy/RunProxyHelpers.cs +++ b/src/Test/RunProxy/RunProxyHelpers.cs @@ -1,12 +1,9 @@ -using System; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using AspNetCore.Proxy; namespace AspNetCore.Proxy.Tests { diff --git a/src/Test/Unit/Endpoints.cs b/src/Test/Unit/Endpoints.cs index 1a21b90..46afebe 100644 --- a/src/Test/Unit/Endpoints.cs +++ b/src/Test/Unit/Endpoints.cs @@ -1,6 +1,5 @@ using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using AspNetCore.Proxy.Endpoints; using Xunit; diff --git a/src/Test/Ws/WsHelpers.cs b/src/Test/Ws/WsHelpers.cs index c9ad5be..9862806 100644 --- a/src/Test/Ws/WsHelpers.cs +++ b/src/Test/Ws/WsHelpers.cs @@ -1,3 +1,4 @@ +using System; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore; @@ -6,7 +7,6 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using AspNetCore.Proxy; using Microsoft.AspNetCore.Http; namespace AspNetCore.Proxy.Tests @@ -78,5 +78,14 @@ internal static Task RunWsServers(CancellationToken token) return Task.WhenAll(proxiedServerTask, proxyServerTask); } + + internal static string GetRandomBase64String(int sizeInKb) + { + var rnd = new Random(); + var b = new byte[sizeInKb * 1024]; + rnd.NextBytes(b); + + return Convert.ToBase64String(b); + } } } \ No newline at end of file diff --git a/src/Test/Ws/WsIntegrationTests.cs b/src/Test/Ws/WsIntegrationTests.cs index 5404115..8c22ace 100644 --- a/src/Test/Ws/WsIntegrationTests.cs +++ b/src/Test/Ws/WsIntegrationTests.cs @@ -2,7 +2,6 @@ using System.Net; using System.Net.Http; using System.Net.WebSockets; -using System.Text; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -71,6 +70,41 @@ public async Task CanDoWebSockets(string server) await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, Extensions.CloseDescription, CancellationToken.None); } + [Theory] + [InlineData("ws://localhost:5001/ws")] + [InlineData("ws://localhost:5001/api/ws")] + [InlineData("ws://localhost:5001/api/ws2")] + public async Task CanDoWebSocketsWithLargeDataChunks(string server) + { + var send1 = WsHelpers.GetRandomBase64String(10); + var expected1 = $"[{send1}]"; + + var send2 = WsHelpers.GetRandomBase64String(500); + var expected2 = $"[{send2}]"; + + await _client.ConnectAsync(new Uri(server), CancellationToken.None); + Assert.Equal(Extensions.SupportedProtocol, _client.SubProtocol); + + // Send a message. + await _client.SendMessageAsync(send1); + await _client.SendMessageAsync(send2); + await _client.SendShortMessageAsync(Extensions.CloseMessage); + + // Receive responses. + var response1 = await _client.ReceiveMessageAsync(); + Assert.Equal(expected1, response1); + var response2 = await _client.ReceiveMessageAsync(); + Assert.Equal(expected2, response2); + + // Receive close. + var result = await _client.ReceiveAsync(new ArraySegment(new byte[4096]), CancellationToken.None); + Assert.Equal(WebSocketMessageType.Close, result.MessageType); + Assert.Equal(WebSocketCloseStatus.NormalClosure, result.CloseStatus); + Assert.Equal(Extensions.CloseDescription, result.CloseStatusDescription); + + await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, Extensions.CloseDescription, CancellationToken.None); + } + [Fact] public async Task CanCatchAbruptClose() {