Skip to content

Commit

Permalink
Per-message compression
Browse files Browse the repository at this point in the history
  • Loading branch information
tpeczek committed Jul 20, 2017
1 parent 1b215a7 commit 74004bc
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 50 deletions.
Expand Up @@ -8,9 +8,9 @@
<Content Include="wwwroot\websocket-api.html" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Lib.AspNetCore.WebSocketsCompression" Version="0.1.0" />
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.StaticFiles" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="1.0.2" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.2" />
</ItemGroup>
</Project>
@@ -1,16 +1,16 @@
using System;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using Lib.AspNetCore.WebSocketsCompression.Providers;

namespace Demo.AspNetCore.WebSockets.Infrastructure
{
public interface ITextWebSocketSubprotocol
{
string SubProtocol { get; }

Task SendAsync(string message, WebSocket webSocket, CancellationToken cancellationToken);
Task SendAsync(string message, WebSocket webSocket, IWebSocketCompressionProvider webSocketCompressionProvider, CancellationToken cancellationToken);

string Read(byte[] bytes);
string Read(string rawMessage);
}
}
@@ -1,21 +1,21 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using Newtonsoft.Json;
using Lib.AspNetCore.WebSocketsCompression.Providers;

namespace Demo.AspNetCore.WebSockets.Infrastructure
{
public class JsonWebSocketSubprotocol : TextWebSocketSubprotocolBase, ITextWebSocketSubprotocol
{
public string SubProtocol => "aspnetcore-ws.json";

public override Task SendAsync(string message, WebSocket webSocket, CancellationToken cancellationToken)
public override Task SendAsync(string message, WebSocket webSocket, IWebSocketCompressionProvider webSocketCompressionProvider, CancellationToken cancellationToken)
{
string jsonMessage = JsonConvert.SerializeObject(new { message, timestamp = DateTime.UtcNow });

return base.SendAsync(jsonMessage, webSocket, cancellationToken);
return base.SendAsync(jsonMessage, webSocket, webSocketCompressionProvider, cancellationToken);
}
}
}
Expand Up @@ -3,26 +3,20 @@
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using Lib.AspNetCore.WebSocketsCompression.Providers;

namespace Demo.AspNetCore.WebSockets.Infrastructure
{
public abstract class TextWebSocketSubprotocolBase
{
public event EventHandler<string> Receive;

public async virtual Task SendAsync(string message, WebSocket webSocket, CancellationToken cancellationToken)
public virtual Task SendAsync(string message, WebSocket webSocket, IWebSocketCompressionProvider webSocketCompressionProvider, CancellationToken cancellationToken)
{
if (webSocket.State == WebSocketState.Open)
{
ArraySegment<byte> buffer = new ArraySegment<byte>(Encoding.ASCII.GetBytes(message), 0, message.Length);

await webSocket.SendAsync(buffer, WebSocketMessageType.Text, true, cancellationToken);
}
return webSocketCompressionProvider.CompressTextMessageAsync(webSocket, message, cancellationToken);
}

public virtual string Read(byte[] bytes)
public virtual string Read(string webSocketMessage)
{
return Encoding.ASCII.GetString(bytes);
return webSocketMessage;
}
}
}
15 changes: 9 additions & 6 deletions Demo.AspNetCore.WebSockets/Infrastructure/WebSocketConnection.cs
@@ -1,14 +1,16 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System;
using Lib.AspNetCore.WebSocketsCompression.Providers;

namespace Demo.AspNetCore.WebSockets.Infrastructure
{
public class WebSocketConnection
{
#region Fields
private WebSocket _webSocket;
private IWebSocketCompressionProvider _webSocketCompressionProvider;
private ITextWebSocketSubprotocol _subProtocol;
#endregion

Expand All @@ -21,22 +23,23 @@ public class WebSocketConnection
#endregion

#region Constructor
public WebSocketConnection(WebSocket webSocket, ITextWebSocketSubprotocol subProtocol)
public WebSocketConnection(WebSocket webSocket, IWebSocketCompressionProvider webSocketCompressionProvider, ITextWebSocketSubprotocol subProtocol)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
_webSocketCompressionProvider = webSocketCompressionProvider ?? throw new ArgumentNullException(nameof(webSocketCompressionProvider));
_subProtocol = subProtocol ?? throw new ArgumentNullException(nameof(subProtocol));
}
#endregion

#region Methods
public Task SendAsync(string message, CancellationToken cancellationToken)
{
return _subProtocol.SendAsync(message, _webSocket, cancellationToken);
return _subProtocol.SendAsync(message, _webSocket, _webSocketCompressionProvider, cancellationToken);
}

public void OnReceive(byte[] receivedBytes)
public void OnReceive(string webSocketMessage)
{
string message = _subProtocol.Read(receivedBytes);
string message = _subProtocol.Read(webSocketMessage);

Receive?.Invoke(this, message);
}
Expand Down
@@ -1,12 +1,13 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Collections.Generic;
using Microsoft.AspNetCore.Http;
using Demo.AspNetCore.WebSockets.Infrastructure;
using Demo.AspNetCore.WebSockets.Services;
using Lib.AspNetCore.WebSocketsCompression;
using Lib.AspNetCore.WebSocketsCompression.Providers;

namespace Demo.AspNetCore.WebSockets.Middlewares
{
Expand All @@ -15,13 +16,15 @@ public class WebSocketConnectionsMiddleware
#region Fields
private WebSocketConnectionsOptions _options;
private IWebSocketConnectionsService _connectionsService;
private IWebSocketCompressionService _compressionService;
#endregion

#region Constructor
public WebSocketConnectionsMiddleware(RequestDelegate next, WebSocketConnectionsOptions options, IWebSocketConnectionsService connectionsService)
public WebSocketConnectionsMiddleware(RequestDelegate next, WebSocketConnectionsOptions options, IWebSocketConnectionsService connectionsService, IWebSocketCompressionService compressionService)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_connectionsService = connectionsService ?? throw new ArgumentNullException(nameof(connectionsService));
_compressionService = compressionService ?? throw new ArgumentNullException(nameof(compressionService));
}
#endregion

Expand All @@ -34,13 +37,15 @@ public async Task Invoke(HttpContext context)
{
ITextWebSocketSubprotocol subProtocol = NegotiateSubProtocol(context.WebSockets.WebSocketRequestedProtocols);

IWebSocketCompressionProvider webSocketCompressionProvider = _compressionService.NegotiateCompression(context);

WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol?.SubProtocol);

WebSocketConnection webSocketConnection = new WebSocketConnection(webSocket, subProtocol ?? _options.DefaultSubProtocol);
WebSocketConnection webSocketConnection = new WebSocketConnection(webSocket, webSocketCompressionProvider, subProtocol ?? _options.DefaultSubProtocol);
webSocketConnection.Receive += async (sender, message) => { await webSocketConnection.SendAsync(message, CancellationToken.None); };
_connectionsService.AddConnection(webSocketConnection);

WebSocketReceiveResult webSocketCloseResult = await ReceiveMessagesAsync(webSocket, webSocketConnection);
WebSocketReceiveResult webSocketCloseResult = await ReceiveMessagesAsync(webSocket, webSocketCompressionProvider, webSocketConnection);

await webSocket.CloseAsync(webSocketCloseResult.CloseStatus.Value, webSocketCloseResult.CloseStatusDescription, CancellationToken.None);

Expand Down Expand Up @@ -78,34 +83,23 @@ private ITextWebSocketSubprotocol NegotiateSubProtocol(IList<string> requestedSu
return subProtocol;
}

private async Task<WebSocketReceiveResult> ReceiveMessagesAsync(WebSocket webSocket, WebSocketConnection webSocketConnection)
private async Task<WebSocketReceiveResult> ReceiveMessagesAsync(WebSocket webSocket, IWebSocketCompressionProvider webSocketCompressionProvider, WebSocketConnection webSocketConnection)
{
byte[] webSocketBuffer = new byte[1024 * 4];
WebSocketReceiveResult webSocketReceiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(webSocketBuffer), CancellationToken.None);
byte[] receivePayloadBuffer = new byte[_options.ReceivePayloadBufferSize];
WebSocketReceiveResult webSocketReceiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receivePayloadBuffer), CancellationToken.None);
while (webSocketReceiveResult.MessageType != WebSocketMessageType.Close)
{
byte[] webSocketReceivedBytes = null;

if (webSocketReceiveResult.EndOfMessage)
if (webSocketReceiveResult.MessageType == WebSocketMessageType.Binary)
{
webSocketReceivedBytes = new byte[webSocketReceiveResult.Count];
Array.Copy(webSocketBuffer, webSocketReceivedBytes, webSocketReceivedBytes.Length);
await webSocketCompressionProvider.DecompressBinaryMessageAsync(webSocket, webSocketReceiveResult, receivePayloadBuffer);
}
else
{
IEnumerable<byte> webSocketReceivedBytesEnumerable = Enumerable.Empty<byte>();
webSocketReceivedBytesEnumerable = webSocketReceivedBytesEnumerable.Concat(webSocketBuffer);

while (!webSocketReceiveResult.EndOfMessage)
{
webSocketReceiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(webSocketBuffer), CancellationToken.None);
webSocketReceivedBytesEnumerable = webSocketReceivedBytesEnumerable.Concat(webSocketBuffer.Take(webSocketReceiveResult.Count));
}
string webSocketMessage = await webSocketCompressionProvider.DecompressTextMessageAsync(webSocket, webSocketReceiveResult, receivePayloadBuffer);
webSocketConnection.OnReceive(webSocketMessage);
}

webSocketConnection.OnReceive(webSocketReceivedBytes);

webSocketReceiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(webSocketBuffer), CancellationToken.None);
webSocketReceiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receivePayloadBuffer), CancellationToken.None);
}

return webSocketReceiveResult;
Expand Down
Expand Up @@ -10,5 +10,14 @@ public class WebSocketConnectionsOptions
public IList<ITextWebSocketSubprotocol> SupportedSubProtocols { get; set; }

public ITextWebSocketSubprotocol DefaultSubProtocol { get; set; }

public int? SendSegmentSize { get; set; }

public int ReceivePayloadBufferSize { get; set; }

public WebSocketConnectionsOptions()
{
ReceivePayloadBufferSize = 4 * 1024;
}
}
}
6 changes: 4 additions & 2 deletions Demo.AspNetCore.WebSockets/Startup.cs
Expand Up @@ -14,6 +14,7 @@ public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddWebSocketCompression();
services.AddWebSocketConnections();
}

Expand All @@ -37,12 +38,13 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env, IService
new JsonWebSocketSubprotocol(),
textWebSocketSubprotocol
},
DefaultSubProtocol = textWebSocketSubprotocol
DefaultSubProtocol = textWebSocketSubprotocol,
SendSegmentSize = 4 * 1024
};

app.UseDefaultFiles(defaultFilesOptions)
.UseStaticFiles()
.UseWebSockets()
.UseWebSocketsCompression()
.MapWebSocketConnections("/socket", webSocketConnectionsOptions)
.Run(async (context) =>
{
Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -5,3 +5,4 @@ Sample project for demonstrating low-level WebSocket protocol aspects in ASP.NET
- Basic connections management
- [Subprotocol negotiation](https://www.tpeczek.com/2017/06/websocket-subprotocol-negotiation-in.html)
- [Cross-Site WebSocket Hijacking protection](https://www.tpeczek.com/2017/07/preventing-cross-site-websocket.html)
- [Per-message compression](https://www.tpeczek.com/2017/07/websocket-per-message-compression-in.html)

0 comments on commit 74004bc

Please sign in to comment.