Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Text.Json;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -57,61 +59,68 @@ private ServiceProvider CreateServiceProvider()


[Fact]
public void RegisterTrigger()
public async Task RegisterTrigger()
{
var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock();
var incomingTriggersTask = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc();

var message = new { }.AsJsonElement();

_messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message }}});

// Assert
incomingTriggersMock.Verify(e => e.OnNext(message));
await incomingTriggersTask.Should()
.CompleteWithinAsync(TimeSpan.FromSeconds(1), "the message should have arrived by now")
.WithResult(message);
}

[Fact]
public async void NoMoreTriggersAfterDispose()
public async Task NoMoreTriggersAfterDispose()
{
// Act
var incomingTriggersMock = _triggerManager.RegisterTrigger(new {}).SubscribeMock();
var incomingTriggersTask = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc();

await ((IAsyncDisposable)_triggerManager).DisposeAsync().ConfigureAwait(false);

// Assert, Dispose should unsubscribe with HA AND stop any messages that do pass
_hassConnectionMock.Verify(m => m.SendCommandAndReturnHassMessageResponseAsync(
new UnsubscribeEventsCommand(nextMessageId), It.IsAny<CancellationToken>()));


_messageSubject.OnNext(new HassMessage(){Id = nextMessageId, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = new JsonElement() }}});

incomingTriggersMock.VerifyNoOtherCalls();

await incomingTriggersTask.Should()
.NotCompleteWithinAsync(TimeSpan.FromSeconds(1), "no messages should arrive after being disposed");

_hassConnectionMock.Verify(m => m.SendCommandAndReturnHassMessageResponseAsync(
new UnsubscribeEventsCommand(nextMessageId), It.IsAny<CancellationToken>()));
}


[Fact]
public void RegisterTriggerCorrectMessagesPerSubscription()
public async Task RegisterTriggerCorrectMessagesPerSubscription()
{
nextMessageId = 6;
var incomingTriggersMock6 = _triggerManager.RegisterTrigger(new {}).SubscribeMock();
var incomingTriggersTask6 = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc();

nextMessageId = 7;
var incomingTriggersMock7 = _triggerManager.RegisterTrigger(new {}).SubscribeMock();
var incomingTriggersTask7 = _triggerManager.RegisterTrigger(new {}).FirstAsync().ToTask().ToFunc();

var message6 = new { tag = "six" }.AsJsonElement();
var message7 = new { tag = "seven" }.AsJsonElement();

// Assert
_messageSubject.OnNext(new HassMessage{Id = 6, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message6 }}});


_messageSubject.OnNext(new HassMessage{Id = 7, Event = new HassEvent(){Variables = new HassVariable()
{TriggerElement = message7 }}});

incomingTriggersMock6.Verify(e => e.OnNext(message6), Times.Once);
incomingTriggersMock7.Verify(e => e.OnNext(message7), Times.Once);

// Assert
await incomingTriggersTask6.Should()
.CompleteWithinAsync(TimeSpan.FromSeconds(1), $"{nameof(message6)} should have arrived by now")
.WithResult(message6);

await incomingTriggersTask7.Should()
.CompleteWithinAsync(TimeSpan.FromSeconds(1), $"{nameof(message7)} should have arrived by now")
.WithResult(message7);
}


}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Linq.Expressions;
using System.Text;
using System.Text.Json;
using System.Threading;

namespace NetDaemon.HassModel.Tests.TestHelpers;

Expand Down Expand Up @@ -29,6 +30,8 @@ public static Mock<IObserver<T>> SubscribeMock<T>(this IObservable<T> observable
return observerMock;
}

public static Func<T> ToFunc<T>(this T input) => () => input;

public static async Task WaitForInvocationAndVerify<T>(this Mock<T> mock, Expression<Action<T>> expression)
where T : class
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Hosting;
using NetDaemon.Client.Internal.HomeAssistant.Commands;
using NetDaemon.Infrastructure.ObservableHelpers;

namespace NetDaemon.HassModel;
Expand Down Expand Up @@ -30,6 +31,7 @@ internal static void AddScopedHaContext(this IServiceCollection services)
services.AddTransient<IHaContext>(s => s.GetRequiredService<AppScopedHaContextProvider>());
services.AddScoped<QueuedObservable<HassEvent>>();
services.AddScoped<IQueuedObservable<HassEvent>>(s => s.GetRequiredService<QueuedObservable<HassEvent>>());
services.AddTransient<ITriggerManager, TriggerManager>();
services.AddScoped<TriggerManager>();
services.AddTransient<ITriggerManager>(s => s.GetRequiredService<TriggerManager>());
}
}
17 changes: 11 additions & 6 deletions src/HassModel/NetDeamon.HassModel/Internal/TriggerManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Collections.Concurrent;
using System.Threading;
using NetDaemon.Client.HomeAssistant.Extensions;
using NetDaemon.Client.Internal.HomeAssistant.Commands;
using NetDaemon.Infrastructure.ObservableHelpers;

namespace NetDaemon.HassModel.Internal;

Expand All @@ -12,17 +14,20 @@ namespace NetDaemon.HassModel.Internal;
internal class TriggerManager : IAsyncDisposable, ITriggerManager
{
private readonly IHomeAssistantRunner _runner;
private readonly IQueuedObservable<HassMessage> _queuedObservable;
private readonly IBackgroundTaskTracker _tracker;
private readonly IHomeAssistantHassMessages _hassMessages;


private readonly ConcurrentBag<(int id, IDisposable disposable)> _subscriptions = new();
private bool _disposed;

public TriggerManager(IHomeAssistantRunner runner, IBackgroundTaskTracker tracker)
public TriggerManager(IHomeAssistantRunner runner, IBackgroundTaskTracker tracker, ILogger<IHaContext> logger)
{
_runner = runner;
_tracker = tracker;
_hassMessages = (IHomeAssistantHassMessages)runner.CurrentConnection!;

var hassMessages = (IHomeAssistantHassMessages)runner.CurrentConnection!;
_queuedObservable = new QueuedObservable<HassMessage>(logger);
_queuedObservable.Initialize(hassMessages.OnHassMessage);
}

public IObservable<JsonElement> RegisterTrigger(object triggerParams)
Expand All @@ -40,11 +45,10 @@ public IObservable<JsonElement> RegisterTrigger(object triggerParams)

private async Task SubscribeToTrigger(object triggerParams, Subject<JsonElement> subject)
{

var message = await _runner.CurrentConnection!.SubscribeToTriggerAsync(triggerParams, CancellationToken.None).ConfigureAwait(false);
var id = message.Id;

var subscribtion = _hassMessages.OnHassMessage
var subscribtion = _queuedObservable
.Where(m => m.Id == id)
.Select(n => n.Event?.Variables?.TriggerElement)
.Where(m => m.HasValue)
Expand All @@ -68,5 +72,6 @@ public async ValueTask DisposeAsync()
}

await Task.WhenAll(tasks).ConfigureAwait(false);
await _queuedObservable.DisposeAsync().ConfigureAwait(false);
}
}