Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Subscriptions and scheduled work return `IDisposable`. ReactiveUI.Primitives inc
| `BooleanDisposable` | Track simple disposed state. |
| `CancellationDisposable` | Tie disposal to a `CancellationTokenSource`. |
| `MultipleDisposable` | Composite-disposable equivalent; add/remove multiple disposables. |
| `CompositeDisposable` | System.Reactive-compatible alias over `MultipleDisposable`. |
| `Pocket` | Named `MultipleDisposable` specialization. |
| `SingleDisposable` / `AssignmentSlot` | Single-assignment disposable container. |
| `SingleReplaceableDisposable` / `Slot` | Replaceable disposable container. |
Expand Down Expand Up @@ -165,8 +166,10 @@ Creation APIs live on `ReactiveUI.Primitives.Signals.Signal`.
| `Signal.Unfold<TState,TResult>(...)` | Generate a finite sequence from state. |
| `Signal.Use<TResource,T>(...)` | Tie a resource lifetime to a subscription. |
| `Signal.FromEnumerable<T>(IEnumerable<T>)` | Convert an enumerable. |
| `Signal.FromEnumerable<T>(IEnumerable<T>, CancellationToken)` | Convert an enumerable and stop synchronous enumeration when cancelled. |
| `Signal.FromAsyncEnumerable<T>(IAsyncEnumerable<T>, CancellationToken)` | Convert an async enumerable on modern TFMs. |
| `Signal.FromTask<T>(Task<T>)` | Convert a task to a signal. |
| `Signal.FromAsync<T>(...)` | Invoke a task factory per subscription. |
| `Signal.After(TimeSpan, ISequencer?)` | Emit one `long` tick after a delay. |
| `Signal.Every(TimeSpan, ISequencer?)` | Emit increasing `long` ticks repeatedly. |
| `Signal.Pulse(...)` | Alias of `Every`. |
Expand Down Expand Up @@ -342,7 +345,7 @@ ReactiveUI.Primitives uses explicit names instead of cloning every System.Reacti
| System.Reactive type | ReactiveUI.Primitives equivalent | Notes |
|---|---|---|
| `Subject<T>` | `Signal<T>` | Push values, errors, and completion to subscribers. |
| `BehaviorSubject<T>` | `BehaviourSignal<T>` or `StateSignal<T>` | Stores the latest value and emits it to new subscribers. `StateSignal<T>` adds a mutable `Value` setter and `Changed`. |
| `BehaviorSubject<T>` | `BehaviorSignal<T>`, or `StateSignal<T>` | Stores the latest value and emits it to new subscribers. `StateSignal<T>` adds a mutable `Value` setter and `Changed`. |
| `ReplaySubject<T>` | `ReplaySignal<T>` | Replays buffered values by size and/or time window. |
| `AsyncSubject<T>` | `AsyncSignal<T>` | Awaitable subject-like signal; also implements `IAwaitSignal<T>`. |
| `ReactiveProperty<T>` / state holder | `StateSignal<T>` plus `ReadOnlyState<T>` | Mutable state and read-only projected state. |
Expand Down Expand Up @@ -493,20 +496,21 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps
| `Observable.Repeat(value)` | `Signal.Repeat(value)` | Indefinite repeat. |
| `Observable.Repeat(value, count)` | `Signal.Repeat(value, count)` | Fixed repeat. |
| `Observable.Defer(factory)` | `Signal.Defer(factory)` | Create source per subscription. |
| `Observable.FromAsync(...)` | `Signal.FromAsync(...)` | Invoke a task factory per subscription. |
| `Observable.Create<T>(...)` | `Signal.Create<T>(...)` or `Signal.CreateSafe<T>(...)` | Prefer `CreateSafe` for general custom sources. |
| `Observable.Using(...)` | `Signal.Use(...)` | Resource scoped to subscription. |
| `Observable.Timer(dueTime)` | `Signal.Timer(dueTime)` or `Signal.After(dueTime)` | Emits `long` tick `0`. |
| `Observable.Timer(dueTime, period)` | `Signal.Timer(dueTime, period)` | Periodic `long` ticks. |
| `Observable.Interval(period)` | `Signal.Interval(period)` or `Signal.Every(period)` | Repeating ticks. |
| `ToObservable()` from enumerable | `Signal.FromEnumerable(values)` or `values.ToSignal()` | `ToSignal` extension is available. |
| `ToObservable()` from enumerable | `Signal.FromEnumerable(values)`, `values.ToSignal()`, or `values.ToObservable()` | Cancellation-token overloads are available. |
| task conversion | `Signal.FromTask(task)` | Function-based task signals also exist. |

### Subject/state mapping

| System.Reactive | ReactiveUI.Primitives | Migration detail |
|---|---|---|
| `new Subject<T>()` | `new Signal<T>()` | Use `OnNext`, `OnError`, `OnCompleted`, and `Subscribe`. |
| `new BehaviorSubject<T>(initial)` | `new BehaviourSignal<T>(initial)` | Keeps `Value` getter and emits latest value to subscribers. |
| `new BehaviorSubject<T>(initial)` | `new BehaviorSignal<T>(initial)` | Keeps `Value` getter and emits latest value to subscribers. |
| mutable reactive property | `new StateSignal<T>(initial)` | Set `Value` to emit. Use `Changed` for observable state stream. |
| `new ReplaySubject<T>()` | `new ReplaySignal<T>()` | Unbounded replay. |
| `new ReplaySubject<T>(bufferSize)` | `new ReplaySignal<T>(bufferSize)` | Size-limited replay. |
Expand Down Expand Up @@ -546,6 +550,7 @@ ReactiveUI.Primitives is not a byte-for-byte clone of System.Reactive. It keeps
| `Buffer(count)` | `Buffer(count)` | Fixed-size buffers. |
| `ToList` / `ToArray` | `CollectList` / `CollectArray` | Signal results. |
| `FirstAsync` | `FirstAsync` | Task result. |
| `CountAsync` / `AnyAsync` | `CountAsync` / `AnyAsync` | Task-shaped terminal helpers, including cancellation overloads. |

### Disposable mapping

Expand Down
39 changes: 37 additions & 2 deletions src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal static class R3SignalBridge
throw new global::System.ArgumentNullException(nameof(source));
}

return global::ReactiveUI.Primitives.Signals.Signal.Create<T>(observer => source.Subscribe(observer));
return global::ReactiveUI.Primitives.Signals.Signal.Create<T>(observer => source.Subscribe(new R3ToPrimitivesObserver<T>(observer)));
}

/// <summary>
Expand All @@ -66,7 +66,42 @@ internal static class R3SignalBridge
throw new global::System.ArgumentNullException(nameof(source));
}

return global::R3.Observable<T>.Create(observer => source.Subscribe(observer));
return global::R3.Observable.Create<T>(observer => source.Subscribe(new PrimitivesToR3Observer<T>(observer)));
}

private sealed class R3ToPrimitivesObserver<T> : global::R3.Observer<T>
{
private readonly global::System.IObserver<T> _observer;

public R3ToPrimitivesObserver(global::System.IObserver<T> observer) => _observer = observer;

protected override void OnNextCore(T value) => _observer.OnNext(value);

protected override void OnErrorResumeCore(global::System.Exception error) => _observer.OnError(error);

protected override void OnCompletedCore(global::R3.Result result)
{
if (result.IsFailure)
{
_observer.OnError(result.Exception);
return;
}

_observer.OnCompleted();
}
}

private sealed class PrimitivesToR3Observer<T> : global::System.IObserver<T>
{
private readonly global::R3.Observer<T> _observer;

public PrimitivesToR3Observer(global::R3.Observer<T> observer) => _observer = observer;

public void OnNext(T value) => _observer.OnNext(value);

public void OnError(global::System.Exception error) => _observer.OnCompleted(global::R3.Result.Failure(error));

public void OnCompleted() => _observer.OnCompleted(global::R3.Result.Success);
}
}
""";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace ReactiveUI.Primitives.Concurrency;
/// <summary>
/// CurrentThreadSequencer.
/// </summary>
/// <seealso cref="ReactiveUI.Primitives.Concurrency.ISequencer" />
public sealed class CurrentThreadSequencer : ISequencer
/// <seealso cref="ISequencer" />
[DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class CurrentThreadSequencer : ISequencer
{
/// <summary>
/// Singleton holder for the current-thread sequencer.
Expand Down Expand Up @@ -103,7 +104,7 @@ public IDisposable Schedule<TState>(TState state, Func<ISequencer, TState, IDisp
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException"><paramref name="action" /> is <c>null</c>.</exception>
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequencer, TState, IDisposable> action)
{
Expand Down
3 changes: 2 additions & 1 deletion src/ReactiveUI.Primitives/Concurrency/DispatcherSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// DispatcherSequencer.
/// </summary>
/// <seealso cref="ReactiveUI.Primitives.Concurrency.ISequencer" />
public class DispatcherSequencer : ISequencer
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public partial class DispatcherSequencer : ISequencer
{
/// <summary>
/// Initializes a new instance of the <see cref="DispatcherSequencer"/> class.
Expand Down
5 changes: 3 additions & 2 deletions src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ namespace ReactiveUI.Primitives.Concurrency;
/// <summary>
/// ImmediateSequencer.
/// </summary>
/// <seealso cref="ReactiveUI.Primitives.Concurrency.ISequencer" />
public sealed class ImmediateSequencer : ISequencer
/// <seealso cref="ISequencer" />
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class ImmediateSequencer : ISequencer
{
/// <summary>
/// Singleton holder for the immediate sequencer.
Expand Down
5 changes: 3 additions & 2 deletions src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// Abstract base class for scheduled work items.
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>>, IsDisposed, IComparable
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public abstract partial class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>>, IsDisposed, IComparable
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
Expand All @@ -35,7 +36,7 @@ public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, ICom
/// </summary>
/// <param name="dueTime">Absolute time at which the work item has to be executed.</param>
/// <param name="comparer">Comparer used to compare work items based on their scheduled time.</param>
/// <exception cref="System.ArgumentNullException">comparer.</exception>
/// <exception cref="ArgumentNullException">comparer.</exception>
/// <exception cref="ArgumentNullException"><paramref name="comparer" /> is <c>null</c>.</exception>
protected ScheduledItem(TAbsolute dueTime, IComparer<TAbsolute> comparer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
/// <typeparam name="TValue">Type of the state passed to the scheduled action.</typeparam>
public sealed class ScheduledItem<TAbsolute, TValue> : ScheduledItem<TAbsolute>
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class ScheduledItem<TAbsolute, TValue> : ScheduledItem<TAbsolute>
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
Expand Down
14 changes: 7 additions & 7 deletions src/ReactiveUI.Primitives/Concurrency/Sequencer.Simple.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static IDisposable Schedule(this ISequencer scheduler, Action action)
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -72,7 +72,7 @@ public static IDisposable Schedule(this ISequencer scheduler, TimeSpan dueTime,
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -124,7 +124,7 @@ public static IDisposable Schedule(this ISequencer scheduler, Action<Action> act
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -190,7 +190,7 @@ internal static IDisposable ScheduleAction<TState>(this ISequencer scheduler, TS
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -222,7 +222,7 @@ internal static IDisposable ScheduleAction<TState>(this ISequencer scheduler, TS
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -254,7 +254,7 @@ internal static IDisposable ScheduleAction<TState>(this ISequencer scheduler, TS
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down Expand Up @@ -286,7 +286,7 @@ internal static IDisposable ScheduleAction<TState>(this ISequencer scheduler, TS
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// <exception cref="ArgumentNullException">
/// scheduler
/// or
/// action.
Expand Down
3 changes: 2 additions & 1 deletion src/ReactiveUI.Primitives/Concurrency/SequencerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
/// <remarks>This type is not thread safe; users should ensure proper synchronization.</remarks>
public class SequencerQueue<TAbsolute>
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public partial class SequencerQueue<TAbsolute>
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions src/ReactiveUI.Primitives/Concurrency/TaskPoolSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ namespace ReactiveUI.Primitives.Concurrency;
/// <summary>
/// TaskPoolSequencer.
/// </summary>
/// <seealso cref="ReactiveUI.Primitives.Concurrency.ISequencer" />
public sealed class TaskPoolSequencer : ISequencer
/// <seealso cref="ISequencer" />
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class TaskPoolSequencer : ISequencer
{
/// <summary>
/// Task factory used to schedule asynchronous work.
Expand Down
3 changes: 2 additions & 1 deletion src/ReactiveUI.Primitives/Concurrency/TestClock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// <summary>
/// Test-facing alias for <see cref="VirtualClock"/>.
/// </summary>
public sealed class TestClock : VirtualClock
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class TestClock : VirtualClock
{
/// <summary>
/// Initializes a new instance of the <see cref="TestClock"/> class at the default clock value.
Expand Down
14 changes: 8 additions & 6 deletions src/ReactiveUI.Primitives/Concurrency/ThreadPoolSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

using ReactiveUI.Primitives.Disposables;
using static ReactiveUI.Primitives.Disposables.Disposable;
using Timer = System.Threading.Timer;

namespace ReactiveUI.Primitives.Concurrency
{
/// <summary>
/// ThreadPoolSequencer.
/// </summary>
/// <seealso cref="ReactiveUI.Primitives.Concurrency.ISequencer" />
public sealed class ThreadPoolSequencer : ISequencer
/// <seealso cref="ISequencer" />
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed partial class ThreadPoolSequencer : ISequencer
{
/// <summary>
/// Gets the shared thread-pool scheduler instance.
Expand All @@ -26,7 +28,7 @@ public sealed class ThreadPoolSequencer : ISequencer
/// <summary>
/// Keeps timers rooted until they fire or are cancelled.
/// </summary>
internal static readonly Dictionary<System.Threading.Timer, object> Timers = [];
internal static readonly Dictionary<Timer, object> Timers = [];

/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolSequencer"/> class.
Expand All @@ -49,7 +51,7 @@ private ThreadPoolSequencer()
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException">action.</exception>
public IDisposable Schedule<TState>(TState state, Func<ISequencer, TState, IDisposable> action)
{
if (action == null)
Expand Down Expand Up @@ -83,7 +85,7 @@ public IDisposable Schedule<TState>(TState state, Func<ISequencer, TState, IDisp
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException">action.</exception>
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequencer, TState, IDisposable> action)
{
if (action == null)
Expand All @@ -94,7 +96,7 @@ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequen
var dueTime1 = Sequencer.Normalize(dueTime);
var hasAdded = false;
var hasRemoved = false;
System.Threading.Timer timer = null!;
Timer timer = null!;
timer = new(
_ =>
{
Expand Down
3 changes: 2 additions & 1 deletion src/ReactiveUI.Primitives/Concurrency/VirtualClock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// <summary>
/// Deterministic virtual scheduler backed by <see cref="DateTimeOffset"/> and <see cref="TimeSpan"/>.
/// </summary>
public class VirtualClock : VirtualTimeSequencer<DateTimeOffset, TimeSpan>
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public partial class VirtualClock : VirtualTimeSequencer<DateTimeOffset, TimeSpan>
{
/// <summary>
/// Initializes a new instance of the <see cref="VirtualClock"/> class at the default clock value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
/// <typeparam name="TRelative">Relative time representation type.</typeparam>
public abstract class VirtualTimeSequencerBase<TAbsolute, TRelative> : ISequencer, IServiceProvider, IStopwatchProvider
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public abstract partial class VirtualTimeSequencerBase<TAbsolute, TRelative> : ISequencer, IServiceProvider, IStopwatchProvider
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace ReactiveUI.Primitives.Concurrency;
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
/// <typeparam name="TRelative">Relative time representation type.</typeparam>
public abstract class VirtualTimeSequencer<TAbsolute, TRelative> : VirtualTimeSequencerBase<TAbsolute, TRelative>
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public abstract partial class VirtualTimeSequencer<TAbsolute, TRelative> : VirtualTimeSequencerBase<TAbsolute, TRelative>
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
Expand Down Expand Up @@ -47,7 +48,7 @@ protected VirtualTimeSequencer(TAbsolute initialClock, IComparer<TAbsolute> comp
/// <returns>
/// The disposable object used to cancel the scheduled action (best effort).
/// </returns>
/// <exception cref="System.ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException">action.</exception>
/// <exception cref="ArgumentNullException"><paramref name="action" /> is <c>null</c>.</exception>
public override IDisposable ScheduleAbsolute<TState>(TState state, TAbsolute dueTime, Func<ISequencer, TState, IDisposable> action)
{
Expand Down
Loading
Loading