Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 70 additions & 20 deletions src/HassModel/NetDaemon.HassModel.Tests/ObservableExtensionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,111 @@ namespace NetDaemon.HassModel.Tests;

public class ObservableExtensionsTest
{

[Fact]
public void TestSubscribeAsyncShouldCallAsyncFunction()
{
var testSubject = new Subject<string>();
var observerMock = new Mock<IObserver<string>>();
var str = string.Empty;
testSubject.SubscribeAsync((a) =>

testSubject.SubscribeAsync(a =>
{
str = a;
return Task.CompletedTask;
});


testSubject.OnNext("hello");
str.Should().Be("hello");
}

[Fact]
public void TestSubscribeSafeShouldCallFunction()
{
var testSubject = new Subject<string>();
var str = string.Empty;

testSubject.SubscribeSafe(a => { str = a; });

testSubject.OnNext("hello");
str.Should().Be("hello");
}
}

[Fact]
public void TestSubscribeAsyncConcurrentShouldCallAsyncFunction()
{
var testSubject = new Subject<string>();
var observerMock = new Mock<IObserver<string>>();
var str = string.Empty;
testSubject.SubscribeAsyncConcurrent((a) =>

testSubject.SubscribeAsyncConcurrent(a =>
{
str = a;
return Task.CompletedTask;
});

testSubject.OnNext("hello");
str.Should().Be("hello");
}
}

[Fact]
public void TestSubscribeAsyncExceptionShouldCallErrorCallback()
{
var testSubject = new Subject<string>();
bool errorCalled = false;
var subscriber = testSubject.SubscribeAsync((a) => throw new InvalidOperationException(),
var errorCalled = false;
var subscriber = testSubject.SubscribeAsync(a => throw new InvalidOperationException(),
e => errorCalled = true);

testSubject.OnNext("hello");
errorCalled.Should().BeTrue();
}

}

[Fact]
public void TestSubscribeAsyncExceptionShouldNotStopAfterException()
{
var testSubject = new Subject<string>();
var nrCalled = 0;
var subscriber = testSubject.SubscribeAsync(a => throw new InvalidOperationException(),
e => nrCalled++);

testSubject.OnNext("hello");
testSubject.OnNext("hello");

nrCalled.Should().Be(2);
}

[Fact]
public void TestSubscribeSafeExceptionShouldCallErrorCallback()
{
var testSubject = new Subject<string>();
var errorCalled = false;
var subscriber = testSubject.SubscribeSafe(a => throw new InvalidOperationException(),
e => errorCalled = true);

testSubject.OnNext("hello");
errorCalled.Should().BeTrue();
}

[Fact]
public void TestSubscribeSafeMultipleExceptionShouldNotStopAfterException()
{
var testSubject = new Subject<string>();
var nrCalled = 0;
var subscriber = testSubject.SubscribeSafe(a => throw new InvalidOperationException(),
e => nrCalled++);

testSubject.OnNext("hello");
testSubject.OnNext("hello");

nrCalled.Should().Be(2);
}

[Fact]
public void TestSubscribeAsyncConcurrentExceptionShouldCallErrorCallback()
{
var testSubject = new Subject<string>();
bool errorCalled = false;
var subscriber = testSubject.SubscribeAsyncConcurrent((a) => throw new InvalidOperationException(),
var errorCalled = false;
var subscriber = testSubject.SubscribeAsyncConcurrent(a => throw new InvalidOperationException(),
e => errorCalled = true);

testSubject.OnNext("hello");
errorCalled.Should().BeTrue();
}
Expand Down
124 changes: 101 additions & 23 deletions src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,93 @@ public static class ObservableExtensions
/// Allows calling async function but does this in a serial way. Long running tasks will block
/// other subscriptions
/// </summary>
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync, Action<Exception>? onError = null)
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
Action<Exception>? onError = null)
{
return source
.Select(e => Observable.FromAsync(() => HandleTask(onNextAsync, e, onError)))
.Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null)))
.Concat()
.Subscribe();
}

/// <summary>
/// Allows calling async function but does this in a serial way. Long running tasks will block
/// other subscriptions
/// </summary>
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
ILogger logger)
{
return source
.Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, null, logger)))
.Concat()
.Subscribe();
}

// public static IDisposable Subscribe<T>([NotNull] this IObservable<T> source, [NotNull] Action<T> onNext)
// in class ObservableExtensions
/// <summary>
/// Allows calling async function. Order of messages is not guaranteed
/// </summary>
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
Action<Exception>? onError = null)
{
return source
.Select(async e => await Observable.FromAsync(() => HandleTask(onNextAsync, e, onError)))
.Select(async e => await Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null)))
.Merge()
.Subscribe();
}

private static async Task HandleTask<T>(Func<T, Task> onNextAsync, T e, Action<Exception>? onError = null)
}

/// <summary>
/// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed
/// </summary>
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
int maxConcurrent, Action<Exception>? onError = null)
{
return source
.Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null)))
.Merge(maxConcurrent)
.Subscribe();
}

/// <summary>
/// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed
/// </summary>
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
int maxConcurrent, ILogger? logger = null)
{
return source
.Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError:null, logger)))
.Merge(maxConcurrent)
.Subscribe();
}

/// <summary>
/// Subscribe safely where unhandled exception does not unsubscribe and always log error
/// </summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
Action<Exception>? onError = null)
{
return source
.Select(e => HandleTaskSafe(onNext, e, onError, null))
.Subscribe();
}

/// <summary>
/// Subscribe safely where unhandled exception does not unsubscribe and always log error
/// to provided ILogger
/// </summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source, Action<T> onNext,
ILogger logger)
{
return source
.Select(e => HandleTaskSafe(onNext, e, null, logger))
.Subscribe();
}

[SuppressMessage("", "CA1031")]
private static T HandleTaskSafe<T>(Action<T> onNext, T e, Action<Exception>? onError = null, ILogger? logger = null)
{
try
{
await onNextAsync(e).ConfigureAwait(false);
onNext(e);
}
catch (Exception ex)
{
Expand All @@ -45,24 +105,42 @@ private static async Task HandleTask<T>(Func<T, Task> onNextAsync, T e, Action<E
}
else
{
var logger = LoggerFactory.Create(x => { x.AddConsole(); }).CreateLogger<IHaContext>();

if (logger is null)
{
using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); });
logger = loggerFactory.CreateLogger<IHaContext>();
}
logger.LogError(ex,
"SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging");
throw;
"Error on SubscribeSafe");
}
}

return e;
}

/// <summary>
/// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed
/// </summary>
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync,
int maxConcurrent)
[SuppressMessage("", "CA1031")]
private static async Task HandleTaskSafeAsync<T>(Func<T, Task> onNextAsync, T e, Action<Exception>? onError = null, ILogger? logger = null)
{
return source
.Select(e => Observable.FromAsync(() => HandleTask(onNextAsync, e)))
.Merge(maxConcurrent)
.Subscribe();
try
{
await onNextAsync(e).ConfigureAwait(false);
}
catch (Exception ex)
{
if (onError is not null)
{
onError(ex);
}
else
{
if (logger is null)
{
using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); });
logger = loggerFactory.CreateLogger<IHaContext>();
}
logger.LogError(ex,
"SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging");
}
}
}
}
}