Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a `discarded` callback #27

Closed
andykent opened this issue Jul 8, 2019 · 14 comments

Comments

@andykent
Copy link

commented Jul 8, 2019

I have a requirement to take some action when a job reaches it's max attempts and is discarded from the queue. I was hoping it would be possible to add a callback to Oban.Worker to handle this. I imagine something like this...

defmodule MyWorker do
  use Oban.Worker, max_attempts: 5

  def discarded(job) do
    Logger.info("job #{job.id} was discarded after #{job.attempts}")
  end
end

note: my use case is more complex than simple logging but hopefully it illustrates the point.

If this is something you would be willing to support in Oban I can have a go at putting together a PR to add the functionality.

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 9, 2019

This is currently possible through telemetry handlers, though it is a bit obtuse. These was recently asked on the forum, so it is clearly a common need.

To recap what was posted on the forum, the handler function would look like this:

def handle_discard([:oban, :failure], _, %{attempt: attempt, max_attempts: max}, _) do
  if attempt == max do
    Logger.info("job #{job.id} was discarded after #{job.attempts}")
  end
end

This could probably use some better documentation and/or more fleshed out examples

The reason I've promoted using telemetry events is because they allow you to handle events from multiple workers (or every worker) without duplicating the same code in each one. For example, you can define a single error handler for all of your workers in a central place rather than sprinkling callbacks everywhere.

@tcoopman

This comment has been minimized.

Copy link

commented Jul 9, 2019

A possible issue with telemetry is that it isn't 100% reliable. If telemetry or your listener or the app crashes, you will have missed the message.
For logging that's not an issue, but for business logic it might be.

@andykent

This comment has been minimized.

Copy link
Author

commented Jul 9, 2019

Yep I did think about the using the telemetry handler but it felt wrong for my use case. If that's the best option then maybe I should just use that though.

To explain my use case... I have a scheduled job that I want to ensure gets attempted daily. I would like it to look something like the following...

defmodule MyWorker do
  use Oban.Worker, max_attempts: 5
  
  def perform(email) do
    send_notifications(email)
    reschedule(email)
  end

  def discarded(%{args: args}) do
        reschedule(args)
  end
end

Ideally I was thinking that the discarded callback would run inside the same transaction as the DB update that sets the job as discarded, this would mean that if anything died during discarding a message then the message would not get discarded in the DB, giving you a guarantee that a job wouldn't be marked as discarded without the callback running to completion.

note: assume I have built reschedule/1 to be idempotent in this example. (that would be nice functionality to have in-built in Oban in fact)

The telemetry solution is quite elegant and great for logging/reporting but there are a few things I don't like about it for my use-case...

  1. There are many more error cases to try and handle / reason about.
  2. It's a lot harder to test.
  3. In my case the logic is worker specific so it's quite nice to keep it contained within the worker.
@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 9, 2019

A possible issue with telemetry is that it isn't 100% reliable. If telemetry or your listener or the app crashes, you will have missed the message.

@tcoopman That is an excellent point. For critical functionality telemetry may not be trustworthy.

To explain my use case... I have a scheduled job that I want to ensure gets attempted daily.

@andykent Nice to hear it, I'm working on this exact feature currently! There is a bit of complexity to it because it is cron based and needs to handle distributed locking, but I aim to have it available this week.

Ideally I was thinking that the discarded callback would run inside the same transaction as the DB update that sets the job as discarded, this would mean that if anything died during discarding a message then the message would not get discarded in the DB, giving you a guarantee that a job wouldn't be marked as discarded without the callback running to completion.

As you mentioned, your use case doesn't seem to fit with using a telemetry handler. I believe you can restructure your worker to ensure the job gets rescheduled regardless of whether it succeeds or not.

Introduce an additional argument that you can use for idempotency. The argument would allow you to retry within the same day if the job fails and would still allow you to schedule the job for the next day.

defmodule MyWorker do
  use Oban.Worker, max_attempts: 5

  def perform(%{email: email, date: date}) do
    reschedule(%{email: email, date: next_day(date)})
    send_notifications(email)
  end
end
  1. By moving the reschedule function before send_notification you guarantee that the next day is scheduled if sending fails
  2. The reschedule function uses a calendar day (i.e, "2019-07-09") for uniqueness, keeping it idempotent within the same day

With those changes you shouldn't need to prevent discarding failed jobs, which seems like it could get rather messy (what happens if the callback raises during the discard transaction, how do we surface that error or exit if we catch it).

assume I have built reschedule/1 to be idempotent in this example. (that would be nice functionality to have in-built in Oban in fact)

I've written about that on the forum a bit as well. It is something I would love to support within Oban, though I believe it requires handling job insertion directly.

@tcoopman

This comment has been minimized.

Copy link

commented Jul 9, 2019

@andykent Nice to hear it, I'm working on this exact feature currently! There is a bit of complexity to it because it is cron based and needs to handle distributed locking, but I aim to have it available this week.

I'm not sure if I'm missing the point or not but should this be integrated into Oban? Isn't the Cron like scheduling something the application itself or a dependency like https://hexdocs.pm/quantum/readme.html can do?

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 9, 2019

I'm not sure if I'm missing the point or not but should this be integrated into Oban? Isn't the Cron like scheduling something the application itself or a dependency like https://hexdocs.pm/quantum/readme.html can do?

You aren't missing anything, that is an excellent point. In the past I've been put off by the size and scope of Quantum, but that doesn't necessarily mean that Oban itself needs to be expanded to handle scheduling.

@andykent

This comment has been minimized.

Copy link
Author

commented Jul 9, 2019

This is a great discussion. Thanks to both of you.

As someone who has just moved away from using Quantum in production I feel like I may have some valuable input here. Quantum works great as a fixed scheduler on a single node. But it has serious reliability issues in global multiple node setups, it relies on maintaining a single scheduler process in the cluster but in my experience it was common to end up with 2 or zero schedulers when rolling instances meaning jobs being missed or duplicated.

I switched to Oban because I think it is a much more stable and simpler foundation on which to build scheduling on top of. Whether on not that should be part of the core library or not, I'm unsure about though. Personally I think if it was treated as an additional layer building on top of the guarantees that the core Job primitive gives you then it could sit really nicely within the project.

To illustrate one big difference between the two approaches here is my use-case... I have users who are able to set their own notification schedules in my App.

  • Using Quantum I was forced into a fixed schedule so a job ran hourly, queried the DB for user schedules that were due based on the current time and then sent any needed notifications, essentially in a loop.
  • In Oban I queue jobs for each user Independently and then the job is responsible for re-queueing itself depending on the user schedule. This has lots of advantages around reliability, observability, testing & scalability, it's working out much nicer so far.

All this is to say, I do think there is a space for recurrent scheduling built on top of Oban, but I'm not sure it should look like how 'cron jobs' do in Quantum. Maybe it looks more like some better low level support for reliably re-scheduling a job?

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 10, 2019

@andykent That is great feedback. Between the concerns voiced by @tcoopman and the use case you've shared I've decided not to continue to pursue cron-like scheduling within Oban itself.

Maybe it looks more like some better low level support for reliably re-scheduling a job?

Picking up at the end of your comment, and jumping back to your original proposal, what primitives are you envisioning? I see two distinct features that could help support scheduling:

  1. Built in support for idempotent (unique) jobs. Providing a native way to declare that a job should be unique is generally useful, even outside the context of a scheduled job.
  2. Job lifecycle callbacks like completed, discarded and failed. These can be used to guarantee that business logic is called after a job has executed without relying on Telemetry events.

An essential part of the lifecycle callbacks, as you noted above, is that they run within the transaction that transitions the job to a new state. How do you envision that API and what are the guarantees? Off the top of my head, a few questions to consider:

  1. What happens if the callback crashes or errors?
  2. What state does a job transition to when a callback fails?
  3. Can the callback change which state a job should transition to? Meaning, can the discarded callback force the job to transition to retryable again? If so, how is that accomplished?

I see the value in locating callbacks with the worker (guarantee that they execute, collocating business with the worker), but I'm not convinced that the callback should be able to change how states are transitioned.

@andykent

This comment has been minimized.

Copy link
Author

commented Jul 12, 2019

Built in support for idempotent (unique) jobs. Providing a native way to declare that a job should be unique is generally useful, even outside the context of a scheduled job.

Yes it is, I wonder how this would work though as I guess uniqueness is kind of application specific. Maybe it's OK to just make uniqueness based on queue name + args? This would mean if you wanted something to be uniquely queued but within an hour you would need to adjust your args accordingly. e.g. %{user: USER_ID, hour: 12} instead of just %{user: USER_ID}

Job lifecycle callbacks like completed, discarded and failed. These can be used to guarantee that business logic is called after a job has executed without relying on Telemetry events.

I've been thinking about this a bit more and I have an alternative proposal.

When we think through the callbacks....

  • completed - is largely not required because you can easily just add whatever code you would put in there tot he end of your perform function.
  • failed - again this can be handled by rescuing in your perform if really needed but most of the time you should let the job fail and Oban handle it. I can't think of a use case for this that couldn't be handled by telemetry.
  • discarded - this could be useful and is not possible to handle using the perform.

We could avoid adding the callback if we changed the signature of perform/1 so that it provided the full Job struct or add the Job as a second arg in the form of perform/2. That would allow you to pattern match on the attempts and take different action on the final attempt. Maybe that's more helpful/flexible? It also has the advantage that the job guarantees remain the same.

defmodule MyWorker do
  @max_attempts 5
  use Oban.Worker, max_attempts: @max_attempts

  def perform(%{attempts: @max_attempts}) do
    Logger.info("do something different on final run")
  end

  def perform(%{args: args}) do
    Logger.info("execute job")
  end
end

What happens if the callback crashes or errors?

This is an interesting one and why I am starting to think maybe callbacks aren't the solution. If added I think docs wise we should push people to keep these callbacks as minimal as possible, ideally they should only be there to adjust the queue which hopefully minimises the errors.

What state does a job transition to when a callback fails?

Lots of queues have a concept of a dead letter queue where discarded messages go, maybe we need something like that although that's largely handled by the discarded state currently. The issue is that you can't really block a transition to discarded. How is this handled currently? E.g. if the DB write to set the state to discarded fails, what happens? Presumably the behaviour should be the same.

Can the callback change which state a job should transition to? Meaning, can the discarded callback force the job to transition to retryable again? If so, how is that accomplished?

I think no, I don't think this is a good idea. If you want to run it again then a new job can be scheduled.


Having thought all this through whilst writing the above I am now coming around to your original proposal of

defmodule MyWorker do
  use Oban.Worker, max_attempts: 5

  def perform(%{email: email, date: date}) do
    reschedule(%{email: email, date: next_day(date)})
    send_notifications(email)
  end
end

This would avoid any of this additional complexity, it's easy to read and reason about and all the other proposals don't really give us any additional guarantees.


To Summarise, I am now thinking the simplest solution might be...

  • add perform(args, job) - this allows conditional logic based on job state where needed ut would maintain backwards compat. e.g. the default would delegate to perform/1.
  • add Job.insert_once(job) that only inserts a job if it doesn't already exist based on queue + args.
  • maybe add support for unique: true arg to use Oban.Worker this would add a validation to the changeset that state + queue + args are unique.
  • maybe add reschedule(job, at: time) as a nicer API around Job.insert_once/1

I think the biggest issue here is how to actually make Job.insert_once/1 idempotent. This issue I see is that I don't think we can add a DB constraint that would support this uniqueness check at the DB level which would mean we were relying on read then write checks which could introduce a race condition between workers. Maybe someone with more SQL fu than me can figure that out though?

Sorry for the ramble, hopefully there's something helpful in there though!

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 15, 2019

@andykent That was an epic comment! Forgive me, but I'm going to focus my reply on the last segment.

Adding perform/2

I toyed with having perform/1 take the full job struct initially, but it seemed like a lot of noise when 95% of workers would be matching on only the args. Introducing perform/2 is an interesting way to work around that.

Another option, which would preserve the single arity perform/1 but be backward compatible, would be to use before_compile to add a fallback clause that explicitly matched on the job struct and called perform again with just the args. With that in place we can always call perform/1 with the full job without checking whether the module exports perform/2.

Insert Once / Unique

I think the biggest issue here is how to actually make Job.insert_once/1 idempotent. This issue I see is that I don't think we can add a DB constraint that would support this uniqueness check at the DB level which would mean we were relying on read then write checks which could introduce a race condition between workers. Maybe someone with more SQL fu than me can figure that out though?

As you implied, there are two ways to make it idempotent: a database constraint with reliance on upsert, and with an old-fashioned existence query followed by an insert. The database constraint has the advantage that it is guaranteed not to allow duplicate records, but it requires running migrations. The check for existence and then insert doesn't have those guarantees, but it is much easier to add and modify.

I think a top level Oban.insert/2 function is the solution. It would perform a repo insert call with on conflict handling in the case that the database has unique constraints, and it can also be set to do uniqueness calculations at the job level. This is the approach I believe should be pursued.

@axelson

This comment has been minimized.

Copy link

commented Jul 15, 2019

I think a top level Oban.insert/2 function is the solution. It would perform a repo insert call with on conflict handling in the case that the database has unique constraints, and it can also be set to do uniqueness calculations at the job level. This is the approach I believe should be pursued.

Would anyone want to have two jobs that have different arguments be considered to be the "same" and not be queued? I could potentially see myself wanting that, although I don't have a concrete use-case in mind right now. One way to handle that would be that by default the uniqueness is checked via the arguments equality, but optionally the user could specify a function that will generate a string from the arguments, and then we check that string for uniqueness. Does that make sense?

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Jul 16, 2019

Would anyone want to have two jobs that have different arguments be considered to be the "same" and not be queued? I could potentially see myself wanting that, although I don't have a concrete use-case in mind right now.

It is certainly possible, and not something that I would want to eliminate outright. Users should be able to choose, on a per-worker basis, which fields to consider for uniqueness. A sensible default would likely be [:worker, :queue, :args].

One way to handle that would be that by default the uniqueness is checked via the arguments equality, but optionally the user could specify a function that will generate a string from the arguments, and then we check that string for uniqueness.

That is essentially how Kiq (and Sidekiq Enterprise) does it. It creates a checksum of the fields and uses that as a unique key. We should be able to do something similar using md5 or sha256 within Postgres.

@andykent

This comment has been minimized.

Copy link
Author

commented Jul 17, 2019

Sorry for the delayed reply, just to say I agree with the direction of all of this and the hashing solution for uniqueness is a nice solution and easily constrained at the DB level.

@sorentwo

This comment has been minimized.

Copy link
Owner

commented Aug 1, 2019

Unique job support has landed! I'm finally closing this one out. Thanks for all the discussion and feedback.

@sorentwo sorentwo closed this Aug 1, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.