Skip to content

Commit

Permalink
NDFirstOrTimeout and Delay implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
helto4real committed Dec 31, 2020
1 parent 050ff3d commit 1e7c017
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/App/NetDaemon.App/Common/Reactive/INetDaemonReactive.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using NetDaemon.Common.Fluent;

namespace NetDaemon.Common.Reactive
Expand Down Expand Up @@ -73,6 +74,25 @@ public interface INetDaemonRxApp : INetDaemonAppBase, IRxEntity
/// </summary>
/// <param name="script">Script to call</param>
void RunScript(params string[] script);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
void Delay(TimeSpan timeout);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
/// <param name="token">Token to cancel any delays</param>
void Delay(TimeSpan timeout, CancellationToken token);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ public void CallService(string domain, string service, dynamic? data)
Daemon.CallService(domain, service, data);
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout)
{
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, _cancelTimers.Token).Wait(_cancelTimers.Token);
Logger.LogError("WE REACHED END OF DELAY!");
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout, CancellationToken token)
{
// We combine timer with provided token so we cancel when app is stopped
using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTimers.Token, token);
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, combinedToken.Token).Wait(combinedToken.Token);
Logger.LogError("WE REACHED END OF DELAY2!");
}

/// <summary>
/// Implements the async dispose pattern
/// </summary>
Expand Down Expand Up @@ -226,6 +244,11 @@ public IDisposable RunIn(TimeSpan timespan, Action action)
{
// Do nothing
}
catch (TimeoutException)
{
// Ignore
LogWarning("Timeout Exception thrown in RunIn APP: {app}, please catch it in your code", Id ?? "unknown");
}
catch (Exception e)
{
LogError(e, "Error, RunIn APP: {app}", Id ?? "unknown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,23 @@ public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(thi
public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(this IObservable<(EntityState Old, EntityState New)> observable) => observable
.Timeout(TimeSpan.FromSeconds(5),
Observable.Return((new EntityState() { State = "TimeOut" }, new EntityState() { State = "TimeOut" }))).Take(1);


/// <summary>
/// Returns first occurence or null if timedout
/// </summary>
/// <param name="observable">Extended object</param>
/// <param name="timeout">The time to wait before timeout.</param>
public static (EntityState Old, EntityState New)? NDFirstOrTimeout(this IObservable<(EntityState Old, EntityState New)> observable, TimeSpan timeout)
{
try
{
return observable.Timeout(timeout).Take(1).Wait();
}
catch (TimeoutException)
{
return null;
}
}
}
}
82 changes: 82 additions & 0 deletions tests/NetDaemon.Daemon.Tests/Reactive/RxAppTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System;
using System.Diagnostics;
using System.Dynamic;
using System.Globalization;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NetDaemon.Common;
using NetDaemon.Common.Exceptions;
using NetDaemon.Common.Reactive;
using Xunit;
Expand Down Expand Up @@ -402,6 +405,85 @@ public async Task GetDataShouldReturnCachedValue()
DefaultDataRepositoryMock.Verify(n => n.Save(It.IsAny<string>(), It.IsAny<string>()), Times.Once);
}

[Fact]
public void DelayShouldDelaySyncronyslyWithToken()
{
// ARRANGE
Stopwatch s = new();
using var tokenSource = new CancellationTokenSource();
// ARRANGE
s.Start();
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(50), tokenSource.Token);
s.Stop();
// ASSERT
Assert.NotInRange(s.ElapsedMilliseconds, 0, 49);
}

[Fact]
public void DelayShouldDelaySyncronysly()
{
// ARRANGE
Stopwatch s = new();
s.Start();
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(50));
s.Stop();
// ASSERT
Assert.NotInRange(s.ElapsedMilliseconds, 0, 49);
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectStateChange()
{
// ARRANGE
await InitializeFakeDaemon().ConfigureAwait(false);
(EntityState Old, EntityState New)? result = null;

// ACT
DefaultDaemonRxApp.Entity("binary_sensor.pir")
.StateChanges
.Subscribe(_ => result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(100)));

DefaultHassClientMock.AddChangedEvent("binary_sensor.pir", "off", "on");
await Task.Delay(50).ConfigureAwait(false);
DefaultHassClientMock.AddChangedEvent("binary_sensor.pir2", "on", "off");

await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.NotNull(result);
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectNullOnTimeout()
{
// ARRANGE
await InitializeFakeDaemon().ConfigureAwait(false);
(EntityState Old, EntityState New)? result = null;

// ACT
DefaultDaemonRxApp.Entity("binary_sensor.pir")
.StateChanges
.Subscribe(_ => result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(100)));

DefaultHassClientMock.AddChangedEvent("binary_sensor.pir", "off", "on");

await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.Null(result);
}

[Fact]
public void DelayShouldCancelWithToken()
{
// ARRANGE
using var tokenSource = new CancellationTokenSource(50);
// ACT & ASSERT
Assert.Throws<OperationCanceledException>(() => DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(300), tokenSource.Token));
}

private interface ITestGetService
{
string TestString { get; }
Expand Down

0 comments on commit 1e7c017

Please sign in to comment.