Skip to content

Commit

Permalink
NDFirstOrTimeout and Delay implementation (#271)
Browse files Browse the repository at this point in the history
- NDFirstOrTimeout 
- Delay implementation
  • Loading branch information
helto4real committed Dec 31, 2020
1 parent 050ff3d commit 03758bc
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/App/NetDaemon.App/Common/Reactive/INetDaemonReactive.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using NetDaemon.Common.Fluent;

namespace NetDaemon.Common.Reactive
Expand Down Expand Up @@ -73,6 +74,25 @@ public interface INetDaemonRxApp : INetDaemonAppBase, IRxEntity
/// </summary>
/// <param name="script">Script to call</param>
void RunScript(params string[] script);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
void Delay(TimeSpan timeout);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
/// <param name="token">Token to cancel any delays</param>
void Delay(TimeSpan timeout, CancellationToken token);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ public void CallService(string domain, string service, dynamic? data)
Daemon.CallService(domain, service, data);
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout)
{
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, _cancelTimers.Token).Wait(_cancelTimers.Token);
Logger.LogError("WE REACHED END OF DELAY!");
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout, CancellationToken token)
{
// We combine timer with provided token so we cancel when app is stopped
using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTimers.Token, token);
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, combinedToken.Token).Wait(combinedToken.Token);
Logger.LogError("WE REACHED END OF DELAY2!");
}

/// <summary>
/// Implements the async dispose pattern
/// </summary>
Expand Down Expand Up @@ -226,6 +244,11 @@ public IDisposable RunIn(TimeSpan timespan, Action action)
{
// Do nothing
}
catch (TimeoutException)
{
// Ignore
LogWarning("Timeout Exception thrown in RunIn APP: {app}, please catch it in your code", Id ?? "unknown");
}
catch (Exception e)
{
LogError(e, "Error, RunIn APP: {app}", Id ?? "unknown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,23 @@ public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(thi
public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(this IObservable<(EntityState Old, EntityState New)> observable) => observable
.Timeout(TimeSpan.FromSeconds(5),
Observable.Return((new EntityState() { State = "TimeOut" }, new EntityState() { State = "TimeOut" }))).Take(1);


/// <summary>
/// Returns first occurence or null if timedout
/// </summary>
/// <param name="observable">Extended object</param>
/// <param name="timeout">The time to wait before timeout.</param>
public static (EntityState Old, EntityState New)? NDFirstOrTimeout(this IObservable<(EntityState Old, EntityState New)> observable, TimeSpan timeout)
{
try
{
return observable.Timeout(timeout).Take(1).Wait();
}
catch (TimeoutException)
{
return null;
}
}
}
}
89 changes: 89 additions & 0 deletions tests/NetDaemon.Daemon.Tests/Reactive/RxAppTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Dynamic;
using System.Globalization;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NetDaemon.Common;
using NetDaemon.Common.Exceptions;
using NetDaemon.Common.Reactive;
using Xunit;
Expand Down Expand Up @@ -402,6 +406,91 @@ public async Task GetDataShouldReturnCachedValue()
DefaultDataRepositoryMock.Verify(n => n.Save(It.IsAny<string>(), It.IsAny<string>()), Times.Once);
}

[Fact]
public void DelayShouldDelaySyncronyslyWithToken()
{
// ARRANGE
var startTime = DateTime.Now;
using var tokenSource = new CancellationTokenSource();
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(100), tokenSource.Token);
// ASSERT
// Compensate that windows resolution is 15ms for system clock
bool isAfterTimeout = DateTime.Now.Subtract(startTime).TotalMilliseconds >= 84;
Assert.True(isAfterTimeout);
}

[Fact]
public void DelayShouldDelaySyncronysly()
{
// ARRANGE
var startTime = DateTime.Now;
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(100));
// Compensate that windows resolution is 15ms for system clock
bool isAfterTimeout = DateTime.Now.Subtract(startTime).TotalMilliseconds >= 84;
// ASSERT
Assert.True(isAfterTimeout);
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectStateChange()
{
// ARRANGE
using var waitFor = new CancellationTokenSource(300);

await InitializeFakeDaemon().ConfigureAwait(false);
var task = Task.Run(async () =>
{
await Task.Delay(20).ConfigureAwait(false);
DefaultHassClientMock.AddChangedEvent("binary_sensor.pir2", "on", "off");
});
var result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(300));
await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.NotNull(result);
}

[SuppressMessage("", "CA1031")]
private static async Task WaitForTimeout(int ms, CancellationToken token)
{
try
{
await Task.Delay(ms, token).ConfigureAwait(false);
}
catch { } // Ignor error
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectNullOnTimeout()
{
// ARRANGE
await InitializeFakeDaemon().ConfigureAwait(false);
(EntityState Old, EntityState New)? result = null;

// ACT
DefaultDaemonRxApp.Entity("binary_sensor.pir")
.StateChanges
.Subscribe(_ => result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(100)));

DefaultHassClientMock.AddChangedEvent("binary_sensor.pir", "off", "on");

await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.Null(result);
}

[Fact]
public void DelayShouldCancelWithToken()
{
// ARRANGE
using var tokenSource = new CancellationTokenSource(50);
// ACT & ASSERT
Assert.Throws<OperationCanceledException>(() => DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(300), tokenSource.Token));
}

private interface ITestGetService
{
string TestString { get; }
Expand Down
2 changes: 1 addition & 1 deletion tests/NetDaemon.Daemon.Tests/SchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ await using (IScheduler scheduler = new Scheduler(mockTimeManager.Object))

// ASSERT
Assert.True(nrOfRuns == 0);
await Task.Delay(800).ConfigureAwait(false);
await Task.Delay(1000).ConfigureAwait(false);
Assert.True(nrOfRuns >= 1);
}

Expand Down

0 comments on commit 03758bc

Please sign in to comment.