Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to prevent some nodes from performing jobs? #82

Closed
halostatue opened this issue Oct 18, 2019 · 8 comments

Comments

@halostatue
Copy link

@halostatue halostatue commented Oct 18, 2019

How would one indicate that, for a given context, I don’t want a particular node to take on any workers? I’m thinking that I have a temporary node whose responsibility is to run mix tasks that can insert jobs into Oban queues, but should not process jobs from those queues.

As an enhanced form of the question, is it possible for a node to specifically subscribe to (or exclude subscription from) particular queues? That is, if I wanted to have two worker nodes, each one handling a different subset of queues, how would I do so?

@sorentwo

This comment has been minimized.

Copy link
Owner

@sorentwo sorentwo commented Oct 18, 2019

How would one indicate that, for a given context, I don’t want a particular node to take on any workers? I’m thinking that I have a temporary node whose responsibility is to run mix tasks that can insert jobs into Oban queues, but should not process jobs from those queues.

You would pass queues: false on startup for that particular node. That will start Oban and let you enqueue jobs, but it won't run any specific queues.

As an enhanced form of the question, is it possible for a node to specifically subscribe to (or exclude subscription from) particular queues? That is, if I wanted to have two worker nodes, each one handling a different subset of queues, how would I do so?

This is a normal thing to do when you have some queues that run "expensive" jobs. On one of the apps I work with we use environment variables to control which queues are ran. The implementation looks a bit like this (I'm putting it all in the application module for illustration purposes, you could do it in releases.exs instead):

defmodule MyApp.Application do
  @moduledoc false

  use Application

  alias MyApp.{Endpoint, Repo}

  def start(_type, _args) do
    children = [
      Repo,
      Endpoint,
      {Oban, oban_opts()}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end

  defp oban_opts do
    env_queues = System.get_env("OBAN_QUEUES")

    :my_app
    |> Application.get_env(Oban)
    |> Keyword.put(:queues, queues(env_queues))
  end

  defp queues(values) when is_binary(values) and byte_size(values) > 3 do
    values
    |> String.split(" ", trim: true)
    |> Enum.map(&String.split(&1, ",", trim: true))
    |> Keyword.new(fn [queue, limit] ->
      {String.to_existing_atom(queue), String.to_integer(limit)}
    end)
  end

  defp queues(_value), do: false
end

That parses the queue names and limits out of a string that is formatted like "QUEUE,LIMIT" and separated by spaces.

Now start the node with just the queues that you want:

OBAN_QUEUES="default,10 media,5" mix phx.server
@halostatue

This comment has been minimized.

Copy link
Author

@halostatue halostatue commented Oct 18, 2019

Can I suggest that this be part of your Oban usage blog series and/or added to a documentation page for advanced usage?

@sorentwo

This comment has been minimized.

Copy link
Owner

@sorentwo sorentwo commented Oct 18, 2019

Can I suggest that this be part of your Oban usage blog series and/or added to a documentation page for advanced usage?

Absolutely! I was thinking the same thing. I'll leave this issue open as a reminder 👍

@halostatue

This comment has been minimized.

Copy link
Author

@halostatue halostatue commented Oct 18, 2019

I ended up doing this a bit differently than you posted, but (currently untested):

  defp oban_opts do
    config = Application.get_env(:my_app, Oban)
    queues =
      config
      |> Keyword.get(:queues, false)
      |> oban_queues(System.get_env("OBAN_QUEUES"))

    Keyword.put(config, :queues, queues)
  end

  defp oban_queues(config_queues, "*"), do: config_queues

  defp oban_queues(_, env_queues) when is_binary(env_queues) and byte_size(env_queues) > 3 do
    env_queues
    |> String.split(" ", trim: true)
    |> Enum.map(&String.split(&1, ",", trim: true))
    |> Keyword.new(fn [queue, limit] ->
      {String.to_existing_atom(queue), String.to_integer(limit)}
    end)
  end

  defp oban_queues(_config_queues, _env_queues), do: false

That way, I can do OBAN_QUEUES=* mix phx.server and I will get the queues that are preconfigured in config.exs while still making it an explicit opt-in to processing the queues.

@halostatue

This comment has been minimized.

Copy link
Author

@halostatue halostatue commented Oct 18, 2019

One other question. With queues disabled, should the beats still be happening?

13:20:34.115 [info]  QUERY OK source="oban_beats" db=54.7ms queue=1.1ms
DELETE FROM "public"."oban_beats" AS o0 WHERE (o0."inserted_at" < $1) [#DateTime<2019-10-18 16:20:34.040562Z>]

13:20:34.118 [info]  QUERY OK source="oban_jobs" db=1.2ms queue=1.3ms
DELETE FROM "public"."oban_jobs" AS o0 USING (SELECT o0."id" AS "id", o0."state" AS "state", o0."queue" AS "queue", o0."worker" AS "worker", o0."args" AS "args", o0."errors" AS "errors", o0."attempt" AS "attempt", o0."attempted_by" AS "attempted_by", o0."max_attempts" AS "max_attempts", o0."attempted_at" AS "attempted_at", o0."completed_at" AS "completed_at", o0."inserted_at" AS "inserted_at", o0."scheduled_at" AS "scheduled_at" FROM "public"."oban_jobs" AS o0 WHERE (((o0."state" = 'completed') AND (o0."completed_at" < $1))) OR ((o0."state" = 'discarded') AND (o0."attempted_at" < $2)) ORDER BY o0."id" DESC LIMIT $3) AS s1 WHERE (o0."id" = s1."id") [#DateTime<2019-10-18 14:32:34.115219Z>, #DateTime<2019-10-18 14:32:34.115219Z>, 5000]
@halostatue

This comment has been minimized.

Copy link
Author

@halostatue halostatue commented Oct 18, 2019

And if I haven’t said it enough, Oban rocks.

@sorentwo

This comment has been minimized.

Copy link
Owner

@sorentwo sorentwo commented Oct 18, 2019

One other question. With queues disabled, should the beats still be happening?

That is expected, it is the pruner running to clean up outdated beats. Each node will do that regardless of which queues are running.

@sorentwo

This comment has been minimized.

Copy link
Owner

@sorentwo sorentwo commented Nov 5, 2019

@sorentwo sorentwo closed this Nov 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.