diff --git a/.vscode/snippets.code-snippets b/.vscode/snippets.code-snippets
new file mode 100644
index 0000000..e324631
--- /dev/null
+++ b/.vscode/snippets.code-snippets
@@ -0,0 +1,49 @@
+ // Place your snippets for csharp here. Each snippet is defined under a snippet name and has a prefix, body and
+ // description. The prefix is what is used to trigger the snippet and the body will be expanded and inserted. Possible variables are:
+ // $1, $2 for tab stops, $0 for the final cursor position, and ${1:label}, ${2:another} for placeholders. Placeholders with the
+ // same ids are connected.
+ // Example:
+ // "Print to console": {
+ // "prefix": "log",
+ // "body": [
+ // "console.log('$1');",
+ // "$2"
+ // ],
+ // "description": "Log output to console"
+ // }
+ {
+ "fact": {
+ "scope": "csharp",
+ "prefix": "fact",
+ "body": [
+ "[Fact]",
+ "public void $1()",
+ "{",
+ " // ARRANGE",
+ " $2",
+ " // ACT",
+ " ",
+ " // ASSERT",
+ " ",
+ "}",
+ ],
+ "description": "XUNIT fact"
+ },
+ "afact": {
+ "scope": "csharp",
+ "prefix": "afact",
+ "body": [
+ "[Fact]",
+ "public async Task $1()",
+ "{",
+ " // ARRANGE",
+ " $2",
+ " // ACT",
+ " ",
+ " // ASSERT",
+ " ",
+ "}",
+ ],
+ "description": "XUNIT fact"
+ }
+ }
\ No newline at end of file
diff --git a/src/HassClient/Client/HassClient.cs b/src/HassClient/Client/HassClient.cs
index 391d7eb..48010fc 100644
--- a/src/HassClient/Client/HassClient.cs
+++ b/src/HassClient/Client/HassClient.cs
@@ -151,11 +151,6 @@ public class HassClient : IHassClient
///
private const int DefaultChannelSize = 200;
- ///
- /// Default read buffer size for websockets
- ///
- private const int DefaultReceiveBufferSize = 1024 * 4;
-
///
/// The default timeout for websockets
///
@@ -209,13 +204,9 @@ public class HassClient : IHassClient
///
private readonly string _messageLogLevel;
- ///
- /// Channel used as a async thread safe way to wite messages to the websocket
- ///
- private readonly Channel _writeChannel =
- Channel.CreateBounded(DefaultChannelSize);
-
private readonly IClientWebSocketFactory _wsFactory;
+ private readonly ITransportPipelineFactory? _pipelineFactory;
+ private readonly ILoggerFactory _loggerFactory;
///
/// Base url to the API (non socket)
@@ -232,6 +223,7 @@ public class HassClient : IHassClient
/// Avoids recursive states
///
private bool _isClosing;
+ private ITransportPipeline? _messagePipeline;
///
/// Channel used as a async thread safe way to read resultmessages from the websocket
@@ -249,21 +241,17 @@ public class HassClient : IHassClient
///
private Task? _readMessagePumpTask;
- ///
- /// Async task to write messages
- ///
- private Task? _writeMessagePumpTask;
-
///
/// The underlying currently connected socket or null if not connected
///
- private IClientWebSocket? _ws;
+ private IClientWebSocket? _ws = null;
+
///
/// Instance a new HassClient
///
/// The LogFactory to use for logging, null uses default values from config.
public HassClient(ILoggerFactory? logFactory = null) :
- this(logFactory, new ClientWebSocketFactory(), null)
+ this(logFactory, new WebSocketMessagePipelineFactory(), new ClientWebSocketFactory(), null)
{ }
///
@@ -272,7 +260,11 @@ public HassClient(ILoggerFactory? logFactory = null) :
/// The LogFactory to use for logging, null uses default values from config.
/// The factory to use for websockets, mainly for testing purposes
/// httpMessage handler (used for mocking)
- internal HassClient(ILoggerFactory? logFactory, IClientWebSocketFactory? wsFactory, HttpMessageHandler? httpMessageHandler)
+ internal HassClient(
+ ILoggerFactory? logFactory,
+ ITransportPipelineFactory? pipelineFactory,
+ IClientWebSocketFactory? wsFactory,
+ HttpMessageHandler? httpMessageHandler)
{
logFactory ??= _getDefaultLoggerFactory;
wsFactory ??= new ClientWebSocketFactory();
@@ -280,7 +272,11 @@ internal HassClient(ILoggerFactory? logFactory, IClientWebSocketFactory? wsFacto
new HttpClient(httpMessageHandler) : new HttpClient();
_wsFactory = wsFactory;
+ _pipelineFactory = pipelineFactory;
+
+ _loggerFactory = logFactory;
_logger = logFactory.CreateLogger();
+
_messageLogLevel = Environment.GetEnvironmentVariable("HASSCLIENT_MSGLOGLEVEL") ?? "Default";
}
@@ -332,7 +328,7 @@ public async Task CallService(string domain, string service, object servic
Domain = domain,
Service = service,
ServiceData = serviceData
- }, waitForResponse);
+ }, waitForResponse).ConfigureAwait(false);
return result.Success ?? false;
}
catch (OperationCanceledException)
@@ -378,7 +374,7 @@ public async Task CloseAsync()
)
{
// after this, the socket state which change to CloseSent
- await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", timeout.Token);
+ await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", timeout.Token).ConfigureAwait(false);
// now we wait for the server response, which will close the socket
while (_ws.State != WebSocketState.Closed && !timeout.Token.IsCancellationRequested)
await Task.Delay(100).ConfigureAwait(false);
@@ -393,10 +389,13 @@ public async Task CloseAsync()
// Cancel all async stuff
CancelSource.Cancel();
+ if (_messagePipeline is object)
+ await _messagePipeline.CloseAsync().ConfigureAwait(false);
+
// Wait for read and write tasks to complete max 5 seconds
- if (_readMessagePumpTask != null && _writeMessagePumpTask != null)
+ if (_readMessagePumpTask is object)
{
- await Task.WhenAll(_readMessagePumpTask, _writeMessagePumpTask);
+ await _readMessagePumpTask.ConfigureAwait(false);
}
}
catch
@@ -410,6 +409,11 @@ public async Task CloseAsync()
_ws.Dispose();
_ws = null;
+ if (_messagePipeline is object)
+ await _messagePipeline.DisposeAsync().ConfigureAwait(false);
+
+ _messagePipeline = null;
+
if (CancelSource != null)
CancelSource.Dispose();
@@ -483,7 +487,7 @@ public async Task ConnectAsync(Uri url, string token,
using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
timerTokenSource.Token, CancelSource.Token);
- await ws.ConnectAsync(url, connectTokenSource.Token);
+ await ws.ConnectAsync(url, connectTokenSource.Token).ConfigureAwait(false);
if (ws.State == WebSocketState.Open)
{
@@ -491,14 +495,14 @@ public async Task ConnectAsync(Uri url, string token,
InitStatesOnConnect(ws);
// Do the authenticate and get the auhtorization response
- HassMessage result = await HandleConnectAndAuthenticate(token, connectTokenSource);
+ HassMessage result = await HandleConnectAndAuthenticate(token, connectTokenSource).ConfigureAwait(false);
switch (result.Type)
{
case "auth_ok":
if (getStatesOnConnect)
{
- await GetStates(connectTokenSource);
+ await GetStates(connectTokenSource).ConfigureAwait(false);
}
_logger.LogTrace($"Connected to websocket ({url}) on host {url.Host} and the api ({_apiUrl})");
@@ -533,7 +537,7 @@ public async Task ConnectAsync(Uri url, string token,
///
public async Task GetConfig()
{
- HassMessage hassResult = await SendCommandAndWaitForResponse(new GetConfigCommand());
+ HassMessage hassResult = await SendCommandAndWaitForResponse(new GetConfigCommand()).ConfigureAwait(false);
object resultMessage =
hassResult.Result ?? throw new ApplicationException("Unexpected response from command");
@@ -562,13 +566,11 @@ public async Task PingAsync(int timeout)
try
{
- if (SendMessage(new HassPingCommand()))
+ await SendMessage(new HassPingCommand()).ConfigureAwait(false);
+ HassMessage result = await _messageChannel.Reader.ReadAsync(pingTokenSource.Token).ConfigureAwait(false);
+ if (result.Type == "pong")
{
- HassMessage result = await _messageChannel.Reader.ReadAsync(pingTokenSource.Token);
- if (result.Type == "pong")
- {
- return true;
- }
+ return true;
}
}
catch (OperationCanceledException) { } // Do nothing
@@ -582,7 +584,7 @@ public async Task PingAsync(int timeout)
/// Set subscribeEvents=true on ConnectAsync to use.
/// OperationCanceledException if the operation is canceled.
/// Returns next event
- public async Task ReadEventAsync() => await _eventChannel.Reader.ReadAsync(CancelSource.Token);
+ public async Task ReadEventAsync() => await _eventChannel.Reader.ReadAsync(CancelSource.Token).ConfigureAwait(false);
///
/// Returns next incoming event and completes async operation
@@ -593,7 +595,7 @@ public async Task PingAsync(int timeout)
public async Task ReadEventAsync(CancellationToken token)
{
using var cancelSource = CancellationTokenSource.CreateLinkedTokenSource(CancelSource.Token, token);
- return await _eventChannel.Reader.ReadAsync(cancelSource.Token);
+ return await _eventChannel.Reader.ReadAsync(cancelSource.Token).ConfigureAwait(false);
}
public async Task SendEvent(string eventId, object? data = null)
@@ -639,11 +641,11 @@ public async Task SendEvent(string eventId, object? data = null)
{
var result = await _httpClient.PostAsync(new Uri(apiUrl),
new StringContent(content, Encoding.UTF8),
- CancelSource.Token);
+ CancelSource.Token).ConfigureAwait(false);
if (result.IsSuccessStatusCode)
{
- var hassState = await JsonSerializer.DeserializeAsync(await result.Content.ReadAsStreamAsync(),
+ var hassState = await JsonSerializer.DeserializeAsync(await result.Content.ReadAsStreamAsync().ConfigureAwait(false),
_defaultSerializerOptions);
return hassState;
@@ -691,14 +693,14 @@ public async Task SubscribeToEvents(EventType eventType = EventType.All)
}
}
- var result = await SendCommandAndWaitForResponse(command);
+ var result = await SendCommandAndWaitForResponse(command).ConfigureAwait(false);
return result.Success ?? false;
}
public async Task> GetServices()
{
- HassMessage servicesResult = await SendCommandAndWaitForResponse(new GetServicesCommand());
+ HassMessage servicesResult = await SendCommandAndWaitForResponse(new GetServicesCommand()).ConfigureAwait(false);
var resultMessage =
servicesResult.Result as IEnumerable
@@ -717,140 +719,54 @@ servicesResult.Result as IEnumerable
///
internal virtual async Task ProcessNextMessage()
{
- var pipe = new Pipe();
-
- using var cancelProcessNextMessage = new CancellationTokenSource();
-
- using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
- cancelProcessNextMessage.Token, CancelSource.Token);
+ if (_messagePipeline is null)
+ {
+ _logger.LogWarning("Processing message with no {pipeline} set! returning.", nameof(_messagePipeline));
+ return;
+ }
- await Task.WhenAll(
- Task.Run(ReadFromClientSocket, cancelTokenSource.Token),
- Task.Run(WriteMessagesToChannel, cancelTokenSource.Token)
- );
- // Task that reads the next message from websocket
- async Task ReadFromClientSocket()
+ try
{
- try
- {
- while (_ws != null && (!CancelSource.Token.IsCancellationRequested && !_ws.CloseStatus.HasValue))
- {
- Memory memory = pipe.Writer.GetMemory(DefaultReceiveBufferSize);
+ // ReSharper disable once AccessToDisposedClosure
+ var m = await _messagePipeline.GetNextMessageAsync(CancelSource.Token).ConfigureAwait(false);
- // ReSharper disable once AccessToDisposedClosure
- ValueWebSocketReceiveResult result = await _ws.ReceiveAsync(memory, cancelTokenSource.Token);
- if (!CancelSource.Token.IsCancellationRequested)
+ switch (m?.Type)
+ {
+ case "event":
+ if (m.Event != null)
{
- if (_ws.State == WebSocketState.CloseReceived && result.MessageType == WebSocketMessageType.Close)
- {
- await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Acknowledge Close frame", CancellationToken.None);
- CancelSource.Cancel();
- }
- else
- {
- if (_ws.State == WebSocketState.Open && result.MessageType != WebSocketMessageType.Close)
- {
- // Log incoming messages for correct loglevel and tracing is enabled
- if (_messageLogLevel != "None" && _logger.IsEnabled(LogLevel.Trace) && result.Count > 0)
- {
- var strMessageReceived = UTF8Encoding.UTF8.GetString(memory.Slice(0, result.Count).ToArray());
- if (_messageLogLevel == "All")
- _logger.LogTrace("ReadClientSocket, message: {strMessageReceived}", strMessageReceived);
- else if (_messageLogLevel == "Default")
- {
- // Log all but events
- if (strMessageReceived.Contains("\"type\": \"event\"") == false)
- _logger.LogTrace("ReadClientSocket, message: {strMessageReceived}", strMessageReceived);
- }
- }
- // Advance writer to the read ne of bytes
- pipe.Writer.Advance(result.Count);
-
- await pipe.Writer.FlushAsync();
-
- if (result.EndOfMessage)
- {
- // We have successfully read the whole message, make available to reader
- await pipe.Writer.CompleteAsync();
- break;
- }
- }
- }
+ _eventChannel.Writer.TryWrite(m.Event);
}
- }
- }
- catch (OperationCanceledException)
- {
- // Canceled the thread just leave
- throw;
- }
- catch (Exception e)
- {
- _logger.LogError(e, "Major failure in ReadFromClientSocket, exit...");
- // Make sure we always cancel the other task of any reason
- // ReSharper disable once AccessToDisposedClosure
- cancelProcessNextMessage.Cancel(true);
- throw;
+
+ break;
+ case "auth_required":
+ case "auth_ok":
+ case "auth_invalid":
+ case "call_service":
+ case "get_config":
+ case "pong":
+ _messageChannel.Writer.TryWrite(m);
+ break;
+ case "result":
+ var resultMessage = GetResultMessage(m);
+ if (resultMessage != null)
+ _messageChannel.Writer.TryWrite(resultMessage);
+ break;
+ default:
+ _logger.LogDebug($"Unexpected eventtype {m?.Type}, discarding message!");
+ break;
}
}
-
- // Task that deserializes the message and write the finished message to a channel
- async Task WriteMessagesToChannel()
+ catch (OperationCanceledException)
{
- try
- {
- // ReSharper disable once AccessToDisposedClosure
- HassMessage m = await JsonSerializer.DeserializeAsync(pipe.Reader.AsStream(),
- cancellationToken: cancelTokenSource.Token);
-
- await pipe.Reader.CompleteAsync();
- switch (m.Type)
- {
- case "event":
- if (m.Event != null)
- {
- _eventChannel.Writer.TryWrite(m.Event);
- }
-
- break;
- case "auth_required":
- case "auth_ok":
- case "auth_invalid":
- case "call_service":
- case "get_config":
- case "pong":
- _messageChannel.Writer.TryWrite(m);
- break;
- case "result":
- var resultMessage = GetResultMessage(m);
- if (resultMessage != null)
- _messageChannel.Writer.TryWrite(resultMessage);
- break;
- default:
- _logger.LogDebug($"Unexpected eventtype {m.Type}, discarding message!");
- break;
- }
- }
- catch (OperationCanceledException)
- {
- // Canceled the thread just leave
- }
- catch (Exception e)
- {
- // Todo: Log the seralizer error here later but continue receive
- // messages from the server. Then we can survive the server
- // Sending bad json messages
- _logger.LogDebug(e, "Error deserialize json response");
- // Make sure we put a small delay incase we have severe error so the loop
- // doesn't kill the server
-
- // ReSharper disable once AccessToDisposedClosure
- await Task.Delay(20, cancelTokenSource.Token);
- // ReSharper disable once AccessToDisposedClosure
- cancelProcessNextMessage.Cancel();
- }
+ // Canceled the thread just leave
+ }
+ catch (Exception e)
+ {
+ // Sending bad json messages
+ _logger.LogDebug(e, "Error deserialize json response");
}
}
@@ -864,15 +780,14 @@ internal virtual async ValueTask SendCommandAndWaitForResponse(Comm
try
{
- if (!SendMessage(message, waitForResponse))
- throw new ApplicationException($"Send message {message.Type} failed!");
+ await SendMessage(message, waitForResponse).ConfigureAwait(false);
if (!waitForResponse)
return new HassMessage { Success = true };
while (true)
{
- HassMessage result = await _messageChannel.Reader.ReadAsync(sendCommandTokenSource.Token);
+ HassMessage result = await _messageChannel.Reader.ReadAsync(sendCommandTokenSource.Token).ConfigureAwait(false);
if (result.Id == message.Id)
{
return result;
@@ -886,9 +801,8 @@ internal virtual async ValueTask SendCommandAndWaitForResponse(Comm
throw new ApplicationException("Failed to write to message channel!");
}
-
// Delay for a short period to let the message arrive we are searching for
- await Task.Delay(10);
+ await Task.Delay(10).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
@@ -910,8 +824,14 @@ internal virtual async ValueTask SendCommandAndWaitForResponse(Comm
/// The message to send
/// True if sender expects response
/// True if successful
- internal virtual bool SendMessage(HassMessageBase message, bool waitForResponse = true)
+ internal virtual Task SendMessage(HassMessageBase message, bool waitForResponse = true)
{
+ if (_messagePipeline is null)
+ {
+ _logger.LogWarning("SendMessage called with no {pipeline} set!", nameof(_messagePipeline));
+ throw new ApplicationException($"SendMessage called with no {nameof(_messagePipeline)} set!");
+ }
+
_logger.LogTrace($"Sends message {message.Type}");
if (message is CommandMessage commandMessage)
{
@@ -927,7 +847,7 @@ internal virtual bool SendMessage(HassMessageBase message, bool waitForResponse
}
}
- return _writeChannel.Writer.TryWrite(message);
+ return _messagePipeline.SendMessageAsync(message, CancelSource.Token);
}
///
@@ -985,8 +905,8 @@ internal virtual bool SendMessage(HassMessageBase message, bool waitForResponse
private async Task GetStates(CancellationTokenSource connectTokenSource)
{
- SendMessage(new GetStatesCommand());
- HassMessage result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token);
+ await SendMessage(new GetStatesCommand()).ConfigureAwait(false);
+ HassMessage result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token).ConfigureAwait(false);
if (result?.Result is List wsResult)
{
foreach (HassState state in wsResult)
@@ -999,11 +919,11 @@ private async Task GetStates(CancellationTokenSource connectTokenSource)
private async Task HandleConnectAndAuthenticate(string token,
CancellationTokenSource connectTokenSource)
{
- HassMessage result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token);
+ HassMessage result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token).ConfigureAwait(false);
if (result.Type == "auth_required")
{
- SendMessage(new HassAuthMessage { AccessToken = token });
- result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token);
+ await SendMessage(new HassAuthMessage { AccessToken = token }).ConfigureAwait(false);
+ result = await _messageChannel.Reader.ReadAsync(connectTokenSource.Token).ConfigureAwait(false);
}
return result;
@@ -1012,18 +932,21 @@ private async Task HandleConnectAndAuthenticate(string token,
private void InitStatesOnConnect(IClientWebSocket ws)
{
_ws = ws;
+
_messageId = 1;
_isClosing = false;
+ _messagePipeline = _pipelineFactory?.CreateWebSocketMessagePipeline(_ws, _loggerFactory);
// Make sure we have new channels so we are not have old messages
_messageChannel = Channel.CreateBounded(DefaultChannelSize);
_eventChannel = Channel.CreateBounded(DefaultChannelSize);
CancelSource = new CancellationTokenSource();
_readMessagePumpTask = Task.Run(ReadMessagePump);
- _writeMessagePumpTask = Task.Run(WriteMessagePump);
+ // _writeMessagePumpTask = Task.Run(WriteMessagePump);
}
+
///
/// A pump that reads incoming messages and put them on the read channel.
///
@@ -1036,7 +959,7 @@ private async Task ReadMessagePump()
{
try
{
- await ProcessNextMessage();
+ await ProcessNextMessage().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -1046,64 +969,22 @@ private async Task ReadMessagePump()
// Should never cast any other exception, if so it just not handle them here
}
+ // Cancel rest of operation
+ CancelSource.Cancel();
_logger.LogTrace("Exit ReadMessagePump");
}
- private async Task WriteMessagePump()
- {
- _logger.LogTrace("Start WriteMessagePump");
-
- while (_ws != null && (!CancelSource.IsCancellationRequested && !_ws.CloseStatus.HasValue))
- {
- try
- {
- HassMessageBase nextMessage = await _writeChannel.Reader.ReadAsync(CancelSource.Token);
-
- if (_ws.State != WebSocketState.Open && _ws.State != WebSocketState.CloseReceived)
- {
- _logger.LogTrace("WriteMessagePump, state not Open or CloseReceived, exiting WriteMessagePump: {socketState}", _ws.State.ToString());
- return;
- }
-
- byte[] result = JsonSerializer.SerializeToUtf8Bytes(nextMessage, nextMessage.GetType(),
- _defaultSerializerOptions);
-
- await _ws.SendAsync(result, WebSocketMessageType.Text, true, CancelSource.Token);
-
- if (_logger.IsEnabled(LogLevel.Trace))
- {
- if (nextMessage is HassAuthMessage == false)
- {
- // We log everything but AuthMessage due to security reasons
- _logger.LogTrace("SendAsync, message: {result}", UTF8Encoding.UTF8.GetString(result));
- }
- {
- _logger.LogTrace("Sending auth message, not shown for security reasons");
- }
- }
- }
- catch (OperationCanceledException)
- {
- // Canceled the thread
- break;
- }
- }
-
- _logger.LogTrace("Exit WriteMessagePump");
- }
public async ValueTask DisposeAsync()
{
try
{
- await CloseAsync();
+ await CloseAsync().ConfigureAwait(false);
}
catch
{
// Ignore errors
}
-
-
}
}
}
\ No newline at end of file
diff --git a/src/HassClient/Client/HassWebSocket.cs b/src/HassClient/Client/HassWebSocket.cs
index fca6561..b7e6e32 100644
--- a/src/HassClient/Client/HassWebSocket.cs
+++ b/src/HassClient/Client/HassWebSocket.cs
@@ -54,30 +54,30 @@ internal class HassWebSocket : IClientWebSocket
public WebSocketCloseStatus? CloseStatus => _ws.CloseStatus;
- public async Task ConnectAsync(Uri uri, CancellationToken cancel) => await _ws.ConnectAsync(uri, cancel);
+ public Task ConnectAsync(Uri uri, CancellationToken cancel) => _ws.ConnectAsync(uri, cancel);
- public async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription,
+ public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription,
CancellationToken cancellationToken) =>
- await _ws.CloseAsync(closeStatus, statusDescription, cancellationToken);
+ _ws.CloseAsync(closeStatus, statusDescription, cancellationToken);
- public async Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription,
+ public Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription,
CancellationToken cancellationToken) =>
- await _ws.CloseAsync(closeStatus, statusDescription, cancellationToken);
+ _ws.CloseAsync(closeStatus, statusDescription, cancellationToken);
- public async Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage,
+ public Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage,
CancellationToken cancellationToken) =>
- await _ws.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
+ _ws.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
public async ValueTask SendAsync(ReadOnlyMemory buffer, WebSocketMessageType messageType,
bool endOfMessage, CancellationToken cancellationToken) =>
await Task.FromException(new NotImplementedException());
- public async Task ReceiveAsync(ArraySegment buffer,
+ public Task ReceiveAsync(ArraySegment buffer,
CancellationToken cancellationToken) =>
- await Task.FromException(new NotImplementedException());
+ Task.FromException(new NotImplementedException());
- public async ValueTask ReceiveAsync(Memory buffer,
- CancellationToken cancellationToken) => await _ws.ReceiveAsync(buffer, cancellationToken);
+ public ValueTask ReceiveAsync(Memory buffer,
+ CancellationToken cancellationToken) => _ws.ReceiveAsync(buffer, cancellationToken);
#region IDisposable Support
diff --git a/src/HassClient/Client/internal/WebSocketMessagePipeline.cs b/src/HassClient/Client/internal/WebSocketMessagePipeline.cs
new file mode 100644
index 0000000..5c2fcbf
--- /dev/null
+++ b/src/HassClient/Client/internal/WebSocketMessagePipeline.cs
@@ -0,0 +1,372 @@
+using System;
+using System.IO.Pipelines;
+using System.Net.WebSockets;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace JoySoftware.HomeAssistant.Client
+{
+ public interface ITransportPipeline : IAsyncDisposable where T : class
+ {
+ ///
+ /// Gets next message from pipeline
+ ///
+ ValueTask GetNextMessageAsync(CancellationToken cancellationToken);
+
+ ///
+ /// Sends a message to the pipline
+ ///
+ ///
+ Task SendMessageAsync(T1 message, CancellationToken cancellationToken) where T1 : class;
+
+ ///
+ /// Close the pipeline, it will also close the underlying websocket
+ ///
+ Task CloseAsync();
+
+ ///
+ /// Returns true if the pipeline is accepting and receiving messages
+ ///
+ bool IsValid { get; }
+ }
+
+ public interface ITransportPipelineFactory where T : class
+ {
+ ITransportPipeline CreateWebSocketMessagePipeline(
+ IClientWebSocket webSocketClient,
+ ILoggerFactory? loggerFactory = null);
+
+
+ }
+
+ internal class WebSocketMessagePipelineFactory : ITransportPipelineFactory where T : class
+ {
+ public ITransportPipeline CreateWebSocketMessagePipeline(IClientWebSocket webSocketClient, ILoggerFactory? loggerFactory = null)
+ {
+ return WebSocketMessagePipeline.CreateWebSocketMessagePipeline(webSocketClient, loggerFactory);
+ }
+ }
+
+ internal class WebSocketMessagePipeline : ITransportPipeline where T : class
+ {
+ private readonly ILogger> _logger;
+
+ private readonly IClientWebSocket _ws;
+ private readonly Task _readMessagePumpTask;
+ private readonly Task _writeMessagePumpTask;
+
+ // Used on DisposeAsync to make sure the tasks are ended
+ private readonly CancellationTokenSource _internalCancellationSource = new CancellationTokenSource();
+ private readonly CancellationToken _internalCancelToken;
+
+ private readonly Pipe _pipe = new Pipe();
+
+ ///
+ /// The max time we will wait for the socket to gracefully close
+ ///
+ private const int MaxWaitTimeSocketClose = 5000; // 5 seconds
+
+ ///
+ /// Default size for channel
+ ///
+ private const int DefaultChannelSize = 200;
+
+ ///
+ /// Channel of the messages read
+ ///
+ private readonly Channel _inChannel =
+ Channel.CreateBounded(DefaultChannelSize);
+
+ ///
+ /// Channel of the messages sent
+ ///
+ private readonly Channel
- public class HomeAssistantMock : IDisposable
+ public class HomeAssistantMock : IAsyncDisposable
{
public static readonly int RecieiveBufferSize = 1024 * 4;
private readonly IHost _host;
@@ -30,8 +30,6 @@ public HomeAssistantMock()
_host.Start();
}
- public void Dispose() => _host?.Dispose();
-
///
/// Starts a websocket server in a generic host
///
@@ -47,11 +45,16 @@ public static IHostBuilder CreateHostBuilder() =>
///
/// Stops the fake Home Assistant server
///
- public void Stop()
+ public async Task Stop()
{
- _host.StopAsync();
+ await _host.StopAsync().ConfigureAwait(false);
_host.WaitForShutdown();
}
+
+ public async ValueTask DisposeAsync()
+ {
+ await Stop().ConfigureAwait(false);
+ }
}
///
@@ -119,11 +122,13 @@ private async Task ProcessWS(WebSocket webSocket)
// First send auth required to the client
byte[] authRequiredMessage = File.ReadAllBytes(Path.Combine(_mockTestdataPath, "auth_required.json"));
await webSocket.SendAsync(new ArraySegment(authRequiredMessage, 0, authRequiredMessage.Length),
- WebSocketMessageType.Text, true, cancelSource.Token);
+ WebSocketMessageType.Text, true, cancelSource.Token).ConfigureAwait(false);
// Wait for incoming messages
WebSocketReceiveResult result =
- await webSocket.ReceiveAsync(new ArraySegment(buffer), cancelSource.Token);
+ await webSocket.ReceiveAsync(new ArraySegment(buffer), cancelSource.Token).ConfigureAwait(false);
+
+ // Console.WriteLine($"SERVER: WebSocketState = {webSocket.State}, MessageType = {result.MessageType}");
while (!result.CloseStatus.HasValue)
{
HassMessage hassMessage =
@@ -141,7 +146,7 @@ await webSocket.SendAsync(new ArraySegment(authRequiredMessage, 0, authReq
// byte[] authOkMessage = File.ReadAllBytes (Path.Combine (this.mockTestdataPath, "auth_ok.json"));
await webSocket.SendAsync(
new ArraySegment(_authOkMessage, 0, _authOkMessage.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
}
else
{
@@ -150,7 +155,7 @@ await webSocket.SendAsync(
File.ReadAllBytes(Path.Combine(_mockTestdataPath, "auth_notok.json"));
await webSocket.SendAsync(
new ArraySegment(authNotOkMessage, 0, authNotOkMessage.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
// Hass will normally close session here but for the sake of testing it wont
}
@@ -158,7 +163,7 @@ await webSocket.SendAsync(
case "ping":
// Hardcoded to be correct for performance tests
await webSocket.SendAsync(new ArraySegment(_pongMessage, 0, _pongMessage.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
break;
case "subscribe_events":
SendCommandMessage subscribeEventMessage =
@@ -169,12 +174,12 @@ await webSocket.SendAsync(new ArraySegment(_pongMessage, 0, _pongMessage.L
byte[] responseString =
JsonSerializer.SerializeToUtf8Bytes(response, typeof(ResultMessage), serializeOptions);
await webSocket.SendAsync(new ArraySegment(responseString, 0, responseString.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
// For tests send a event message
byte[] newEventMessage = File.ReadAllBytes(Path.Combine(_mockTestdataPath, "event.json"));
await webSocket.SendAsync(
new ArraySegment(newEventMessage, 0, newEventMessage.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
// Hass will normally close session here but for the sake of testing it wont
break;
@@ -186,7 +191,7 @@ await webSocket.SendAsync(
File.ReadAllBytes(Path.Combine(_mockTestdataPath, "result_states.json"));
await webSocket.SendAsync(
new ArraySegment(stateReusultMessage, 0, stateReusultMessage.Length),
- WebSocketMessageType.Text, true, CancellationToken.None);
+ WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
break;
@@ -198,7 +203,7 @@ await webSocket.SendAsync(
{
// Send close message (some bug n CloseAsync makes we have to do it this way)
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing",
- timeout.Token);
+ timeout.Token).ConfigureAwait(false);
// Wait for close message
//await webSocket.ReceiveAsync(new ArraySegment(buffer), timeout.Token);
}
@@ -210,15 +215,15 @@ await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing",
}
// Wait for incoming messages
- result = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None);
+ result = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None).ConfigureAwait(false);
}
await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription,
- CancellationToken.None);
+ CancellationToken.None).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
- await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal", CancellationToken.None);
+ await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Normal", CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
diff --git a/tests/HassClient.Unit.Tests/HassClient.Unit.Tests.csproj b/tests/HassClient.Unit.Tests/HassClient.Unit.Tests.csproj
index 320750b..79312ff 100644
--- a/tests/HassClient.Unit.Tests/HassClient.Unit.Tests.csproj
+++ b/tests/HassClient.Unit.Tests/HassClient.Unit.Tests.csproj
@@ -16,8 +16,8 @@
all
-
-
+
+
all
diff --git a/tests/HassClient.Unit.Tests/HassClientTests.cs b/tests/HassClient.Unit.Tests/HassClientTests.cs
index 63e0e64..a0d002a 100644
--- a/tests/HassClient.Unit.Tests/HassClientTests.cs
+++ b/tests/HassClient.Unit.Tests/HassClientTests.cs
@@ -35,14 +35,14 @@ public async Task SubscriptToEventTypeShouldReturnEvent(EventType eventType)
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT AND ASSERT
var subscribeTask = hassClient.SubscribeToEvents(eventType);
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": true, ""result"": null}");
- Assert.True(await subscribeTask);
+ Assert.True(await subscribeTask.ConfigureAwait(false));
mock.AddResponse(HassWebSocketMock.EventMessage);
- HassEvent eventMsg = await hassClient.ReadEventAsync();
+ HassEvent eventMsg = await hassClient.ReadEventAsync().ConfigureAwait(false);
Assert.NotNull(eventMsg);
}
@@ -58,12 +58,12 @@ public async Task ConnectWithStateOtherThanOpenShouldReturnFalse(WebSocketState
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// Set Closed state to fake
mock.SetupGet(x => x.State).Returns(state);
// ACT and ASSERT
- Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false));
+ Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false).ConfigureAwait(false));
}
public class UnknownCommand : CommandMessage
@@ -77,7 +77,7 @@ public async Task CallServiceIfCanceledShouldThrowOperationCanceledException()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Do not add a fake service call message result
@@ -86,7 +86,7 @@ public async Task CallServiceIfCanceledShouldThrowOperationCanceledException()
hassClient.CancelSource.Cancel();
// ASSERT
- await Assert.ThrowsAsync(async () => await callServiceTask);
+ await Assert.ThrowsAsync(async () => await callServiceTask.ConfigureAwait(false));
}
[Fact]
@@ -95,7 +95,7 @@ public async Task CallServiceSuccessfulReturnsTrue()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Service call successful
mock.AddResponse(@"{
@@ -112,7 +112,7 @@ public async Task CallServiceSuccessfulReturnsTrue()
}");
// ACT
- var result = await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" });
+ var result = await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" }).ConfigureAwait(false);
// Assert
Assert.True(result);
@@ -124,7 +124,7 @@ public async Task CallServiceWithoutResponseShouldReturnSuccessWitoutReturnMessa
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Service call successful
mock.AddResponse(@"{
@@ -141,7 +141,7 @@ public async Task CallServiceWithoutResponseShouldReturnSuccessWitoutReturnMessa
}");
// ACT
- var result = await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" }, false);
+ var result = await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" }, false).ConfigureAwait(false);
// Assert
Assert.True(result);
@@ -172,13 +172,13 @@ public async Task CallServiceWithTimeoutShouldReturnFalse()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
hassClient.SocketTimeout = 10;
// ACT AND ASSERT
// Do not add a message and force timeout
- Assert.False(await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" }));
+ Assert.False(await hassClient.CallService("light", "turn_on", new { entity_id = "light.tomas_rum" }).ConfigureAwait(false));
}
[Fact]
@@ -187,7 +187,7 @@ public async Task ClientGetUnexpectedMessageRecoversResultNotNull()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT
var confTask = hassClient.GetConfig();
@@ -198,7 +198,7 @@ public async Task ClientGetUnexpectedMessageRecoversResultNotNull()
mock.AddResponse(HassWebSocketMock.ConfigMessage);
// ASSERT
- Assert.NotNull(await confTask);
+ Assert.NotNull(await confTask.ConfigureAwait(false));
}
[Fact]
@@ -207,9 +207,9 @@ public async Task CloseAsyncIsRanOnce()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- await using IHassClient hassClient = await mock.GetHassConnectedClient();
+ await using IHassClient hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
- await hassClient.CloseAsync();
+ await hassClient.CloseAsync().ConfigureAwait(false);
// ASSERT
mock.Verify(
@@ -225,7 +225,7 @@ public async Task CloseAsyncWithTimeoutThrowsOperationCanceledExceotion()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
mock.Setup(x =>
x.CloseAsync(It.IsAny(), It.IsAny(), It.IsAny()))
@@ -234,7 +234,7 @@ public async Task CloseAsyncWithTimeoutThrowsOperationCanceledExceotion()
hassClient.SocketTimeout = 20;
// ACT
- await hassClient.CloseAsync();
+ await hassClient.CloseAsync().ConfigureAwait(false);
// ASSERT
mock.Logger.AssertLogged(LogLevel.Trace, Times.AtLeastOnce());
@@ -246,7 +246,7 @@ public async Task CommandWithUnsuccessfulShouldThrowAggregateException()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT
Task confTask = hassClient.GetConfig();
@@ -264,7 +264,7 @@ public async Task ConfigShouldBeCorrect()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT
Task getConfigTask = hassClient.GetConfig();
@@ -300,13 +300,13 @@ public async Task ConnectAlreadyConnectedThrowsInvalidOperation()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT AND ASSERT
// The hass client already connected and should assert error
await Assert.ThrowsAsync(async () =>
- await hassClient.ConnectAsync(new Uri("ws://localhost:8192/api/websocket"), "TOKEN", false));
+ await hassClient.ConnectAsync(new Uri("ws://localhost:8192/api/websocket"), "TOKEN", false).ConfigureAwait(false)).ConfigureAwait(false);
}
[Fact]
@@ -315,7 +315,7 @@ public async Task ConnectShouldReturnTrue()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// First message from Home Assistant is auth required
mock.AddResponse(@"{""type"": ""auth_required""}");
@@ -324,7 +324,7 @@ public async Task ConnectShouldReturnTrue()
// ACT and ASSERT
// Calls connect without getting the states initially
- Assert.True(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false));
+ Assert.True(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false).ConfigureAwait(false));
}
[Fact]
@@ -333,13 +333,13 @@ public async Task ConnectTimeoutReturnsFalse()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client and we add no response messages
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// Set the timeout to a very low value for testing purposes
hassClient.SocketTimeout = 20;
// ACT AND ASSERT
- Assert.False(await hassClient.ConnectAsync(new Uri("ws://localhost:8192/api/websocket"), "TOKEN", false));
+ Assert.False(await hassClient.ConnectAsync(new Uri("ws://localhost:8192/api/websocket"), "TOKEN", false).ConfigureAwait(false));
}
[Fact]
@@ -348,7 +348,7 @@ public async Task ConnectWithAuthFailLogsErrorAndReturnFalse()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// First message from Home Assistant is auth required
mock.AddResponse(@"{""type"": ""auth_required""}");
@@ -357,7 +357,7 @@ public async Task ConnectWithAuthFailLogsErrorAndReturnFalse()
// ACT and ASSERT
// Calls connect without getting the states initially
- Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false));
+ Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false).ConfigureAwait(false));
// Make sure we logged the error.
mock.Logger.AssertLogged(LogLevel.Error, Times.AtLeastOnce());
}
@@ -368,7 +368,7 @@ public async Task ConnectWithoutSslShouldStartWithWs()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client and we add no response messages
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// First message from Home Assistant is auth required
mock.AddResponse(@"{""type"": ""auth_required""}");
// Next one we fake it is auth ok
@@ -376,7 +376,7 @@ public async Task ConnectWithoutSslShouldStartWithWs()
// ACT and ASSERT
// Connect without ssl
- await hassClient.ConnectAsync("localhost", 8123, false, "FAKETOKEN", false);
+ await hassClient.ConnectAsync("localhost", 8123, false, "FAKETOKEN", false).ConfigureAwait(false);
mock.Verify(
n => n.ConnectAsync(new Uri("ws://localhost:8123/api/websocket"), It.IsAny()),
@@ -389,7 +389,7 @@ public async Task ConnectWithSslShouldStartWithWss()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client and we add no response messages
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// First message from Home Assistant is auth required
mock.AddResponse(@"{""type"": ""auth_required""}");
// Next one we fake it is auth ok
@@ -397,7 +397,7 @@ public async Task ConnectWithSslShouldStartWithWss()
// ACT and ASSERT
// Connect with ssl
- await hassClient.ConnectAsync("localhost", 8123, true, "FAKETOKEN", false);
+ await hassClient.ConnectAsync("localhost", 8123, true, "FAKETOKEN", false).ConfigureAwait(false);
mock.Verify(
n => n.ConnectAsync(new Uri("wss://localhost:8123/api/websocket"), It.IsAny()),
@@ -410,10 +410,10 @@ public async Task ConnectWithUriNullThrowsArgumentNullException()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client and we add no response messages
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
await Assert.ThrowsAsync(
- async () => await hassClient.ConnectAsync(null, "lss", false));
+ async () => await hassClient.ConnectAsync(null, "lss", false).ConfigureAwait(false)).ConfigureAwait(false);
}
[Fact]
@@ -422,13 +422,13 @@ public async Task CustomEventShouldHaveCorrectObject()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Add the service message fake , check service_event.json for reference
mock.AddResponse(HassWebSocketMock.CustomEventMessage);
// ACT
- var result = await hassClient.ReadEventAsync();
+ var result = await hassClient.ReadEventAsync().ConfigureAwait(false);
var customEvent = result?.Data;
@@ -445,13 +445,13 @@ public async Task EventWithStateBooleanShouldHaveCorrectTypeAndValue()
{
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Add response event message, see event.json as reference
mock.AddResponse(HassWebSocketMock.EventMessageBoolean);
// ACT
- HassEvent eventMsg = await hassClient.ReadEventAsync();
+ HassEvent eventMsg = await hassClient.ReadEventAsync().ConfigureAwait(false);
var stateMessage = eventMsg.Data as HassStateChangedEventData;
@@ -464,13 +464,13 @@ public async Task EventWithStateDoubleShouldHaveCorrectTypeAndValue()
{
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Add response event message, see event.json as reference
mock.AddResponse(HassWebSocketMock.EventMessageDouble);
// ACT
- HassEvent eventMsg = await hassClient.ReadEventAsync();
+ HassEvent eventMsg = await hassClient.ReadEventAsync().ConfigureAwait(false);
var stateMessage = eventMsg.Data as HassStateChangedEventData;
@@ -484,13 +484,13 @@ public async Task EventWithStateIntegerShouldHaveCorrectTypeAndValue()
{
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Add response event message, see event.json as reference
mock.AddResponse(HassWebSocketMock.EventMessageInteger);
// ACT
- HassEvent eventMsg = await hassClient.ReadEventAsync();
+ HassEvent eventMsg = await hassClient.ReadEventAsync().ConfigureAwait(false);
var stateMessage = eventMsg.Data as HassStateChangedEventData;
@@ -504,7 +504,7 @@ public async Task GetConfigGetUnexpectedMessageThrowsException()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT
var getConfigTask = hassClient.GetConfig();
@@ -512,7 +512,7 @@ public async Task GetConfigGetUnexpectedMessageThrowsException()
// Fake return not expected message, check result_config.json for reference
mock.AddResponse(@"{""id"": 2,""type"": ""result"", ""success"": true}");
- await Assert.ThrowsAsync(async () => await getConfigTask);
+ await Assert.ThrowsAsync(async () => await getConfigTask.ConfigureAwait(false));
}
[Fact]
@@ -521,7 +521,7 @@ public async Task GetConfigGetUnexpectedResultThrowsException()
// ARRANGE
var mock = new HassWebSocketMock();
var mockHassClient =
- new Mock(mock.Logger.LoggerFactory,
+ new Mock(mock.Logger.LoggerFactory, new TransportPipelineFactoryMock().Object,
mock.WebSocketMockFactory.Object, null);
@@ -532,7 +532,7 @@ public async Task GetConfigGetUnexpectedResultThrowsException()
// Next one we fake it is auth ok
mock.AddResponse(@"{""type"": ""auth_ok""}");
- await mockHassClient.Object.ConnectAsync(new Uri("http://192.168.1.1"), "token", false);
+ await mockHassClient.Object.ConnectAsync(new Uri("http://192.168.1.1"), "token", false).ConfigureAwait(false);
mockHassClient.Setup(n =>
n.SendCommandAndWaitForResponse(
@@ -548,7 +548,7 @@ public async Task GetConfigGetUnexpectedResultThrowsException()
// ACT AND ASSERT
var getConfigTask = mockHassClient.Object.GetConfig();
- await Assert.ThrowsAsync(async () => await getConfigTask);
+ await Assert.ThrowsAsync(async () => await getConfigTask.ConfigureAwait(false));
}
[Fact]
@@ -557,12 +557,12 @@ public async Task NoPongMessagePingShouldReturnFalse()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// No pong message is sent from server...
// ACT and ASSERT
- Assert.False(await hassClient.PingAsync(2));
+ Assert.False(await hassClient.PingAsync(2).ConfigureAwait(false));
}
[Fact]
@@ -571,13 +571,13 @@ public async Task PingShouldReturnTrue()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient();
// Fake return pong message
mock.AddResponse(@"{""type"": ""pong""}");
// ACT and ASSERT
- Assert.True(await hassClient.PingAsync(1000));
+ Assert.True(await hassClient.PingAsync(1000).ConfigureAwait(false));
}
[Fact]
@@ -586,7 +586,7 @@ public async Task ReceiveAsyncThrowsExceptionProcessMessageShouldHandleException
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
mock.Setup(x => x.ReceiveAsync(It.IsAny>(), It.IsAny()))
.Returns((Memory buffer, CancellationToken token) =>
@@ -596,7 +596,7 @@ public async Task ReceiveAsyncThrowsExceptionProcessMessageShouldHandleException
// ACT AND ASSERT
-
+ var subscribeTask = hassClient.SubscribeToEvents();
// Service call successful
mock.AddResponse(@"{
@@ -611,8 +611,8 @@ public async Task ReceiveAsyncThrowsExceptionProcessMessageShouldHandleException
}
}
}");
- var subscribeTask = await hassClient.SubscribeToEvents();
+ await subscribeTask.ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Error, Times.Once());
}
@@ -622,7 +622,7 @@ public async Task ReturningStatesTheCountShouldBeNineteen()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the non connected hass client
- var hassClient = mock.GetHassClientNotConnected();
+ await using var hassClient = mock.GetHassClientNotConnected();
hassClient.SocketTimeout = 50000;
// ACT
@@ -630,11 +630,11 @@ public async Task ReturningStatesTheCountShouldBeNineteen()
var connectTask = hassClient.ConnectAsync(new Uri("ws://localhost:8192/api/websocket"), "TOKEN");
// Wait until hassclient processes connect sequence
- await mock.WaitUntilConnected();
+ await mock.WaitUntilConnected().ConfigureAwait(false);
// Fake return states message
mock.AddResponse(HassWebSocketMock.StateMessage);
- await connectTask;
+ await connectTask.ConfigureAwait(false);
// ASSERT
Assert.Equal(19, hassClient.States.Count);
@@ -646,11 +646,11 @@ public async Task SendingUnknownMessageShouldDiscardAndLogDebug()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
- hassClient.SendMessage(new UnknownCommand());
+ await hassClient.SendMessage(new UnknownCommand()).ConfigureAwait(false);
hassClient.SocketTimeout = 20;
- await hassClient.CallService("test", "test", null);
+ await hassClient.CallService("test", "test", null).ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Error, Times.Once());
}
@@ -660,7 +660,7 @@ public async Task SendMessageFailShouldThrowException()
// ARRANGE
var mock = new HassWebSocketMock();
var mockHassClient =
- new Mock(mock.Logger.LoggerFactory,
+ new Mock(mock.Logger.LoggerFactory, new TransportPipelineFactoryMock().Object,
mock.WebSocketMockFactory.Object, null);
@@ -671,13 +671,13 @@ public async Task SendMessageFailShouldThrowException()
// Next one we fake it is auth ok
mock.AddResponse(@"{""type"": ""auth_ok""}");
- await mockHassClient.Object.ConnectAsync(new Uri("http://192.168.1.1"), "token", false);
+ await mockHassClient.Object.ConnectAsync(new Uri("http://192.168.1.1"), "token", false).ConfigureAwait(false);
mockHassClient.Setup(n => n.SendMessage(It.IsAny(),
- It.IsAny())).Returns(false);
+ It.IsAny())).ThrowsAsync(new ApplicationException("Hello"));
// ACT AND ASSERT
await Assert.ThrowsAsync(async () =>
- await mockHassClient.Object.CallService("light", "turn_on", null));
+ await mockHassClient.Object.CallService("light", "turn_on", null).ConfigureAwait(false)).ConfigureAwait(false);
}
[Fact]
@@ -686,13 +686,13 @@ public async Task ServiceEventShouldHaveCorrectObject()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// Add the service message fake , check service_event.json for reference
mock.AddResponse(HassWebSocketMock.ServiceMessage);
// ACT
- var result = await hassClient.ReadEventAsync();
+ var result = await hassClient.ReadEventAsync().ConfigureAwait(false);
var serviceEvent = result?.Data as HassServiceEventData;
JsonElement? c = serviceEvent?.ServiceData?.GetProperty("entity_id");
@@ -711,7 +711,7 @@ public async Task GetServiceShouldHaveCorrectObject()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
var task = hassClient.GetServices();
// Add the service message fake , check service_event.json for reference
@@ -719,7 +719,7 @@ public async Task GetServiceShouldHaveCorrectObject()
// ACT
// HassEvent eventMsg = await hassClient.ReadEventAsync();
- var result = await task;
+ var result = await task.ConfigureAwait(false);
var first = result.FirstOrDefault();
@@ -743,18 +743,18 @@ public async Task SubscribeToEventsReturnsCorrectEvent()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
var subscribeTask = hassClient.SubscribeToEvents();
// Add result success
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": true, ""result"": null}");
- await subscribeTask;
+ await subscribeTask.ConfigureAwait(false);
// Add response event message, see event.json as reference
mock.AddResponse(HassWebSocketMock.EventMessage);
// ACT
- HassEvent eventMsg = await hassClient.ReadEventAsync();
+ HassEvent eventMsg = await hassClient.ReadEventAsync().ConfigureAwait(false);
// ASSERT, object multiple assertions
Assert.NotNull(eventMsg);
@@ -795,7 +795,7 @@ public async Task SubscribeToEventsReturnsTrue()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
// ACT
var subscribeTask = hassClient.SubscribeToEvents();
@@ -803,7 +803,7 @@ public async Task SubscribeToEventsReturnsTrue()
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": true, ""result"": null}");
// ASSERT
- Assert.True(await subscribeTask);
+ Assert.True(await subscribeTask.ConfigureAwait(false));
}
[Fact]
@@ -812,13 +812,13 @@ public async Task UnsupportedCommandMessageShouldBeLogged()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
- hassClient.SendMessage(new UnknownCommand());
+ await hassClient.SendMessage(new UnknownCommand()).ConfigureAwait(false);
//UnknownCommand
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": true, ""result"": null}");
- await Task.Delay(20);
+ await Task.Delay(20).ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Error, Times.Once());
}
@@ -829,12 +829,12 @@ public async Task ErrorCommandMessageShouldBeLogged()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
- hassClient.SendMessage(new CallServiceCommand { Domain = "light", Service = "some_service" });
+ await hassClient.SendMessage(new CallServiceCommand { Domain = "light", Service = "some_service" }).ConfigureAwait(false);
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": false, ""result"": null, ""error"":{""code"": ""no_service"", ""message"": ""message""}}");
- await Task.Delay(20);
+ await Task.Delay(20).ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Warning, Times.Once());
}
@@ -845,12 +845,12 @@ public async Task ErrorCommandMessageCodeNonStringShouldBeLogged()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
- hassClient.SendMessage(new CallServiceCommand { Domain = "light", Service = "some_service" });
+ await hassClient.SendMessage(new CallServiceCommand { Domain = "light", Service = "some_service" }).ConfigureAwait(false);
mock.AddResponse(@"{""id"": 2, ""type"": ""result"", ""success"": false, ""result"": null, ""error"":{""code"": 20, ""message"": ""message""}}");
- await Task.Delay(20);
+ await Task.Delay(20).ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Warning, Times.Once());
}
@@ -862,10 +862,10 @@ public async Task UnsupportedMessageReceivedShouldBeDebugLogged()
var mock = new HassWebSocketMock();
// Don“t remove, the client does stuff in the background while delay
// ReSharper disable once UnusedVariable
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
mock.AddResponse(@"{""type"": ""unknown""}");
- await Task.Delay(5);
+ await Task.Delay(5).ConfigureAwait(false);
mock.Logger.AssertLogged(LogLevel.Debug, Times.AtLeast(1));
}
@@ -874,17 +874,19 @@ public async Task WhenFactoryReturnsNullWebsocketReturnsFalseAndLogsError()
{
// ARRANGE
var websocketFactoryMock = new Mock();
+ var pipeMock = new TransportPipelineFactoryMock();
+
websocketFactoryMock.Setup(n => n.New()).Returns(() => null);
var loggerMock = new LoggerMock();
- var hassClient =
- new JoySoftware.HomeAssistant.Client.HassClient(loggerMock.LoggerFactory, websocketFactoryMock.Object,
+ await using var hassClient =
+ new JoySoftware.HomeAssistant.Client.HassClient(loggerMock.LoggerFactory, pipeMock.Object, websocketFactoryMock.Object,
null);
// ACT and ASSERT
// Calls returns false and logs error
- Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false));
+ Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false).ConfigureAwait(false));
loggerMock.AssertLogged(LogLevel.Error, Times.Once());
}
@@ -894,7 +896,7 @@ public async Task WrongMessagesFromHassShouldReturnFalse()
// ARRANGE
var mock = new HassWebSocketMock();
// Get the default state hass client
- var hassClient = mock.GetHassClient();
+ await using var hassClient = mock.GetHassClient();
// First message from Home Assistant is auth required
mock.AddResponse(@"{""type"": ""auth_required""}");
@@ -903,7 +905,7 @@ public async Task WrongMessagesFromHassShouldReturnFalse()
// ACT and ASSERT
// Calls connect without getting the states initially
- Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false));
+ Assert.False(await hassClient.ConnectAsync(new Uri("ws://anyurldoesntmatter.org"), "FAKETOKEN", false).ConfigureAwait(false));
}
[Fact]
@@ -924,9 +926,9 @@ public async Task HttpClientShouldCallCorrectHttpMessageHandler()
}); ;
// Get the default state hass client
- var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object);
+ await using var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object).ConfigureAwait(false);
- await hassClient.SetState("sensor.my_sensor", "new_state", new { attr1 = "hello" });
+ await hassClient.SetState("sensor.my_sensor", "new_state", new { attr1 = "hello" }).ConfigureAwait(false);
// ACT and ASSERT
// Calls connect without getting the states initially
@@ -962,9 +964,9 @@ public async Task SetStateNonSuccessHttpResponseCodeReturnNull()
});
// Get the default state hass client
- var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object);
+ await using var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object).ConfigureAwait(false);
- var result = await hassClient.SetState("sensor.my_sensor", "new_state", new { attr1 = "hello" });
+ var result = await hassClient.SetState("sensor.my_sensor", "new_state", new { attr1 = "hello" }).ConfigureAwait(false);
// ACT and ASSERT
Assert.Null(result);
@@ -1001,9 +1003,9 @@ public async Task SendEventHttpClientShouldCallCorrectHttpMessageHandler()
}); ;
// Get the default state hass client
- var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object);
+ await using var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object).ConfigureAwait(false);
- await hassClient.SendEvent("test_event", new { custom_data = "hello" });
+ await hassClient.SendEvent("test_event", new { custom_data = "hello" }).ConfigureAwait(false);
// ACT and ASSERT
// Calls connect without getting the states initially
@@ -1038,9 +1040,9 @@ public async Task SendEventFaileHttpClientShouldReturnFalse()
}); ;
// Get the default state hass client
- var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object);
+ await using var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object).ConfigureAwait(false);
- var result = await hassClient.SendEvent("test_event", new { custom_data = "hello" });
+ var result = await hassClient.SendEvent("test_event", new { custom_data = "hello" }).ConfigureAwait(false);
Assert.False(result);
}
@@ -1063,9 +1065,9 @@ public async Task SendEventNoDataHttpClientShouldCallCorrectHttpMessageHandler()
}); ;
// Get the default state hass client
- var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object);
+ await using var hassClient = await mock.GetHassConnectedClient(false, httpMessageHandlerMock.Object).ConfigureAwait(false);
- await hassClient.SendEvent("test_event");
+ await hassClient.SendEvent("test_event").ConfigureAwait(false);
// ACT and ASSERT
// Calls connect without getting the states initially
@@ -1087,10 +1089,10 @@ public async Task ReadEventShouldCancel()
{
var mock = new HassWebSocketMock();
// Get the connected hass client
- var hassClient = await mock.GetHassConnectedClient();
+ await using var hassClient = await mock.GetHassConnectedClient().ConfigureAwait(false);
var cancelSoon = new CancellationTokenSource(50);
// ACT & ASSERT
- await Assert.ThrowsAsync(async () => await hassClient.ReadEventAsync(cancelSoon.Token));
+ await Assert.ThrowsAsync(async () => await hassClient.ReadEventAsync(cancelSoon.Token).ConfigureAwait(false)).ConfigureAwait(false);
}
}
}
\ No newline at end of file
diff --git a/tests/HassClient.Unit.Tests/HassWebSocketMock.cs b/tests/HassClient.Unit.Tests/HassWebSocketMock.cs
index 01b3629..55479af 100644
--- a/tests/HassClient.Unit.Tests/HassWebSocketMock.cs
+++ b/tests/HassClient.Unit.Tests/HassWebSocketMock.cs
@@ -139,7 +139,10 @@ public void AddResponse(string message)
public JoySoftware.HomeAssistant.Client.HassClient GetHassClient(HttpMessageHandler httpMessageHandler = null)
{
- return new JoySoftware.HomeAssistant.Client.HassClient(Logger.LoggerFactory, WebSocketMockFactory.Object, httpMessageHandler ?? null);
+ return new JoySoftware.HomeAssistant.Client.HassClient(
+ Logger.LoggerFactory,
+ new WebSocketMessagePipelineFactory(),
+ WebSocketMockFactory.Object, httpMessageHandler ?? null);
}
///
diff --git a/tests/HassClient.Unit.Tests/TransportPipelineMock.cs b/tests/HassClient.Unit.Tests/TransportPipelineMock.cs
new file mode 100644
index 0000000..9e91893
--- /dev/null
+++ b/tests/HassClient.Unit.Tests/TransportPipelineMock.cs
@@ -0,0 +1,25 @@
+using JoySoftware.HomeAssistant.Client;
+using Moq;
+
+namespace HassClient.Unit.Tests
+{
+ public class TransportPipelineMock : Mock>
+ {
+
+
+ }
+
+ public class TransportPipelineFactoryMock : Mock>
+ {
+ TransportPipelineMock pipeline = new TransportPipelineMock();
+ HassWebSocketMock wsMock = new HassWebSocketMock();
+
+ LoggerMock loggerMock = new LoggerMock();
+ public TransportPipelineFactoryMock(IClientWebSocket client = null)
+ {
+ client = client ?? wsMock.Object;
+ Setup(n => n.CreateWebSocketMessagePipeline(client, loggerMock.LoggerFactory));
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/tests/HassClient.Unit.Tests/WebSocketMessagePipelineTests.cs b/tests/HassClient.Unit.Tests/WebSocketMessagePipelineTests.cs
new file mode 100644
index 0000000..4a5b78f
--- /dev/null
+++ b/tests/HassClient.Unit.Tests/WebSocketMessagePipelineTests.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Net.WebSockets;
+using System.Text;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using JoySoftware.HomeAssistant.Client;
+using Microsoft.Extensions.Logging;
+using Moq;
+using Moq.Protected;
+using Xunit;
+
+namespace HassClient.Unit.Tests
+{
+ public class WebSocketMessagePipelineTests
+ {
+ [Fact]
+ public async Task WrongMessagesFromHassShouldReturnFalse()
+ {
+ // ARRANGE
+ var mock = new HassWebSocketMock();
+
+ // First message from Home Assistant is auth required
+ mock.AddResponse(@"{""type"": ""any_kind_of_type""}");
+
+ await using var pipe = new WebSocketMessagePipeline(mock.Object);
+
+ // ACT
+ var msg = await pipe.GetNextMessageAsync(CancellationToken.None);
+
+ Assert.Equal("any_kind_of_type", msg.Type);
+ }
+
+ }
+}
\ No newline at end of file