From 1a55b63894df905e2121e83c4eb41ac5c3dc3e96 Mon Sep 17 00:00:00 2001 From: Harsh Balyan Date: Wed, 29 Jan 2020 17:47:42 +0530 Subject: [PATCH] Add Flume.API.job_counts/1 to get count of jobs in the pipeline which are yet to be processed. --- CHANGELOG.md | 4 ++++ lib/flume/api.ex | 19 +++++++++++++++++++ lib/flume/queue/manager.ex | 18 +++++++++++++++++- lib/flume/redis/client.ex | 17 ++++++----------- lib/flume/redis/command.ex | 10 ++++++++++ mix.exs | 2 +- test/flume/queue/manager_test.exs | 23 +++++++++++++++++++++++ 7 files changed, 80 insertions(+), 13 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..865f970 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,4 @@ +# Changelog + +## v0.2.0 + * Add `Flume.API.job_counts/1` to get count of jobs in the pipeline which are yet to be processed. diff --git a/lib/flume/api.ex b/lib/flume/api.ex index 672b9f3..9032dfa 100644 --- a/lib/flume/api.ex +++ b/lib/flume/api.ex @@ -70,6 +70,25 @@ defmodule Flume.API do end def worker_context, do: Pipeline.Context.get() + + @doc """ + Returns count of jobs in the pipeline which are yet to be processed. + + ## Examples + ``` + Flume.API.job_counts(["queue-1", "queue-2"]) + {:ok, [2, 3]} + + Flume.API.job_counts(["queue-1", "not-a-queue-name"]) + {:ok, [2, 0]} + ``` + """ + @spec job_counts(nonempty_list(binary)) :: + {:ok, nonempty_list(Redix.Protocol.redis_value())} + | {:error, atom | Redix.Error.t() | Redix.ConnectionError.t()} + def job_counts(queues), do: Manager.job_counts(namespace(), queues) + + defp namespace, do: Config.namespace() end end end diff --git a/lib/flume/queue/manager.ex b/lib/flume/queue/manager.ex index ede83b1..136cfb5 100644 --- a/lib/flume/queue/manager.ex +++ b/lib/flume/queue/manager.ex @@ -1,7 +1,7 @@ defmodule Flume.Queue.Manager do require Flume.Logger - alias Flume.{Config, Event, Logger, Instrumentation, Utils} + alias Flume.{Config, Event, Logger, Instrumentation, Utils, Redis} alias Flume.Redis.Job alias Flume.Queue.Backoff alias Flume.Support.Time, as: TimeExtension @@ -66,6 +66,20 @@ defmodule Flume.Queue.Manager do schedule_job_at(queue_name, time_in_seconds, job) end + def job_counts(namespace, [_queue | _] = queues) do + queues + |> Enum.map(&(fully_qualified_queue_name(namespace, &1) |> Redis.Command.llen())) + |> Redis.Client.pipeline() + |> case do + {:ok, counts} -> + {:ok, Enum.zip(queues, counts)} + + {:error, reason} -> + Logger.error("Error in getting job counts #{inspect(reason)}") + {:error, reason} + end + end + def fetch_jobs( namespace, queue, @@ -263,6 +277,8 @@ defmodule Flume.Queue.Manager do [scheduled_key(namespace), retry_key(namespace)] end + defp fully_qualified_queue_name(namespace, queue_name), do: "#{namespace}:queue:#{queue_name}" + def processing_key(namespace, queue), do: full_key(namespace, "queue:processing:#{queue}") def rate_limit_key(namespace, queue, nil), do: full_key(namespace, "queue:limit:#{queue}") diff --git a/lib/flume/redis/client.ex b/lib/flume/redis/client.ex index a5e8938..4317341 100644 --- a/lib/flume/redis/client.ex +++ b/lib/flume/redis/client.ex @@ -19,7 +19,6 @@ defmodule Flume.Redis.Client do @ltrim "LTRIM" @rpush "RPUSH" @lrange "LRANGE" - @llen "LLEN" @lrem "LREM" @rpoplpush "RPOPLPUSH" @sadd "SADD" @@ -195,13 +194,9 @@ defmodule Flume.Redis.Client do iex> Flume.Redis.Client.llen!("flume:test:stack") 0 """ - def llen!(list_name) do - query!([@llen, list_name]) - end + def llen!(list_name), do: Command.llen(list_name) |> query!() - def llen(list_name) do - query([@llen, list_name]) - end + def llen(list_name), do: Command.llen(list_name) |> query() @doc """ Removes given values from the list. @@ -224,9 +219,9 @@ defmodule Flume.Redis.Client do {:error, reason} -> {:error, reason} - {:ok, reponses} -> + {:ok, responses} -> success_responses = - reponses + responses |> Enum.map(fn response -> case response do value when value in [:undefined, nil] -> @@ -269,9 +264,9 @@ defmodule Flume.Redis.Client do {:error, reason} -> {:error, reason} - {:ok, reponses} -> + {:ok, responses} -> success_responses = - reponses + responses |> Enum.map(fn response -> case response do value when value in [:undefined, nil] -> diff --git a/lib/flume/redis/command.ex b/lib/flume/redis/command.ex index c8a785f..b4d1af3 100644 --- a/lib/flume/redis/command.ex +++ b/lib/flume/redis/command.ex @@ -6,6 +6,7 @@ defmodule Flume.Redis.Command do @hmget "HMGET" @hscan "HSCAN" @hset "HSET" + @llen "LLEN" @doc """ Prepares HDEL commands for list of {hash, key} pairs @@ -60,4 +61,13 @@ defmodule Flume.Redis.Command do def hscan(hash, cursor, pattern), do: [@hscan, hash, cursor, @match, pattern] def hset(hash, key, value), do: [@hset, hash, key, value] + + @doc """ + Returns command for getting the length of a list. + + ## Examples + iex> Flume.Redis.Command.llen("flume:test:stack") + ["LLEN", "flume:test:stack"] + """ + def llen(key), do: [@llen, key] end diff --git a/mix.exs b/mix.exs index 5faa384..ab50cfe 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Flume.Mixfile do def project do [ app: :flume, - version: "0.1.3", + version: "0.2.0", elixir: "~> 1.6", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/flume/queue/manager_test.exs b/test/flume/queue/manager_test.exs index cf8ac94..22e52a0 100644 --- a/test/flume/queue/manager_test.exs +++ b/test/flume/queue/manager_test.exs @@ -243,4 +243,27 @@ defmodule Flume.Queue.ManagerTest do ) end end + + describe "job_counts/2" do + test "returns counts of the requested queues" do + jobs = JobFactory.generate_jobs("Elixir.Worker", 10) + + [ + queue_1, + queue_2, + _ + ] = Enum.map(1..3, &"test_#{&1}") + + Enum.map(1..2, &Job.bulk_enqueue("#{@namespace}:queue:test_#{&1}", jobs)) + + assert { + :ok, + [ + {queue_1, 10}, + {queue_2, 10}, + {"unknown-queue", 0} + ] + } == Manager.job_counts(@namespace, [queue_1, queue_2, "unknown-queue"]) + end + end end