Skip to content

Job executor: Unmarshal job args late, after middlewares have run#783

Merged
brandur merged 1 commit intomasterfrom
brandur-late-unmarshal-json
Feb 27, 2025
Merged

Job executor: Unmarshal job args late, after middlewares have run#783
brandur merged 1 commit intomasterfrom
brandur-late-unmarshal-json

Conversation

@brandur
Copy link
Copy Markdown
Contributor

@brandur brandur commented Feb 21, 2025

Here, modify the job executor so that instead of exclusively invoking a
work unit's UnmarshalJSON implementation before starting to execute
the stack of work middleware, invoke it much later from within the
executor's doInner implementation. This gives middlewares the
opportunity to modify args before unmarshaling, thereby enabling use
cases like args encryption.

This does have the downside of having to modify the worker interface's
Middleware function from taking a generic job:

type Worker[T JobArgs] interface {
    Middleware(job *Job[T]) []rivertype.WorkerMiddleware

To taking a JobRow instead:

type Worker[T JobArgs] interface {
    Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware

Because the full generic Job[T] is not yet available by the time we
need to extract a middleware stack.

@brandur brandur changed the title Job executor: Unmarshal job args within middleware stack instead of before it Job executor: Unmarshal job args within middleware stack if changed Feb 21, 2025
@brandur brandur force-pushed the brandur-late-unmarshal-json branch 2 times, most recently from 3193df8 to 9a3668f Compare February 21, 2025 08:43
@brandur brandur requested a review from bgentry February 21, 2025 08:47
Comment thread internal/jobexecutor/job_executor.go Outdated
// The middleware stack is allowed an opportunity to modify EncodedArgs.
// If we do detect a modification, unmarshal args again.
if bytes.Compare(encodedArgs, e.JobRow.EncodedArgs) != 0 {
if err := e.WorkUnit.UnmarshalJob(); err != nil {
return err
}
}

return e.WorkUnit.Work(ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think about the potential downsides of this. Obviously calling unmarshal twice isn't great, though this shouldn't have a high cost. And if we want to continue to allow args-driven timeouts, then you'd have to do any pre-processing of args before calling the job timeout. I think this means that encrypted args would be incompatible with having a job-specific timeout, because you wouldn't have access to the decrypted args when calling .Timeout() prior to the middleware chain 🤔 That's not great either as it would complicate the matrix of which features can/can't work together.

This feels a bit like an ordering issue, where we have some middleware (like the one you're working on) which we want to run first before unmarshalling, and then subsequent middleware (application of the job timeout, most other middleware) that would run afterward. I don't know if it's a great idea to add something to the middleware interface to indicate when it should run, but it's an idea that came to mind while thinking about this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah fair points. I went back over this again, and yeah, this approach seems clearly wrong to me now.

I switched to a new version that I think is much more clearly right — basically, all unmarshaling migrates into doInner, and this tends to be okay because the middleware only have access to the JobRow anyway, so they really don't need the unmarshaled Job so early.

The downside is that the Worker middleware function has to take a JobRow instead of a Job:

-	Middleware(job *Job[T]) []rivertype.WorkerMiddleware
+	Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware

TBH though, that seems more consistent with the middleware API anyway.

It is less consistent with the other Worker interface functions though (they tend to take Job[T]). We could either leave it like that, or maybe it'd be better to separate the Middleware function out into its own interface given that it's probably far less used in a general sense anyway.

Want to take a look at my latest commit and LMK what you think?

It might be tempting not to change any of this stuff, but having played around with the encryption side quite a bit, I don't think there's any way to implement it as middleware given the current API. Given that an encryption middleware is about as basic of a middleware use as one can imagine, it seems to me that something's got to be changed here.

Comment on lines +206 to +200
executeFunc := execution.MiddlewareChain(e.GlobalMiddleware, e.WorkUnit.Middleware(), doInner, e.JobRow)
jobTimeout := valutil.FirstNonZero(e.WorkUnit.Timeout(), e.ClientJobTimeout)
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
defer cancel()

return &jobExecutorResult{Err: doInner(ctx), MetadataUpdates: metadataUpdates}
return &jobExecutorResult{Err: executeFunc(ctx), MetadataUpdates: metadataUpdates}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I think I fixed #752 as part of 0132e53 / #753 without explicitly calling it out. Looks like it's now applying the timeout to the entire chain.

Copy link
Copy Markdown
Contributor Author

@brandur brandur Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Latest push though does change this back to extracting a timeout from within doInner, heh. See comment above.

Copy link
Copy Markdown
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense. Obviously it's not great to break this interface but IMO we've found a worthy reason to do so given the use cases that can't be adequately solved without the change.

Needs a changelog callout and I think a fix for context cancellation (should that have a test?), but otherwise LGTM ✌️

Comment on lines -192 to -193
ctx, cancel := execution.MaybeApplyTimeout(ctx, jobTimeout)
defer cancel()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A notable and maybe unintentional change here: the context passed to middleware is no longer being cancelled at any point, because you're only canceling the innermost context given to the worker's Work(). Maybe this should be resolved by adding a context.WithCancel() and defer cancel() up at the top of execute() so ensure that the context is 100% always canceled for all layers once execution is done?

Copy link
Copy Markdown
Contributor Author

@brandur brandur Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, okay, I think this might be a big difference in how you and I understand contexts/cancellations, but I still don't feel great about littering random context.WithCancel()s all over the place where the cancel() is never intended for use except on defer return.

Think about it this way: the only time cancel() is called is if we've successfully returned from middleware functions. If any middleware gets stuck somewhere waiting, the cancel will never be called anyway because it's only called on defer. So it has no effect in the success case, and no effect in the failure case either.

So the only thing we're protecting against by adding more cancels is cases where a middleware was able to return successfully, but then improperly spawned off some extraneous goroutines but which will properly respond to a cancelled context (meaning they'd have to be selecting on ctx.Done(), which is a really easy thing to forget).

IMO, it shouldn't really be a framework's job to cancel that kind of stuff. I'd even go so far as to say that it'd be better if it didn't, because that'd potentially let the problem be found more easily and fixed, whereas context cancellations from River might paper over the problem for a long time in this one place, but letting it maybe still occur somewhere else.

I re-read the docs for WithCancel in case I've been missing something big all these years, but I can't find anything in there suggesting use of WithCancel in places where cancel() is not intended to be used (e.g. see the example there, which uses cancel to stop an associated goroutine when its work is no longer needed).

I looked into putting in some sort global context cancellation of another kind of middlewares, but I couldn't find any easy resolution for this one right now because no cancellation should be applied to doInner, and doInner is intrinsically linked to the rest of the middleware invocation.

IMO again, it might be better not to have one anyway, and instead encourage each middleware to select appropriate timeouts if they need it. i.e. Maybe it makes sense for a concurrency limiter middleware to have one, but an encryption middleware should have no context timeout at all (one will be ignored it it's set anyway).

Thoughts on all that? Okay leaving this out for the time being and continuing to refine this as we go?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I see it as being conceptually the same as the context on an incoming http.Request. From https://pkg.go.dev/net/http#Request.Context :

For incoming server requests, the context is canceled when the client's connection closes, the request is canceled (with HTTP/2), or when the ServeHTTP method returns.

The idea being that as soon as user-controlled code is completed, the context is canceled to prevent any potential leaked resources. I see the same pattern is followed for gRPC.

To me it makes sense given all the ways in which a context is scoped to a specific job being executed, and it feels like a low cost nicety that might save users some pain vs having to remember to do it themselves when spawning concurrent goroutines in a job. But it’s not a hill I’ll die on, so your call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I see it as being conceptually the same as the context on an incoming http.Request. From https://pkg.go.dev/net/http#Request.Context :

K, I view this as quite a different situation because in the case of an HTTP handler, something might be happening out of band (i.e. the original request that the handler is trying to serve's connection is closed) which makes work being done by a handler superfluous, and therefore it's a very useful feature for the context to be cancelled, because you're preventing additional work that no longer needs to be done.

In this case, the cancel() would only happen on return, so nothing would ever happen out-of-band. Unless I'm missing something here, there's no circumstances under which this would save additional work (as mentioned above, it would necessitate the middleware stack to return before cancel() could ever be invoked).

Going to merge as is for now, but we've got to retool how timeouts work on middleware stacks anyway, so let's get into that next.

@brandur brandur changed the title Job executor: Unmarshal job args within middleware stack if changed Job executor: Unmarshal job args late, after middlewares have run Feb 27, 2025
@brandur brandur force-pushed the brandur-late-unmarshal-json branch 2 times, most recently from ad07ad0 to ce1d253 Compare February 27, 2025 16:08
@brandur
Copy link
Copy Markdown
Contributor Author

brandur commented Feb 27, 2025

Added changelog entry and retooled the PR title/description to match changes since I opened this.

Thanks!

@brandur brandur force-pushed the brandur-late-unmarshal-json branch from ce1d253 to b8c9da9 Compare February 27, 2025 16:12
Here, modify the job executor so that instead of exclusively invoking a
work unit's `UnmarshalJSON` implementation before starting to execute
the stack of work middleware, invoke it much later from within the
executor's `doInner` implementation. This gives middlewares the
opportunity to modify args before unmarshaling, thereby enabling use
cases like args encryption.

This does have the downside of having to modify the worker interface's
`Middleware` function from taking a generic job:

    type Worker[T JobArgs] interface {
        Middleware(job *Job[T]) []rivertype.WorkerMiddleware

To taking a `JobRow` instead:

    type Worker[T JobArgs] interface {
        Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware

Because the full generic `Job[T]` is not yet available by the time we
need to extract a middleware stack.
@brandur brandur force-pushed the brandur-late-unmarshal-json branch from b8c9da9 to 5fad4a4 Compare February 27, 2025 16:22
@brandur brandur merged commit 2685735 into master Feb 27, 2025
@brandur brandur deleted the brandur-late-unmarshal-json branch February 27, 2025 16:28
@riverqueue riverqueue deleted a comment from 4almrasla Feb 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants