From 8fdcd1897423037f5c7361ba15e74183d698403c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomas=20Hellstr=C3=B6m?= Date: Fri, 22 Jul 2022 16:25:47 +0200 Subject: [PATCH 1/3] SubscribeSafe added. --- .../ObservableExtensionsTest.cs | 90 ++++++++++++++----- .../ObservableExtensions.cs | 82 ++++++++++++----- 2 files changed, 129 insertions(+), 43 deletions(-) diff --git a/src/HassModel/NetDaemon.HassModel.Tests/ObservableExtensionsTest.cs b/src/HassModel/NetDaemon.HassModel.Tests/ObservableExtensionsTest.cs index d814979df..26830840a 100644 --- a/src/HassModel/NetDaemon.HassModel.Tests/ObservableExtensionsTest.cs +++ b/src/HassModel/NetDaemon.HassModel.Tests/ObservableExtensionsTest.cs @@ -9,61 +9,111 @@ namespace NetDaemon.HassModel.Tests; public class ObservableExtensionsTest { - [Fact] public void TestSubscribeAsyncShouldCallAsyncFunction() { var testSubject = new Subject(); - var observerMock = new Mock>(); var str = string.Empty; - - testSubject.SubscribeAsync((a) => + + testSubject.SubscribeAsync(a => { str = a; return Task.CompletedTask; }); - + + testSubject.OnNext("hello"); + str.Should().Be("hello"); + } + + [Fact] + public void TestSubscribeSafeShouldCallFunction() + { + var testSubject = new Subject(); + var str = string.Empty; + + testSubject.SubscribeSafe(a => { str = a; }); + testSubject.OnNext("hello"); str.Should().Be("hello"); - } - + } + [Fact] public void TestSubscribeAsyncConcurrentShouldCallAsyncFunction() { var testSubject = new Subject(); var observerMock = new Mock>(); var str = string.Empty; - - testSubject.SubscribeAsyncConcurrent((a) => + + testSubject.SubscribeAsyncConcurrent(a => { str = a; return Task.CompletedTask; }); - + testSubject.OnNext("hello"); str.Should().Be("hello"); - } - + } + [Fact] public void TestSubscribeAsyncExceptionShouldCallErrorCallback() { var testSubject = new Subject(); - bool errorCalled = false; - var subscriber = testSubject.SubscribeAsync((a) => throw new InvalidOperationException(), + var errorCalled = false; + var subscriber = testSubject.SubscribeAsync(a => throw new InvalidOperationException(), e => errorCalled = true); - + testSubject.OnNext("hello"); errorCalled.Should().BeTrue(); - } - + } + + [Fact] + public void TestSubscribeAsyncExceptionShouldNotStopAfterException() + { + var testSubject = new Subject(); + var nrCalled = 0; + var subscriber = testSubject.SubscribeAsync(a => throw new InvalidOperationException(), + e => nrCalled++); + + testSubject.OnNext("hello"); + testSubject.OnNext("hello"); + + nrCalled.Should().Be(2); + } + + [Fact] + public void TestSubscribeSafeExceptionShouldCallErrorCallback() + { + var testSubject = new Subject(); + var errorCalled = false; + var subscriber = testSubject.SubscribeSafe(a => throw new InvalidOperationException(), + e => errorCalled = true); + + testSubject.OnNext("hello"); + errorCalled.Should().BeTrue(); + } + + [Fact] + public void TestSubscribeSafeMultipleExceptionShouldNotStopAfterException() + { + var testSubject = new Subject(); + var nrCalled = 0; + var subscriber = testSubject.SubscribeSafe(a => throw new InvalidOperationException(), + e => nrCalled++); + + testSubject.OnNext("hello"); + testSubject.OnNext("hello"); + + nrCalled.Should().Be(2); + } + [Fact] public void TestSubscribeAsyncConcurrentExceptionShouldCallErrorCallback() { var testSubject = new Subject(); - bool errorCalled = false; - var subscriber = testSubject.SubscribeAsyncConcurrent((a) => throw new InvalidOperationException(), + var errorCalled = false; + var subscriber = testSubject.SubscribeAsyncConcurrent(a => throw new InvalidOperationException(), e => errorCalled = true); - + testSubject.OnNext("hello"); errorCalled.Should().BeTrue(); } diff --git a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs index 36117e0e5..fcd178643 100644 --- a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs +++ b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs @@ -9,16 +9,15 @@ public static class ObservableExtensions /// Allows calling async function but does this in a serial way. Long running tasks will block /// other subscriptions /// - public static IDisposable SubscribeAsync(this IObservable source, Func onNextAsync, Action? onError = null) + public static IDisposable SubscribeAsync(this IObservable source, Func onNextAsync, + Action? onError = null) { return source - .Select(e => Observable.FromAsync(() => HandleTask(onNextAsync, e, onError))) + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError))) .Concat() .Subscribe(); } - // public static IDisposable Subscribe([NotNull] this IObservable source, [NotNull] Action onNext) - // in class ObservableExtensions /// /// Allows calling async function. Order of messages is not guaranteed /// @@ -26,16 +25,40 @@ public static IDisposable SubscribeAsyncConcurrent(this IObservable source Action? onError = null) { return source - .Select(async e => await Observable.FromAsync(() => HandleTask(onNextAsync, e, onError))) + .Select(async e => await Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError))) .Merge() .Subscribe(); - } - - private static async Task HandleTask(Func onNextAsync, T e, Action? onError = null) + } + + /// + /// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed + /// + public static IDisposable SubscribeAsyncConcurrent(this IObservable source, Func onNextAsync, + int maxConcurrent) + { + return source + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e))) + .Merge(maxConcurrent) + .Subscribe(); + } + + /// + /// Subscribe safely where unhandled exception does not unsubscribe and always log error + /// + public static IDisposable SubscribeSafe(this IObservable source, Action onNext, + Action? onError = null) + { + return source + .Select(e => HandleTaskSafe(onNext, e, onError)) + .Subscribe(); + } + + [SuppressMessage("", "CA1031")] + private static T HandleTaskSafe(Action onNext, T e, Action? onError = null) { try { - await onNextAsync(e).ConfigureAwait(false); + onNext(e); } catch (Exception ex) { @@ -45,24 +68,37 @@ private static async Task HandleTask(Func onNextAsync, T e, Action { x.AddConsole(); }).CreateLogger(); - + using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); + var logger = loggerFactory.CreateLogger(); logger.LogError(ex, - "SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging"); - throw; + "Error on SubscribeSafe"); } } + + return e; } - /// - /// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed - /// - public static IDisposable SubscribeAsyncConcurrent(this IObservable source, Func onNextAsync, - int maxConcurrent) + [SuppressMessage("", "CA1031")] + private static async Task HandleTaskSafeAsync(Func onNextAsync, T e, Action? onError = null) { - return source - .Select(e => Observable.FromAsync(() => HandleTask(onNextAsync, e))) - .Merge(maxConcurrent) - .Subscribe(); + try + { + await onNextAsync(e).ConfigureAwait(false); + } + catch (Exception ex) + { + if (onError is not null) + { + onError(ex); + } + else + { + using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); + var logger = loggerFactory.CreateLogger(); + + logger.LogError(ex, + "SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging"); + } + } } -} \ No newline at end of file +} From abe92676c311571435bc90992cbda3ca7e5a01a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomas=20Hellstr=C3=B6m?= Date: Sat, 23 Jul 2022 00:54:53 +0200 Subject: [PATCH 2/3] Added ILogger overload --- .../ObservableExtensions.cs | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs index fcd178643..21fe32d65 100644 --- a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs +++ b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs @@ -13,7 +13,20 @@ public static IDisposable SubscribeAsync(this IObservable source, Func? onError = null) { return source - .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError))) + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null))) + .Concat() + .Subscribe(); + } + + /// + /// Allows calling async function but does this in a serial way. Long running tasks will block + /// other subscriptions + /// + public static IDisposable SubscribeAsync(this IObservable source, Func onNextAsync, + ILogger logger) + { + return source + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, null, logger))) .Concat() .Subscribe(); } @@ -25,7 +38,7 @@ public static IDisposable SubscribeAsyncConcurrent(this IObservable source Action? onError = null) { return source - .Select(async e => await Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError))) + .Select(async e => await Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null))) .Merge() .Subscribe(); } @@ -34,10 +47,22 @@ public static IDisposable SubscribeAsyncConcurrent(this IObservable source /// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed /// public static IDisposable SubscribeAsyncConcurrent(this IObservable source, Func onNextAsync, - int maxConcurrent) + int maxConcurrent, Action? onError = null) + { + return source + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError, null))) + .Merge(maxConcurrent) + .Subscribe(); + } + + /// + /// Allows calling async function with max nr of concurrent messages. Order of messages is not guaranteed + /// + public static IDisposable SubscribeAsyncConcurrent(this IObservable source, Func onNextAsync, + int maxConcurrent, ILogger? logger = null) { return source - .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e))) + .Select(e => Observable.FromAsync(() => HandleTaskSafeAsync(onNextAsync, e, onError:null, logger))) .Merge(maxConcurrent) .Subscribe(); } @@ -49,12 +74,24 @@ public static IDisposable SubscribeSafe(this IObservable source, Action Action? onError = null) { return source - .Select(e => HandleTaskSafe(onNext, e, onError)) + .Select(e => HandleTaskSafe(onNext, e, onError, null)) + .Subscribe(); + } + + /// + /// Subscribe safely where unhandled exception does not unsubscribe and always log error + /// to provided ILogger + /// + public static IDisposable SubscribeSafe(this IObservable source, Action onNext, + ILogger logger) + { + return source + .Select(e => HandleTaskSafe(onNext, e, null, logger)) .Subscribe(); } [SuppressMessage("", "CA1031")] - private static T HandleTaskSafe(Action onNext, T e, Action? onError = null) + private static T HandleTaskSafe(Action onNext, T e, Action? onError = null, ILogger? logger = null) { try { @@ -69,7 +106,7 @@ private static T HandleTaskSafe(Action onNext, T e, Action? onE else { using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); - var logger = loggerFactory.CreateLogger(); + logger ??= loggerFactory.CreateLogger(); logger.LogError(ex, "Error on SubscribeSafe"); } @@ -79,7 +116,7 @@ private static T HandleTaskSafe(Action onNext, T e, Action? onE } [SuppressMessage("", "CA1031")] - private static async Task HandleTaskSafeAsync(Func onNextAsync, T e, Action? onError = null) + private static async Task HandleTaskSafeAsync(Func onNextAsync, T e, Action? onError = null, ILogger? logger = null) { try { @@ -94,7 +131,7 @@ private static async Task HandleTaskSafeAsync(Func onNextAsync, T e, else { using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); - var logger = loggerFactory.CreateLogger(); + logger = logger ??= loggerFactory.CreateLogger(); logger.LogError(ex, "SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging"); From 0ffdd13d927563c9cabea6988115b5635e67cca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomas=20Hellstr=C3=B6m?= Date: Sun, 24 Jul 2022 09:48:15 +0200 Subject: [PATCH 3/3] Fixed check if logger exists before creating factory --- .../NetDeamon.HassModel/ObservableExtensions.cs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs index 21fe32d65..928021e8c 100644 --- a/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs +++ b/src/HassModel/NetDeamon.HassModel/ObservableExtensions.cs @@ -105,8 +105,11 @@ private static T HandleTaskSafe(Action onNext, T e, Action? onE } else { - using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); - logger ??= loggerFactory.CreateLogger(); + if (logger is null) + { + using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); + logger = loggerFactory.CreateLogger(); + } logger.LogError(ex, "Error on SubscribeSafe"); } @@ -130,9 +133,11 @@ private static async Task HandleTaskSafeAsync(Func onNextAsync, T e, } else { - using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); - logger = logger ??= loggerFactory.CreateLogger(); - + if (logger is null) + { + using var loggerFactory = LoggerFactory.Create(x => { x.AddConsole(); }); + logger = loggerFactory.CreateLogger(); + } logger.LogError(ex, "SubscribeConcurrent throws an unhandled Exception, please use error callback function to do proper logging"); }