Skip to content

Custom Observable ChangeSets

Darrin W. Cullop edited this page Jan 3, 2024 · 1 revision

Introduction

The observable changesets from DynamicData can be created from a variety of situations thanks to the ObservableChangeSet.Create method. This method allows almost anything to be adapted to the ChangeSet model. This article will show how this can be accomplished.

Anything that has "Add" "Update" "Remove" events might be a good candidate for adapting to the ChangeSet model. This model provides many benefits including the ability to allow initial values and future values to be treated uniformly through a single code path instead of having to handle them separately.

How To Use

ObservableChangeSet.Create is very similar to Observable.Create in that it takes a function that is run when a new subscription is created that returns an IDisposable that manages the subscription. However, unlike Observable.Create, instead of receiving the IObserver<T>, the function receives an implicit SourceCache<T, TKey>. The function merely needs to set up whatever is needed to add/remove/update items from this cache, and the subscriber will receive all of those changes via the resulting observable changeset.

The provided cache is implicitly thread-safe so schedulers or other asynchronous methods can be used without the need for other synchronization.

Examples

File System as an Observable ChangeSet

DotNet provides the FileSystemWatcher that enables monitoring the file system for changes by firing events for adds, removes, etc. However, each of those events must be handled separately. Furthermore, it doesn't do anything for existing files so those must be dealt with explicitly.

However, one can combine DynamicData and FileSystemWatcher to create an Observable ChangeSet for the File System. Like other Observable ChangeSets, a new subscription receives an initial changeset with Add events for all of the initial values, and this allows current files and new files to be handled through a single code path.

Sample Code

The following static class exposes two methods:

  1. An extension method for FileSystemWatcher called AsObservableChangeSet for use with code that already has an instance of FileSystemWatcher
  2. A static helper method that completely hides the fact a FileSystemWatcher is used at all

Both methods return an IObservable<IChangeSet<FileSystemInfo, string>> where the Key for the ChangeSet is the full file path. FileSystemInfo is the base class for both DirectoryInfo and FileInfo and each value in the cache will actually be one of those concrete types.

public static class FileSystemObservable
{
    // Extension method to convert a FileSystemWatcher to an Observable ChangeSet
    public static IObservable<IChangeSet<FileSystemInfo, string>> AsObservableChangeSet(this FileSystemWatcher fsw) =>
        ObservableChangeSet.Create<FileSystemInfo, string>(cache =>
        {
            // Create an observable from the Changed event that updates the cache
            var observeChanged = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                            o => fsw.Changed += o,
                                            o => fsw.Changed -= o)
                                    .Select(args => args.EventArgs)
                                    .Do(args => cache.AddOrUpdate(GetInfo(args.FullPath)));

            // Create an observable from the Created event that adds the file to the cache
            var observeCreated = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                            o => fsw.Created += o,
                                            o => fsw.Created -= o)
                                    .Select(args => args.EventArgs)
                                    .Do(args => cache.AddOrUpdate(GetInfo(args.FullPath)));

            // Create an observable from the Deleted event that removes the file from the cache
            var observeDeleted = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                            o => fsw.Deleted += o,
                                            o => fsw.Deleted -= o)
                                    .Select(args => args.EventArgs)
                                    .Do(args => cache.RemoveKey(args.FullPath));

            // Create an observable from the Renamed event that removes the old name and adds the new name
            // Use Edit so both changes are emitted together as a single ChangeSet
            var observeRenamed = Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(
                                            o => fsw.Renamed += o,
                                            o => fsw.Renamed -= o)
                                    .Select(args => args.EventArgs)
                                    .Do(args => cache.Edit(updater =>
                                    {
                                        updater.RemoveKey(args.OldFullPath);
                                        updater.AddOrUpdate(GetInfo(args.FullPath));
                                    }));
            
            // Subscribe to all the observables together
            var subscription = Observable.Merge(observeChanged, observeCreated, observeDeleted, observeRenamed).Subscribe();
            
            // Ensure the events are flowing
            fsw.EnableRaisingEvents = true;

            // Add the initial files
            var initialFiles = Directory.EnumerateFileSystemEntries(fsw.Path, fsw.Filter, fsw.IncludeSubdirectories ? SearchOption.AllDirectories : SearchOption.TopDirectoryOnly)
                                    .Select(p => GetInfo(p));									
            cache.AddOrUpdate(initialFiles);
            
            // Return the Disposable that controls the subscription
            return subscription;
        },
        fsi => fsi.FullName);

    // Helper function that manages the FileSystemWatcher instance automatically
    public static IObservable<IChangeSet<FileSystemInfo, string>> Create(string path, string filter, bool includeSubDirectories) =>
        Observable.Using(
            () => new FileSystemWatcher(path, filter){ IncludeSubdirectories = includeSubDirectories },
            fsw => fsw.AsObservableChangeSet());

    private static FileSystemInfo GetInfo(string fullPath) =>
        Directory.Exists(fullPath) ? new DirectoryInfo(fullPath) : new FileInfo(fullPath);
}

Explanation

The above code works but creating a separate observable for each of the 4 events from FileSystemWatcher needed to keep the cache up-to-date. It adds a handler to each one that applies the appropriate changes to the implicit cache that was provided to the function and then creates a single subscription that combines all 4 together.

All though the events may fire on multiple different threads, the cache synchronizes the changes implicitly.

It then pre-populates the cache by enumerating all the file system entries and adding them so that the initial changeset emitted by the observable contains all of the initial files.

The result is a single changeset that allows consumers to uniformly deal with current files and future changes.

Example Usage

Here is a test fixture to show all the events and existing files are handled:

void Main()
{
    var tempDir = Directory.CreateTempSubdirectory();

    File.WriteAllText(Path.Join(tempDir.FullName, "ExistingFile.txt"), "Show Existing Files Show Up");
    
    using var sub = FileSystemObservable.Create(tempDir.FullName, string.Empty, includeSubDirectories: true)
                        .OnItemAdded(fso => Console.WriteLine($"Added: {fso} [{fso.GetType()}]"))
                        .OnItemRemoved(fso => Console.WriteLine($"Removed: {fso} [{fso.GetType()}]"))
                        .OnItemUpdated((fso, _) => Console.WriteLine($"Updated: {fso} [{fso.GetType()}]"))
                        .Subscribe();

    try
    {
        var subDir = tempDir.CreateSubdirectory(UniqueStr());

        var files = Enumerable.Range(0, 10).Select(_ => Path.Join(subDir.FullName, UniqueStr() + ".temp")).ToList();
        files.ForEach(path => File.WriteAllText(path, "Hello World"));
        files.ForEach(path => File.AppendAllText(path, "Hello Again"));
        files.ForEach(path => File.Move(path, path + ".temp2"));
    }
    finally
    {
        tempDir.Delete(recursive: true);
    }
    
    Thread.Yield();
    
    static string UniqueStr() => $"{Guid.NewGuid():N}";
}