Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Changelog

## v0.2.0
* Add `Flume.API.job_counts/1` to get count of jobs in the pipeline which are yet to be processed.
19 changes: 19 additions & 0 deletions lib/flume/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ defmodule Flume.API do
end

def worker_context, do: Pipeline.Context.get()

@doc """
Returns count of jobs in the pipeline which are yet to be processed.

## Examples
```
Flume.API.job_counts(["queue-1", "queue-2"])
{:ok, [2, 3]}

Flume.API.job_counts(["queue-1", "not-a-queue-name"])
{:ok, [2, 0]}
```
"""
@spec job_counts(nonempty_list(binary)) ::
{:ok, nonempty_list(Redix.Protocol.redis_value())}
| {:error, atom | Redix.Error.t() | Redix.ConnectionError.t()}
def job_counts(queues), do: Manager.job_counts(namespace(), queues)

defp namespace, do: Config.namespace()
end
end
end
18 changes: 17 additions & 1 deletion lib/flume/queue/manager.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Flume.Queue.Manager do
require Flume.Logger

alias Flume.{Config, Event, Logger, Instrumentation, Utils}
alias Flume.{Config, Event, Logger, Instrumentation, Utils, Redis}
alias Flume.Redis.Job
alias Flume.Queue.Backoff
alias Flume.Support.Time, as: TimeExtension
Expand Down Expand Up @@ -66,6 +66,20 @@ defmodule Flume.Queue.Manager do
schedule_job_at(queue_name, time_in_seconds, job)
end

def job_counts(namespace, [_queue | _] = queues) do
queues
|> Enum.map(&(fully_qualified_queue_name(namespace, &1) |> Redis.Command.llen()))
|> Redis.Client.pipeline()
|> case do
{:ok, counts} ->
{:ok, Enum.zip(queues, counts)}

{:error, reason} ->
Logger.error("Error in getting job counts #{inspect(reason)}")
{:error, reason}
end
end

def fetch_jobs(
namespace,
queue,
Expand Down Expand Up @@ -263,6 +277,8 @@ defmodule Flume.Queue.Manager do
[scheduled_key(namespace), retry_key(namespace)]
end

defp fully_qualified_queue_name(namespace, queue_name), do: "#{namespace}:queue:#{queue_name}"

def processing_key(namespace, queue), do: full_key(namespace, "queue:processing:#{queue}")

def rate_limit_key(namespace, queue, nil), do: full_key(namespace, "queue:limit:#{queue}")
Expand Down
17 changes: 6 additions & 11 deletions lib/flume/redis/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Flume.Redis.Client do
@ltrim "LTRIM"
@rpush "RPUSH"
@lrange "LRANGE"
@llen "LLEN"
@lrem "LREM"
@rpoplpush "RPOPLPUSH"
@sadd "SADD"
Expand Down Expand Up @@ -195,13 +194,9 @@ defmodule Flume.Redis.Client do
iex> Flume.Redis.Client.llen!("flume:test:stack")
0
"""
def llen!(list_name) do
query!([@llen, list_name])
end
def llen!(list_name), do: Command.llen(list_name) |> query!()

def llen(list_name) do
query([@llen, list_name])
end
def llen(list_name), do: Command.llen(list_name) |> query()

@doc """
Removes given values from the list.
Expand All @@ -224,9 +219,9 @@ defmodule Flume.Redis.Client do
{:error, reason} ->
{:error, reason}

{:ok, reponses} ->
{:ok, responses} ->
success_responses =
reponses
responses
|> Enum.map(fn response ->
case response do
value when value in [:undefined, nil] ->
Expand Down Expand Up @@ -269,9 +264,9 @@ defmodule Flume.Redis.Client do
{:error, reason} ->
{:error, reason}

{:ok, reponses} ->
{:ok, responses} ->
success_responses =
reponses
responses
|> Enum.map(fn response ->
case response do
value when value in [:undefined, nil] ->
Expand Down
10 changes: 10 additions & 0 deletions lib/flume/redis/command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Flume.Redis.Command do
@hmget "HMGET"
@hscan "HSCAN"
@hset "HSET"
@llen "LLEN"

@doc """
Prepares HDEL commands for list of {hash, key} pairs
Expand Down Expand Up @@ -60,4 +61,13 @@ defmodule Flume.Redis.Command do
def hscan(hash, cursor, pattern), do: [@hscan, hash, cursor, @match, pattern]

def hset(hash, key, value), do: [@hset, hash, key, value]

@doc """
Returns command for getting the length of a list.

## Examples
iex> Flume.Redis.Command.llen("flume:test:stack")
["LLEN", "flume:test:stack"]
"""
def llen(key), do: [@llen, key]
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Flume.Mixfile do
def project do
[
app: :flume,
version: "0.1.3",
version: "0.2.0",
elixir: "~> 1.6",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
23 changes: 23 additions & 0 deletions test/flume/queue/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,27 @@ defmodule Flume.Queue.ManagerTest do
)
end
end

describe "job_counts/2" do
test "returns counts of the requested queues" do
jobs = JobFactory.generate_jobs("Elixir.Worker", 10)

[
queue_1,
queue_2,
_
] = Enum.map(1..3, &"test_#{&1}")

Enum.map(1..2, &Job.bulk_enqueue("#{@namespace}:queue:test_#{&1}", jobs))

assert {
:ok,
[
{queue_1, 10},
{queue_2, 10},
{"unknown-queue", 0}
]
} == Manager.job_counts(@namespace, [queue_1, queue_2, "unknown-queue"])
end
end
end