Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir-
- [Batch Processing](#batch-processing)
- [Pipeline Control](#pipeline-control)
- [Instrumentation](#instrumentation)
- [Testing](#testing)
- [Writing Tests](#writing-tests)
- [Roadmap](#roadmap)
- [References](#references)
- [Contributing](#contributing)
Expand Down Expand Up @@ -329,17 +329,57 @@ Following metrics are emitted:
- duration of a job/worker
- count, latency and payload_size of dequeued jobs

## Testing
## Writing Tests

Use these guidelines for running tests:

* Disable flume pipelines in test env
**To enable mock in the test environment**

**config/test.exs**

```elixir
config :flume,
pipelines: []
config :flume, mock: true
```

**To mock individual test**

```elixir
import Flume.Mock
...
describe "enqueue/4" do
test "mock works" do
with_flume_mock do
Flume.enqueue(:test, List, :last, [[1]])

assert_receive %{
queue: :test,
worker: List,
function_name: :last,
args: [[1]]
}
end
end
end
```

**To enable mock for all tests in a module**

```elixir
defmodule ListTest do
use ExUnit.Case, async: true
use Flume.Mock

describe "enqueue/4" do
test "mock works" do
Flume.enqueue(:test, List, :last, [[1]])

assert_receive %{
queue: :test,
worker: List,
function_name: :last,
args: [[1]]
}
end
end
end
```

## Roadmap
Expand Down
23 changes: 13 additions & 10 deletions lib/flume.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ defmodule Flume do
end

def start_link do
children = [
supervisor(Flume.Redis.Supervisor, []),
worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]),
supervisor(Flume.Pipeline.SystemEvent.Supervisor, []),
supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]])
]

# This order matters, first we need to start all redix worker processes
# then all other processes.
children = children ++ Flume.Support.Pipelines.list()
children =
if Config.mock() do
[]
else
# This order matters, first we need to start all redix worker processes
# then all other processes.
[
supervisor(Flume.Redis.Supervisor, []),
worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]),
supervisor(Flume.Pipeline.SystemEvent.Supervisor, []),
supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]])
] ++ Flume.Support.Pipelines.list()
end

opts = [
strategy: :one_for_one,
Expand Down
59 changes: 34 additions & 25 deletions lib/flume/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,61 @@ defmodule Flume.API do
alias Flume.{Config, Pipeline}
alias Flume.Queue.Manager

def enqueue(
queue,
worker,
function_name \\ :perform,
args,
opts \\ []
) do
Manager.enqueue(
namespace(),
def bulk_enqueue(queue, jobs) do
apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs])
end

def bulk_enqueue(queue, jobs, opts) do
apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs, opts])
end

def enqueue(queue, worker, args) do
apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, args])
end

def enqueue(queue, worker, function_name, args) do
apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args])
end

def enqueue(queue, worker, function_name, args, opts) do
apply(Flume.Config.queue_api_module(), :enqueue, [
queue,
worker,
function_name,
args,
opts
)
])
end

def bulk_enqueue(queue, jobs, opts \\ []) do
Manager.bulk_enqueue(namespace(), queue, jobs, opts)
def enqueue_in(queue, time_in_seconds, worker, args) do
apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args])
end

def enqueue_in(
queue,
time_in_seconds,
worker,
function_name \\ :perform,
args,
opts \\ []
) do
Manager.enqueue_in(
namespace(),
def enqueue_in(queue, time_in_seconds, worker, function_name, args) do
apply(Flume.Config.queue_api_module(), :enqueue_in, [
queue,
time_in_seconds,
worker,
function_name,
args
])
end

def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do
apply(Flume.Config.queue_api_module(), :enqueue_in, [
queue,
time_in_seconds,
worker,
function_name,
args,
opts
)
])
end

def pause_all(temporary \\ true) do
Config.pipeline_names() |> Enum.map(&pause(&1, temporary))
end

defp namespace, do: Config.namespace()

defdelegate pause(pipeline_name, temporary \\ true), to: Pipeline.Event

defdelegate resume(pipeline_name, temporary \\ true), to: Pipeline.Event
Expand Down
11 changes: 11 additions & 0 deletions lib/flume/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Flume.Config do
database: 0,
host: "127.0.0.1",
logger: Flume.DefaultLogger,
mock: false,
max_retries: 5,
name: Flume,
namespace: "flume",
Expand Down Expand Up @@ -92,4 +93,14 @@ defmodule Flume.Config do
def queues, do: Enum.map(pipelines(), & &1.queue)

def pipeline_names, do: Enum.map(pipelines(), & &1.name)

def queue_api_module do
case mock() do
false ->
Flume.Queue.DefaultAPI

true ->
Flume.Queue.MockAPI
end
end
end
27 changes: 27 additions & 0 deletions lib/flume/mock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Flume.Mock do
defmacro __using__(_) do
quote do
setup _mock do
Application.put_env(:flume, :mock, true)

on_exit(fn ->
Application.put_env(:flume, :mock, false)
end)

:ok
end
end
end

defmacro with_flume_mock(do: yield) do
quote do
Application.put_env(:flume, :mock, true)

on_exit(fn ->
Application.put_env(:flume, :mock, false)
end)

unquote(yield)
end
end
end
9 changes: 9 additions & 0 deletions lib/flume/queue/api.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Flume.Queue.API do
@callback bulk_enqueue(String.t(), [any()], [any()]) :: {:ok, term} | {:error, String.t()}

@callback enqueue(String.t(), Atom.t(), Atom.t(), [any()], [any()]) ::
{:ok, term} | {:error, String.t()}

@callback enqueue_in(String.t(), integer, Atom.t(), Atom.t(), [any()], [any()]) ::
{:ok, term} | {:error, String.t()}
end
48 changes: 48 additions & 0 deletions lib/flume/queue/default_api.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule Flume.Queue.DefaultAPI do
@behaviour Flume.Queue.API

alias Flume.Config
alias Flume.Queue.Manager

def bulk_enqueue(queue, jobs, opts \\ []) do
Manager.bulk_enqueue(namespace(), queue, jobs, opts)
end

def enqueue(
queue,
worker,
function_name \\ :perform,
args,
opts \\ []
) do
Manager.enqueue(
namespace(),
queue,
worker,
function_name,
args,
opts
)
end

def enqueue_in(
queue,
time_in_seconds,
worker,
function_name \\ :perform,
args,
opts \\ []
) do
Manager.enqueue_in(
namespace(),
queue,
time_in_seconds,
worker,
function_name,
args,
opts
)
end

defp namespace, do: Config.namespace()
end
Loading