Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDFirstOrTimeout and Delay implementation #271

Merged
merged 9 commits into from
Dec 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/App/NetDaemon.App/Common/Reactive/INetDaemonReactive.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using NetDaemon.Common.Fluent;

namespace NetDaemon.Common.Reactive
Expand Down Expand Up @@ -73,6 +74,25 @@ public interface INetDaemonRxApp : INetDaemonAppBase, IRxEntity
/// </summary>
/// <param name="script">Script to call</param>
void RunScript(params string[] script);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
void Delay(TimeSpan timeout);

/// <summary>
/// Delays timeout time
/// </summary>
/// <remarks>
/// When the app stops it will cancel wait with OperationanceledException
/// </remarks>
/// <param name="timeout">Time to delay execution</param>
/// <param name="token">Token to cancel any delays</param>
void Delay(TimeSpan timeout, CancellationToken token);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ public void CallService(string domain, string service, dynamic? data)
Daemon.CallService(domain, service, data);
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout)
{
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, _cancelTimers.Token).Wait(_cancelTimers.Token);
Logger.LogError("WE REACHED END OF DELAY!");
}

/// <inheritdoc/>
public void Delay(TimeSpan timeout, CancellationToken token)
{
// We combine timer with provided token so we cancel when app is stopped
using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTimers.Token, token);
// We use Task.Delay instead of Thread.Sleep so we can stop timers on cancellation tokens
Task.Delay(timeout, combinedToken.Token).Wait(combinedToken.Token);
Logger.LogError("WE REACHED END OF DELAY2!");
}

/// <summary>
/// Implements the async dispose pattern
/// </summary>
Expand Down Expand Up @@ -226,6 +244,11 @@ public IDisposable RunIn(TimeSpan timespan, Action action)
{
// Do nothing
}
catch (TimeoutException)
{
// Ignore
LogWarning("Timeout Exception thrown in RunIn APP: {app}, please catch it in your code", Id ?? "unknown");
}
catch (Exception e)
{
LogError(e, "Error, RunIn APP: {app}", Id ?? "unknown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,23 @@ public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(thi
public static IObservable<(EntityState Old, EntityState New)> NDWaitForState(this IObservable<(EntityState Old, EntityState New)> observable) => observable
.Timeout(TimeSpan.FromSeconds(5),
Observable.Return((new EntityState() { State = "TimeOut" }, new EntityState() { State = "TimeOut" }))).Take(1);


/// <summary>
/// Returns first occurence or null if timedout
/// </summary>
/// <param name="observable">Extended object</param>
/// <param name="timeout">The time to wait before timeout.</param>
public static (EntityState Old, EntityState New)? NDFirstOrTimeout(this IObservable<(EntityState Old, EntityState New)> observable, TimeSpan timeout)
{
try
{
return observable.Timeout(timeout).Take(1).Wait();
}
catch (TimeoutException)
{
return null;
}
}
}
}
89 changes: 89 additions & 0 deletions tests/NetDaemon.Daemon.Tests/Reactive/RxAppTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Dynamic;
using System.Globalization;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using NetDaemon.Common;
using NetDaemon.Common.Exceptions;
using NetDaemon.Common.Reactive;
using Xunit;
Expand Down Expand Up @@ -402,6 +406,91 @@ public async Task GetDataShouldReturnCachedValue()
DefaultDataRepositoryMock.Verify(n => n.Save(It.IsAny<string>(), It.IsAny<string>()), Times.Once);
}

[Fact]
public void DelayShouldDelaySyncronyslyWithToken()
{
// ARRANGE
var startTime = DateTime.Now;
using var tokenSource = new CancellationTokenSource();
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(100), tokenSource.Token);
// ASSERT
// Compensate that windows resolution is 15ms for system clock
bool isAfterTimeout = DateTime.Now.Subtract(startTime).TotalMilliseconds >= 84;
Assert.True(isAfterTimeout);
}

[Fact]
public void DelayShouldDelaySyncronysly()
{
// ARRANGE
var startTime = DateTime.Now;
// ACT
DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(100));
// Compensate that windows resolution is 15ms for system clock
bool isAfterTimeout = DateTime.Now.Subtract(startTime).TotalMilliseconds >= 84;
// ASSERT
Assert.True(isAfterTimeout);
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectStateChange()
{
// ARRANGE
using var waitFor = new CancellationTokenSource(300);

await InitializeFakeDaemon().ConfigureAwait(false);
var task = Task.Run(async () =>
{
await Task.Delay(20).ConfigureAwait(false);
DefaultHassClientMock.AddChangedEvent("binary_sensor.pir2", "on", "off");
});
var result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(300));
await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.NotNull(result);
}

[SuppressMessage("", "CA1031")]
private static async Task WaitForTimeout(int ms, CancellationToken token)
{
try
{
await Task.Delay(ms, token).ConfigureAwait(false);
}
catch { } // Ignor error
}

[Fact]
public async Task NDFirstOrTimeOutShouldReturnCorrectNullOnTimeout()
{
// ARRANGE
await InitializeFakeDaemon().ConfigureAwait(false);
(EntityState Old, EntityState New)? result = null;

// ACT
DefaultDaemonRxApp.Entity("binary_sensor.pir")
.StateChanges
.Subscribe(_ => result = DefaultDaemonRxApp.Entity("binary_sensor.pir2").StateChanges.NDFirstOrTimeout(TimeSpan.FromMilliseconds(100)));

DefaultHassClientMock.AddChangedEvent("binary_sensor.pir", "off", "on");

await RunFakeDaemonUntilTimeout().ConfigureAwait(false);

// ASSERT
Assert.Null(result);
}

[Fact]
public void DelayShouldCancelWithToken()
{
// ARRANGE
using var tokenSource = new CancellationTokenSource(50);
// ACT & ASSERT
Assert.Throws<OperationCanceledException>(() => DefaultDaemonRxApp.Delay(TimeSpan.FromMilliseconds(300), tokenSource.Token));
}

private interface ITestGetService
{
string TestString { get; }
Expand Down
2 changes: 1 addition & 1 deletion tests/NetDaemon.Daemon.Tests/SchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ await using (IScheduler scheduler = new Scheduler(mockTimeManager.Object))

// ASSERT
Assert.True(nrOfRuns == 0);
await Task.Delay(800).ConfigureAwait(false);
await Task.Delay(1000).ConfigureAwait(false);
Assert.True(nrOfRuns >= 1);
}

Expand Down