diff --git a/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs b/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs
new file mode 100644
index 00000000..f99f4674
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Adapt.cs
@@ -0,0 +1,69 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Injects a side effect into the changeset stream by calling .
+ /// for every changeset, then forwarding it downstream unchanged.
+ ///
+ /// The type of items in the cache.
+ /// The type of the key.
+ /// The source to observe and adapt.
+ /// The whose Adapt method is called for each changeset.
+ /// An observable that emits the same changesets as , after the adaptor has processed each one.
+ ///
+ ///
+ /// This is a thin wrapper around Rx's Do operator. The adaptor receives each changeset
+ /// as a side effect; the changeset itself is forwarded downstream unmodified.
+ ///
+ ///
+ /// or is .
+ ///
+ ///
+ public static IObservable> Adapt(this IObservable> source, IChangeSetAdaptor adaptor)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));
+
+ return source.Do(adaptor.Adapt);
+ }
+
+ ///
+ /// The source to observe and adapt.
+ /// The whose Adapt method is called for each changeset.
+ /// This overload operates on . Delegates to Rx's Do operator.
+ public static IObservable> Adapt(this IObservable> source, ISortedChangeSetAdaptor adaptor)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ adaptor.ThrowArgumentNullExceptionIfNull(nameof(adaptor));
+
+ return source.Do(adaptor.Adapt);
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AddOrUpdate.cs b/src/DynamicData/Cache/ObservableCacheEx.AddOrUpdate.cs
new file mode 100644
index 00000000..3b217f6e
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AddOrUpdate.cs
@@ -0,0 +1,113 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Adds or updates the cache with the specified item, producing a changeset with a single Add
+ /// (if the key is new) or Update (if the key already exists).
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The to add or update items in.
+ /// The item to add or update.
+ ///
+ /// Convenience method that wraps a single-item mutation inside .
+ ///
+ /// EventBehavior
+ /// - AddProduced when the key does not already exist in the cache.
+ /// - UpdateProduced when the key already exists. The previous value is included in the changeset.
+ /// - RemoveNot produced by this method.
+ /// - RefreshNot produced by this method.
+ ///
+ ///
+ /// is .
+ ///
+ ///
+ public static void AddOrUpdate(this ISourceCache source, TObject item)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.AddOrUpdate(item));
+ }
+
+ ///
+ /// The to add or update items in.
+ /// The item to add or update.
+ /// The used to determine whether a new item is the same as an existing cached item. When equal, the update is skipped.
+ /// This overload uses to suppress no-op updates when the new value equals the existing one.
+ public static void AddOrUpdate(this ISourceCache source, TObject item, IEqualityComparer equalityComparer)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.AddOrUpdate(item, equalityComparer));
+ }
+
+ ///
+ /// The to add or update items in.
+ /// The of items to add or update.
+ /// Batch overload. All items are added/updated inside a single call, producing one changeset.
+ public static void AddOrUpdate(this ISourceCache source, IEnumerable items)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.AddOrUpdate(items));
+ }
+
+ ///
+ /// The to add or update items in.
+ /// The of items to add or update.
+ /// The used to determine whether a new item is the same as an existing cached item. When equal, the update is skipped.
+ /// Batch overload with equality comparison. All items are added/updated inside a single call.
+ public static void AddOrUpdate(this ISourceCache source, IEnumerable items, IEqualityComparer equalityComparer)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.AddOrUpdate(items, equalityComparer));
+ }
+
+ ///
+ /// The to add or update items in.
+ /// The item to add or update.
+ /// The key to associate with the item.
+ /// This overload operates on , which requires an explicit key parameter.
+ public static void AddOrUpdate(this IIntermediateCache source, TObject item, TKey key)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ item.ThrowArgumentNullExceptionIfNull(nameof(item));
+
+ source.Edit(updater => updater.AddOrUpdate(item, key));
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.And.cs b/src/DynamicData/Cache/ObservableCacheEx.And.cs
new file mode 100644
index 00000000..511edb1a
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.And.cs
@@ -0,0 +1,238 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Applied a logical And operator between the collections i.e items which are in all of the
+ /// sources are included.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to combine.
+ /// The additional streams to combine with.
+ /// An observable which emits change sets.
+ /// source or others.
+ ///
+ public static IObservable> And(this IObservable> source, params IObservable>[] others)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return others is null || others.Length == 0
+ ? throw new ArgumentNullException(nameof(others))
+ : source.Combine(CombineOperator.And, others);
+ }
+
+ ///
+ /// Applied a logical And operator between the collections i.e items which are in all of the sources are included.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ ///
+ /// source
+ /// or
+ /// others.
+ ///
+ public static IObservable> And(this ICollection>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList>> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ ///
+ /// Dynamically apply a logical And operator between the items in the outer observable list.
+ /// Items which are in all of the sources are included in the result.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The of changeset streams to combine.
+ /// An observable which emits change sets.
+ public static IObservable> And(this IObservableList> sources)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return sources.Combine(CombineOperator.And);
+ }
+
+ private static IObservable> Combine(this IObservableList> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var connections = source.Connect().Transform(x => x.Connect()).AsObservableList();
+ var subscriber = connections.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(connections, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return Observable.Create>(
+ observer =>
+ {
+ var connections = source.Connect().Transform(x => x.Connect()).AsObservableList();
+ var subscriber = connections.Combine(type).SubscribeSafe(observer);
+ return new CompositeDisposable(connections, subscriber);
+ });
+ }
+
+ private static IObservable> Combine(this IObservableList>> source, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DynamicCombiner(source, type).Run();
+ }
+
+ private static IObservable> Combine(this ICollection>> sources, CombineOperator type)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ sources.ThrowArgumentNullExceptionIfNull(nameof(sources));
+
+ return Observable.Create>(
+ observer =>
+ {
+ void UpdateAction(IChangeSet updates)
+ {
+ try
+ {
+ observer.OnNext(updates);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ }
+ }
+
+ var subscriber = Disposable.Empty;
+ try
+ {
+ var combiner = new Combiner(type, UpdateAction);
+ subscriber = combiner.Subscribe([.. sources]);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+
+ return subscriber;
+ });
+ }
+
+ private static IObservable> Combine(this IObservable> source, CombineOperator type, params IObservable>[] combineTarget)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ combineTarget.ThrowArgumentNullExceptionIfNull(nameof(combineTarget));
+
+ return Observable.Create>(
+ observer =>
+ {
+ void UpdateAction(IChangeSet updates)
+ {
+ try
+ {
+ observer.OnNext(updates);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+ }
+
+ var subscriber = Disposable.Empty;
+ try
+ {
+ var list = combineTarget.ToList();
+ list.Insert(0, source);
+
+ var combiner = new Combiner(type, UpdateAction);
+ subscriber = combiner.Subscribe([.. list]);
+ }
+ catch (Exception ex)
+ {
+ observer.OnError(ex);
+ observer.OnCompleted();
+ }
+
+ return subscriber;
+ });
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AsObservableCache.cs b/src/DynamicData/Cache/ObservableCacheEx.AsObservableCache.cs
new file mode 100644
index 00000000..3a9f5887
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AsObservableCache.cs
@@ -0,0 +1,78 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Wraps an in a read-only facade, hiding the mutable API.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The to operate on.
+ /// A read-only .
+ /// is .
+ ///
+ public static IObservableCache AsObservableCache(this IObservableCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new AnonymousObservableCache(source);
+ }
+
+ ///
+ /// Materializes a changeset stream into a queryable, read-only .
+ /// The cache subscribes to the source on first access and maintains a live snapshot of all items.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to materialize into a read-only cache.
+ /// If (default), all cache operations are synchronized. Set to when the caller guarantees single-threaded access.
+ /// A read-only observable cache that reflects the current state of the pipeline.
+ ///
+ ///
+ /// Disposing the returned cache unsubscribes from the source stream. The cache's Connect()
+ /// method provides a changeset stream of its own, which re-emits the current state on each new subscriber.
+ ///
+ /// When is , a is used internally.
+ ///
+ /// is .
+ ///
+ ///
+ public static IObservableCache AsObservableCache(this IObservable> source, bool applyLocking = true)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (applyLocking)
+ {
+ return new AnonymousObservableCache(source);
+ }
+
+ return new LockFreeObservableCache(source);
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AsyncDisposeMany.cs b/src/DynamicData/Cache/ObservableCacheEx.AsyncDisposeMany.cs
new file mode 100644
index 00000000..bf87de8d
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AsyncDisposeMany.cs
@@ -0,0 +1,82 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+#if SUPPORTS_ASYNC_DISPOSABLE
+ ///
+ ///
+ /// Disposes items implementing or when they are removed or replaced,
+ /// and disposes all tracked items when the stream completes, errors, or the subscription is disposed.
+ ///
+ ///
+ /// Individual items are disposed after the changeset has been forwarded downstream, so downstream operators
+ /// see the removal before disposal occurs. Items implementing neither disposal interface are ignored.
+ ///
+ ///
+ /// The type of items in the cache.
+ /// The type of the key.
+ /// The source to track for async disposal on removal.
+ ///
+ ///
+ /// Invoked once per subscription, providing an that signals when all
+ /// calls have finished. The signal emits a single value
+ /// and then completes.
+ ///
+ ///
+ /// This is delivered on a separate channel from the main changeset stream so it can be observed even
+ /// if the source stream errors.
+ ///
+ ///
+ /// A stream that forwards all changesets from unchanged.
+ ///
+ ///
+ /// Change reason handling:
+ ///
+ /// EventBehavior
+ /// - AddTracks the item. No disposal.
+ /// - UpdateDisposes the previous value (if it differs by reference from the current). Tracks the new value.
+ /// - RemoveDisposes the removed item.
+ /// - RefreshPassed through. No disposal.
+ ///
+ ///
+ ///
+ /// On stream completion, error, or subscription disposal, all items still in the cache are disposed.
+ /// items are disposed synchronously; items
+ /// are dispatched via the signal.
+ ///
+ ///
+ /// or is .
+ ///
+ public static IObservable> AsyncDisposeMany(
+ this IObservable> source,
+ Action> disposalsCompletedAccessor)
+ where TObject : notnull
+ where TKey : notnull
+ => Cache.Internal.AsyncDisposeMany.Create(
+ source: source,
+ disposalsCompletedAccessor: disposalsCompletedAccessor);
+#endif
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs b/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs
new file mode 100644
index 00000000..1e7bfb42
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs
@@ -0,0 +1,90 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Automatically refresh downstream operators when any properties change.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The source to monitor for property-driven refresh signals.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements have successive property changes.
+ /// An optional throttle applied to each item's property change notifications, preventing excessive refresh invocations.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ public static IObservable> AutoRefresh(this IObservable> source, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.AutoRefreshOnObservable(
+ (t, _) =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenAnyPropertyChanged();
+ }
+
+ return t.WhenAnyPropertyChanged().Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+
+ ///
+ /// Automatically refresh downstream operators when properties change.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of the property.
+ /// The source to monitor for property-driven refresh signals.
+ /// A that specify a property to observe changes. When it changes a Refresh is invoked.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements have successive property changes.
+ /// An optional throttle applied to each item's property change notifications, preventing excessive refresh invocations.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ public static IObservable> AutoRefresh(this IObservable> source, Expression> propertyAccessor, TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler? scheduler = null)
+ where TObject : INotifyPropertyChanged
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.AutoRefreshOnObservable(
+ (t, _) =>
+ {
+ if (propertyChangeThrottle is null)
+ {
+ return t.WhenPropertyChanged(propertyAccessor, false);
+ }
+
+ return t.WhenPropertyChanged(propertyAccessor, false).Throttle(propertyChangeThrottle.Value, scheduler ?? GlobalConfig.DefaultScheduler);
+ },
+ changeSetBuffer,
+ scheduler);
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.AutoRefreshOnObservable.cs b/src/DynamicData/Cache/ObservableCacheEx.AutoRefreshOnObservable.cs
new file mode 100644
index 00000000..39a04294
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.AutoRefreshOnObservable.cs
@@ -0,0 +1,67 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of evaluation.
+ /// The source to monitor for observable-driven refresh signals.
+ /// The observable which acts on items within the collection and produces a value when the item should be refreshed.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements require a refresh.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ public static IObservable> AutoRefreshOnObservable(this IObservable> source, Func> reevaluator, TimeSpan? changeSetBuffer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => source.AutoRefreshOnObservable((t, _) => reevaluator(t), changeSetBuffer, scheduler);
+
+ ///
+ /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification.
+ ///
+ /// The object of the change set.
+ /// The key of the change set.
+ /// The type of evaluation.
+ /// The source to monitor for observable-driven refresh signals.
+ /// The observable which acts on items within the collection and produces a value when the item should be refreshed.
+ /// An optional buffer duration. Batches multiple refresh signals into a single changeset, improving performance when many elements change in quick succession. This greatly increases performance when many elements require a refresh.
+ /// An optional for scheduling work.
+ /// An observable change set with additional refresh changes.
+ ///
+ /// Worth noting: Per-item observable errors are silently ignored (not forwarded to the downstream observer). Only source stream errors propagate.
+ ///
+ public static IObservable> AutoRefreshOnObservable(this IObservable> source, Func> reevaluator, TimeSpan? changeSetBuffer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ reevaluator.ThrowArgumentNullExceptionIfNull(nameof(reevaluator));
+
+ return new AutoRefresh(source, reevaluator, changeSetBuffer, scheduler).Run();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Batch.cs b/src/DynamicData/Cache/ObservableCacheEx.Batch.cs
new file mode 100644
index 00000000..1ed843de
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Batch.cs
@@ -0,0 +1,64 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Collects changesets emitted within a time window and merges them into a single changeset.
+ /// Uses Rx's Buffer operator followed by .
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to batch.
+ /// The time window for batching.
+ /// The scheduler for timing. Defaults to .
+ /// An observable that emits merged changesets, one per time window.
+ ///
+ ///
+ /// All changesets received during the time window are concatenated into a single changeset.
+ /// This is useful for reducing UI update frequency when the source emits many rapid changes.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddBuffered and included in the merged changeset at the end of the time window.
+ /// - UpdateBuffered and included in the merged changeset.
+ /// - RemoveBuffered and included in the merged changeset.
+ /// - RefreshBuffered and included in the merged changeset.
+ /// - OnCompletedAny remaining buffered changes are flushed, then completion is forwarded.
+ ///
+ /// Worth noting: The merged changeset may contain contradictory changes (e.g., Add then Remove for the same key). Downstream operators handle this correctly, but raw inspection of the changeset may be surprising.
+ ///
+ /// is .
+ ///
+ ///
+ public static IObservable> Batch(this IObservable> source, TimeSpan timeSpan, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return source.Buffer(timeSpan, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.BatchIf.cs b/src/DynamicData/Cache/ObservableCacheEx.BatchIf.cs
new file mode 100644
index 00000000..99fb5ee1
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.BatchIf.cs
@@ -0,0 +1,97 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// This overload delegates to the primary overload with initialPauseState: false.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => BatchIf(source, pauseIfTrueSelector, false, scheduler);
+
+ ///
+ /// This overload delegates to the primary overload with default initialPauseState: false.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => new BatchIf(source, pauseIfTrueSelector, null, initialPauseState, scheduler: scheduler).Run();
+
+ ///
+ /// This overload omits initialPauseState (defaults to ) but accepts a timeout.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, TimeSpan? timeOut = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => BatchIf(source, pauseIfTrueSelector, false, timeOut, scheduler);
+
+ ///
+ /// Conditionally buffers changesets while a pause signal is active, then flushes all buffered
+ /// changes as a single merged changeset when the signal resumes.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to conditionally buffer.
+ /// An that when , buffering begins. When , the buffer is flushed.
+ /// If , starts in a paused (buffering) state.
+ /// A that maximum time the buffer stays open. When elapsed, the buffer is flushed regardless of pause state.
+ /// The for timeout timing.
+ /// An observable that emits changesets, buffered or passthrough depending on pause state.
+ ///
+ ///
+ /// While paused, incoming changesets are accumulated. On resume (or timeout), all buffered changesets
+ /// are merged into a single changeset and emitted. While not paused, changesets pass through immediately.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddBuffered while paused; forwarded immediately while active.
+ /// - UpdateBuffered while paused; forwarded immediately while active.
+ /// - RemoveBuffered while paused; forwarded immediately while active.
+ /// - RefreshBuffered while paused; forwarded immediately while active.
+ /// - OnErrorBuffered data is lost.
+ /// - OnCompletedAny remaining buffered data is flushed before completion.
+ ///
+ /// Worth noting: If the source completes while paused, buffered data IS flushed before OnCompleted. However, if the source errors while paused, buffered data is lost.
+ ///
+ /// or is .
+ ///
+ ///
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, TimeSpan? timeOut = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ pauseIfTrueSelector.ThrowArgumentNullExceptionIfNull(nameof(pauseIfTrueSelector));
+
+ return new BatchIf(source, pauseIfTrueSelector, timeOut, initialPauseState, scheduler: scheduler).Run();
+ }
+
+ ///
+ /// The source to conditionally buffer.
+ /// An that controls buffering: begins buffering, flushes the buffer.
+ /// If , starts in a paused (buffering) state.
+ /// An optional timer. The buffer is flushed each time the timer produces a value, and buffering ceases when it completes.
+ /// An optional for scheduling work.
+ /// This overload accepts an explicit timer observable instead of a timeout.
+ public static IObservable> BatchIf(this IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, IObservable? timer = null, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => new BatchIf(source, pauseIfTrueSelector, null, initialPauseState, timer, scheduler).Run();
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Bind.cs b/src/DynamicData/Cache/ObservableCacheEx.Bind.cs
new file mode 100644
index 00000000..ec591efa
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Bind.cs
@@ -0,0 +1,332 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The number of changes before a reset notification is triggered.
+ /// An observable which will emit change sets.
+ /// source.
+ ///
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, int refreshThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+
+ var options = refreshThreshold == BindingOptions.DefaultResetThreshold
+ ? defaults
+ : defaults with { ResetThreshold = refreshThreshold };
+
+ return source?.Bind(destination, new ObservableCollectionAdaptor(options)) ?? throw new ArgumentNullException(nameof(source));
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ return source?.Bind(destination, new ObservableCollectionAdaptor(options)) ?? throw new ArgumentNullException(nameof(source));
+ }
+
+ ///
+ /// Binds the results to the specified binding collection using the specified update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that applies changes to the bound collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, IObservableCollectionAdaptor updater)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+ updater.ThrowArgumentNullExceptionIfNull(nameof(updater));
+
+ return Observable.Create>(
+ observer =>
+ source.SynchronizeSafe(InternalEx.NewLock()).Select(
+ changes =>
+ {
+ updater.Adapt(changes, destination);
+ return changes;
+ }).SubscribeSafe(observer));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, new ObservableCollectionAdaptor(options));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The number of changes before a reset notification is triggered.
+ /// When , uses Replace instead of Remove/Add for updates in the bound collection. Not all platforms support replace notifications.
+ /// An optional that controls how the target collection is updated.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = BindingOptions.DefaultResetThreshold, bool useReplaceForUpdates = BindingOptions.DefaultUseReplaceForUpdates, IObservableCollectionAdaptor? adaptor = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ if (adaptor is not null)
+ {
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, adaptor);
+ }
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold && useReplaceForUpdates == BindingOptions.DefaultUseReplaceForUpdates
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold, UseReplaceForUpdates = useReplaceForUpdates };
+
+ return source.Bind(out readOnlyObservableCollection, options);
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ return source.Bind(destination, DynamicDataOptions.Binding);
+ }
+
+ ///
+ /// Binds the results to the specified observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+
+ var updater = new SortedObservableCollectionAdaptor(options);
+ return source.Bind(destination, updater);
+ }
+
+ ///
+ /// Binds the results to the specified binding collection using the specified update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The that applies changes to the bound collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, IObservableCollection destination, ISortedObservableCollectionAdaptor updater)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ destination.ThrowArgumentNullExceptionIfNull(nameof(destination));
+ updater.ThrowArgumentNullExceptionIfNull(nameof(updater));
+
+ return Observable.Create>(
+ observer =>
+ source.SynchronizeSafe(InternalEx.NewLock()).Select(
+ changes =>
+ {
+ updater.Adapt(changes, destination);
+ return changes;
+ }).SubscribeSafe(observer));
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The that controls binding behavior.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, BindingOptions options)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ var target = new ObservableCollectionExtended();
+ var result = new ReadOnlyObservableCollection(target);
+ var updater = new SortedObservableCollectionAdaptor(options);
+ readOnlyObservableCollection = result;
+ return source.Bind(target, updater);
+ }
+
+ ///
+ /// Binds the results to the specified readonly observable collection using the default update algorithm.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to bind to a collection.
+ /// The output that will be populated with the results.
+ /// The number of changes before a reset event is called on the observable collection.
+ /// When , uses Replace instead of Remove/Add for updates in the bound collection. Not all platforms support replace notifications.
+ /// An that specify an adaptor to change the algorithm to update the target collection.
+ /// An observable which will emit change sets.
+ /// source.
+ public static IObservable> Bind(this IObservable> source, out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = BindingOptions.DefaultResetThreshold, bool useReplaceForUpdates = BindingOptions.DefaultUseReplaceForUpdates, ISortedObservableCollectionAdaptor? adaptor = null)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ // if user has not specified different defaults, use system wide defaults instead.
+ // This is a hack to retro fit system wide defaults which override the hard coded defaults above
+ var defaults = DynamicDataOptions.Binding;
+ var options = resetThreshold == BindingOptions.DefaultResetThreshold && useReplaceForUpdates == BindingOptions.DefaultUseReplaceForUpdates
+ ? defaults
+ : defaults with { ResetThreshold = resetThreshold, UseReplaceForUpdates = useReplaceForUpdates };
+
+ adaptor ??= new SortedObservableCollectionAdaptor(options);
+
+ var target = new ObservableCollectionExtended();
+ readOnlyObservableCollection = new ReadOnlyObservableCollection(target);
+ return source.Bind(target, adaptor);
+ }
+
+#if SUPPORTS_BINDINGLIST
+ ///
+ /// Binds a clone of the observable change set to the target observable collection.
+ ///
+ /// The object type.
+ /// The key type.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The reset threshold.
+ /// An observable which will emit change sets.
+ ///
+ /// source
+ /// or
+ /// targetCollection.
+ ///
+ public static IObservable> Bind<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TObject, TKey>(this IObservable> source, BindingList bindingList, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ bindingList.ThrowArgumentNullExceptionIfNull(nameof(bindingList));
+
+ return source.Adapt(new BindingListAdaptor(bindingList, resetThreshold));
+ }
+
+ ///
+ /// Binds a clone of the observable change set to the target observable collection.
+ ///
+ /// The object type.
+ /// The key type.
+ /// The source to bind to a collection.
+ /// The that will receive the changes.
+ /// The reset threshold.
+ /// An observable which will emit change sets.
+ ///
+ /// source
+ /// or
+ /// targetCollection.
+ ///
+ public static IObservable> Bind<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TObject, TKey>(this IObservable> source, BindingList bindingList, int resetThreshold = BindingOptions.DefaultResetThreshold)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ bindingList.ThrowArgumentNullExceptionIfNull(nameof(bindingList));
+
+ return source.Adapt(new SortedBindingListAdaptor(bindingList, resetThreshold));
+ }
+#endif
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.BufferInitial.cs b/src/DynamicData/Cache/ObservableCacheEx.BufferInitial.cs
new file mode 100644
index 00000000..c8d709c8
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.BufferInitial.cs
@@ -0,0 +1,56 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Buffers the initial burst of changesets for the specified duration, merges them into a single
+ /// changeset, then passes all subsequent changesets through without buffering.
+ ///
+ /// The object type.
+ /// The type of the key.
+ /// The source to buffer during the initial loading period.
+ /// The time window to buffer, measured from when the first changeset arrives.
+ /// The scheduler for timing. Defaults to .
+ /// An observable that emits one merged changeset for the initial burst, then passthrough for the rest.
+ ///
+ ///
+ /// Useful for aggregating the initial snapshot (which may arrive as many small changesets) into a
+ /// single changeset for efficient downstream processing, while leaving subsequent live updates untouched.
+ ///
+ /// Internally uses , Rx Buffer, and .
+ ///
+ ///
+ ///
+ public static IObservable> BufferInitial(this IObservable> source, TimeSpan initialBuffer, IScheduler? scheduler = null)
+ where TObject : notnull
+ where TKey : notnull => source.DeferUntilLoaded().Publish(
+ shared =>
+ {
+ var initial = shared.Buffer(initialBuffer, scheduler ?? GlobalConfig.DefaultScheduler).FlattenBufferResult().Take(1);
+
+ return initial.Concat(shared);
+ });
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Cast.cs b/src/DynamicData/Cache/ObservableCacheEx.Cast.cs
new file mode 100644
index 00000000..90aa68b0
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Cast.cs
@@ -0,0 +1,58 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Casts each item in the changeset to a new type using the provided converter function.
+ /// Equivalent to
+ /// but named for discoverability when a simple type cast or conversion is needed.
+ ///
+ /// The type of the source object.
+ /// The type of the key.
+ /// The type of the destination object.
+ /// The source to cast.
+ /// The conversion function applied to each item.
+ /// An observable changeset of converted items.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddCalls and emits an Add with the converted item.
+ /// - UpdateCalls on the new value and emits an Update.
+ /// - RemoveEmits a Remove. The converter is not called.
+ /// - RefreshForwarded as Refresh. The converter is not called.
+ ///
+ ///
+ ///
+ public static IObservable> Cast(this IObservable> source, Func converter)
+ where TSource : notnull
+ where TKey : notnull
+ where TDestination : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new Cast(source, converter).Run();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.ChangeKey.cs b/src/DynamicData/Cache/ObservableCacheEx.ChangeKey.cs
new file mode 100644
index 00000000..53dc96e5
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.ChangeKey.cs
@@ -0,0 +1,84 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Re-keys each item in the changeset by applying to the current item.
+ /// The original change reason is preserved; only the key is remapped.
+ ///
+ /// The type of the object.
+ /// The type of the source key.
+ /// The type of the destination key.
+ /// The source to re-key.
+ /// The that computes the destination key from the item, e.g. (item) => item.NewId.
+ /// An observable changeset with items re-keyed using .
+ ///
+ ///
+ /// EventBehavior
+ /// - Add is called on the item. An Add is emitted with the destination key.
+ /// - Update is called on the current item. An Update is emitted with the destination key. If the key selector produces a different destination key for the updated value than it did for the original value, downstream consumers will see an Update for a key that may not match the original Add.
+ /// - Remove is called on the item. A Remove is emitted with the destination key.
+ /// - Refresh is called on the item. A Refresh is emitted with the destination key.
+ ///
+ ///
+ ///
+ public static IObservable> ChangeKey(this IObservable> source, Func keySelector)
+ where TObject : notnull
+ where TSourceKey : notnull
+ where TDestinationKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ keySelector.ThrowArgumentNullExceptionIfNull(nameof(keySelector));
+
+ return source.Select(
+ updates =>
+ {
+ var changed = updates.Select(u => new Change(u.Reason, keySelector(u.Current), u.Current, u.Previous));
+ return new ChangeSet(changed);
+ });
+ }
+
+ ///
+ ///
+ /// This overload also provides the source key to ,
+ /// allowing the destination key to be derived from both the item and its original key.
+ ///
+ public static IObservable> ChangeKey(this IObservable> source, Func keySelector)
+ where TObject : notnull
+ where TSourceKey : notnull
+ where TDestinationKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ keySelector.ThrowArgumentNullExceptionIfNull(nameof(keySelector));
+
+ return source.Select(
+ updates =>
+ {
+ var changed = updates.Select(u => new Change(u.Reason, keySelector(u.Key, u.Current), u.Current, u.Previous));
+ return new ChangeSet(changed);
+ });
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Clear.cs b/src/DynamicData/Cache/ObservableCacheEx.Clear.cs
new file mode 100644
index 00000000..d29301dc
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Clear.cs
@@ -0,0 +1,71 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Removes all items from the cache, producing a changeset with a Remove for every item.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The to clear.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddNot produced by this operation.
+ /// - UpdateNot produced by this operation.
+ /// - RemoveA Remove is emitted for every item currently in the cache.
+ /// - RefreshNot produced by this operation.
+ ///
+ ///
+ /// is .
+ public static void Clear(this ISourceCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.Clear());
+ }
+
+ ///
+ public static void Clear(this IIntermediateCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ source.Edit(updater => updater.Clear());
+ }
+
+ ///
+ public static void Clear(this LockFreeObservableCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ source.Edit(updater => updater.Clear());
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Clone.cs b/src/DynamicData/Cache/ObservableCacheEx.Clone.cs
new file mode 100644
index 00000000..1f635fc5
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Clone.cs
@@ -0,0 +1,82 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Applies each change from the source changeset to the specified collection as a side effect.
+ /// The changeset is forwarded downstream unchanged.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to clone.
+ /// The target collection to which changes are applied.
+ /// An observable that forwards all changesets from unchanged.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddThe item is added to . Forwarded as Add.
+ /// - UpdateThe previous item is removed from and the current item is added. Forwarded as Update.
+ /// - RemoveThe item is removed from . Forwarded as Remove.
+ /// - RefreshIgnored ( has no concept of refresh). Forwarded as Refresh.
+ ///
+ ///
+ public static IObservable> Clone(this IObservable> source, ICollection target)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ target.ThrowArgumentNullExceptionIfNull(nameof(target));
+
+ return source.Do(
+ changes =>
+ {
+ foreach (var item in changes.ToConcreteType())
+ {
+ switch (item.Reason)
+ {
+ case ChangeReason.Add:
+ {
+ target.Add(item.Current);
+ }
+
+ break;
+
+ case ChangeReason.Update:
+ {
+ target.Remove(item.Previous.Value);
+ target.Add(item.Current);
+ }
+
+ break;
+
+ case ChangeReason.Remove:
+ target.Remove(item.Current);
+ break;
+ }
+ }
+ });
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.Convert.cs b/src/DynamicData/Cache/ObservableCacheEx.Convert.cs
new file mode 100644
index 00000000..bf6d26c1
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.Convert.cs
@@ -0,0 +1,53 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Obsolete: use instead.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The type of the destination.
+ /// The source to convert.
+ /// The conversion factory.
+ /// An observable which emits change sets.
+ [Obsolete("This was an experiment that did not work. Use Transform instead")]
+ public static IObservable> Convert(this IObservable> source, Func conversionFactory)
+ where TObject : notnull
+ where TKey : notnull
+ where TDestination : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ conversionFactory.ThrowArgumentNullExceptionIfNull(nameof(conversionFactory));
+
+ return source.Select(
+ changes =>
+ {
+ var transformed = changes.Select(change => new Change(change.Reason, change.Key, conversionFactory(change.Current), change.Previous.Convert(conversionFactory), change.CurrentIndex, change.PreviousIndex));
+ return new ChangeSet(transformed);
+ });
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.DeferUntilLoaded.cs b/src/DynamicData/Cache/ObservableCacheEx.DeferUntilLoaded.cs
new file mode 100644
index 00000000..89cebf0d
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.DeferUntilLoaded.cs
@@ -0,0 +1,58 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Suppresses all emissions until the first non-empty changeset arrives, then replays that changeset and all subsequent ones.
+ /// If the source never produces a non-empty changeset, the stream waits indefinitely.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to defer until the first changeset arrives.
+ /// An observable that begins emitting changesets once the first non-empty changeset is received.
+ ///
+ /// Worth noting: Blocks indefinitely if the cache or stream never receives any data. Ensure the source will eventually emit at least one changeset.
+ ///
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DeferUntilLoaded(source).Run();
+ }
+
+ ///
+ public static IObservable> DeferUntilLoaded(this IObservableCache source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DeferUntilLoaded(source).Run();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.DisposeMany.cs b/src/DynamicData/Cache/ObservableCacheEx.DisposeMany.cs
new file mode 100644
index 00000000..52ad5127
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.DisposeMany.cs
@@ -0,0 +1,71 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ ///
+ /// Disposes items implementing when they are removed or replaced,
+ /// and disposes all tracked items when the stream completes, errors, or the subscription is disposed.
+ ///
+ ///
+ /// Individual items are disposed after the changeset has been forwarded downstream, so downstream operators
+ /// see the removal before disposal occurs. Items that do not implement are ignored.
+ ///
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to track for disposal on removal.
+ /// A stream that forwards all changesets from unchanged.
+ ///
+ ///
+ /// Change reason handling:
+ ///
+ /// EventBehavior
+ /// - AddTracks the item. No disposal.
+ /// - UpdateDisposes the previous value (if it differs by reference from the current). Tracks the new value.
+ /// - RemoveDisposes the removed item.
+ /// - RefreshPassed through. No disposal.
+ ///
+ ///
+ ///
+ /// On stream completion, error, or subscription disposal, all remaining tracked items are disposed.
+ /// All disposal is synchronous via .
+ /// For items that implement , use instead.
+ ///
+ ///
+ /// is .
+ ///
+ ///
+ ///
+ public static IObservable> DisposeMany(this IObservable> source)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+
+ return new DisposeMany(source).Run();
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.DistinctValues.cs b/src/DynamicData/Cache/ObservableCacheEx.DistinctValues.cs
new file mode 100644
index 00000000..6007f055
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.DistinctValues.cs
@@ -0,0 +1,53 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// Selects distinct values from the source.
+ ///
+ /// The type object from which the distinct values are selected.
+ /// The type of the key.
+ /// The type of the value.
+ /// The source to extract distinct values.
+ /// The value selector.
+ /// An observable which will emit distinct change sets.
+ ///
+ /// Due to it's nature only adds or removes can be returned.
+ /// Worth noting: Reference counting assumes value equality is transitive. Mutable value objects with inconsistent Equals implementations can corrupt ref counts.
+ ///
+ /// source.
+ ///
+ public static IObservable> DistinctValues(this IObservable> source, Func valueSelector)
+ where TObject : notnull
+ where TKey : notnull
+ where TValue : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ valueSelector.ThrowArgumentNullExceptionIfNull(nameof(valueSelector));
+
+ return Observable.Create>(observer => new DistinctCalculator(source, valueSelector).Run().SubscribeSafe(observer));
+ }
+}
diff --git a/src/DynamicData/Cache/ObservableCacheEx.EditDiff.cs b/src/DynamicData/Cache/ObservableCacheEx.EditDiff.cs
new file mode 100644
index 00000000..86a68c29
--- /dev/null
+++ b/src/DynamicData/Cache/ObservableCacheEx.EditDiff.cs
@@ -0,0 +1,139 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.ComponentModel;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq.Expressions;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Runtime.CompilerServices;
+using DynamicData.Binding;
+using DynamicData.Cache;
+using DynamicData.Cache.Internal;
+
+// ReSharper disable once CheckNamespace
+
+namespace DynamicData;
+
+///
+/// Extensions for dynamic data.
+///
+public static partial class ObservableCacheEx
+{
+ ///
+ /// The to diff and update.
+ /// The representing the complete desired state to diff against the cache.
+ /// An used to determine whether a new item is the same as an existing cached item.
+ ///
+ /// This overload uses an instead of a delegate
+ /// to determine item equality.
+ ///
+ public static void EditDiff(this ISourceCache source, IEnumerable allItems, IEqualityComparer equalityComparer)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ allItems.ThrowArgumentNullExceptionIfNull(nameof(allItems));
+ equalityComparer.ThrowArgumentNullExceptionIfNull(nameof(equalityComparer));
+
+ source.EditDiff(allItems, equalityComparer.Equals);
+ }
+
+ ///
+ /// Diffs a complete snapshot of items against the current cache contents, producing the minimal set of
+ /// Add, Update, and Remove changes needed to bring the cache in sync with the snapshot.
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The to diff and update.
+ /// The representing the complete desired state.
+ /// The that returns when the current and previous items are considered equal, e.g. (current, previous) => current.Version == previous.Version.
+ ///
+ ///
+ /// EventBehavior
+ /// - AddItems in whose key is not in the cache produce an Add.
+ /// - UpdateItems present in both and the cache that differ (per ) produce an Update.
+ /// - RemoveItems in the cache whose key is not in produce a Remove.
+ /// - RefreshNot produced by this operation.
+ ///
+ ///
+ /// , , or is .
+ public static void EditDiff(this ISourceCache source, IEnumerable allItems, Func areItemsEqual)
+ where TObject : notnull
+ where TKey : notnull
+ {
+ source.ThrowArgumentNullExceptionIfNull(nameof(source));
+ allItems.ThrowArgumentNullExceptionIfNull(nameof(allItems));
+ areItemsEqual.ThrowArgumentNullExceptionIfNull(nameof(areItemsEqual));
+
+ var editDiff = new EditDiff(source, areItemsEqual);
+ editDiff.Edit(allItems);
+ }
+
+ ///
+ /// Converts an of into a changeset stream by diffing each
+ /// emission against the previous one. Each emission replaces the entire dataset.
+ /// Counterpart to .
+ ///
+ /// The type of the object.
+ /// The type of the key.
+ /// The source to convert into a keyed changeset stream.
+ /// The that extracts the unique key from each item.
+ /// An optional for comparing items. Uses default equality if .
+ /// An observable changeset representing the incremental differences between successive snapshots.
+ ///
+ ///
+ ///