diff --git a/README.md b/README.md index 9940f5f..5d29be0 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ Subscriptions and scheduled work return `IDisposable`. ReactiveUI.Primitives inc | `BooleanDisposable` | Track simple disposed state. | | `CancellationDisposable` | Tie disposal to a `CancellationTokenSource`. | | `MultipleDisposable` | Composite-disposable equivalent; add/remove multiple disposables. | +| `CompositeDisposable` | System.Reactive-compatible alias over `MultipleDisposable`. | | `Pocket` | Named `MultipleDisposable` specialization. | | `SingleDisposable` / `AssignmentSlot` | Single-assignment disposable container. | | `SingleReplaceableDisposable` / `Slot` | Replaceable disposable container. | @@ -165,8 +166,10 @@ Creation APIs live on `ReactiveUI.Primitives.Signals.Signal`. | `Signal.Unfold(...)` | Generate a finite sequence from state. | | `Signal.Use(...)` | Tie a resource lifetime to a subscription. | | `Signal.FromEnumerable(IEnumerable)` | Convert an enumerable. | +| `Signal.FromEnumerable(IEnumerable, CancellationToken)` | Convert an enumerable and stop synchronous enumeration when cancelled. | | `Signal.FromAsyncEnumerable(IAsyncEnumerable, CancellationToken)` | Convert an async enumerable on modern TFMs. | | `Signal.FromTask(Task)` | Convert a task to a signal. | +| `Signal.FromAsync(...)` | Invoke a task factory per subscription. | | `Signal.After(TimeSpan, ISequencer?)` | Emit one `long` tick after a delay. | | `Signal.Every(TimeSpan, ISequencer?)` | Emit increasing `long` ticks repeatedly. | | `Signal.Pulse(...)` | Alias of `Every`. | @@ -342,7 +345,7 @@ ReactiveUI.Primitives uses explicit names instead of cloning every System.Reacti | System.Reactive type | ReactiveUI.Primitives equivalent | Notes | |---|---|---| | `Subject` | `Signal` | Push values, errors, and completion to subscribers. | -| `BehaviorSubject` | `BehaviourSignal` or `StateSignal` | Stores the latest value and emits it to new subscribers. `StateSignal` adds a mutable `Value` setter and `Changed`. | +| `BehaviorSubject` | `BehaviorSignal`, or `StateSignal` | Stores the latest value and emits it to new subscribers. `StateSignal` adds a mutable `Value` setter and `Changed`. | | `ReplaySubject` | `ReplaySignal` | Replays buffered values by size and/or time window. | | `AsyncSubject` | `AsyncSignal` | Awaitable subject-like signal; also implements `IAwaitSignal`. | | `ReactiveProperty` / state holder | `StateSignal` plus `ReadOnlyState` | Mutable state and read-only projected state. | @@ -493,12 +496,13 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps | `Observable.Repeat(value)` | `Signal.Repeat(value)` | Indefinite repeat. | | `Observable.Repeat(value, count)` | `Signal.Repeat(value, count)` | Fixed repeat. | | `Observable.Defer(factory)` | `Signal.Defer(factory)` | Create source per subscription. | +| `Observable.FromAsync(...)` | `Signal.FromAsync(...)` | Invoke a task factory per subscription. | | `Observable.Create(...)` | `Signal.Create(...)` or `Signal.CreateSafe(...)` | Prefer `CreateSafe` for general custom sources. | | `Observable.Using(...)` | `Signal.Use(...)` | Resource scoped to subscription. | | `Observable.Timer(dueTime)` | `Signal.Timer(dueTime)` or `Signal.After(dueTime)` | Emits `long` tick `0`. | | `Observable.Timer(dueTime, period)` | `Signal.Timer(dueTime, period)` | Periodic `long` ticks. | | `Observable.Interval(period)` | `Signal.Interval(period)` or `Signal.Every(period)` | Repeating ticks. | -| `ToObservable()` from enumerable | `Signal.FromEnumerable(values)` or `values.ToSignal()` | `ToSignal` extension is available. | +| `ToObservable()` from enumerable | `Signal.FromEnumerable(values)`, `values.ToSignal()`, or `values.ToObservable()` | Cancellation-token overloads are available. | | task conversion | `Signal.FromTask(task)` | Function-based task signals also exist. | ### Subject/state mapping @@ -506,7 +510,7 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps | System.Reactive | ReactiveUI.Primitives | Migration detail | |---|---|---| | `new Subject()` | `new Signal()` | Use `OnNext`, `OnError`, `OnCompleted`, and `Subscribe`. | -| `new BehaviorSubject(initial)` | `new BehaviourSignal(initial)` | Keeps `Value` getter and emits latest value to subscribers. | +| `new BehaviorSubject(initial)` | `new BehaviorSignal(initial)` | Keeps `Value` getter and emits latest value to subscribers. | | mutable reactive property | `new StateSignal(initial)` | Set `Value` to emit. Use `Changed` for observable state stream. | | `new ReplaySubject()` | `new ReplaySignal()` | Unbounded replay. | | `new ReplaySubject(bufferSize)` | `new ReplaySignal(bufferSize)` | Size-limited replay. | @@ -546,6 +550,7 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps | `Buffer(count)` | `Buffer(count)` | Fixed-size buffers. | | `ToList` / `ToArray` | `CollectList` / `CollectArray` | Signal results. | | `FirstAsync` | `FirstAsync` | Task result. | +| `CountAsync` / `AnyAsync` | `CountAsync` / `AnyAsync` | Task-shaped terminal helpers, including cancellation overloads. | ### Disposable mapping diff --git a/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs b/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs index 9a09610..768a365 100644 --- a/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs +++ b/src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs @@ -53,7 +53,7 @@ internal static class R3SignalBridge throw new global::System.ArgumentNullException(nameof(source)); } - return global::ReactiveUI.Primitives.Signals.Signal.Create(observer => source.Subscribe(observer)); + return global::ReactiveUI.Primitives.Signals.Signal.Create(observer => source.Subscribe(new R3ToPrimitivesObserver(observer))); } /// @@ -66,7 +66,42 @@ internal static class R3SignalBridge throw new global::System.ArgumentNullException(nameof(source)); } - return global::R3.Observable.Create(observer => source.Subscribe(observer)); + return global::R3.Observable.Create(observer => source.Subscribe(new PrimitivesToR3Observer(observer))); + } + + private sealed class R3ToPrimitivesObserver : global::R3.Observer + { + private readonly global::System.IObserver _observer; + + public R3ToPrimitivesObserver(global::System.IObserver observer) => _observer = observer; + + protected override void OnNextCore(T value) => _observer.OnNext(value); + + protected override void OnErrorResumeCore(global::System.Exception error) => _observer.OnError(error); + + protected override void OnCompletedCore(global::R3.Result result) + { + if (result.IsFailure) + { + _observer.OnError(result.Exception); + return; + } + + _observer.OnCompleted(); + } + } + + private sealed class PrimitivesToR3Observer : global::System.IObserver + { + private readonly global::R3.Observer _observer; + + public PrimitivesToR3Observer(global::R3.Observer observer) => _observer = observer; + + public void OnNext(T value) => _observer.OnNext(value); + + public void OnError(global::System.Exception error) => _observer.OnCompleted(global::R3.Result.Failure(error)); + + public void OnCompleted() => _observer.OnCompleted(global::R3.Result.Success); } } """; diff --git a/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs index 751a6fa..f829b4c 100644 --- a/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs +++ b/src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs @@ -10,8 +10,9 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// CurrentThreadSequencer. /// -/// -public sealed class CurrentThreadSequencer : ISequencer +/// +[DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class CurrentThreadSequencer : ISequencer { /// /// Singleton holder for the current-thread sequencer. @@ -103,7 +104,7 @@ public IDisposable Schedule(TState state, Func /// The disposable object used to cancel the scheduled action (best effort). /// - /// action. + /// action. /// is null. public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { diff --git a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs index a64c117..523cca6 100644 --- a/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs +++ b/src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs @@ -15,7 +15,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// DispatcherSequencer. /// /// -public class DispatcherSequencer : ISequencer +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class DispatcherSequencer : ISequencer { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs index e8cdf0e..7728800 100644 --- a/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs +++ b/src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs @@ -7,8 +7,9 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// ImmediateSequencer. /// -/// -public sealed class ImmediateSequencer : ISequencer +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class ImmediateSequencer : ISequencer { /// /// Singleton holder for the immediate sequencer. diff --git a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs index 081c3f3..08f6cae 100644 --- a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs +++ b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// Abstract base class for scheduled work items. /// /// Absolute time representation type. -public abstract class ScheduledItem : IScheduledItem, IComparable>, IsDisposed, IComparable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public abstract partial class ScheduledItem : IScheduledItem, IComparable>, IsDisposed, IComparable where TAbsolute : IComparable { /// @@ -35,7 +36,7 @@ public abstract class ScheduledItem : IScheduledItem, ICom /// /// Absolute time at which the work item has to be executed. /// Comparer used to compare work items based on their scheduled time. - /// comparer. + /// comparer. /// is null. protected ScheduledItem(TAbsolute dueTime, IComparer comparer) { diff --git a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs index f8c1633..6c1d445 100644 --- a/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs +++ b/src/ReactiveUI.Primitives/Concurrency/ScheduledItem{TAbsolute,TValue}.cs @@ -9,7 +9,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Absolute time representation type. /// Type of the state passed to the scheduled action. -public sealed class ScheduledItem : ScheduledItem +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class ScheduledItem : ScheduledItem where TAbsolute : IComparable { /// diff --git a/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs b/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs index 17d3df4..d303b57 100644 --- a/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs +++ b/src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs @@ -42,7 +42,7 @@ public static IDisposable Schedule(this ISequencer scheduler, Action action) /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -72,7 +72,7 @@ public static IDisposable Schedule(this ISequencer scheduler, TimeSpan dueTime, /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -124,7 +124,7 @@ public static IDisposable Schedule(this ISequencer scheduler, Action act /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -190,7 +190,7 @@ internal static IDisposable ScheduleAction(this ISequencer scheduler, TS /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -222,7 +222,7 @@ internal static IDisposable ScheduleAction(this ISequencer scheduler, TS /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -254,7 +254,7 @@ internal static IDisposable ScheduleAction(this ISequencer scheduler, TS /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. @@ -286,7 +286,7 @@ internal static IDisposable ScheduleAction(this ISequencer scheduler, TS /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// + /// /// scheduler /// or /// action. diff --git a/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs b/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs index dbb2655..44c0fca 100644 --- a/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs +++ b/src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Absolute time representation type. /// This type is not thread safe; users should ensure proper synchronization. -public class SequencerQueue +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class SequencerQueue where TAbsolute : IComparable { /// diff --git a/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs index e811ede..8c5273c 100644 --- a/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs +++ b/src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs @@ -9,8 +9,9 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// TaskPoolSequencer. /// -/// -public sealed class TaskPoolSequencer : ISequencer +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class TaskPoolSequencer : ISequencer { /// /// Task factory used to schedule asynchronous work. diff --git a/src/ReactiveUI.Primitives/Concurrency/TestClock.cs b/src/ReactiveUI.Primitives/Concurrency/TestClock.cs index 499069a..a49ab01 100644 --- a/src/ReactiveUI.Primitives/Concurrency/TestClock.cs +++ b/src/ReactiveUI.Primitives/Concurrency/TestClock.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Test-facing alias for . /// -public sealed class TestClock : VirtualClock +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class TestClock : VirtualClock { /// /// Initializes a new instance of the class at the default clock value. diff --git a/src/ReactiveUI.Primitives/Concurrency/ThreadPoolSequencer.cs b/src/ReactiveUI.Primitives/Concurrency/ThreadPoolSequencer.cs index 2b5d9e3..ecc78f7 100644 --- a/src/ReactiveUI.Primitives/Concurrency/ThreadPoolSequencer.cs +++ b/src/ReactiveUI.Primitives/Concurrency/ThreadPoolSequencer.cs @@ -4,14 +4,16 @@ using ReactiveUI.Primitives.Disposables; using static ReactiveUI.Primitives.Disposables.Disposable; +using Timer = System.Threading.Timer; namespace ReactiveUI.Primitives.Concurrency { /// /// ThreadPoolSequencer. /// - /// - public sealed class ThreadPoolSequencer : ISequencer + /// + [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] + public sealed partial class ThreadPoolSequencer : ISequencer { /// /// Gets the shared thread-pool scheduler instance. @@ -26,7 +28,7 @@ public sealed class ThreadPoolSequencer : ISequencer /// /// Keeps timers rooted until they fire or are cancelled. /// - internal static readonly Dictionary Timers = []; + internal static readonly Dictionary Timers = []; /// /// Initializes a new instance of the class. @@ -49,7 +51,7 @@ private ThreadPoolSequencer() /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// action. + /// action. public IDisposable Schedule(TState state, Func action) { if (action == null) @@ -83,7 +85,7 @@ public IDisposable Schedule(TState state, Func /// The disposable object used to cancel the scheduled action (best effort). /// - /// action. + /// action. public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) @@ -94,7 +96,7 @@ public IDisposable Schedule(TState state, TimeSpan dueTime, Func { diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualClock.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualClock.cs index 2227b84..139aae6 100644 --- a/src/ReactiveUI.Primitives/Concurrency/VirtualClock.cs +++ b/src/ReactiveUI.Primitives/Concurrency/VirtualClock.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Deterministic virtual scheduler backed by and . /// -public class VirtualClock : VirtualTimeSequencer +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class VirtualClock : VirtualTimeSequencer { /// /// Initializes a new instance of the class at the default clock value. diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs index 739fc2c..2fbe289 100644 --- a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs +++ b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencerBase{TAbsolute,TRelative}.cs @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Absolute time representation type. /// Relative time representation type. -public abstract class VirtualTimeSequencerBase : ISequencer, IServiceProvider, IStopwatchProvider +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public abstract partial class VirtualTimeSequencerBase : ISequencer, IServiceProvider, IStopwatchProvider where TAbsolute : IComparable { /// diff --git a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs index 8c4feaa..a35f898 100644 --- a/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs +++ b/src/ReactiveUI.Primitives/Concurrency/VirtualTimeSequencer{TAbsolute,TRelative}.cs @@ -9,7 +9,8 @@ namespace ReactiveUI.Primitives.Concurrency; /// /// Absolute time representation type. /// Relative time representation type. -public abstract class VirtualTimeSequencer : VirtualTimeSequencerBase +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public abstract partial class VirtualTimeSequencer : VirtualTimeSequencerBase where TAbsolute : IComparable { /// @@ -47,7 +48,7 @@ protected VirtualTimeSequencer(TAbsolute initialClock, IComparer comp /// /// The disposable object used to cancel the scheduled action (best effort). /// - /// action. + /// action. /// is null. public override IDisposable ScheduleAbsolute(TState state, TAbsolute dueTime, Func action) { diff --git a/src/ReactiveUI.Primitives/ConnectableSignalMixins.cs b/src/ReactiveUI.Primitives/ConnectableSignalMixins.cs new file mode 100644 index 0000000..84c160a --- /dev/null +++ b/src/ReactiveUI.Primitives/ConnectableSignalMixins.cs @@ -0,0 +1,304 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Disposables; +using ReactiveUI.Primitives.Signals; + +namespace ReactiveUI.Primitives; + +/// +/// Hot-sharing operators for Primitives connectable signals. +/// +public static class ConnectableSignalMixins +{ + /// + /// Multicasts source values through the supplied hub. + /// + /// The value type. + /// Source sequence to multicast. + /// Hub that receives source values. + /// A connectable signal. + public static ConnectableSignal Multicast(this IObservable source, ISignal hub) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (hub == null) + { + throw new ArgumentNullException(nameof(hub)); + } + + return new ConnectableSignal(source, hub); + } + + /// + /// Publishes source values through a live signal hub. + /// + /// The value type. + /// Source sequence to publish. + /// A connectable live signal. + public static ConnectableSignal PublishLive(this IObservable source) => + source.Multicast(new Signal()); + + /// + /// Publishes source values through a live signal hub. + /// + /// The value type. + /// Source sequence to publish. + /// A connectable live signal. + public static ConnectableSignal Publish(this IObservable source) => + source.PublishLive(); + + /// + /// Replays source values through a bounded replay hub. + /// + /// The value type. + /// Source sequence to replay. + /// Maximum number of values to replay. + /// A connectable replay signal. + public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize) => + source.Multicast(new ReplaySignal(bufferSize)); + + /// + /// Replays source values through a replay hub constrained by count and time. + /// + /// The value type. + /// Source sequence to replay. + /// Maximum number of values to replay. + /// Maximum replay window. + /// A connectable replay signal. + public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize, TimeSpan window) => + source.Multicast(new ReplaySignal(bufferSize, window)); + + /// + /// Replays source values through a bounded replay hub. + /// + /// The value type. + /// Source sequence to replay. + /// Maximum number of values to replay. + /// A connectable replay signal. + public static ConnectableSignal Replay(this IObservable source, int bufferSize) => + source.ReplayLive(bufferSize); + + /// + /// Replays source values through a replay hub constrained by count and time. + /// + /// The value type. + /// Source sequence to replay. + /// Maximum number of values to replay. + /// Maximum replay window. + /// A connectable replay signal. + public static ConnectableSignal Replay(this IObservable source, int bufferSize, TimeSpan window) => + source.ReplayLive(bufferSize, window); + + /// + /// Shares one live source subscription while at least one observer is subscribed. + /// + /// The value type. + /// Source sequence to share. + /// A reference-counted live sequence. + public static IObservable ShareLive(this IObservable source) => source.PublishLive().RefCount(); + + /// + /// Shares one live source subscription while at least one observer is subscribed. + /// + /// The value type. + /// Source sequence to share. + /// A reference-counted live sequence. + public static IObservable Share(this IObservable source) => source.ShareLive(); + + /// + /// Connects on first subscriber and disconnects when the last subscriber disposes. + /// + /// The value type. + /// Connectable signal to reference count. + /// A reference-counted sequence. + public static IObservable RefCount(this ConnectableSignal source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + var gate = RefCountGate.For(source); + return Signal.Create(gate.Subscribe); + } + + /// + /// Connects on the first observer subscription. + /// + /// The value type. + /// Connectable signal to connect. + /// A sequence that connects after the first subscription. + public static IObservable AutoConnect(this ConnectableSignal source) => + AutoConnect(source, 1); + + /// + /// Connects after observers have subscribed. + /// + /// The value type. + /// Connectable signal to connect. + /// Number of observers required before connecting. + /// A sequence that connects after the requested number of subscriptions. + public static IObservable AutoConnect(this ConnectableSignal source, int subscriberCount) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (subscriberCount < 0) + { + throw new ArgumentOutOfRangeException(nameof(subscriberCount)); + } + + var gate = AutoConnectGate.For(source, subscriberCount); + return Signal.Create(gate.Subscribe); + } + + /// + /// Tracks reference-counted connection state. + /// + /// The value type. + private sealed class RefCountGate + { + /// + /// Synchronizes reference-count state. + /// + private readonly object _gate = new(); + + /// + /// Connectable signal being reference-counted. + /// + private readonly ConnectableSignal _source; + + /// + /// Active subscriber count. + /// + private int _count; + + /// + /// Active source connection. + /// + private IDisposable? _connection; + + /// + /// Initializes a new instance of the class. + /// + /// Connectable signal being reference-counted. + private RefCountGate(ConnectableSignal source) => _source = source; + + /// + /// Creates a reference-count gate for a connectable signal. + /// + /// Connectable signal being reference-counted. + /// A reference-count gate. + public static RefCountGate For(ConnectableSignal source) => new(source); + + /// + /// Subscribes an observer and manages the shared connection lifetime. + /// + /// Observer to subscribe. + /// A disposable that removes the observer and may disconnect the source. + public IDisposable Subscribe(IObserver observer) + { + IDisposable subscription; + lock (_gate) + { + subscription = _source.Subscribe(observer); + _count++; + _connection ??= _source.Connect(); + } + + return Disposable.Create(() => + { + subscription.Dispose(); + lock (_gate) + { + _count--; + if (_count == 0) + { + _connection?.Dispose(); + _connection = null; + } + } + }); + } + } + + /// + /// Tracks auto-connect subscription state. + /// + /// The value type. + private sealed class AutoConnectGate + { + /// + /// Synchronizes auto-connect state. + /// + private readonly object _gate = new(); + + /// + /// Connectable signal being auto-connected. + /// + private readonly ConnectableSignal _source; + + /// + /// Number of observers required before connecting. + /// + private readonly int _subscriberCount; + + /// + /// Current subscriber count. + /// + private int _count; + + /// + /// Value indicating whether the source has connected. + /// + private bool _connected; + + /// + /// Initializes a new instance of the class. + /// + /// Connectable signal being auto-connected. + /// Number of observers required before connecting. + private AutoConnectGate(ConnectableSignal source, int subscriberCount) + { + _source = source; + _subscriberCount = subscriberCount; + } + + /// + /// Creates an auto-connect gate for a connectable signal. + /// + /// Connectable signal being auto-connected. + /// Number of observers required before connecting. + /// An auto-connect gate. + public static AutoConnectGate For(ConnectableSignal source, int subscriberCount) => + new(source, subscriberCount); + + /// + /// Subscribes an observer and connects when the threshold is reached. + /// + /// Observer to subscribe. + /// A disposable that removes the observer subscription. + public IDisposable Subscribe(IObserver observer) + { + var subscription = _source.Subscribe(observer); + lock (_gate) + { + _count++; + if (!_connected && _count >= _subscriberCount) + { + _connected = true; + _source.Connect(); + } + } + + return subscription; + } + } +} diff --git a/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs b/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs index b358610..452ea94 100644 --- a/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs +++ b/src/ReactiveUI.Primitives/ConnectableSignal{T}.cs @@ -5,15 +5,14 @@ using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.Signals; -#pragma warning disable SA1107, SA1116, SA1117, SA1204, SA1402, SA1501, SA1611, SA1615, SA1618 - namespace ReactiveUI.Primitives; /// /// Connectable hot signal that subscribes to its source only when connected. /// /// The value type. -public sealed class ConnectableSignal : IObservable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class ConnectableSignal : IObservable { /// /// Synchronizes connection state. @@ -74,261 +73,3 @@ public IDisposable Connect() /// public IDisposable Subscribe(IObserver observer) => _hub.Subscribe(observer); } - -/// -/// Hot-sharing operators for Primitives connectable signals. -/// -public static class ConnectableSignalMixins -{ - /// - /// Multicasts source values through the supplied hub. - /// - /// The value type. - /// Source sequence to multicast. - /// Hub that receives source values. - /// A connectable signal. - public static ConnectableSignal Multicast(this IObservable source, ISignal hub) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - - if (hub == null) - { - throw new ArgumentNullException(nameof(hub)); - } - - return new ConnectableSignal(source, hub); - } - - /// - /// Publishes source values through a live signal hub. - /// - /// The value type. - /// Source sequence to publish. - /// A connectable live signal. - public static ConnectableSignal PublishLive(this IObservable source) => - source.Multicast(new Signal()); - - /// - /// Replays source values through a bounded replay hub. - /// - /// The value type. - /// Source sequence to replay. - /// Maximum number of values to replay. - /// A connectable replay signal. - public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize) => - source.Multicast(new ReplaySignal(bufferSize)); - - /// - /// Replays source values through a replay hub constrained by count and time. - /// - /// The value type. - /// Source sequence to replay. - /// Maximum number of values to replay. - /// Maximum replay window. - /// A connectable replay signal. - public static ConnectableSignal ReplayLive(this IObservable source, int bufferSize, TimeSpan window) => - source.Multicast(new ReplaySignal(bufferSize, window)); - - /// - /// Shares one live source subscription while at least one observer is subscribed. - /// - /// The value type. - /// Source sequence to share. - /// A reference-counted live sequence. - public static IObservable ShareLive(this IObservable source) => source.PublishLive().RefCount(); - - /// - /// Connects on first subscriber and disconnects when the last subscriber disposes. - /// - /// The value type. - /// Connectable signal to reference count. - /// A reference-counted sequence. - public static IObservable RefCount(this ConnectableSignal source) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - - var gate = RefCountGate.For(source); - return ReactiveUI.Primitives.Signals.Signal.Create(gate.Subscribe); - } - - /// - /// Connects on the first observer subscription. - /// - /// The value type. - /// Connectable signal to connect. - /// A sequence that connects after the first subscription. - public static IObservable AutoConnect(this ConnectableSignal source) => - AutoConnect(source, 1); - - /// - /// Connects after observers have subscribed. - /// - /// The value type. - /// Connectable signal to connect. - /// Number of observers required before connecting. - /// A sequence that connects after the requested number of subscriptions. - public static IObservable AutoConnect(this ConnectableSignal source, int subscriberCount) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - - if (subscriberCount < 0) - { - throw new ArgumentOutOfRangeException(nameof(subscriberCount)); - } - - var gate = AutoConnectGate.For(source, subscriberCount); - return ReactiveUI.Primitives.Signals.Signal.Create(gate.Subscribe); - } - - /// - /// Tracks reference-counted connection state. - /// - /// The value type. - private sealed class RefCountGate - { - /// - /// Synchronizes reference-count state. - /// - private readonly object _gate = new(); - - /// - /// Connectable signal being reference-counted. - /// - private readonly ConnectableSignal _source; - - /// - /// Active subscriber count. - /// - private int _count; - - /// - /// Active source connection. - /// - private IDisposable? _connection; - - /// - /// Initializes a new instance of the class. - /// - /// Connectable signal being reference-counted. - private RefCountGate(ConnectableSignal source) => _source = source; - - /// - /// Creates a reference-count gate for a connectable signal. - /// - /// Connectable signal being reference-counted. - /// A reference-count gate. - public static RefCountGate For(ConnectableSignal source) => new(source); - - /// - /// Subscribes an observer and manages the shared connection lifetime. - /// - /// Observer to subscribe. - /// A disposable that removes the observer and may disconnect the source. - public IDisposable Subscribe(IObserver observer) - { - IDisposable subscription; - lock (_gate) - { - subscription = _source.Subscribe(observer); - _count++; - _connection ??= _source.Connect(); - } - - return Disposable.Create(() => - { - subscription.Dispose(); - lock (_gate) - { - _count--; - if (_count == 0) - { - _connection?.Dispose(); - _connection = null; - } - } - }); - } - } - - /// - /// Tracks auto-connect subscription state. - /// - /// The value type. - private sealed class AutoConnectGate - { - /// - /// Synchronizes auto-connect state. - /// - private readonly object _gate = new(); - - /// - /// Connectable signal being auto-connected. - /// - private readonly ConnectableSignal _source; - - /// - /// Number of observers required before connecting. - /// - private readonly int _subscriberCount; - - /// - /// Current subscriber count. - /// - private int _count; - - /// - /// Value indicating whether the source has connected. - /// - private bool _connected; - - /// - /// Initializes a new instance of the class. - /// - /// Connectable signal being auto-connected. - /// Number of observers required before connecting. - private AutoConnectGate(ConnectableSignal source, int subscriberCount) - { - _source = source; - _subscriberCount = subscriberCount; - } - - /// - /// Creates an auto-connect gate for a connectable signal. - /// - /// Connectable signal being auto-connected. - /// Number of observers required before connecting. - /// An auto-connect gate. - public static AutoConnectGate For(ConnectableSignal source, int subscriberCount) => - new(source, subscriberCount); - - /// - /// Subscribes an observer and connects when the threshold is reached. - /// - /// Observer to subscribe. - /// A disposable that removes the observer subscription. - public IDisposable Subscribe(IObserver observer) - { - var subscription = _source.Subscribe(observer); - lock (_gate) - { - _count++; - if (!_connected && _count >= _subscriberCount) - { - _connected = true; - _source.Connect(); - } - } - - return subscription; - } - } -} diff --git a/src/ReactiveUI.Primitives/Core/Moment{T}.cs b/src/ReactiveUI.Primitives/Core/Moment{T}.cs index cb0224d..29b596e 100644 --- a/src/ReactiveUI.Primitives/Core/Moment{T}.cs +++ b/src/ReactiveUI.Primitives/Core/Moment{T}.cs @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Core; /// /// The captured value type. [Serializable] -public readonly struct Moment : IEquatable> +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public readonly partial struct Moment : IEquatable> { /// /// Initializes a new instance of the struct. diff --git a/src/ReactiveUI.Primitives/Core/Spark{T}.cs b/src/ReactiveUI.Primitives/Core/Spark{T}.cs index 7f26636..ae0ff65 100644 --- a/src/ReactiveUI.Primitives/Core/Spark{T}.cs +++ b/src/ReactiveUI.Primitives/Core/Spark{T}.cs @@ -14,7 +14,8 @@ namespace ReactiveUI.Primitives.Core /// /// The type of the elements received by the observer. [Serializable] - public abstract class Spark : IEquatable> + [DebuggerDisplay("{DebuggerDisplay,nq}")] + public abstract partial class Spark : IEquatable> { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs b/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs index d5bdb0f..f25a735 100644 --- a/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs +++ b/src/ReactiveUI.Primitives/Core/TimeInterval{T}.cs @@ -12,7 +12,8 @@ namespace ReactiveUI.Primitives.Core; /// /// The type of the value being annotated with time interval information. [Serializable] -public readonly struct TimeInterval : IEquatable> +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public readonly partial struct TimeInterval : IEquatable> { /// /// Initializes a new instance of the struct. diff --git a/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs b/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs new file mode 100644 index 0000000..90ec635 --- /dev/null +++ b/src/ReactiveUI.Primitives/DebuggerDisplay.Partials.cs @@ -0,0 +1,309 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +#pragma warning disable SA1201 // Debugger display partial members are grouped by namespace. +#pragma warning disable SA1402 // Debugger display partial members are intentionally grouped in one support file. +#pragma warning disable SA1403 // Debugger display partial members span the public namespaces that need the shared pattern. +#pragma warning disable SA1601 // Primary type declarations carry the public documentation. + +namespace ReactiveUI.Primitives +{ + public sealed partial class ConnectableSignal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public readonly partial struct RxVoid + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +} + +namespace ReactiveUI.Primitives.Concurrency +{ + public sealed partial class CurrentThreadSequencer + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + +#if WINDOWS + public partial class DispatcherSequencer + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +#endif + + public sealed partial class ImmediateSequencer + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public abstract partial class ScheduledItem + where TAbsolute : IComparable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class ScheduledItem + where TAbsolute : IComparable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class SequencerQueue + where TAbsolute : IComparable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class TaskPoolSequencer + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class TestClock + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class ThreadPoolSequencer + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class VirtualClock + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public abstract partial class VirtualTimeSequencer + where TAbsolute : IComparable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public abstract partial class VirtualTimeSequencerBase + where TAbsolute : IComparable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +} + +namespace ReactiveUI.Primitives.Core +{ + public readonly partial struct Moment + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public abstract partial class Spark + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public readonly partial struct TimeInterval + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +} + +namespace ReactiveUI.Primitives.Disposables +{ + public sealed partial class AssignmentSlot + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class BooleanDisposable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class CancellationDisposable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class MultipleDisposable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class Pocket + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class SingleDisposable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class SingleReplaceableDisposable + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class Slot + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +} + +namespace ReactiveUI.Primitives.Signals +{ + public partial class AsyncSignal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class CommandSignal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public sealed partial class ReadOnlyState + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class ReplaySignal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class Signal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } + + public partial class StateSignal + { + /// + /// Gets the debugger display text. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string DebuggerDisplay => ToString() ?? string.Empty; + } +} diff --git a/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs b/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs index 84b2ba0..bce827d 100644 --- a/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs +++ b/src/ReactiveUI.Primitives/Disposables/AssignmentSlot.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// Primitives alias for a single-assignment disposable slot. /// -public sealed class AssignmentSlot : SingleDisposable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class AssignmentSlot : SingleDisposable { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/Disposables/BooleanDisposable.cs b/src/ReactiveUI.Primitives/Disposables/BooleanDisposable.cs index 5723f38..a7256fc 100644 --- a/src/ReactiveUI.Primitives/Disposables/BooleanDisposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/BooleanDisposable.cs @@ -7,8 +7,9 @@ namespace ReactiveUI.Primitives.Disposables; /// /// BooleanDisposable. /// -/// -public sealed class BooleanDisposable : IsDisposed +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class BooleanDisposable : IsDisposed { /// /// Gets a value indicating whether this instance is disposed. diff --git a/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs b/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs index 81e1968..805635e 100644 --- a/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/CancellationDisposable.cs @@ -7,8 +7,9 @@ namespace ReactiveUI.Primitives.Disposables; /// /// CancellationDisposable. /// -/// -public sealed class CancellationDisposable : IsDisposed +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class CancellationDisposable : IsDisposed { /// /// Cancellation source owned by this disposable. @@ -19,7 +20,7 @@ public sealed class CancellationDisposable : IsDisposed /// Initializes a new instance of the class. /// /// The CTS. - /// cts. + /// cts. public CancellationDisposable(CancellationTokenSource cts) => _cts = cts ?? throw new ArgumentNullException(nameof(cts)); /// diff --git a/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs b/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs index 03e9e7e..36d10a3 100644 --- a/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/MultipleDisposable.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// A disposable pocket that contains a set of disposables and disposes them together. /// -public class MultipleDisposable : IsDisposed +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class MultipleDisposable : IsDisposed { /// /// Initial capacity for overflow disposable storage. diff --git a/src/ReactiveUI.Primitives/Disposables/Pocket.cs b/src/ReactiveUI.Primitives/Disposables/Pocket.cs index aff6be0..4b987d1 100644 --- a/src/ReactiveUI.Primitives/Disposables/Pocket.cs +++ b/src/ReactiveUI.Primitives/Disposables/Pocket.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// Primitives alias for a group of disposables that are disposed together. /// -public sealed class Pocket : MultipleDisposable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class Pocket : MultipleDisposable { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs index 55143f8..db0c0fb 100644 --- a/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/SingleDisposable.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// Single-assignment disposable slot. /// -public class SingleDisposable : IsDisposed +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class SingleDisposable : IsDisposed { /// /// Marker used once the slot has been disposed. diff --git a/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs index 73518db..92e7538 100644 --- a/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/SingleReplaceableDisposable.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// SingleReplaceableDisposable. /// -public class SingleReplaceableDisposable : IsDisposed +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class SingleReplaceableDisposable : IsDisposed { /// /// Marker used once the slot has been disposed. diff --git a/src/ReactiveUI.Primitives/Disposables/Slot.cs b/src/ReactiveUI.Primitives/Disposables/Slot.cs index 4c2ccab..ed54b55 100644 --- a/src/ReactiveUI.Primitives/Disposables/Slot.cs +++ b/src/ReactiveUI.Primitives/Disposables/Slot.cs @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Disposables; /// /// Primitives alias for a replaceable disposable slot. /// -public sealed class Slot : SingleReplaceableDisposable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class Slot : SingleReplaceableDisposable { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/LinqMixins.cs b/src/ReactiveUI.Primitives/LinqMixins.cs index c76a886..f615299 100644 --- a/src/ReactiveUI.Primitives/LinqMixins.cs +++ b/src/ReactiveUI.Primitives/LinqMixins.cs @@ -20,7 +20,7 @@ public static partial class LinqMixins /// The source. /// The selector. /// A ISignals. - /// + /// /// source /// or /// selector. @@ -35,8 +35,8 @@ public static IObservable Select(this IObservableThe source. /// The count of each buffer. /// An Signals sequence of buffers. - /// source. - /// count. + /// source. + /// count. public static IObservable> Buffer(this IObservable source, int count) { if (source == null) @@ -60,8 +60,8 @@ public static IObservable> Buffer(this IObservableLength of each buffer before being skipped. /// Number of elements to skip between creation of consecutive buffers. /// An Signals sequence of buffers taking the count then skipping the skipped value, the sequecnce is then repeated. - /// source. - /// + /// source. + /// /// count /// or /// skip. @@ -122,7 +122,7 @@ public static SingleDisposable DisposeWith(this IDisposable disposable, Action? /// The source. /// The predicate. /// An ISignals. - /// + /// /// source /// or /// predicate. diff --git a/src/ReactiveUI.Primitives/RxVoid.cs b/src/ReactiveUI.Primitives/RxVoid.cs index 5cd8e95..99654dd 100644 --- a/src/ReactiveUI.Primitives/RxVoid.cs +++ b/src/ReactiveUI.Primitives/RxVoid.cs @@ -8,7 +8,8 @@ namespace ReactiveUI.Primitives; /// A Reactive Void. /// [Serializable] -public readonly struct RxVoid : IEquatable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public readonly partial struct RxVoid : IEquatable { /// /// Gets the single value. diff --git a/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs index e824256..35c8650 100644 --- a/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/AsyncSignal{T}.cs @@ -11,8 +11,9 @@ namespace ReactiveUI.Primitives.Signals; /// AsyncSignal. /// /// The Type. -/// -public class AsyncSignal : IAwaitSignal +/// +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class AsyncSignal : IAwaitSignal { /// /// Executes the new operation. @@ -54,7 +55,7 @@ public class AsyncSignal : IAwaitSignal /// /// The value. /// - /// AsyncSubject is not completed yet. + /// AsyncSubject is not completed yet. public T Value { get @@ -140,7 +141,7 @@ public void OnCompleted(Action continuation) /// Called when [error]. /// /// The error. - /// error. + /// error. public void OnError(Exception error) { if (error == null) @@ -190,7 +191,7 @@ public void OnNext(T value) /// /// The observer. /// A Disposable. - /// observer. + /// observer. public IDisposable Subscribe(IObserver observer) { if (observer == null) diff --git a/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/BehaviorSignal{T}.cs similarity index 93% rename from src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs rename to src/ReactiveUI.Primitives/Signal/BehaviorSignal{T}.cs index b836024..e41ab61 100644 --- a/src/ReactiveUI.Primitives/Signal/BehaviourSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/BehaviorSignal{T}.cs @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Signals; /// BehaviourSignal. /// /// The Type. -public class BehaviourSignal : ISignal +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public class BehaviorSignal : ISignal { /// /// Executes the new operation. @@ -42,10 +43,10 @@ public class BehaviourSignal : ISignal private Exception? _lastError; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// The default value. - public BehaviourSignal(T defaultValue) + public BehaviorSignal(T defaultValue) { _lastValue = defaultValue; } @@ -90,6 +91,18 @@ public T Value /// public bool IsDisposed { get; private set; } + /// + /// Gets the string representation of this object for debugger display purposes. + /// + [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] + private string? DebuggerDisplay + { + get + { + return ToString(); + } + } + /// /// Tries to get the current value or throws an exception. /// @@ -297,7 +310,7 @@ private sealed class ObserverHandler : IDisposable /// /// Stores state for the signal implementation. /// - private BehaviourSignal? _subject; + private BehaviorSignal? _subject; /// /// Stores state for the signal implementation. @@ -309,7 +322,7 @@ private sealed class ObserverHandler : IDisposable /// /// The subject value. /// The observer value. - public ObserverHandler(BehaviourSignal subject, IObserver observer) + public ObserverHandler(BehaviorSignal subject, IObserver observer) { _subject = subject; _observer = observer; diff --git a/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs b/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs index 46f9991..36dd6b8 100644 --- a/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs +++ b/src/ReactiveUI.Primitives/Signal/CommandSignal{TResult}.cs @@ -2,15 +2,14 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. -#pragma warning disable SA1501 - namespace ReactiveUI.Primitives.Signals; /// /// Minimal reactive command that gates execution and publishes result, fault, and running state streams. /// /// The command result type. -public sealed class CommandSignal : IDisposable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class CommandSignal : IDisposable { /// /// Stores state for the signal implementation. diff --git a/src/ReactiveUI.Primitives/Signal/IAwaitSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/IAwaitSignal{T}.cs index 61f13b7..71a3b97 100644 --- a/src/ReactiveUI.Primitives/Signal/IAwaitSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/IAwaitSignal{T}.cs @@ -8,7 +8,7 @@ namespace ReactiveUI.Primitives.Signals; /// IAwaitSignal. /// /// The Type of Signal. -/// +/// /// public interface IAwaitSignal : ISignal, System.Runtime.CompilerServices.INotifyCompletion { diff --git a/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs b/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs index 65c6008..d585db3 100644 --- a/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/ReadOnlyState{T}.cs @@ -2,15 +2,14 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. -#pragma warning disable SA1116, SA1117, SA1204, SA1402, SA1501, SA1611, SA1615, SA1618 - namespace ReactiveUI.Primitives.Signals; /// /// Read-only latest-value signal for projected or externally owned state. /// /// The value type. -public sealed class ReadOnlyState : IObservable, IDisposable +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public sealed partial class ReadOnlyState : IObservable, IDisposable { /// /// Stores state for the signal implementation. @@ -49,9 +48,11 @@ public ReadOnlyState(IObservable source, T initialValue) public IObservable Changed => _inner; /// - /// Executes the Subscribe operation. + /// Notifies the provider that an observer is to receive notifications. /// - /// The result. + /// The object that is to receive notifications. + /// A reference to an interface that allows observers to stop receiving notifications before the provider has + /// finished sending them. public IDisposable Subscribe(IObserver observer) => _inner.Subscribe(observer); /// @@ -63,42 +64,3 @@ public void Dispose() _inner.Dispose(); } } - -/// -/// State projection helpers. -/// -public static class StateSignalMixins -{ - /// - /// Projects an observable sequence into a read-only state signal. - /// - /// The source value type. - /// The projected value type. - /// The source sequence. - /// The initial projected value. - /// The projection function. - /// A read-only projected state. - public static ReadOnlyState ToReadOnlyState( - this IObservable source, - TResult initialValue, - Func selector) - { - if (source == null) - { - throw new ArgumentNullException(nameof(source)); - } - - if (selector == null) - { - throw new ArgumentNullException(nameof(selector)); - } - - return new ReadOnlyState( - ReactiveUI.Primitives.Signals.Signal.CreateSafe( - observer => source.Subscribe( - value => observer.OnNext(selector(value)), - observer.OnError, - observer.OnCompleted)), - initialValue); - } -} diff --git a/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs b/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs index 5bd9c14..49fec04 100644 --- a/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/ReplaySignal{T}.cs @@ -12,7 +12,8 @@ namespace ReactiveUI.Primitives.Signals; /// ReplaySignal. /// /// The Type. -public class ReplaySignal : ISignal +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class ReplaySignal : ISignal { /// /// Stores state for the signal implementation. diff --git a/src/ReactiveUI.Primitives/Signal/Signal{T}.cs b/src/ReactiveUI.Primitives/Signal/Signal{T}.cs index 90fadc0..323c24f 100644 --- a/src/ReactiveUI.Primitives/Signal/Signal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/Signal{T}.cs @@ -12,7 +12,8 @@ namespace ReactiveUI.Primitives.Signals; /// Subject. /// /// The Type. -public class Signal : ISignal +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class Signal : ISignal { /// /// Stores state for the signal implementation. diff --git a/src/ReactiveUI.Primitives/Signal/StateSignalMixins.cs b/src/ReactiveUI.Primitives/Signal/StateSignalMixins.cs new file mode 100644 index 0000000..b69313d --- /dev/null +++ b/src/ReactiveUI.Primitives/Signal/StateSignalMixins.cs @@ -0,0 +1,44 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Signals; + +/// +/// State projection helpers. +/// +public static class StateSignalMixins +{ + /// + /// Projects an observable sequence into a read-only state signal. + /// + /// The source value type. + /// The projected value type. + /// The source sequence. + /// The initial projected value. + /// The projection function. + /// A read-only projected state. + public static ReadOnlyState ToReadOnlyState( + this IObservable source, + TResult initialValue, + Func selector) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + return new ReadOnlyState( + Signal.CreateSafe( + observer => source.Subscribe( + value => observer.OnNext(selector(value)), + observer.OnError, + observer.OnCompleted)), + initialValue); + } +} diff --git a/src/ReactiveUI.Primitives/Signal/StateSignal{T}.cs b/src/ReactiveUI.Primitives/Signal/StateSignal{T}.cs index 970b515..7dfb0cb 100644 --- a/src/ReactiveUI.Primitives/Signal/StateSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signal/StateSignal{T}.cs @@ -4,15 +4,14 @@ using ReactiveUI.Primitives; -#pragma warning disable SA1501 - namespace ReactiveUI.Primitives.Signals; /// /// Mutable latest-value signal with a ReactiveUI.Primitives name for reactive-property parity. /// /// The value type. -public class StateSignal : BehaviourSignal +[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] +public partial class StateSignal : BehaviorSignal { /// /// Initializes a new instance of the class. diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs index ddd469d..3e5fbbf 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs @@ -8,8 +8,6 @@ using ReactiveUI.Primitives.Signals; using ReactiveUI.Primitives.Signals.Core; -#pragma warning disable SA1107, SA1116, SA1117, SA1501, SA1611, SA1615, SA1618 - namespace ReactiveUI.Primitives; /// @@ -19,8 +17,15 @@ namespace ReactiveUI.Primitives; public static partial class LinqMixins { /// - /// Maps every value with . + /// Projects each element of an observable sequence into a new form. /// + /// The type of the elements in the source sequence. + /// The type of the elements in the result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each element. + /// An observable sequence whose elements are the result of invoking the transform function on each element of the + /// source sequence. + /// or is . public static IObservable Map(this IObservable source, Func selector) { if (source == null) @@ -37,8 +42,18 @@ public static IObservable Map(this IObservable - /// Maps every value with explicit state to avoid closure allocations in hot paths. + /// Projects each element of an observable sequence into a new form by incorporating state that is passed to the + /// selector function. /// + /// The type of the elements in the source sequence. + /// The type of the state used in the selector function. + /// The type of the elements in the result sequence. + /// An observable sequence of elements to project. + /// The state to pass to the selector function. + /// A transform function to apply to each source element along with the state. + /// An observable sequence whose elements are the result of invoking the transform function on each element of the + /// source along with the state. + /// is . public static IObservable MapWith(this IObservable source, TState state, Func selector) { if (selector == null) @@ -50,8 +65,14 @@ public static IObservable MapWith(this IObser } /// - /// Keeps values that satisfy . + /// Filters an observable sequence to include only elements that satisfy a specified condition. /// + /// The type of elements in the observable sequence. + /// The source observable sequence to filter. + /// A function to test each element for a condition. + /// An observable sequence that contains elements from the input sequence that satisfy the condition specified by + /// . + /// or is . public static IObservable Keep(this IObservable source, Func predicate) { if (source == null) @@ -68,8 +89,16 @@ public static IObservable Keep(this IObservable source, Func p } /// - /// Keeps values that satisfy a stateful predicate. + /// Filters elements from an observable sequence based on a predicate that uses external state. /// + /// The type of elements in the source sequence. + /// The type of the state parameter passed to the predicate. + /// The source observable sequence to filter. + /// The state value to pass to the predicate for each element. + /// A function to test each element along with the state; returns to keep the element, to filter it out. + /// An observable sequence containing only the elements from the source sequence that satisfy the predicate. + /// is . public static IObservable KeepWith(this IObservable source, TState state, Func predicate) { if (predicate == null) @@ -81,8 +110,12 @@ public static IObservable KeepWith(this IObservable source, TSt } /// - /// Keeps non-null values and narrows nullable references. + /// Filters out null values from the source observable sequence, emitting only non-null values. /// + /// The type of elements in the observable sequence. + /// The source observable sequence to filter. + /// An observable sequence that emits only non-null values from the source sequence. + /// is null. public static IObservable KeepNotNull(this IObservable source) where T : class { @@ -106,8 +139,12 @@ public static IObservable KeepNotNull(this IObservable source) } /// - /// Projects only values assignable to . + /// Filters values to those assignable to . /// + /// The result value type. + /// The source sequence. + /// A sequence containing only values assignable to . + /// is . [System.Diagnostics.CodeAnalysis.SuppressMessage( "Major Code Smell", "S4018:Generic methods should provide type parameters", @@ -134,8 +171,12 @@ public static IObservable OfType(this IObservable sou } /// - /// Casts every value to . + /// Casts each source value to . /// + /// The result value type. + /// The source sequence. + /// A sequence containing each value cast to . + /// is . [System.Diagnostics.CodeAnalysis.SuppressMessage( "Major Code Smell", "S4018:Generic methods should provide type parameters", @@ -151,8 +192,13 @@ public static IObservable Cast(this IObservable sourc } /// - /// Runs a side effect for every value while preserving the source values. + /// Invokes an action for each value while preserving the original sequence. /// + /// The value type. + /// The source sequence. + /// The action to invoke for each value. + /// The source values after the action has run. + /// is . public static IObservable Tap(this IObservable source, Action onNext) { if (onNext == null) @@ -168,8 +214,15 @@ public static IObservable Tap(this IObservable source, Action onNext } /// - /// Runs a stateful side effect for every value while preserving the source values. + /// Invokes a stateful action for each value while preserving the original sequence. /// + /// The value type. + /// The state type. + /// The source sequence. + /// The state passed to . + /// The action to invoke for each value. + /// The source values after the action has run. + /// is . public static IObservable TapWith(this IObservable source, TState state, Action onNext) { if (onNext == null) @@ -181,8 +234,15 @@ public static IObservable TapWith(this IObservable source, TSta } /// - /// Emits accumulated state for every source value. + /// Emits the accumulated state after each source value. /// + /// The source value type. + /// The accumulated value type. + /// The source sequence. + /// The initial accumulated value. + /// The function that combines the current state with the next source value. + /// A sequence of intermediate accumulated values. + /// or is . public static IObservable Scan(this IObservable source, TAccumulate seed, Func accumulator) { if (source == null) @@ -210,8 +270,15 @@ public static IObservable Scan(this IObservab } /// - /// Emits one final accumulated value when the source completes. + /// Emits the final accumulated state when the source completes. /// + /// The source value type. + /// The accumulated value type. + /// The source sequence. + /// The initial accumulated value. + /// The function that combines the current state with the next source value. + /// A sequence that emits one accumulated value on completion. + /// or is . public static IObservable Fold(this IObservable source, TAccumulate seed, Func accumulator) { if (source == null) @@ -241,6 +308,12 @@ public static IObservable Fold(this IObservab /// /// Emits at most values before completing. /// + /// The value type. + /// The source sequence. + /// The maximum number of values to emit. + /// A sequence containing at most source values. + /// is . + /// is less than zero. public static IObservable Take(this IObservable source, int count) { if (source == null) @@ -285,8 +358,14 @@ public static IObservable Take(this IObservable source, int count) } /// - /// Skips values. + /// Skips the first source values. /// + /// The value type. + /// The source sequence. + /// The number of values to skip. + /// A sequence containing source values after the skipped prefix. + /// is . + /// is less than zero. public static IObservable Skip(this IObservable source, int count) { if (source == null) @@ -319,14 +398,23 @@ public static IObservable Skip(this IObservable source, int count) } /// - /// Suppresses duplicate values according to the comparer. + /// Suppresses values that have already been observed. /// + /// The value type. + /// The source sequence. + /// A sequence containing the first occurrence of each source value. + /// is . public static IObservable Distinct(this IObservable source) => source.Distinct(null); /// - /// Suppresses duplicate values according to the comparer. + /// Suppresses values that have already been observed using the supplied comparer. /// + /// The value type. + /// The source sequence. + /// The comparer used to identify duplicate values. + /// A sequence containing the first occurrence of each source value. + /// is . public static IObservable Distinct(this IObservable source, IEqualityComparer? comparer) { if (source == null) @@ -353,14 +441,23 @@ public static IObservable Distinct(this IObservable source, IEqualityCo } /// - /// Suppresses adjacent duplicate values according to the comparer. + /// Suppresses adjacent duplicate values. /// + /// The value type. + /// The source sequence. + /// A sequence with adjacent duplicates removed. + /// is . public static IObservable DistinctUntilChanged(this IObservable source) => source.DistinctUntilChanged(null); /// - /// Suppresses adjacent duplicate values according to the comparer. + /// Suppresses adjacent duplicate values using the supplied comparer. /// + /// The value type. + /// The source sequence. + /// The comparer used to compare adjacent values. + /// A sequence with adjacent duplicates removed. + /// is . public static IObservable DistinctUntilChanged(this IObservable source, IEqualityComparer? comparer) { if (source == null) @@ -391,8 +488,12 @@ public static IObservable DistinctUntilChanged(this IObservable source, } /// - /// Converts values and terminal messages into sparks. + /// Converts source values and terminal notifications into values. /// + /// The value type. + /// The source sequence. + /// A sequence of spark values representing source notifications; terminal sparks are followed by completion. + /// is . public static IObservable> Sparkify(this IObservable source) { if (source == null) @@ -415,8 +516,12 @@ public static IObservable> Sparkify(this IObservable source) } /// - /// Converts spark values back into source notifications. + /// Converts values back into observer notifications. /// + /// The value type. + /// The spark sequence. + /// A sequence represented by the supplied spark values. + /// is . public static IObservable Unspark(this IObservable> source) { if (source == null) @@ -431,8 +536,12 @@ public static IObservable Unspark(this IObservable> source) } /// - /// Concatenates a signal of signals. + /// Subscribes to inner sequences one at a time in source order. /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence that emits each inner sequence after the previous one completes. + /// is . public static IObservable Concat(this IObservable> sources) { if (sources == null) @@ -521,14 +630,22 @@ void Drain() } /// - /// Concatenates this signal followed by . + /// Concatenates two sequences. /// + /// The value type. + /// The first sequence. + /// The second sequence. + /// A sequence that emits after completes. public static IObservable Concat(this IObservable first, IObservable second) => Signal.Concat(first, second); /// - /// Merges a signal of signals. + /// Subscribes to all inner sequences and forwards their values as they arrive. /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence containing values from all inner sequences. + /// is . public static IObservable Merge(this IObservable> sources) { if (sources == null) @@ -597,8 +714,12 @@ void TryComplete() } /// - /// Races the supplied source signals and mirrors the first source to emit any notification. + /// Mirrors the first inner sequence to produce any notification. /// + /// The value type. + /// The competing inner sequences. + /// A sequence that mirrors the winning inner sequence. + /// is . public static IObservable Race(this IObservable> sources) { if (sources == null) @@ -610,8 +731,16 @@ public static IObservable Race(this IObservable> sources) } /// - /// Zips two signals by waiting for one value from both sides. + /// Combines paired values from two sequences, completing when no more pairs can be formed. /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines paired values. + /// A sequence containing one result for each available value pair. + /// , , or is . public static IObservable Zip(this IObservable left, IObservable right, Func selector) { if (left == null) @@ -629,7 +758,7 @@ public static IObservable Zip(this IObservable< throw new ArgumentNullException(nameof(selector)); } - if (left is RangeSignal leftRange && right is RangeSignal rightRange) + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) { return new RangeZipSignal(leftRange, rightRange, (Func)(object)selector); } @@ -638,8 +767,16 @@ public static IObservable Zip(this IObservable< } /// - /// Combines the latest values after both sides have produced at least one value. + /// Combines the latest values after both sequences have produced at least one value. /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines the latest values. + /// A sequence containing selected latest-value combinations. + /// , , or is . public static IObservable CombineLatest(this IObservable left, IObservable right, Func selector) { if (left == null) @@ -657,12 +794,26 @@ public static IObservable CombineLatest(this IO throw new ArgumentNullException(nameof(selector)); } + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return CreateRangeCombineLatestSignal(leftRange, rightRange, (Func)(object)selector); + } + return Signal.CreateSafe(observer => new CombineLatestCoordinator(observer, selector).Run(left, right)); } /// - /// Combines each left value with the latest right value after the right side has produced one value. + /// Combines each left value with the latest right value after the right sequence has produced a value. /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The triggering sequence. + /// The sequence that supplies the latest value. + /// The function that combines the left value with the latest right value. + /// A sequence containing selected left/latest-right combinations. + /// Left values produced before the first right value are ignored. + /// , , or is . public static IObservable WithLatest(this IObservable left, IObservable right, Func selector) { if (left == null) @@ -680,6 +831,11 @@ public static IObservable WithLatest(this IObse throw new ArgumentNullException(nameof(selector)); } + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return CreateRangeWithLatestSignal(leftRange, rightRange, (Func)(object)selector); + } + return Signal.CreateSafe(observer => { var gate = new OperatorGate(); @@ -719,8 +875,12 @@ public static IObservable WithLatest(this IObse } /// - /// Switches to the most recent inner signal. + /// Switches to the most recent inner sequence. /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence that mirrors only the latest inner sequence. + /// is . public static IObservable Switch(this IObservable> sources) { if (sources == null) @@ -732,8 +892,14 @@ public static IObservable Switch(this IObservable> sources) } /// - /// Retries the source up to times after failures. + /// Resubscribes to the source after an error up to times. /// + /// The value type. + /// The source sequence. + /// The maximum number of retry attempts after the initial subscription. + /// A sequence that retries the source before forwarding the final error. + /// is . + /// is less than zero. public static IObservable Retry(this IObservable source, int retryCount) { if (source == null) @@ -776,14 +942,24 @@ void SubscribeNext() } /// - /// Recovers from errors by switching to a handler-provided signal. + /// Recovers from errors by switching to a handler-provided sequence. /// + /// The value type. + /// The source sequence. + /// The function that creates the recovery sequence for an error. + /// A sequence that continues with the handler result after an error. + /// or is . public static IObservable Rescue(this IObservable source, Func> handler) => source.Catch(handler); /// - /// Continues with a fallback signal after an error. + /// Continues with a fallback sequence after an error. /// + /// The value type. + /// The source sequence. + /// The sequence to subscribe to after an error. + /// A sequence that resumes with after an error. + /// or is . public static IObservable Resume(this IObservable source, IObservable fallback) { if (fallback == null) @@ -795,14 +971,23 @@ public static IObservable Resume(this IObservable source, IObservable - /// Delays notifications by . + /// Delays source notifications by the specified duration. /// + /// The value type. + /// The source sequence. + /// The delay applied to each notification. + /// A sequence that forwards source notifications after the delay. public static IObservable Delay(this IObservable source, TimeSpan dueTime) => source.Delay(dueTime, null); /// - /// Delays notifications by . + /// Delays source notifications by the specified duration on a sequencer. /// + /// The value type. + /// The source sequence. + /// The delay applied to each notification. + /// The sequencer used to schedule delayed notifications. + /// A sequence that forwards source notifications after the delay. public static IObservable Delay(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) { if (source == null) @@ -825,14 +1010,23 @@ public static IObservable Delay(this IObservable source, TimeSpan dueTi } /// - /// Fails the signal if no terminal signal arrives before the timeout. + /// Fails the sequence if it does not terminate before the timeout. /// + /// The value type. + /// The source sequence. + /// The timeout duration. + /// A sequence that errors with when the timeout elapses first. public static IObservable Timeout(this IObservable source, TimeSpan dueTime) => source.Timeout(dueTime, null); /// - /// Fails the signal if no terminal signal arrives before the timeout. + /// Fails the sequence if it does not terminate before the sequencer timeout. /// + /// The value type. + /// The source sequence. + /// The timeout duration. + /// The sequencer used to schedule the timeout. + /// A sequence that errors with when the timeout elapses first. public static IObservable Timeout(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) { if (source == null) @@ -890,6 +1084,9 @@ public static IObservable Timeout(this IObservable source, TimeSpan due /// /// Collects all values into a list when the source completes. /// + /// The value type. + /// The source sequence. + /// A sequence that emits one list containing all source values. public static IObservable> CollectList(this IObservable source) { if (source == null) @@ -897,6 +1094,11 @@ public static IObservable> CollectList(this IObservable source) throw new ArgumentNullException(nameof(source)); } + if (source is RangeSignal range && CanReadRangeAs()) + { + return CreateRangeListSignal(range); + } + return Signal.CreateSafe>(observer => { var values = new List(); @@ -914,16 +1116,201 @@ public static IObservable> CollectList(this IObservable source) /// /// Collects all values into an array when the source completes. /// - public static IObservable CollectArray(this IObservable source) => - source.CollectList().Map(values => values.ToArray()); + /// The value type. + /// The source sequence. + /// A sequence that emits one array containing all source values. + public static IObservable CollectArray(this IObservable source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (source is RangeSignal range && CanReadRangeAs()) + { + return CreateRangeArraySignal(range); + } + + return Signal.CreateSafe(observer => + { + var values = new List(); + return source.Subscribe( + values.Add, + observer.OnError, + () => + { + observer.OnNext([.. values]); + observer.OnCompleted(); + }); + }); + } /// - /// Converts an enumerable to a signal. + /// Converts an enumerable sequence to a signal. /// + /// The value type. + /// The values to enumerate. + /// A signal that emits the enumerable values. public static IObservable ToSignal(this IEnumerable values) => Signal.FromEnumerable(values); /// - /// Converts an observable to a signal-compatible observable. + /// Converts an enumerable sequence to a signal that observes cancellation. /// + /// The value type. + /// The values to enumerate. + /// The token used to stop enumeration. + /// A signal that emits the enumerable values until enumeration completes or cancellation is requested. + public static IObservable ToSignal(this IEnumerable values, CancellationToken cancellationToken) => + Signal.FromEnumerable(values, cancellationToken); + + /// + /// Returns an observable sequence as a signal-compatible observable. + /// + /// The value type. + /// The source sequence. + /// The supplied source sequence. public static IObservable ToSignal(this IObservable source) => source ?? throw new ArgumentNullException(nameof(source)); + + /// + /// Creates the optimized range-backed combine-latest sequence. + /// + /// The result value type. + /// The left range source. + /// The right range source. + /// The function that combines range values. + /// The optimized combine-latest sequence. + private static IObservable CreateRangeCombineLatestSignal( + RangeSignal left, + RangeSignal right, + Func selector) => + Signal.CreateSafe(observer => + { + var leftValue = left.Start + left.Count - 1; + for (var i = 0; i < right.Count; i++) + { + observer.OnNext(selector(leftValue, right.Start + i)); + } + + observer.OnCompleted(); + return Disposable.Empty; + }); + + /// + /// Creates the optimized range-backed with-latest sequence. + /// + /// The result value type. + /// The left range source. + /// The right range source. + /// The function that combines range values. + /// The optimized with-latest sequence. + private static IObservable CreateRangeWithLatestSignal( + RangeSignal left, + RangeSignal right, + Func selector) => + Signal.CreateSafe(observer => + { + var rightValue = right.Start + right.Count - 1; + for (var i = 0; i < left.Count; i++) + { + observer.OnNext(selector(left.Start + i, rightValue)); + } + + observer.OnCompleted(); + return Disposable.Empty; + }); + + /// + /// Creates a range-backed list signal without per-value subscriptions. + /// + /// The result element type. + /// The source range. + /// The list signal. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The generic type is validated by the caller before creating a range-backed signal.")] + private static IObservable> CreateRangeListSignal(RangeSignal range) + { + if (typeof(T) == typeof(int)) + { + return (IObservable>)(object)Signal.CreateSafe>(observer => + { + var values = new List(range.Count); + for (var i = 0; i < range.Count; i++) + { + values.Add(range.Start + i); + } + + observer.OnNext(values); + observer.OnCompleted(); + return Disposable.Empty; + }); + } + + return Signal.CreateSafe>(observer => + { + var values = new List(range.Count); + for (var i = 0; i < range.Count; i++) + { + values.Add((T)(object)(range.Start + i)); + } + + observer.OnNext(values); + observer.OnCompleted(); + return Disposable.Empty; + }); + } + + /// + /// Creates a range-backed array signal without per-value subscriptions. + /// + /// The result element type. + /// The source range. + /// The array signal. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The generic type is validated by the caller before creating a range-backed signal.")] + private static IObservable CreateRangeArraySignal(RangeSignal range) + { + if (typeof(T) == typeof(int)) + { + return (IObservable)(object)Signal.CreateSafe(observer => + { + var values = new int[range.Count]; + for (var i = 0; i < values.Length; i++) + { + values[i] = range.Start + i; + } + + observer.OnNext(values); + observer.OnCompleted(); + return Disposable.Empty; + }); + } + + return Signal.CreateSafe(observer => + { + var values = new T[range.Count]; + for (var i = 0; i < values.Length; i++) + { + values[i] = (T)(object)(range.Start + i); + } + + observer.OnNext(values); + observer.OnCompleted(); + return Disposable.Empty; + }); + } + + /// + /// Determines whether a generic observer type can receive boxed range integers. + /// + /// The observer value type. + /// when the cast is valid. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The method is a generic type test used by range fast paths.")] + private static bool CanReadRangeAs() => typeof(T).IsAssignableFrom(typeof(int)); } diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.AggregateHelpers.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.AggregateHelpers.cs index 44cba1f..3dd02d7 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.AggregateHelpers.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.AggregateHelpers.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for full license information. using ReactiveUI.Primitives.Core; +using ReactiveUI.Primitives.Disposables; +using ReactiveUI.Primitives.Signals.Core; namespace ReactiveUI.Primitives; @@ -139,6 +141,13 @@ public IDisposable Subscribe(IObserver observer) throw new ArgumentNullException(nameof(observer)); } + if (_source is RangeSignal range) + { + observer.OnNext(range.Count); + observer.OnCompleted(); + return Disposable.Empty; + } + if (_source is ICountSource countSource) { return countSource.SubscribeCount(observer); @@ -224,6 +233,13 @@ public IDisposable Subscribe(IObserver observer) throw new ArgumentNullException(nameof(observer)); } + if (_source is RangeSignal range) + { + observer.OnNext(range.Count); + observer.OnCompleted(); + return Disposable.Empty; + } + if (_source is ICountSource countSource) { return countSource.SubscribeLongCount(observer); @@ -309,6 +325,13 @@ public IDisposable Subscribe(IObserver observer) throw new ArgumentNullException(nameof(observer)); } + if (_source is RangeSignal) + { + observer.OnNext(true); + observer.OnCompleted(); + return Disposable.Empty; + } + var sink = new AnyObserver(observer); sink.SetSubscription(_source.Subscribe(sink)); return sink; diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.Helpers.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.Helpers.cs index 1cce3bc..e0f98cf 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.Helpers.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.Helpers.cs @@ -5,8 +5,6 @@ using ReactiveUI.Primitives.Concurrency; using ReactiveUI.Primitives.Disposables; -#pragma warning disable SA1107, SA1116, SA1117, SA1501, SA1611, SA1615, SA1618 - namespace ReactiveUI.Primitives; /// @@ -199,8 +197,9 @@ public IDisposable Subscribe(IObserver observer) } /// - /// Shared disposable sink for single-source terminal operators. + /// Abstract base class for observers that manage a single upstream subscription. /// + /// The type of elements observed. private abstract class SingleSourceObserver : IObserver, IDisposable { /// diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs index 931924e..2416280 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs @@ -6,8 +6,7 @@ using ReactiveUI.Primitives.Core; using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.Signals; - -#pragma warning disable SA1107, SA1116, SA1117, SA1501, SA1611, SA1615, SA1618 +using ReactiveUI.Primitives.Signals.Core; namespace ReactiveUI.Primitives; @@ -19,11 +18,21 @@ public static partial class LinqMixins /// /// Prepends a value before the source sequence. Alias of using Primitives vocabulary. /// + /// The value type. + /// The source sequence. + /// The value to emit before the source. + /// A sequence that emits before the source values. + /// is . public static IObservable Lead(this IObservable source, T value) => source.Prepend(value); /// /// Prepends a value before the source sequence. /// + /// The value type. + /// The source sequence. + /// The value to emit before the source. + /// A sequence that emits before the source values. + /// is . public static IObservable Prepend(this IObservable source, T value) { if (source == null) @@ -37,6 +46,11 @@ public static IObservable Prepend(this IObservable source, T value) /// /// Appends a value after the source sequence completes. /// + /// The value type. + /// The source sequence. + /// The value to emit after the source completes. + /// A sequence that emits the source values followed by . + /// is . public static IObservable Append(this IObservable source, T value) { if (source == null) @@ -50,11 +64,21 @@ public static IObservable Append(this IObservable source, T value) /// /// Prepends a value before the source sequence using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The value to emit before the source. + /// A sequence that emits before the source values. + /// is . public static IObservable StartWith(this IObservable source, T value) => source.Prepend(value); /// /// Prepends values before the source sequence using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The values to emit before the source. + /// A sequence that emits before the source values. + /// or is . public static IObservable StartWith(this IObservable source, params T[] values) { if (source == null) @@ -73,6 +97,11 @@ public static IObservable StartWith(this IObservable source, params T[] /// /// Prepends values before the source sequence using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The values to emit before the source. + /// A sequence that emits before the source values. + /// or is . public static IObservable StartWith(this IObservable source, IEnumerable values) { if (source == null) @@ -91,16 +120,40 @@ public static IObservable StartWith(this IObservable source, IEnumerabl /// /// Returns the source as an observable. This is an identity adapter for BCL observable sources. /// + /// The value type. + /// The source sequence. + /// The supplied source sequence. + /// is . public static IObservable AsObservable(this IObservable source) => source ?? throw new ArgumentNullException(nameof(source)); /// /// Converts an enumerable sequence to a Primitives signal using the System.Reactive conversion name. /// + /// The value type. + /// The values to enumerate. + /// A signal that emits the enumerable values. + /// is . public static IObservable ToObservable(this IEnumerable values) => Signal.FromEnumerable(values); + /// + /// Converts an enumerable sequence to a Primitives signal using the System.Reactive conversion name. + /// + /// The value type. + /// The values to enumerate. + /// The token used to stop enumeration. + /// A signal that emits the enumerable values until enumeration completes or cancellation is requested. + /// is . + public static IObservable ToObservable(this IEnumerable values, CancellationToken cancellationToken) => + Signal.FromEnumerable(values, cancellationToken); + /// /// Schedules observer notifications on the supplied scheduler using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The sequencer used to deliver observer notifications. + /// The source sequence when is immediate; otherwise a sequence observed on the sequencer. + /// or is . public static IObservable ObserveOn(this IObservable source, ISequencer scheduler) { if (source == null) @@ -113,29 +166,59 @@ public static IObservable ObserveOn(this IObservable source, ISequencer throw new ArgumentNullException(nameof(scheduler)); } + if (scheduler == Sequencer.Immediate) + { + return source; + } + return source.WitnessOn(scheduler); } /// /// Alias for using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The delay before subscribing to the source. + /// A sequence that subscribes to the source after . + /// is . public static IObservable DelaySubscription(this IObservable source, TimeSpan dueTime) => source.DelayStart(dueTime, null); /// /// Alias for using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The delay before subscribing to the source. + /// The sequencer used to schedule the delayed subscription. + /// A sequence that subscribes to the source after . + /// is . public static IObservable DelaySubscription(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) => source.DelayStart(dueTime, scheduler); /// /// Runs a side effect for each source value using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The action to invoke for each value. + /// The source values after runs. + /// is . public static IObservable Do(this IObservable source, Action onNext) => source.Tap(onNext); /// - /// Runs side effects for source notifications using the System.Reactive operator name. + /// Invokes actions for each element in the observable sequence, for error notifications, and for successful + /// completion. /// + /// The type of the elements in the source sequence. + /// The source sequence. + /// Action to invoke for each element in the observable sequence. + /// Action to invoke upon exceptional termination of the observable sequence. + /// Action to invoke upon graceful termination of the observable sequence. + /// The source sequence with the side-effecting behavior applied. + /// , , , or is . public static IObservable Do(this IObservable source, Action onNext, Action onError, Action onCompleted) { if (source == null) @@ -179,12 +262,21 @@ public static IObservable Do(this IObservable source, Action onNext, /// /// Alias for using the System.Reactive operator name. /// + /// The value type. + /// The source sequence. + /// The function that creates the recovery sequence for an error. + /// A sequence that continues with the handler result after an error. + /// or is . public static IObservable Catch(this IObservable source, Func> handler) => source.Rescue(handler); /// /// Ignores all source values and only forwards terminal messages. /// + /// The value type. + /// The source sequence. + /// A sequence that forwards only error and completion notifications. + /// is . public static IObservable IgnoreValues(this IObservable source) { if (source == null) @@ -198,12 +290,21 @@ public static IObservable IgnoreValues(this IObservable source) /// /// Emits the supplied value if the source completes without values. /// + /// The value type. + /// The source sequence. + /// A sequence that emits when the source is empty. + /// is . public static IObservable DefaultIfEmpty(this IObservable source) => source.DefaultIfEmpty(default!); /// /// Emits the supplied value if the source completes without values. /// + /// The value type. + /// The source sequence. + /// The value to emit when the source is empty. + /// A sequence that emits when the source is empty. + /// is . public static IObservable DefaultIfEmpty(this IObservable source, T defaultValue) { if (source == null) @@ -217,12 +318,25 @@ public static IObservable DefaultIfEmpty(this IObservable source, T def /// /// Suppresses duplicate keys according to the comparer. /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// A sequence containing the first value for each observed key. + /// or is . public static IObservable DistinctBy(this IObservable source, Func keySelector) => source.DistinctBy(keySelector, null); /// /// Suppresses duplicate keys according to the comparer. /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// The comparer used to identify duplicate keys. + /// A sequence containing the first value for each observed key. + /// or is . public static IObservable DistinctBy(this IObservable source, Func keySelector, IEqualityComparer? comparer) { if (source == null) @@ -241,12 +355,25 @@ public static IObservable DistinctBy(this IObservable source, Fun /// /// Suppresses adjacent duplicate keys according to the comparer. /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// A sequence with adjacent duplicate keys removed. + /// or is . public static IObservable DistinctUntilChangedBy(this IObservable source, Func keySelector) => source.DistinctUntilChangedBy(keySelector, null); /// /// Suppresses adjacent duplicate keys according to the comparer. /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// The comparer used to compare adjacent keys. + /// A sequence with adjacent duplicate keys removed. + /// or is . public static IObservable DistinctUntilChangedBy(this IObservable source, Func keySelector, IEqualityComparer? comparer) { if (source == null) @@ -285,6 +412,11 @@ public static IObservable DistinctUntilChangedBy(this IObservable /// /// Emits values while the predicate remains true, then completes. /// + /// The value type. + /// The source sequence. + /// The function that determines whether to keep taking values. + /// A sequence that emits the leading values that satisfy . + /// or is . public static IObservable TakeWhile(this IObservable source, Func predicate) { if (source == null) @@ -326,6 +458,11 @@ public static IObservable TakeWhile(this IObservable source, Func /// Skips values while the predicate remains true, then mirrors the remaining source. /// + /// The value type. + /// The source sequence. + /// The function that determines whether to keep skipping values. + /// A sequence that emits values after the leading values that satisfy . + /// or is . public static IObservable SkipWhile(this IObservable source, Func predicate) { if (source == null) @@ -360,11 +497,23 @@ public static IObservable SkipWhile(this IObservable source, Func /// Projects each source value to an inner signal and concatenates all inner values. /// + /// The source value type. + /// The result value type. + /// The source sequence. + /// The function that projects each source value to an inner sequence. + /// A sequence containing the concatenated inner values. + /// or is . public static IObservable Bind(this IObservable source, Func> selector) => source.SelectMany(selector); /// /// Projects each source value to an inner signal and concatenates all inner values. /// + /// The source value type. + /// The result value type. + /// The source sequence. + /// The function that projects each source value to an inner sequence. + /// A sequence containing the concatenated inner values. + /// or is . public static IObservable SelectMany(this IObservable source, Func> selector) { if (source == null) @@ -383,6 +532,14 @@ public static IObservable SelectMany(this IObservable /// /// Projects each source value to an inner signal and maps outer/inner values with a result selector. /// + /// The source value type. + /// The inner value type. + /// The result value type. + /// The source sequence. + /// The function that projects each source value to an inner sequence. + /// The function that combines source and inner values. + /// A sequence containing selected outer/inner combinations. + /// or is . public static IObservable SelectMany( this IObservable source, Func> collectionSelector, @@ -404,6 +561,10 @@ public static IObservable SelectMany( /// /// Counts the source values as an . /// + /// The value type. + /// The source sequence. + /// A sequence that emits the number of source values when the source completes. + /// is . public static IObservable Count(this IObservable source) { if (source == null) @@ -417,6 +578,11 @@ public static IObservable Count(this IObservable source) /// /// Counts source values that satisfy the predicate as an . /// + /// The value type. + /// The source sequence. + /// The function that identifies values to count. + /// A sequence that emits the matching value count when the source completes. + /// or is . public static IObservable Count(this IObservable source, Func predicate) { if (predicate == null) @@ -435,6 +601,10 @@ public static IObservable Count(this IObservable source, Func /// Counts the source values as an . /// + /// The value type. + /// The source sequence. + /// A sequence that emits the number of source values when the source completes. + /// is . public static IObservable LongCount(this IObservable source) { if (source == null) @@ -448,6 +618,11 @@ public static IObservable LongCount(this IObservable source) /// /// Counts source values that satisfy the predicate as an . /// + /// The value type. + /// The source sequence. + /// The function that identifies values to count. + /// A sequence that emits the matching value count when the source completes. + /// or is . public static IObservable LongCount(this IObservable source, Func predicate) { if (predicate == null) @@ -466,6 +641,10 @@ public static IObservable LongCount(this IObservable source, Func /// Emits true when any value is present. /// + /// The value type. + /// The source sequence. + /// A sequence that emits whether the source produced any values. + /// is . public static IObservable Any(this IObservable source) { if (source == null) @@ -479,6 +658,11 @@ public static IObservable Any(this IObservable source) /// /// Emits true when any value satisfies the predicate. /// + /// The value type. + /// The source sequence. + /// The function that tests each value. + /// A sequence that emits whether any source value satisfies . + /// or is . public static IObservable Any(this IObservable source, Func predicate) { if (source == null) @@ -497,6 +681,11 @@ public static IObservable Any(this IObservable source, Func /// /// Emits true when every value satisfies the predicate. /// + /// The value type. + /// The source sequence. + /// The function that tests each value. + /// A sequence that emits whether every source value satisfies . + /// or is . public static IObservable All(this IObservable source, Func predicate) { if (source == null) @@ -541,12 +730,23 @@ public static IObservable All(this IObservable source, Func /// /// Emits true when the source contains the requested value. /// + /// The value type. + /// The source sequence. + /// The value to locate. + /// A sequence that emits whether the source contains . + /// is . public static IObservable Contains(this IObservable source, T value) => source.Contains(value, null); /// /// Emits true when the source contains the requested value. /// + /// The value type. + /// The source sequence. + /// The value to locate. + /// The comparer used to compare source values. + /// A sequence that emits whether the source contains . + /// is . public static IObservable Contains(this IObservable source, T value, IEqualityComparer? comparer) { comparer ??= EqualityComparer.Default; @@ -556,17 +756,32 @@ public static IObservable Contains(this IObservable source, T value, /// /// Emits true when the source completes without values. /// + /// The value type. + /// The source sequence. + /// A sequence that emits whether the source completed without values. + /// is . public static IObservable IsEmpty(this IObservable source) => source.Any().Map(hasValue => !hasValue); /// /// Emits values from source after delaying subscription by the due time. /// + /// The value type. + /// The source sequence. + /// The delay before subscribing to the source. + /// A sequence that subscribes to the source after . + /// is . public static IObservable DelayStart(this IObservable source, TimeSpan dueTime) => source.DelayStart(dueTime, null); /// /// Emits values from source after delaying subscription by the due time. /// + /// The value type. + /// The source sequence. + /// The delay before subscribing to the source. + /// The sequencer used to schedule the delayed subscription. + /// A sequence that subscribes to the source after . + /// is . public static IObservable DelayStart(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) { if (source == null) @@ -586,12 +801,23 @@ public static IObservable DelayStart(this IObservable source, TimeSpan /// /// Emits only the most recent value after the quiet period elapses. /// + /// The value type. + /// The source sequence. + /// The quiet period before emitting the latest value. + /// A sequence that emits the latest value after each quiet period. + /// is . public static IObservable Throttle(this IObservable source, TimeSpan dueTime) => source.Throttle(dueTime, null); /// /// Emits only the most recent value after the quiet period elapses. /// + /// The value type. + /// The source sequence. + /// The quiet period before emitting the latest value. + /// The sequencer used to schedule quiet-period timers. + /// A sequence that emits the latest value after each quiet period. + /// is . public static IObservable Throttle(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) { if (source == null) @@ -638,12 +864,25 @@ public static IObservable Throttle(this IObservable source, TimeSpan du /// /// Emits the latest source value whenever the sampling period ticks. /// + /// The value type. + /// The source sequence. + /// The interval between sampling ticks. + /// A sequence that emits the latest source value on each sampling tick. + /// is . + /// is less than . public static IObservable Sample(this IObservable source, TimeSpan period) => source.Sample(period, null); /// /// Emits the latest source value whenever the sampling period ticks. /// + /// The value type. + /// The source sequence. + /// The interval between sampling ticks. + /// The sequencer used to schedule sampling ticks. + /// A sequence that emits the latest source value on each sampling tick. + /// is . + /// is less than . public static IObservable Sample(this IObservable source, TimeSpan period, ISequencer? scheduler) { if (source == null) @@ -665,12 +904,21 @@ public static IObservable Sample(this IObservable source, TimeSpan peri /// /// Annotates values with their scheduler timestamp. /// + /// The value type. + /// The source sequence. + /// A sequence containing each value with its timestamp. + /// is . public static IObservable> Timestamp(this IObservable source) => source.Timestamp(null); /// /// Annotates values with their scheduler timestamp. /// + /// The value type. + /// The source sequence. + /// The sequencer that supplies timestamps. + /// A sequence containing each value with its timestamp. + /// is . public static IObservable> Timestamp(this IObservable source, ISequencer? scheduler) { if (source == null) @@ -685,12 +933,21 @@ public static IObservable> Timestamp(this IObservable source, IS /// /// Annotates each value with the elapsed scheduler time since the previous value. /// + /// The value type. + /// The source sequence. + /// A sequence containing each value with its elapsed interval since the previous value. + /// is . public static IObservable> TimeInterval(this IObservable source) => source.TimeInterval(null); /// /// Annotates each value with the elapsed scheduler time since the previous value. /// + /// The value type. + /// The source sequence. + /// The sequencer that supplies timestamps. + /// A sequence containing each value with its elapsed interval since the previous value. + /// is . public static IObservable> TimeInterval(this IObservable source, ISequencer? scheduler) { if (source == null) @@ -720,18 +977,42 @@ public static IObservable> TimeInterval(this IObservable s /// /// Combines latest values from both sources. Alias for latest-fusion vocabulary. /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines the latest values. + /// A sequence containing selected latest-value combinations. + /// , , or is . public static IObservable ZipLatest(this IObservable left, IObservable right, Func selector) => left.CombineLatest(right, selector); /// /// Alias for . /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines the latest values. + /// A sequence containing selected latest-value combinations. + /// , , or is . public static IObservable FuseLatest(this IObservable left, IObservable right, Func selector) => left.ZipLatest(right, selector); /// /// Waits for both sources to complete and emits one value from their last elements when both produced at least one value. /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines the final values. + /// A sequence that emits one selected value after both sources complete. + /// , , or is . public static IObservable ForkJoin(this IObservable left, IObservable right, Func selector) { if (left == null) @@ -749,33 +1030,112 @@ public static IObservable ForkJoin(this IObserv throw new ArgumentNullException(nameof(selector)); } + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return Signal.CreateSafe(observer => + { + observer.OnNext(((Func)(object)selector)( + leftRange.Start + leftRange.Count - 1, + rightRange.Start + rightRange.Count - 1)); + observer.OnCompleted(); + return Disposable.Empty; + }); + } + return Signal.CreateSafe(observer => new ForkJoinCoordinator(observer, selector).Run(left, right)); } /// /// Awaits the first source value. /// - public static Task FirstAsync(this IObservable source) => source.FirstOrDefaultCoreAsync(false, default!); + /// The value type. + /// The source sequence. + /// A task that completes with the first source value. + /// is . + /// The source completes without producing a value. + public static Task FirstAsync(this IObservable source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult(CreateRangeValue(range.Start)); + } + + return source.FirstOrDefaultCoreAsync(false, default!); + } /// /// Awaits the first source value, returning a default value when the source is empty. /// - public static Task FirstOrDefaultAsync(this IObservable source) => - source.FirstOrDefaultCoreAsync(true, default!); + /// The value type. + /// The source sequence. + /// A task that completes with the first source value, or when the source is empty. + /// is . + public static Task FirstOrDefaultAsync(this IObservable source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult(CreateRangeValue(range.Start)); + } + + return source.FirstOrDefaultCoreAsync(true, default!); + } /// /// Awaits the first source value, returning a default value when the source is empty. /// - public static Task FirstOrDefaultAsync(this IObservable source, T defaultValue) => source.FirstOrDefaultCoreAsync(true, defaultValue); + /// The value type. + /// The source sequence. + /// The value to return when the source is empty. + /// A task that completes with the first source value, or when the source is empty. + /// is . + public static Task FirstOrDefaultAsync(this IObservable source, T defaultValue) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult(CreateRangeValue(range.Start)); + } + + return source.FirstOrDefaultCoreAsync(true, defaultValue); + } /// /// Awaits source completion and returns the last value produced by the source. /// + /// The value type. + /// The source sequence. + /// A task that completes with the final source value. + /// is . + /// The source completes without producing a value. public static Task ToTask(this IObservable source) => source.ToTask(CancellationToken.None); /// /// Awaits source completion and returns the last value produced by the source. /// + /// The value type. + /// The source sequence. + /// The token used to cancel the task and dispose the subscription. + /// A task that completes with the final source value. + /// is . + /// The source completes without producing a value. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S1541:Methods and properties should not be too complex", + Justification = "ToTask keeps cancellation, terminal, and synchronous fast paths together to avoid extra allocations.")] public static Task ToTask(this IObservable source, CancellationToken cancellationToken) { if (source == null) @@ -783,6 +1143,16 @@ public static Task ToTask(this IObservable source, CancellationToken ca throw new ArgumentNullException(nameof(source)); } + if (cancellationToken.IsCancellationRequested) + { + return Task.FromCanceled(cancellationToken); + } + + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult(CreateRangeValue(range.Start + range.Count - 1)); + } + var completion = new TaskCompletionSource(); var seen = false; var last = default(T); @@ -834,11 +1204,107 @@ public static Task ToTask(this IObservable source, CancellationToken ca /// /// Identity helper that keeps source-compatible FirstAsync().ToTask() migrations compiling. /// + /// The task result type. + /// The task to return. + /// The supplied task. + /// is . public static Task ToTask(this Task task) => task ?? throw new ArgumentNullException(nameof(task)); + /// + /// Awaits the source count as a task. + /// + /// The value type. + /// The source sequence. + /// A task that completes with the number of source values. + /// is . + public static Task CountAsync(this IObservable source) => + source.Count().ToTask(); + + /// + /// Awaits the source count as a task. + /// + /// The value type. + /// The source sequence. + /// The token used to cancel the task. + /// A task that completes with the number of source values. + /// is . + public static Task CountAsync(this IObservable source, CancellationToken cancellationToken) => + source.Count().ToTask(cancellationToken); + + /// + /// Awaits the source predicate count as a task. + /// + /// The value type. + /// The source sequence. + /// The function that identifies values to count. + /// A task that completes with the matching value count. + /// or is . + public static Task CountAsync(this IObservable source, Func predicate) => + source.Count(predicate).ToTask(); + + /// + /// Awaits the source predicate count as a task. + /// + /// The value type. + /// The source sequence. + /// The function that identifies values to count. + /// The token used to cancel the task. + /// A task that completes with the matching value count. + /// or is . + public static Task CountAsync(this IObservable source, Func predicate, CancellationToken cancellationToken) => + source.Count(predicate).ToTask(cancellationToken); + + /// + /// Awaits whether any value is present. + /// + /// The value type. + /// The source sequence. + /// A task that completes with whether the source produced any values. + /// is . + public static Task AnyAsync(this IObservable source) => + source.Any().ToTask(); + + /// + /// Awaits whether any value is present. + /// + /// The value type. + /// The source sequence. + /// The token used to cancel the task. + /// A task that completes with whether the source produced any values. + /// is . + public static Task AnyAsync(this IObservable source, CancellationToken cancellationToken) => + source.Any().ToTask(cancellationToken); + + /// + /// Awaits whether any value matches a predicate. + /// + /// The value type. + /// The source sequence. + /// The function that tests each value. + /// A task that completes with whether any source value satisfies . + /// or is . + public static Task AnyAsync(this IObservable source, Func predicate) => + source.Any(predicate).ToTask(); + + /// + /// Awaits whether any value matches a predicate. + /// + /// The value type. + /// The source sequence. + /// The function that tests each value. + /// The token used to cancel the task. + /// A task that completes with whether any source value satisfies . + /// or is . + public static Task AnyAsync(this IObservable source, Func predicate, CancellationToken cancellationToken) => + source.Any(predicate).ToTask(cancellationToken); + /// /// Collects all values into an array task. /// + /// The value type. + /// The source sequence. + /// A task that completes with all source values in an array. + /// is . public static Task CollectArrayAsync(this IObservable source) { if (source == null) @@ -846,6 +1312,11 @@ public static Task CollectArrayAsync(this IObservable source) throw new ArgumentNullException(nameof(source)); } + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult(CreateRangeArray(range)); + } + var completion = new TaskCompletionSource(); var values = new List(); source.Subscribe(values.Add, error => completion.TrySetException(error), () => completion.TrySetResult([.. values])); @@ -855,6 +1326,10 @@ public static Task CollectArrayAsync(this IObservable source) /// /// Collects all values into a list task. /// + /// The value type. + /// The source sequence. + /// A task that completes with all source values in a list. + /// is . public static Task> CollectListAsync(this IObservable source) { if (source == null) @@ -862,6 +1337,11 @@ public static Task> CollectListAsync(this IObservable source) throw new ArgumentNullException(nameof(source)); } + if (source is RangeSignal range && CanReadRangeAs()) + { + return Task.FromResult((IList)CreateRangeList(range)); + } + var completion = new TaskCompletionSource>(); var values = new List(); source.Subscribe(values.Add, error => completion.TrySetException(error), () => completion.TrySetResult(values)); @@ -915,4 +1395,84 @@ private static Task FirstOrDefaultCoreAsync(this IObservable source, bo }); return completion.Task; } + + /// + /// Converts an integer value to the specified numeric type. + /// + /// Uses boxing and unboxing to perform the conversion. The generic type parameter is expected to + /// be validated by the caller. + /// The target numeric type. + /// The integer value to convert. + /// The value converted to type . + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The generic type is validated by the caller before reading range values.")] + private static T CreateRangeValue(int value) => (T)(object)value; + + /// + /// Creates an array of sequential values from the specified range signal. + /// + /// The element type of the array. + /// The range signal specifying the start value and count. + /// An array containing sequential values from the range start to start + count - 1. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The generic type is validated by the caller before reading range values.")] + private static T[] CreateRangeArray(RangeSignal range) + { + if (typeof(T) == typeof(int)) + { + var values = new int[range.Count]; + for (var i = 0; i < values.Length; i++) + { + values[i] = range.Start + i; + } + + return (T[])(object)values; + } + + var boxed = new T[range.Count]; + for (var i = 0; i < boxed.Length; i++) + { + boxed[i] = CreateRangeValue(range.Start + i); + } + + return boxed; + } + + /// + /// Creates a list of values from the specified range signal. + /// + /// Optimized for integer types by directly incrementing values. For other types, uses + /// CreateRangeValue to generate each element. + /// The type of elements to create in the list. + /// The range signal containing the start value and count. + /// A list containing the generated range values. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S4018:Generic methods should provide type parameters", + Justification = "The generic type is validated by the caller before reading range values.")] + private static List CreateRangeList(RangeSignal range) + { + if (typeof(T) == typeof(int)) + { + var integers = new List(range.Count); + for (var i = 0; i < range.Count; i++) + { + integers.Add(range.Start + i); + } + + return (List)(object)integers; + } + + var values = new List(range.Count); + for (var i = 0; i < range.Count; i++) + { + values.Add(CreateRangeValue(range.Start + i)); + } + + return values; + } } diff --git a/src/ReactiveUI.Primitives/Signals/Core/FromEnumerableSignal{T}.cs b/src/ReactiveUI.Primitives/Signals/Core/FromEnumerableSignal{T}.cs index 6643184..50a53e3 100644 --- a/src/ReactiveUI.Primitives/Signals/Core/FromEnumerableSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signals/Core/FromEnumerableSignal{T}.cs @@ -18,6 +18,11 @@ internal sealed class FromEnumerableSignal : IRequireCurrentThread, IInlin /// private readonly IEnumerable _values; + /// + /// Cancels synchronous enumeration when requested. + /// + private readonly CancellationToken _cancellationToken; + /// /// Initializes a new instance of the class. /// @@ -25,6 +30,17 @@ internal sealed class FromEnumerableSignal : IRequireCurrentThread, IInlin public FromEnumerableSignal(IEnumerable values) => _values = values; + /// + /// Initializes a new instance of the class. + /// + /// The source values. + /// The cancellation token. + public FromEnumerableSignal(IEnumerable values, CancellationToken cancellationToken) + { + _values = values; + _cancellationToken = cancellationToken; + } + /// /// Executes the IsRequiredSubscribeOnCurrentThread operation. /// @@ -43,7 +59,7 @@ public IDisposable Subscribe(IObserver observer) throw new ArgumentNullException(nameof(observer)); } - if (_values is T[] array) + if (!_cancellationToken.CanBeCanceled && _values is T[] array) { for (var i = 0; i < array.Length; i++) { @@ -54,7 +70,7 @@ public IDisposable Subscribe(IObserver observer) return Disposable.Empty; } - if (_values is IReadOnlyList readOnlyList) + if (!_cancellationToken.CanBeCanceled && _values is IReadOnlyList readOnlyList) { for (var i = 0; i < readOnlyList.Count; i++) { @@ -67,6 +83,11 @@ public IDisposable Subscribe(IObserver observer) foreach (var value in _values) { + if (_cancellationToken.IsCancellationRequested) + { + return Disposable.Empty; + } + observer.OnNext(value); } @@ -81,6 +102,10 @@ public IDisposable Subscribe(IObserver observer) /// The onError value. /// The onCompleted value. /// The subscription. + [System.Diagnostics.CodeAnalysis.SuppressMessage( + "Major Code Smell", + "S1541:Methods and properties should not be too complex", + Justification = "The method keeps array, read-only-list, iterator, and cancellation fast paths allocation-free.")] public IDisposable Subscribe(Action onNext, Action onError, Action onCompleted) { if (onNext == null) @@ -93,7 +118,7 @@ public IDisposable Subscribe(Action onNext, Action onError, Action throw new ArgumentNullException(nameof(onCompleted)); } - if (_values is T[] array) + if (!_cancellationToken.CanBeCanceled && _values is T[] array) { for (var i = 0; i < array.Length; i++) { @@ -104,7 +129,7 @@ public IDisposable Subscribe(Action onNext, Action onError, Action return Disposable.Empty; } - if (_values is IReadOnlyList readOnlyList) + if (!_cancellationToken.CanBeCanceled && _values is IReadOnlyList readOnlyList) { for (var i = 0; i < readOnlyList.Count; i++) { @@ -117,6 +142,11 @@ public IDisposable Subscribe(Action onNext, Action onError, Action foreach (var value in _values) { + if (_cancellationToken.IsCancellationRequested) + { + return Disposable.Empty; + } + onNext(value); } diff --git a/src/ReactiveUI.Primitives/Signals/Core/RangeConcatSignal.cs b/src/ReactiveUI.Primitives/Signals/Core/RangeConcatSignal.cs new file mode 100644 index 0000000..e555964 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/Core/RangeConcatSignal.cs @@ -0,0 +1,75 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Core; +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives.Signals.Core; + +/// +/// Concatenates synchronous integer ranges without outer observable/coordinator overhead. +/// +internal sealed class RangeConcatSignal : IRequireCurrentThread, IInlineSignal +{ + /// + /// Source ranges to emit in order. + /// + private readonly RangeSignal[] _ranges; + + /// + /// Initializes a new instance of the class. + /// + /// The source ranges. + public RangeConcatSignal(RangeSignal[] ranges) => _ranges = ranges; + + /// + public bool IsRequiredSubscribeOnCurrentThread() => false; + + /// + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + for (var rangeIndex = 0; rangeIndex < _ranges.Length; rangeIndex++) + { + var range = _ranges[rangeIndex]; + for (var i = 0; i < range.Count; i++) + { + observer.OnNext(range.Start + i); + } + } + + observer.OnCompleted(); + return Disposable.Empty; + } + + /// + public IDisposable Subscribe(Action onNext, Action onError, Action onCompleted) + { + if (onNext == null) + { + throw new ArgumentNullException(nameof(onNext)); + } + + if (onCompleted == null) + { + throw new ArgumentNullException(nameof(onCompleted)); + } + + for (var rangeIndex = 0; rangeIndex < _ranges.Length; rangeIndex++) + { + var range = _ranges[rangeIndex]; + for (var i = 0; i < range.Count; i++) + { + onNext(range.Start + i); + } + } + + onCompleted(); + return Disposable.Empty; + } +} diff --git a/src/ReactiveUI.Primitives/Signals/Signal{Catch}.cs b/src/ReactiveUI.Primitives/Signals/Signal{Catch}.cs index b9d14ab..7402ecd 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{Catch}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{Catch}.cs @@ -15,7 +15,7 @@ public static partial class Signal /// Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler. /// /// The type of the elements in the source sequence and sequences returned by the exception handler function. - /// The type of the exception to catch and handle. Needs to derive from . + /// The type of the exception to catch and handle. Needs to derive from . /// Source sequence. /// Exception handler function, producing another observable sequence. /// diff --git a/src/ReactiveUI.Primitives/Signals/Signal{Create}.cs b/src/ReactiveUI.Primitives/Signals/Signal{Create}.cs index 45e25d1..f296222 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{Create}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{Create}.cs @@ -19,7 +19,7 @@ public static partial class Signal /// The type. /// The subscribe. /// An Signals. - /// subscribe. + /// subscribe. /// is null. public static IObservable Create(Func, IDisposable> subscribe) { @@ -39,7 +39,7 @@ public static IObservable Create(Func, IDisposable> subscribe /// The subscribe. /// if set to true [is required subscribe on current thread]. /// An Signals. - /// subscribe. + /// subscribe. /// is null. public static IObservable Create(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { @@ -60,7 +60,7 @@ public static IObservable Create(Func, IDisposable> subscribe /// The state. /// The subscribe. /// An Signals. - /// subscribe. + /// subscribe. /// is null. public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe) { @@ -82,7 +82,7 @@ public static IObservable CreateWithState(TState state, FuncThe subscribe. /// if set to true [is required subscribe on current thread]. /// An Signals. - /// subscribe. + /// subscribe. /// is null. public static IObservable CreateWithState(TState state, Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { @@ -101,7 +101,7 @@ public static IObservable CreateWithState(TState state, FuncThe type. /// The subscribe. /// An Signals. - /// subscribe. + /// subscribe. /// is null. public static IObservable CreateSafe(Func, IDisposable> subscribe) { @@ -121,7 +121,7 @@ public static IObservable CreateSafe(Func, IDisposable> subsc /// The subscribe. /// if set to true [is required subscribe on current thread]. /// An Observable. - /// subscribe. + /// subscribe. /// is null. public static IObservable CreateSafe(Func, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { diff --git a/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs b/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs index e602933..a22ac19 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{Factories}.cs @@ -184,6 +184,21 @@ public static IObservable FromEnumerable(IEnumerable values) return new FromEnumerableSignal(values); } + /// + /// Creates a signal from an enumerable sequence and stops enumeration when the token is cancelled. + /// + public static IObservable FromEnumerable(IEnumerable values, CancellationToken cancellationToken) + { + if (values == null) + { + throw new ArgumentNullException(nameof(values)); + } + + return cancellationToken.CanBeCanceled + ? new FromEnumerableSignal(values, cancellationToken) + : new FromEnumerableSignal(values); + } + /// /// Creates a signal from a task instance. /// @@ -242,6 +257,38 @@ public static IObservable FromTask(Task task) }); } + /// + /// Creates a signal by invoking an asynchronous factory at subscription time. + /// + public static IObservable FromAsync(Func> taskFactory) + { + if (taskFactory == null) + { + throw new ArgumentNullException(nameof(taskFactory)); + } + + return Defer(() => FromTask(taskFactory())); + } + + /// + /// Creates a signal by invoking an asynchronous factory at subscription time. + /// + public static IObservable FromAsync(Func> taskFactory) => + FromAsync(taskFactory, CancellationToken.None); + + /// + /// Creates a signal by invoking an asynchronous factory at subscription time. + /// + public static IObservable FromAsync(Func> taskFactory, CancellationToken cancellationToken) + { + if (taskFactory == null) + { + throw new ArgumentNullException(nameof(taskFactory)); + } + + return Defer(() => FromTask(taskFactory(cancellationToken))); + } + /// /// Runs a function on the supplied scheduler and emits its result. /// @@ -465,20 +512,36 @@ public static IObservable Timer(TimeSpan dueTime, TimeSpan period, ISequen /// /// Concatenates the supplied signals. /// - public static IObservable Concat(params IObservable[] sources) => - FromEnumerable(ValidateSources(sources)).Concat(); + public static IObservable Concat(params IObservable[] sources) + { + var validated = ValidateSources(sources); + var rangeConcat = TryCreateRangeConcat(validated); + return rangeConcat == null ? FromEnumerable(validated).Concat() : (IObservable)(object)rangeConcat; + } /// /// Merges the supplied signals. /// - public static IObservable Merge(params IObservable[] sources) => - FromEnumerable(ValidateSources(sources)).Merge(); + public static IObservable Merge(params IObservable[] sources) + { + var validated = ValidateSources(sources); + var rangeConcat = TryCreateRangeConcat(validated); + return rangeConcat == null ? FromEnumerable(validated).Merge() : (IObservable)(object)rangeConcat; + } /// /// Races the supplied signals and mirrors the first one to produce a value or terminal signal. /// - public static IObservable Race(params IObservable[] sources) => - FromEnumerable(ValidateSources(sources)).Race(); + public static IObservable Race(params IObservable[] sources) + { + var validated = ValidateSources(sources); + if (validated.Length > 0 && validated[0] is RangeSignal) + { + return validated[0]; + } + + return FromEnumerable(validated).Race(); + } /// /// Zips two signals with a result selector. @@ -528,6 +591,33 @@ private static IObservable[] ValidateSources(IObservable[] sources) return sources; } + /// + /// Creates a range concat signal when every source is a synchronous integer range. + /// + /// The source value type. + /// The validated sources. + /// A range concat signal, or when the fast path is not applicable. + private static RangeConcatSignal? TryCreateRangeConcat(IObservable[] sources) + { + if (typeof(T) != typeof(int) || sources.Length == 0) + { + return null; + } + + var ranges = new RangeSignal[sources.Length]; + for (var i = 0; i < sources.Length; i++) + { + if (sources[i] is not RangeSignal range) + { + return null; + } + + ranges[i] = range; + } + + return new RangeConcatSignal(ranges); + } + #if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER || NET5_0_OR_GREATER /// diff --git a/src/ReactiveUI.Primitives/Signals/Signal{GetAwaiter}.cs b/src/ReactiveUI.Primitives/Signals/Signal{GetAwaiter}.cs index 33be3af..fc36d6c 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{GetAwaiter}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{GetAwaiter}.cs @@ -16,7 +16,7 @@ public static partial class Signal /// The type of the source. /// Source sequence to await. /// An AsyncSignal. - /// source. + /// source. public static IAwaitSignal GetAwaiter(this IObservable source) { if (source == null) @@ -37,7 +37,7 @@ public static IAwaitSignal GetAwaiter(this IObservable /// An AsyncSignal. /// - /// source. + /// source. public static IAwaitSignal GetAwaiter(this IObservable source, CancellationToken cancellationToken) { if (source == null) diff --git a/src/ReactiveUI.Primitives/Signals/Signal{Return}.cs b/src/ReactiveUI.Primitives/Signals/Signal{Return}.cs index e1eab53..4797210 100644 --- a/src/ReactiveUI.Primitives/Signals/Signal{Return}.cs +++ b/src/ReactiveUI.Primitives/Signals/Signal{Return}.cs @@ -36,7 +36,7 @@ public static IObservable Return(T value, ISequencer scheduler) /// The value. /// An Signals. public static IObservable Return(T value) => - Return(value, Sequencer.Immediate); + Return(value, Sequencer.Immediate); /// /// Return single sequence Immediately, optimized for RxVoid(no allocate memory). diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/StatefulSignalBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/StatefulSignalBenchmarks.cs index fbccb26..e312196 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/StatefulSignalBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/StatefulSignalBenchmarks.cs @@ -83,7 +83,7 @@ public int R3BehaviorSubject1024() private static int EmitAndReadBehaviourSignal(int count) { var observer = new IntSignalObserver(); - using var subject = new BehaviourSignal(0); + using var subject = new BehaviorSignal(0); using var subscription = subject.Subscribe(observer); for (var i = 1; i <= count; i++) { diff --git a/src/tests/ReactiveUI.Primitives.Tests/BehaviourSignalTests.cs b/src/tests/ReactiveUI.Primitives.Tests/BehaviourSignalTests.cs index 6a494f1..33d6f9c 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/BehaviourSignalTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/BehaviourSignalTests.cs @@ -38,14 +38,14 @@ public class BehaviourSignalTests /// [Test] public void Subscribe_ArgumentChecking() => - Assert.Throws(() => new BehaviourSignal(1).Subscribe(null!)); + Assert.Throws(() => new BehaviorSignal(1).Subscribe(null!)); /// /// Called when [error argument checking]. /// [Test] public void OnError_ArgumentChecking() => - Assert.Throws(() => new BehaviourSignal(1).OnError(null!)); + Assert.Throws(() => new BehaviorSignal(1).OnError(null!)); /// /// Determines whether this instance has observers. @@ -53,7 +53,7 @@ public void OnError_ArgumentChecking() => [Test] public void HasObservers() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); var d1 = s.Subscribe(_ => { }); @@ -81,7 +81,7 @@ public void HasObservers() [Test] public void HasObservers_Dispose1() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); @@ -104,7 +104,7 @@ public void HasObservers_Dispose1() [Test] public void HasObservers_Dispose2() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); @@ -127,7 +127,7 @@ public void HasObservers_Dispose2() [Test] public void HasObservers_Dispose3() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); @@ -142,7 +142,7 @@ public void HasObservers_Dispose3() [Test] public void HasObservers_OnCompleted() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); using var subscription = s.Subscribe(_ => { }); @@ -161,7 +161,7 @@ public void HasObservers_OnCompleted() [Test] public void HasObservers_OnError() { - var s = new BehaviourSignal(42); + var s = new BehaviorSignal(42); Assert.False(s.HasObservers); using var subscription = s.Subscribe(_ => { }, _ => { }); @@ -180,7 +180,7 @@ public void HasObservers_OnError() [Test] public void Value_Initial() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); Assert.True(s.TryGetValue(out var x)); @@ -193,7 +193,7 @@ public void Value_Initial() [Test] public void Value_First() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); Assert.True(s.TryGetValue(out var x)); @@ -212,7 +212,7 @@ public void Value_First() [Test] public void Value_Second() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); Assert.True(s.TryGetValue(out var x)); @@ -237,7 +237,7 @@ public void Value_Second() [Test] public void Value_FrozenAfterOnCompleted() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); Assert.True(s.TryGetValue(out var x)); @@ -274,7 +274,7 @@ public void Value_FrozenAfterOnCompleted() [Test] public void Value_ThrowsAfterOnError() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); s.OnError(new InvalidOperationException()); @@ -290,7 +290,7 @@ public void Value_ThrowsAfterOnError() [Test] public void Value_ThrowsOnDispose() { - var s = new BehaviourSignal(InitialValue); + var s = new BehaviorSignal(InitialValue); Assert.Equal(InitialValue, s.Value); s.Dispose(); diff --git a/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs b/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs index 5043a32..f0b57bf 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/CoverageRuntimeTests.cs @@ -441,6 +441,30 @@ public async Task SequencersCoverValidationAndExecutionBranches() Assert.Throws(() => ThreadPoolSequencer.Instance.Schedule(One, TimeSpan.Zero, null!)); } + /// + /// Covers virtual-time extension validation and action scheduling. + /// + [Test] + public void VirtualTimeSequencerExtensionsValidateAndRunActions() + { + var clock = new TestClock(DateTimeOffset.UnixEpoch); + var invoked = 0; + + Assert.Throws(() => VirtualTimeSequencerExtensions.ScheduleRelative(null!, TimeSpan.Zero, () => { })); + Assert.Throws(() => clock.ScheduleRelative(TimeSpan.Zero, null!)); + Assert.Throws(() => VirtualTimeSequencerExtensions.ScheduleAbsolute(null!, DateTimeOffset.UnixEpoch, () => { })); + Assert.Throws(() => clock.ScheduleAbsolute(DateTimeOffset.UnixEpoch, null!)); + + clock.ScheduleRelative(TimeSpan.FromTicks(One), () => invoked += One); + clock.ScheduleAbsolute(DateTimeOffset.UnixEpoch.AddTicks(Two), () => invoked += Two); + + clock.AdvanceBy(TimeSpan.FromTicks(One)); + Assert.Equal(One, invoked); + + clock.AdvanceBy(TimeSpan.FromTicks(One)); + Assert.Equal(Three, invoked); + } + /// /// Creates an iterator-backed enumerable for the non-indexable enumerable path. /// diff --git a/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs b/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs index 0658cad..0890a11 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/FactoryOperatorContractTests.cs @@ -7,10 +7,12 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using ReactiveUI.Primitives; using ReactiveUI.Primitives.Concurrency; using ReactiveUI.Primitives.Core; using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.Signals; +using ReactiveUI.Primitives.Signals.Core; using TUnit.Core; namespace ReactiveUI.Primitives.Tests; @@ -381,16 +383,42 @@ public void CombiningOperatorsPreserveCoreOrderingSemantics() var concatenated = new List(); var zipped = new List(); var latest = new List(); - + var rangeConcatenated = new List(); + var rangeMerged = new List(); + var rangeRace = new List(); + var rangeLatest = new List(); + var rangeWithLatest = new List(); + var rangeForkJoin = new List(); + var rangeObserver = new RecordingObserver(); + + var rangeConcatSignal = Signal.Concat(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)); Signal.Merge(Signal.FromEnumerable(TakeWhileExpected), Signal.FromEnumerable([RetrySuccessAttempt, FourthValue])).Subscribe(merged.Add); Signal.Concat(Signal.FromEnumerable(TakeWhileExpected), Signal.FromEnumerable([RetrySuccessAttempt, FourthValue])).Subscribe(concatenated.Add); Signal.Zip(Signal.FromEnumerable(TakeWhileExpected), Signal.FromEnumerable([ProjectedFirstValue, ProjectedThirdValue]), (left, right) => left + right).Subscribe(zipped.Add); Signal.CombineLatest(Signal.FromEnumerable(TakeWhileExpected), Signal.FromEnumerable(["a", "b"]), (left, right) => left + right).Subscribe(latest.Add); + rangeConcatSignal.Subscribe(rangeConcatenated.Add); + rangeConcatSignal.Subscribe(rangeObserver); + Signal.Merge(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)).Subscribe(rangeMerged.Add); + Signal.Race(Signal.Range(FirstValue, SecondValue), Signal.Range(RetrySuccessAttempt, SecondValue)).Subscribe(rangeRace.Add); + Signal.CombineLatest(Signal.Range(FirstValue, SecondValue), Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeLatest.Add); + Signal.Range(FirstValue, SecondValue).WithLatest(Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeWithLatest.Add); + Signal.ForkJoin(Signal.Range(FirstValue, SecondValue), Signal.Range(ProjectionMultiplier, SecondValue), static (left, right) => left + right).Subscribe(rangeForkJoin.Add); + Assert.Throws(() => rangeConcatSignal.Subscribe((IObserver)null!)); + Assert.Throws(() => ((IInlineSignal)rangeConcatSignal).Subscribe(null!, _ => { }, () => { })); + Assert.Throws(() => ((IInlineSignal)rangeConcatSignal).Subscribe(_ => { }, _ => { }, null!)); Assert.Equal(FourItemExpected, merged); Assert.Equal(FourItemExpected, concatenated); Assert.Equal(ZippedExpected, zipped); Assert.Equal(LatestExpected, latest); + Assert.Equal(FourItemExpected, rangeConcatenated); + Assert.Equal(FourItemExpected, rangeObserver.Values); + Assert.Equal(1, rangeObserver.Completed); + Assert.Equal(FourItemExpected, rangeMerged); + Assert.Equal(TakeWhileExpected, rangeRace); + Assert.Equal(new[] { ProjectedSecondBucketPeerValue, RangeZipShorterSecondResult }, rangeLatest); + Assert.Equal(new[] { ProjectedSecondBucketPeerValue, RangeZipShorterSecondResult }, rangeWithLatest); + Assert.Equal(new[] { RangeZipShorterSecondResult }, rangeForkJoin); } /// @@ -590,10 +618,65 @@ public async Task TerminalTaskOperatorsCompleteWithExpectedSemantics() var first = await Signal.FromEnumerable([RetrySuccessAttempt, FourthValue]).FirstAsync(); var collected = await Signal.FromEnumerable([FirstValue, SecondValue, RetrySuccessAttempt]).CollectArrayAsync(); var none = await Signal.Empty().FirstOrDefaultAsync(RetryResult); + var rangeFirst = await Signal.Range(FirstValue, FourthValue).FirstAsync(); + var rangeLast = await Signal.Range(FirstValue, FourthValue).ToTask(); + var rangeCollected = await Signal.Range(FirstValue, RetrySuccessAttempt).CollectListAsync(); + var count = await Signal.Range(FirstValue, FourthValue).CountAsync(); + var countEven = await Signal.Range(FirstValue, FourthValue).CountAsync(static value => value % 2 == 0); + var any = await Signal.Range(FirstValue, FourthValue).AnyAsync(static value => value == FourthValue); Assert.Equal(RetrySuccessAttempt, first); Assert.Equal(CollectedExpected, (IEnumerable)collected); Assert.Equal(RetryResult, none); + Assert.Equal(FirstValue, rangeFirst); + Assert.Equal(FourthValue, rangeLast); + Assert.Equal(CollectedExpected, (IEnumerable)rangeCollected); + Assert.Equal(FourthValue, count); + Assert.Equal(SecondValue, countEven); + Assert.True(any); + } + + /// + /// Verifies factory guards, async aliases, and cancellation-aware enumerable conversion. + /// + /// A task that completes when asynchronous assertions finish. + [Test] + public async Task FactoryAliasesAndGuardsCoverParityBranches() + { + var values = new List(); + var errors = new List(); + var completed = 0; + using var cancelled = new CancellationTokenSource(); + await cancelled.CancelAsync(); + + Assert.Throws(() => Signal.Range(FirstValue, -1)); + Assert.Throws(() => Signal.Range(FirstValue, SecondValue, null!)); + Assert.Throws(() => Signal.Repeat(FirstValue, -1)); + Assert.Throws(() => Signal.Unfold(0, null!, static state => state, static state => state)); + Assert.Throws(() => Signal.Unfold(0, static _ => true, null!, static state => state)); + Assert.Throws(() => Signal.Unfold(0, static _ => true, static state => state, null!)); + Assert.Throws(() => Signal.Start((Func)null!)); + Assert.Throws(() => Signal.Start(static () => FirstValue, null!)); + Assert.Throws(() => Signal.Start((Action)null!)); + Assert.Throws(() => Signal.After(TimeSpan.Zero, null!)); + Assert.Throws(() => Signal.Every(TimeSpan.FromTicks(-1))); + Assert.Throws(() => Signal.Timer(TimeSpan.Zero, TimeSpan.Zero, null!)); + Assert.Throws(() => Signal.FromAsync((Func>)null!)); + Assert.Throws(() => Signal.FromAsync((Func>)null!)); + + Signal.Range(FirstValue, 0).Subscribe(values.Add, errors.Add, () => completed++); + Signal.Repeat(FirstValue, 0).Subscribe(values.Add, errors.Add, () => completed++); + new[] { FirstValue, SecondValue }.ToObservable(cancelled.Token).Subscribe(values.Add, errors.Add, () => completed++); + Signal.Start(() => throw new InvalidOperationException("start failed"), Sequencer.Immediate).Subscribe(values.Add, errors.Add, () => completed++); + + var fromAsync = await Signal.FromAsync(() => Task.FromResult(RetryResult)).ToTask(); + var fromAsyncWithToken = await Signal.FromAsync(static token => Task.FromResult(token.IsCancellationRequested ? -1 : RetrySuccessAttempt)).ToTask(); + + Assert.Equal(RetryResult, fromAsync); + Assert.Equal(RetrySuccessAttempt, fromAsyncWithToken); + Assert.Equal(0, values.Count); + Assert.Equal(SecondValue, completed); + Assert.Equal(1, errors.Count); } /// @@ -676,4 +759,30 @@ private static async Task VerifyTaskAliasOperators() Assert.Equal(RepeatValue, first); Assert.Equal(ProjectedSecondValue, started); } + + /// + /// Records observer values and terminal signals. + /// + /// The observed value type. + private sealed class RecordingObserver : IObserver + { + /// + /// Gets observed values. + /// + public List Values { get; } = []; + + /// + /// Gets completion count. + /// + public int Completed { get; private set; } + + /// + public void OnCompleted() => Completed++; + + /// + public void OnError(Exception error) => throw error; + + /// + public void OnNext(T value) => Values.Add(value); + } } diff --git a/src/tests/ReactiveUI.Primitives.Tests/StatefulSharingAndBridgeContractTests.cs b/src/tests/ReactiveUI.Primitives.Tests/StatefulSharingAndBridgeContractTests.cs index c92a47b..e813659 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/StatefulSharingAndBridgeContractTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/StatefulSharingAndBridgeContractTests.cs @@ -13,6 +13,7 @@ using Microsoft.CodeAnalysis; using Microsoft.CodeAnalysis.CSharp; using ReactiveUI.Primitives; +using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.R3Bridge.Generator; using ReactiveUI.Primitives.Signals; using ReactiveUI.Primitives.SystemReactiveBridge.Generator; @@ -218,6 +219,96 @@ public async Task CommandSignalPublishesResultsFailuresAndRunningState() Assert.Equal("Command cannot run.", rejected!.Message); } + /// + /// Verifies connectable aliases, auto-connect validation, and replay window overloads. + /// + [Test] + public void ConnectableAliasesValidateAndConnectAtThreshold() + { + var source = new Signal(); + var sourceSubscriptions = 0; + var cold = Signal.Create(observer => + { + sourceSubscriptions++; + return source.Subscribe(observer); + }); + + var auto = cold.Publish().AutoConnect(2); + var first = new List(); + var second = new List(); + using var firstSubscription = auto.Subscribe(first.Add); + source.OnNext(FirstSharedValue); + using var secondSubscription = auto.Subscribe(second.Add); + source.OnNext(SecondSharedValue); + + Assert.Equal(1, sourceSubscriptions); + Assert.Equal(ExpectedSecondSharedValues[1..], first); + Assert.Equal(ExpectedSecondSharedValues[1..], second); + Assert.Throws(() => ConnectableSignalMixins.Multicast(null!, new Signal())); + Assert.Throws(() => Signal.Never().Multicast(null!)); + Assert.Throws(() => ConnectableSignalMixins.RefCount(null!)); + Assert.Throws(() => ConnectableSignalMixins.AutoConnect(null!)); + Assert.Throws(() => cold.PublishLive().AutoConnect(-1)); + + var replayed = cold.Replay(1, TimeSpan.FromSeconds(1)); + using var connection = replayed.Connect(); + source.OnNext(FirstReplayValue); + var replayValues = new List(); + replayed.Subscribe(replayValues.Add); + + Assert.Equal(ExpectedReplayValues[..1], replayValues); + } + + /// + /// Verifies command aliases, sync execution failures, and disposal branches. + /// + /// A task that completes when command assertions finish. + [Test] + public async Task CommandSignalCoversSyncFaultAndDisposalBranches() + { + var behavior = new BehaviorSignal(InitialStateValue); + var disposable = new MultipleDisposable(Disposable.Empty); + var fault = new InvalidOperationException("sync failed"); + var command = new CommandSignal(() => throw fault); + var results = new List(); + var faults = new List(); + + command.Results.Subscribe(results.Add); + command.Faults.Subscribe(faults.Add); + behavior.OnNext(UpdatedStateValue); + disposable.Dispose(); + + InvalidOperationException? observed = null; + try + { + await command.ExecuteAsync(); + } + catch (InvalidOperationException error) + { + observed = error; + } + + command.Dispose(); + command.Dispose(); + ObjectDisposedException? disposed = null; + try + { + await command.ExecuteAsync(); + } + catch (ObjectDisposedException error) + { + disposed = error; + } + + Assert.Same(fault, observed!); + Assert.Equal(0, results.Count); + Assert.Equal(1, faults.Count); + Assert.Same(fault, faults[0]); + Assert.Equal(UpdatedStateValue, behavior.Value); + Assert.True(disposable.IsDisposed); + Assert.NotNull(disposed); + } + /// /// Verifies bridge generators emit adapters when external shapes are present. /// @@ -239,17 +330,50 @@ public static class Observable { } namespace R3 { + public readonly struct Result + { + public static Result Success => default; + + public static Result Failure(Exception exception) => new Result(exception); + + private Result(Exception exception) => Exception = exception; + + public Exception Exception { get; } + + public bool IsFailure => Exception != null; + } + + public abstract class Observer : IDisposable + { + public void OnNext(T value) => OnNextCore(value); + + public void OnErrorResume(Exception error) => OnErrorResumeCore(error); + + public void OnCompleted(Result result) => OnCompletedCore(result); + + public void Dispose() { } + + protected abstract void OnNextCore(T value); + + protected abstract void OnErrorResumeCore(Exception error); + + protected abstract void OnCompletedCore(Result result); + } + public abstract class Observable { - public abstract IDisposable Subscribe(IObserver observer); + public abstract IDisposable Subscribe(Observer observer); + } - public static Observable Create(Func, IDisposable> subscribe) => new DelegateObservable(subscribe); + public static class Observable + { + public static Observable Create(Func, IDisposable> subscribe) => new DelegateObservable(subscribe); private sealed class DelegateObservable : Observable { - private readonly Func, IDisposable> _subscribe; - public DelegateObservable(Func, IDisposable> subscribe) => _subscribe = subscribe; - public override IDisposable Subscribe(IObserver observer) => _subscribe(observer); + private readonly Func, IDisposable> _subscribe; + public DelegateObservable(Func, IDisposable> subscribe) => _subscribe = subscribe; + public override IDisposable Subscribe(Observer observer) => _subscribe(observer); } } }