Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformAsync enhancements #819

Merged
merged 12 commits into from
Jan 22, 2024
Merged

Conversation

RolandPheasant
Copy link
Collaborator

@RolandPheasant RolandPheasant commented Jan 4, 2024

  1. Improve implementation of TransformAsync
  2. Add max concurrency param to prevent unbounded parallelisation.
  3. Add transform on refresh overloads
  4. Improve slow running concurrency test
  5. Use TransformAsyncOptions for primitive objects - also include for Safe variants.

TODO:

  1. Deal with large number of overloads required to include the extra optional params
  2. Maybe scrap 1 and use a TransformOptions object (similar to BindingOptions) and move away from the bewildering number of overloads.

@dwcullop this is what I meant regarding using pure rx to handle the async.

Func<TSource, Optional<TSource>, TKey, Task<TDestination>> transformFactory,
Action<Error<TSource, TKey>>? exceptionCallback,
IObservable<Func<TSource, TKey, bool>>? forceTransform = null,
int maximumConcurrency = 4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of this parameter is the main performance improvement here, yeah?

I would have the default be null, for a couple of reasons:

  1. You're really just passing this parameter directly to .Merge() and .Merge() doesn't enforce a default, you either supply a limit, or there isn't one.
  2. 4 is the reasonable default that .NET usually uses for CPU parallelism, but this isn't about limiting CPU parallelism. The practical scenario that this supports is about I/O concurrency, E.G. database queries.

Otherwise, this functionally looks good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had no idea there was an overload of Merge that took a max concurrency parameter. But having looked it up, I agree it should be null, and if it is, then invoke the overload that doesn't use one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only found out about the overload to limit concurrency recently myself. Rx is the gift that keeps giving.

You both make fair points, and the same also occurred to me. So it will be null.

What do you two think about point 2? Complex overloads are a cognitive pain which optional params tried to resolve. That's why I M thinking about moving towards options objects for primitive optional values

Copy link
Collaborator

@JakenVeina JakenVeina Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options objects wouldn't be TOO crazy, but I'm not sure where the line is for how many parameters it takes to merit a full object. Is there any wisdom we can glean from Microsoft's usages of that pattern? If we did do this, we would definitely want to implement Options objects as readonly record struct I think.

In my personal opinion, I don't consider optional parameters as being cognitive pain at all, because when I invoke almost any method where the parameter purpose isn't self-evident at a glance, I do...

var result = SomeMethod(
    parameter1: value1,
    parameter2: value2,
    parameter3: value3);

This plays seamlessly with optional parameters, cause all I have to do is omit the ones I don't want.

This is also pretty much functionally and semantically identical to using an options struct.

var result = SomeMethod(new()
{
    Option1 = value1,
    Option2 = value2,
    Option3 = value3
});

Both of these offer essentially the exact same cost for the method call, as all the parameters/options (used or not) are allocated on the stack. The difference really is just syntax, and how much code we have to write in the library.

If the question is whether to use overloads or optional parameters to provide variable functionality, my vote is definitely for optional parameters, except for methods that already exist, since adding new optional parameters is a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record struct is fine, but not all versions of C# support it. Either way, make sure to pass it using in so that it isn't copied on the stack.

An options object is not a bad idea because it enables changes to be made without breaking things (adding new options, changing defaults) but we should also have overloads that allow a few commonly used parameters to be set without having to create the object.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it enables changes to be made without breaking things

Oooh, an excellent point.

@dwcullop
Copy link
Member

dwcullop commented Jan 6, 2024

@dwcullop this is what I meant regarding using pure rx to handle the async.

Yeah, this is good stuff. My work on TransformManyAsync does fall back onto TransformAsync, so this will improve that as well. I'm going to try and update it to follow the Rx-ness you've demonstrated here.

return ProcessUpdates(cache, transformed);
private IObservable<IChangeSet<TDestination, TKey>> DoTransform(
ChangeAwareCache<TransformedItemContainer, TKey> cache, IChangeSet<TSource, TKey> changes)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the point of allowing concurrency to be specified is for cases like if the transformation needs to do something like hit a database and only a certain number of DB connections are allowed at once? I guess that makes sense, but I really don't think it's DD's job to do that.
The same thing could be accomplished by having a Semaphore in the Transform function. So, maybe you don't need this new parameter at all.

Merge also has this overload (I also recently discovered):
IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)

Which seems to be exactly what you're trying to achieve with the Defer / ToObservable combination. Unfortunately, I don't see a version of that works with tasks AND has a concurrency limit, but I'm not really sure you need to use that.

Copy link
Collaborator

@JakenVeina JakenVeina Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather agree about this not being DD's responsibility, but from the perspective of " we're calling .Merge() so, let's expose the optimization parameter that .Merge() has", I think it makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather agree about this not being DD's responsibility, but from the perspective of " we're calling .Merge() so, let's expose the optimization parameter that .Merge() has", I think it makes sense.

I guess if the logic is in the middle of the operator, there aren't many good options for allowing consumers to customize it.

private IObservable<IChangeSet<TDestination, TKey>> DoTransform(
ChangeAwareCache<TransformedItemContainer, TKey> cache, IChangeSet<TSource, TKey> changes)
{
return changes.Select(change => Observable.Defer(() => Transform(change).ToObservable()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This threw me off at first... Using ToArray with an Observable. But if you're starting with an array, I guess it makes sense. Never seen that approach before so I wanted to call it out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye on this.

To me, this seems like a big indicator that something nonsensical is being done here. It seems like the only point of converting each change to its own observable is to be able to .Merge() them together, with the maxConcurrent parameter, yes? But that accomplishes nothing if we then do .ToArray() on the whole thing. That's gonna block the whole thread, is it not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't block the thread. ToArray returns an IObservable<T[]> that fires once (when the input sequence has completed). There's no need to block, you just don't get events downstream until the all the Mergeed streams have completed.

I think it does make sense. It's just a new technique to me.

var toTransform = cache.KeyValues.Where(kvp => shouldTransform(kvp.Value.Source, kvp.Key)).Select(kvp =>
new Change<TSource, TKey>(ChangeReason.Update, kvp.Key, kvp.Value.Source, kvp.Value.Source)).ToArray();

return toTransform.Select(change => Observable.Defer(() => Transform(change).ToObservable()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Observable.FromAsync(() => Transform(change)) gives the same semantics as Defer plus ToObservable and more clearly expresses your intent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you could do:

return toTransform.Select(change => Transform(change))
        .Merge()
        .ToArray()
        .Select(transformed => ProcessUpdates(cache, transformed));

Or, if you want to provide the maximum concurrency parameter:

return toTransform.Select(change => Observable.FromAsync(() => Transform(change)))
        .Merge(maximumConcurrency)
        .ToArray()
        .Select(transformed => ProcessUpdates(cache, transformed));

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I believe Defer() is redundant here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer is necessary, otherwise the task is invoked immediately, which would result in the merge limiter having no effect as it would be the results being limited not the actual tasks.

Copy link
Collaborator

@JakenVeina JakenVeina Jan 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Observable.FromAsync() already works that way. The async function returning Task (I.E. Transform) is not invoked until the observable is subscribed to. Which is the thing that maxConcurrent controls.

var whenCompletedSource = new TaskCompletionSource<int>();

var whenCompleted = Observable.FromAsync(() =>
{
    Console.WriteLine("Observable.FromAsync()");
    return whenCompletedSource.Task;
});

Console.WriteLine("whenCompleted.Subscribe()");
whenCompleted.Subscribe(x => Console.WriteLine($"whenCompleted.OnNext({x})"));

Console.WriteLine("whenCompletedSource.SetResult(7)}");
whenCompletedSource.SetResult(7);

Thread.Sleep(TimeSpan.FromSeconds(1));

This produces the following output:

whenCompleted.Subscribe()
Observable.FromAsync()
whenCompletedSource.SetResult(7)}
whenCompleted.OnNext(7)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example is for a single invocation. What I'd be interested to see is to try and limit say a 1000 tasks in a way that that only 4 of them would run concurrently

Copy link
Member

@dwcullop dwcullop Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function passed to Observable.FromAsync gets run once per subscription but not until the subscription happens (like Defer) and Merge(x) does not subscribe to the X+1-th item until one of the previous streams completes.

You need Defer if you invoke a method to get a task and then convert to an observable, but I think Defer plus ToObservable is semantically the same as Observable.FromAsync, so coder's choice, but I do feel like Observable.FromAsync is clearer.

Code

I used this test... It's only 20, not 1000, but I don't think that changes anything...

async Task Main()
{
    // These two seem to be the same
    var obsAsync = Observable.FromAsync(AsyncThing);
    // var obsAsync = Observable.Defer(() => AsyncThing().ToObservable());

    var obs = Observable.Range(0, 20).Select(n => obsAsync.Spy($"Async #{n}"));
    
    // Only 3 AsyncThings at a time
    await obs.Merge(3);
    
    // Let them all fly
    //await obs.Merge();
}

static int current = 0;
static int counter = 0;

async Task<int> AsyncThing()
{
    int n = Interlocked.Increment(ref current);
    Console.WriteLine($"Starting AsyncThing #{n} ({Interlocked.Increment(ref counter)})");
    
    try
    {
        await Task.Delay(100);
        return n;
    }
    finally
    {
        Console.WriteLine($"Ending AsyncThing #{n} ({Interlocked.Decrement(ref counter)})");
    }
}

AsyncThing is invoked once per subscription. The Observable is created, but the async method is not invoked until the subscribe.

With Merge(), counter gets up to 20 because they're all running at once. With Merge(3), then counter never gets bigger than 3.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. I add the overload and some testing soon.

Copy link
Collaborator Author

@RolandPheasant RolandPheasant Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JakenVeina I can confirm that Observable,FromAsync() works like the Defer as you suggested. I was not willing to change what I had until I had a test. I get similar run times with your suggested change as previously.

image

I have a question for you against the test.


public async Task WithMaxConcurrency(int maxConcurrency)
{
/* We need to test whether the max concurrency has any effect.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JakenVeina any ideas how I can test concurrency apart from looking at the timings?

@RolandPheasant RolandPheasant changed the title WIP: TransformAsync enhancements TransformAsync enhancements Jan 16, 2024
@RolandPheasant RolandPheasant marked this pull request as ready for review January 16, 2024 07:51
@glennawatson glennawatson marked this pull request as draft January 16, 2024 22:38
@RolandPheasant RolandPheasant marked this pull request as ready for review January 19, 2024 07:28
@RolandPheasant RolandPheasant merged commit 5148d14 into main Jan 22, 2024
1 check passed
@RolandPheasant RolandPheasant deleted the feature/transform-async-enhancements branch January 22, 2024 07:24
Copy link

github-actions bot commented Feb 6, 2024

This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Feb 6, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants