diff --git a/src/HassModel/NetDaemon.HassModel.Tests/Internal/TriggerManagerTest.cs b/src/HassModel/NetDaemon.HassModel.Tests/Internal/TriggerManagerTest.cs index 689545f6e..39e6453fc 100644 --- a/src/HassModel/NetDaemon.HassModel.Tests/Internal/TriggerManagerTest.cs +++ b/src/HassModel/NetDaemon.HassModel.Tests/Internal/TriggerManagerTest.cs @@ -1,4 +1,6 @@ -using System.Reactive.Subjects; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Reactive.Threading.Tasks; using System.Text.Json; using System.Threading; using Microsoft.Extensions.DependencyInjection; @@ -57,61 +59,68 @@ private ServiceProvider CreateServiceProvider() [Fact] - public void RegisterTrigger() + public async Task RegisterTrigger() { - var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock(); + var incomingTriggersTask = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc(); var message = new { }.AsJsonElement(); _messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable() {TriggerElement = message }}}); - + // Assert - incomingTriggersMock.Verify(e => e.OnNext(message)); + await incomingTriggersTask.Should() + .CompleteWithinAsync(TimeSpan.FromSeconds(1), "the message should have arrived by now") + .WithResult(message); } [Fact] - public async void NoMoreTriggersAfterDispose() + public async Task NoMoreTriggersAfterDispose() { // Act - var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock(); + var incomingTriggersTask = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc(); await ((IAsyncDisposable)_triggerManager).DisposeAsync().ConfigureAwait(false); // Assert, Dispose should unsubscribe with HA AND stop any messages that do pass - _hassConnectionMock.Verify(m => m.SendCommandAndReturnHassMessageResponseAsync( - new UnsubscribeEventsCommand(nextMessageId), It.IsAny())); - + _messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable() {TriggerElement = new JsonElement() }}}); - - incomingTriggersMock.VerifyNoOtherCalls(); + + await incomingTriggersTask.Should() + .NotCompleteWithinAsync(TimeSpan.FromSeconds(1), "no messages should arrive after being disposed"); + + _hassConnectionMock.Verify(m => m.SendCommandAndReturnHassMessageResponseAsync( + new UnsubscribeEventsCommand(nextMessageId), It.IsAny())); } [Fact] - public void RegisterTriggerCorrectMessagesPerSubscription() + public async Task RegisterTriggerCorrectMessagesPerSubscription() { nextMessageId = 6; - var incomingTriggersMock6 = _triggerManager.RegisterTrigger(new {}).SubscribeMock(); + var incomingTriggersTask6 = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc(); nextMessageId = 7; - var incomingTriggersMock7 = _triggerManager.RegisterTrigger(new {}).SubscribeMock(); + var incomingTriggersTask7 = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc(); var message6 = new { tag = "six" }.AsJsonElement(); var message7 = new { tag = "seven" }.AsJsonElement(); - // Assert _messageSubject.OnNext(new HassMessage{Id = 6, Event = new HassEvent(){Variables = new HassVariable() {TriggerElement = message6 }}}); _messageSubject.OnNext(new HassMessage{Id = 7, Event = new HassEvent(){Variables = new HassVariable() {TriggerElement = message7 }}}); - - incomingTriggersMock6.Verify(e => e.OnNext(message6), Times.Once); - incomingTriggersMock7.Verify(e => e.OnNext(message7), Times.Once); + + // Assert + await incomingTriggersTask6.Should() + .CompleteWithinAsync(TimeSpan.FromSeconds(1), $"{nameof(message6)} should have arrived by now") + .WithResult(message6); + + await incomingTriggersTask7.Should() + .CompleteWithinAsync(TimeSpan.FromSeconds(1), $"{nameof(message7)} should have arrived by now") + .WithResult(message7); } - - } \ No newline at end of file diff --git a/src/HassModel/NetDaemon.HassModel.Tests/TestHelpers/Extensions.cs b/src/HassModel/NetDaemon.HassModel.Tests/TestHelpers/Extensions.cs index c25386e4f..b1ce2e0a3 100644 --- a/src/HassModel/NetDaemon.HassModel.Tests/TestHelpers/Extensions.cs +++ b/src/HassModel/NetDaemon.HassModel.Tests/TestHelpers/Extensions.cs @@ -1,6 +1,7 @@ using System.Linq.Expressions; using System.Text; using System.Text.Json; +using System.Threading; namespace NetDaemon.HassModel.Tests.TestHelpers; @@ -29,6 +30,8 @@ public static Mock> SubscribeMock(this IObservable observable return observerMock; } + public static Func ToFunc(this T input) => () => input; + public static async Task WaitForInvocationAndVerify(this Mock mock, Expression> expression) where T : class { diff --git a/src/HassModel/NetDeamon.HassModel/DependencyInjectionSetup.cs b/src/HassModel/NetDeamon.HassModel/DependencyInjectionSetup.cs index 88264ca4b..ad92dec3b 100644 --- a/src/HassModel/NetDeamon.HassModel/DependencyInjectionSetup.cs +++ b/src/HassModel/NetDeamon.HassModel/DependencyInjectionSetup.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.Hosting; +using NetDaemon.Client.Internal.HomeAssistant.Commands; using NetDaemon.Infrastructure.ObservableHelpers; namespace NetDaemon.HassModel; @@ -30,6 +31,7 @@ internal static void AddScopedHaContext(this IServiceCollection services) services.AddTransient(s => s.GetRequiredService()); services.AddScoped>(); services.AddScoped>(s => s.GetRequiredService>()); - services.AddTransient(); + services.AddScoped(); + services.AddTransient(s => s.GetRequiredService()); } } diff --git a/src/HassModel/NetDeamon.HassModel/Internal/TriggerManager.cs b/src/HassModel/NetDeamon.HassModel/Internal/TriggerManager.cs index 89b831e87..3d5cd00b2 100644 --- a/src/HassModel/NetDeamon.HassModel/Internal/TriggerManager.cs +++ b/src/HassModel/NetDeamon.HassModel/Internal/TriggerManager.cs @@ -1,6 +1,8 @@ using System.Collections.Concurrent; using System.Threading; using NetDaemon.Client.HomeAssistant.Extensions; +using NetDaemon.Client.Internal.HomeAssistant.Commands; +using NetDaemon.Infrastructure.ObservableHelpers; namespace NetDaemon.HassModel.Internal; @@ -12,17 +14,20 @@ namespace NetDaemon.HassModel.Internal; internal class TriggerManager : IAsyncDisposable, ITriggerManager { private readonly IHomeAssistantRunner _runner; + private readonly IQueuedObservable _queuedObservable; private readonly IBackgroundTaskTracker _tracker; - private readonly IHomeAssistantHassMessages _hassMessages; - + private readonly ConcurrentBag<(int id, IDisposable disposable)> _subscriptions = new(); private bool _disposed; - public TriggerManager(IHomeAssistantRunner runner, IBackgroundTaskTracker tracker) + public TriggerManager(IHomeAssistantRunner runner, IBackgroundTaskTracker tracker, ILogger logger) { _runner = runner; _tracker = tracker; - _hassMessages = (IHomeAssistantHassMessages)runner.CurrentConnection!; + + var hassMessages = (IHomeAssistantHassMessages)runner.CurrentConnection!; + _queuedObservable = new QueuedObservable(logger); + _queuedObservable.Initialize(hassMessages.OnHassMessage); } public IObservable RegisterTrigger(object triggerParams) @@ -40,11 +45,10 @@ public IObservable RegisterTrigger(object triggerParams) private async Task SubscribeToTrigger(object triggerParams, Subject subject) { - var message = await _runner.CurrentConnection!.SubscribeToTriggerAsync(triggerParams, CancellationToken.None).ConfigureAwait(false); var id = message.Id; - var subscribtion = _hassMessages.OnHassMessage + var subscribtion = _queuedObservable .Where(m => m.Id == id) .Select(n => n.Event?.Variables?.TriggerElement) .Where(m => m.HasValue) @@ -68,5 +72,6 @@ public async ValueTask DisposeAsync() } await Task.WhenAll(tasks).ConfigureAwait(false); + await _queuedObservable.DisposeAsync().ConfigureAwait(false); } } \ No newline at end of file