Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/ReactiveUI.Primitives.R3Bridge.Generator/R3BridgeGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace ReactiveUI.Primitives.R3Bridge.Generator;
[Generator(LanguageNames.CSharp)]
public sealed class R3BridgeGenerator : IIncrementalGenerator
{
/// <summary>
/// Attribute source emitted during post initialization so consumers can identify generated bridge output.
/// </summary>
private const string MarkerSource = """
// <auto-generated />
namespace ReactiveUI.Primitives.R3Bridge.Generated;
Expand All @@ -27,6 +30,9 @@ internal sealed class PrimitivesR3BridgeGeneratedAttribute : global::System.Attr
}
""";

/// <summary>
/// Bridge extension source emitted when the consumer compilation references R3.
/// </summary>
private const string BridgeSource = """
// <auto-generated />
#nullable enable
Expand Down Expand Up @@ -75,10 +81,12 @@ public void Initialize(IncrementalGeneratorInitializationContext context)

context.RegisterSourceOutput(context.CompilationProvider, static (output, compilation) =>
{
if (compilation.GetTypeByMetadataName("R3.Observable`1") != null)
if (compilation.GetTypeByMetadataName("R3.Observable`1") == null)
{
output.AddSource("R3SignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
return;
}

output.AddSource("R3SignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace ReactiveUI.Primitives.SystemReactiveBridge.Generator;
[Generator(LanguageNames.CSharp)]
public sealed class SystemReactiveBridgeGenerator : IIncrementalGenerator
{
/// <summary>
/// Attribute source emitted during post initialization so consumers can identify generated bridge output.
/// </summary>
private const string MarkerSource = """
// <auto-generated />
namespace ReactiveUI.Primitives.SystemReactiveBridge.Generated;
Expand All @@ -27,6 +30,9 @@ internal sealed class PrimitivesSystemReactiveBridgeGeneratedAttribute : global:
}
""";

/// <summary>
/// Bridge extension source emitted when the consumer compilation references System.Reactive.
/// </summary>
private const string BridgeSource = """
// <auto-generated />
#nullable enable
Expand Down Expand Up @@ -75,10 +81,12 @@ public void Initialize(IncrementalGeneratorInitializationContext context)

context.RegisterSourceOutput(context.CompilationProvider, static (output, compilation) =>
{
if (compilation.GetTypeByMetadataName("System.Reactive.Linq.Observable") != null)
if (compilation.GetTypeByMetadataName("System.Reactive.Linq.Observable") == null)
{
output.AddSource("SystemReactiveSignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
return;
}

output.AddSource("SystemReactiveSignalBridge.g.cs", SourceText.From(BridgeSource, Encoding.UTF8));
});
}
}
49 changes: 44 additions & 5 deletions src/ReactiveUI.Primitives/Concurrency/CurrentThreadSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,32 @@ namespace ReactiveUI.Primitives.Concurrency;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class CurrentThreadSequencer : ISequencer
{
/// <summary>
/// Singleton holder for the current-thread sequencer.
/// </summary>
private static readonly Lazy<CurrentThreadSequencer> StaticInstance = new(() => new CurrentThreadSequencer());

/// <summary>
/// Tracks whether the current thread is running scheduled work.
/// </summary>
[ThreadStatic]
private static bool _running;

/// <summary>
/// Holds recursive work queued for the current thread.
/// </summary>
[ThreadStatic]
private static SequencerQueue<TimeSpan>? _threadLocalQueue;

/// <summary>
/// Measures relative due times for the current thread.
/// </summary>
[ThreadStatic]
private static Stopwatch? clock;

/// <summary>
/// Initializes a new instance of the <see cref="CurrentThreadSequencer"/> class.
/// </summary>
private CurrentThreadSequencer()
{
}
Expand All @@ -45,8 +60,11 @@ private CurrentThreadSequencer()
/// <summary>
/// Gets the scheduler's notion of current time.
/// </summary>
public DateTimeOffset Now => DateTimeOffset.UtcNow;
public DateTimeOffset Now => Sequencer.Now;

/// <summary>
/// Gets elapsed time on the current thread.
/// </summary>
private static TimeSpan Time
{
get
Expand Down Expand Up @@ -100,7 +118,7 @@ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequen
// There is no timed task and no task is currently running
if (!_running)
{
_running = true;
SetRunning(true);

if (dueTime > TimeSpan.Zero)
{
Expand All @@ -116,7 +134,7 @@ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequen
catch
{
SetQueue(null);
_running = false;
SetRunning(false);
throw;
}

Expand All @@ -133,12 +151,12 @@ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequen
finally
{
SetQueue(null);
_running = false;
SetRunning(false);
}
}
else
{
_running = false;
SetRunning(false);
}

return d;
Expand Down Expand Up @@ -177,12 +195,33 @@ public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<I
return Schedule(state, due, action);
}

/// <summary>
/// Gets the queued recursive work for the current thread.
/// </summary>
/// <returns>The current thread queue, if one exists.</returns>
private static SequencerQueue<TimeSpan>? GetQueue() => _threadLocalQueue;

/// <summary>
/// Sets the queued recursive work for the current thread.
/// </summary>
/// <param name="newQueue">The queue to assign.</param>
private static void SetQueue(SequencerQueue<TimeSpan>? newQueue) => _threadLocalQueue = newQueue;

/// <summary>
/// Sets the current-thread running marker.
/// </summary>
/// <param name="running">Value indicating whether work is running.</param>
private static void SetRunning(bool running) => _running = running;

/// <summary>
/// Runs queued current-thread work.
/// </summary>
private static class Trampoline
{
/// <summary>
/// Runs all work currently in the queue.
/// </summary>
/// <param name="queue">Queue to drain.</param>
public static void Run(SequencerQueue<TimeSpan> queue)
{
while (queue.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<ISequen

var timeSpan = Sequencer.Normalize(dueTime);
var timer = new DispatcherTimer();
timer.Tick += (s, e) =>
timer.Tick += (_, _) =>
{
timer?.Stop();
timer = null;
Expand Down
2 changes: 1 addition & 1 deletion src/ReactiveUI.Primitives/Concurrency/IScheduledItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace ReactiveUI.Primitives.Concurrency;
/// Represents a work item that has been scheduled.
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
public interface IScheduledItem<TAbsolute>
public interface IScheduledItem<out TAbsolute>
{
/// <summary>
/// Gets the absolute time at which the item is due for invocation.
Expand Down
8 changes: 7 additions & 1 deletion src/ReactiveUI.Primitives/Concurrency/ImmediateSequencer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ namespace ReactiveUI.Primitives.Concurrency;
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public sealed class ImmediateSequencer : ISequencer
{
/// <summary>
/// Singleton holder for the immediate sequencer.
/// </summary>
private static readonly Lazy<ImmediateSequencer> StaticInstance = new(static () => new ImmediateSequencer());

/// <summary>
/// Initializes a new instance of the <see cref="ImmediateSequencer"/> class.
/// </summary>
private ImmediateSequencer()
{
}
Expand All @@ -25,7 +31,7 @@ private ImmediateSequencer()
/// <summary>
/// Gets the scheduler's notion of current time.
/// </summary>
public DateTimeOffset Now => DateTimeOffset.UtcNow;
public DateTimeOffset Now => Sequencer.Now;

/// <summary>
/// Schedules the specified state.
Expand Down
60 changes: 52 additions & 8 deletions src/ReactiveUI.Primitives/Concurrency/ScheduledItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Runtime.CompilerServices;
using ReactiveUI.Primitives.Disposables;

namespace ReactiveUI.Primitives.Concurrency;
Expand All @@ -11,11 +12,22 @@ namespace ReactiveUI.Primitives.Concurrency;
/// </summary>
/// <typeparam name="TAbsolute">Absolute time representation type.</typeparam>
[System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>>, IsDisposed
public abstract class ScheduledItem<TAbsolute> : IScheduledItem<TAbsolute>, IComparable<ScheduledItem<TAbsolute>>, IsDisposed, IComparable
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
/// Compares scheduled items by due time.
/// </summary>
private readonly IComparer<TAbsolute> _comparer;

/// <summary>
/// Holds the disposable returned by the scheduled action.
/// </summary>
private IDisposable? _disposable;

/// <summary>
/// Tracks cancellation without taking a lock.
/// </summary>
private int _isDisposed;

/// <summary>
Expand Down Expand Up @@ -106,7 +118,11 @@ protected ScheduledItem(TAbsolute dueTime, IComparer<TAbsolute> comparer)
/// </summary>
/// <param name="other">Work item to compare the current work item to.</param>
/// <returns>Relative ordering between this and the specified work item.</returns>
/// <remarks>The inequality operators are overloaded to provide results consistent with the <see cref="IComparable"/> implementation. Equality operators implement traditional reference equality semantics.</remarks>
/// <remarks>
/// The inequality operators are overloaded to provide results consistent with the
/// <see cref="IComparable"/> implementation. Equality operators implement traditional
/// reference equality semantics.
/// </remarks>
public int CompareTo(ScheduledItem<TAbsolute>? other)
{
// MSDN: By definition, any object compares greater than null, and two null references compare equal to each other.
Expand All @@ -118,6 +134,26 @@ public int CompareTo(ScheduledItem<TAbsolute>? other)
return _comparer.Compare(DueTime, other.DueTime);
}

/// <summary>
/// Compares the current instance with another object of the same type and returns an integer that indicates relative ordering.
/// </summary>
/// <param name="obj">An object to compare with this instance.</param>
/// <returns>A value that indicates the relative order of the objects being compared.</returns>
public int CompareTo(object? obj)
{
if (obj == null)
{
return 1;
}

if (obj is ScheduledItem<TAbsolute> x)
{
return CompareTo(x);
}

throw new ArgumentException("Object must be a compatible scheduled item.", nameof(obj));
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
Expand All @@ -131,14 +167,18 @@ public void Dispose()
/// Determines whether a <see cref="ScheduledItem{TAbsolute}" /> object is equal to the specified object.
/// </summary>
/// <param name="obj">The object to compare to the current <see cref="ScheduledItem{TAbsolute}" /> object.</param>
/// <returns><c>true</c> if the obj parameter is a <see cref="ScheduledItem{TAbsolute}" /> object and is equal to the current <see cref="ScheduledItem{TAbsolute}" /> object; otherwise, <c>false</c>.</returns>
/// <returns>
/// <c>true</c> if the obj parameter is a <see cref="ScheduledItem{TAbsolute}" />
/// object and is equal to the current <see cref="ScheduledItem{TAbsolute}" />
/// object; otherwise, <c>false</c>.
/// </returns>
public override bool Equals(object? obj) => ReferenceEquals(this, obj);

/// <summary>
/// Returns the hash code for the current <see cref="ScheduledItem{TAbsolute}" /> object.
/// </summary>
/// <returns>A 32-bit signed integer hash code.</returns>
public override int GetHashCode() => base.GetHashCode();
public override int GetHashCode() => RuntimeHelpers.GetHashCode(this);

/// <summary>
/// Invokes the work item.
Expand All @@ -158,10 +198,12 @@ public void Invoke()
return;
}

if (IsDisposed)
if (!IsDisposed)
{
disposable.Dispose();
return;
}

disposable.Dispose();
}

/// <summary>
Expand All @@ -170,10 +212,12 @@ public void Invoke()
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
{
if (disposing && Interlocked.Exchange(ref _isDisposed, 1) == 0)
if (!disposing || Interlocked.Exchange(ref _isDisposed, 1) != 0)
{
Interlocked.Exchange(ref _disposable, Disposable.Empty)?.Dispose();
return;
}

Interlocked.Exchange(ref _disposable, Disposable.Empty)?.Dispose();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,19 @@ namespace ReactiveUI.Primitives.Concurrency;
public sealed class ScheduledItem<TAbsolute, TValue> : ScheduledItem<TAbsolute>
where TAbsolute : IComparable<TAbsolute>
{
/// <summary>
/// Sequencer passed to the scheduled action.
/// </summary>
private readonly ISequencer _scheduler;

/// <summary>
/// State passed to the scheduled action.
/// </summary>
private readonly TValue _state;

/// <summary>
/// Action invoked when the scheduled item runs.
/// </summary>
private readonly Func<ISequencer, TValue, IDisposable> _action;

/// <summary>
Expand Down
Loading
Loading