diff --git a/README.md b/README.md index c573fb6..ace7b83 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir- - [Batch Processing](#batch-processing) - [Pipeline Control](#pipeline-control) - [Instrumentation](#instrumentation) -- [Testing](#testing) +- [Writing Tests](#writing-tests) - [Roadmap](#roadmap) - [References](#references) - [Contributing](#contributing) @@ -329,17 +329,57 @@ Following metrics are emitted: - duration of a job/worker - count, latency and payload_size of dequeued jobs -## Testing +## Writing Tests -Use these guidelines for running tests: - -* Disable flume pipelines in test env +**To enable mock in the test environment** **config/test.exs** ```elixir -config :flume, - pipelines: [] +config :flume, mock: true +``` + +**To mock individual test** + +```elixir +import Flume.Mock +... +describe "enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end +end +``` + +**To enable mock for all tests in a module** + +```elixir +defmodule ListTest do + use ExUnit.Case, async: true + use Flume.Mock + + describe "enqueue/4" do + test "mock works" do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end +end ``` ## Roadmap diff --git a/lib/flume.ex b/lib/flume.ex index 56dcf14..0c8307f 100644 --- a/lib/flume.ex +++ b/lib/flume.ex @@ -16,16 +16,19 @@ defmodule Flume do end def start_link do - children = [ - supervisor(Flume.Redis.Supervisor, []), - worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]), - supervisor(Flume.Pipeline.SystemEvent.Supervisor, []), - supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]]) - ] - - # This order matters, first we need to start all redix worker processes - # then all other processes. - children = children ++ Flume.Support.Pipelines.list() + children = + if Config.mock() do + [] + else + # This order matters, first we need to start all redix worker processes + # then all other processes. + [ + supervisor(Flume.Redis.Supervisor, []), + worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]), + supervisor(Flume.Pipeline.SystemEvent.Supervisor, []), + supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]]) + ] ++ Flume.Support.Pipelines.list() + end opts = [ strategy: :one_for_one, diff --git a/lib/flume/api.ex b/lib/flume/api.ex index 5a1e1c7..672b9f3 100644 --- a/lib/flume/api.ex +++ b/lib/flume/api.ex @@ -5,52 +5,61 @@ defmodule Flume.API do alias Flume.{Config, Pipeline} alias Flume.Queue.Manager - def enqueue( - queue, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue( - namespace(), + def bulk_enqueue(queue, jobs) do + apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs]) + end + + def bulk_enqueue(queue, jobs, opts) do + apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs, opts]) + end + + def enqueue(queue, worker, args) do + apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, args]) + end + + def enqueue(queue, worker, function_name, args) do + apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args]) + end + + def enqueue(queue, worker, function_name, args, opts) do + apply(Flume.Config.queue_api_module(), :enqueue, [ queue, worker, function_name, args, opts - ) + ]) end - def bulk_enqueue(queue, jobs, opts \\ []) do - Manager.bulk_enqueue(namespace(), queue, jobs, opts) + def enqueue_in(queue, time_in_seconds, worker, args) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args]) end - def enqueue_in( - queue, - time_in_seconds, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue_in( - namespace(), + def enqueue_in(queue, time_in_seconds, worker, function_name, args) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [ + queue, + time_in_seconds, + worker, + function_name, + args + ]) + end + + def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [ queue, time_in_seconds, worker, function_name, args, opts - ) + ]) end def pause_all(temporary \\ true) do Config.pipeline_names() |> Enum.map(&pause(&1, temporary)) end - defp namespace, do: Config.namespace() - defdelegate pause(pipeline_name, temporary \\ true), to: Pipeline.Event defdelegate resume(pipeline_name, temporary \\ true), to: Pipeline.Event diff --git a/lib/flume/config.ex b/lib/flume/config.ex index ae00650..c3881c6 100644 --- a/lib/flume/config.ex +++ b/lib/flume/config.ex @@ -5,6 +5,7 @@ defmodule Flume.Config do database: 0, host: "127.0.0.1", logger: Flume.DefaultLogger, + mock: false, max_retries: 5, name: Flume, namespace: "flume", @@ -92,4 +93,14 @@ defmodule Flume.Config do def queues, do: Enum.map(pipelines(), & &1.queue) def pipeline_names, do: Enum.map(pipelines(), & &1.name) + + def queue_api_module do + case mock() do + false -> + Flume.Queue.DefaultAPI + + true -> + Flume.Queue.MockAPI + end + end end diff --git a/lib/flume/mock.ex b/lib/flume/mock.ex new file mode 100644 index 0000000..d234977 --- /dev/null +++ b/lib/flume/mock.ex @@ -0,0 +1,27 @@ +defmodule Flume.Mock do + defmacro __using__(_) do + quote do + setup _mock do + Application.put_env(:flume, :mock, true) + + on_exit(fn -> + Application.put_env(:flume, :mock, false) + end) + + :ok + end + end + end + + defmacro with_flume_mock(do: yield) do + quote do + Application.put_env(:flume, :mock, true) + + on_exit(fn -> + Application.put_env(:flume, :mock, false) + end) + + unquote(yield) + end + end +end diff --git a/lib/flume/queue/api.ex b/lib/flume/queue/api.ex new file mode 100644 index 0000000..84c1127 --- /dev/null +++ b/lib/flume/queue/api.ex @@ -0,0 +1,9 @@ +defmodule Flume.Queue.API do + @callback bulk_enqueue(String.t(), [any()], [any()]) :: {:ok, term} | {:error, String.t()} + + @callback enqueue(String.t(), Atom.t(), Atom.t(), [any()], [any()]) :: + {:ok, term} | {:error, String.t()} + + @callback enqueue_in(String.t(), integer, Atom.t(), Atom.t(), [any()], [any()]) :: + {:ok, term} | {:error, String.t()} +end diff --git a/lib/flume/queue/default_api.ex b/lib/flume/queue/default_api.ex new file mode 100644 index 0000000..adfd5dc --- /dev/null +++ b/lib/flume/queue/default_api.ex @@ -0,0 +1,48 @@ +defmodule Flume.Queue.DefaultAPI do + @behaviour Flume.Queue.API + + alias Flume.Config + alias Flume.Queue.Manager + + def bulk_enqueue(queue, jobs, opts \\ []) do + Manager.bulk_enqueue(namespace(), queue, jobs, opts) + end + + def enqueue( + queue, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue( + namespace(), + queue, + worker, + function_name, + args, + opts + ) + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue_in( + namespace(), + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ) + end + + defp namespace, do: Config.namespace() +end diff --git a/lib/flume/queue/mock_api.ex b/lib/flume/queue/mock_api.ex new file mode 100644 index 0000000..da2c47c --- /dev/null +++ b/lib/flume/queue/mock_api.ex @@ -0,0 +1,111 @@ +defmodule Flume.Queue.MockAPI do + @behaviour Flume.Queue.API + + def bulk_enqueue(queue, jobs, opts \\ []) + + def bulk_enqueue(queue, jobs, []) do + message = %{queue: queue, jobs: jobs} + + send(self(), message) + + {:ok, message} + end + + def bulk_enqueue(queue, jobs, opts) do + send(self(), %{queue: queue, jobs: jobs, options: opts}) + end + + def enqueue( + queue, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) + + def enqueue( + queue, + worker, + function_name, + args, + [] + ) do + message = %{queue: queue, worker: worker, function_name: function_name, args: args} + + send(self(), message) + + {:ok, message} + end + + def enqueue( + queue, + worker, + function_name, + args, + opts + ) do + message = %{ + queue: queue, + worker: worker, + function_name: function_name, + args: args, + options: opts + } + + send(self(), message) + + {:ok, message} + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name, + args, + [] + ) do + message = %{ + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args + } + + send(self(), message) + + {:ok, message} + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ) do + message = %{ + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args, + options: opts + } + + send(self(), message) + + {:ok, message} + end +end diff --git a/test/flume_test.exs b/test/flume_test.exs index c6d862e..030b729 100644 --- a/test/flume_test.exs +++ b/test/flume_test.exs @@ -1,6 +1,8 @@ defmodule FlumeTest do use Flume.TestWithRedis + import Flume.Mock + alias Flume.Redis.Job alias Flume.{Config, Pipeline, JobFactory} alias Flume.Pipeline.Event.{ProducerConsumer, Consumer, Producer} @@ -155,4 +157,57 @@ defmodule FlumeTest do GenStage.stop(producer) end end + + describe "enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end + end + + describe "enqueue_in/5" do + test "mock works" do + with_flume_mock do + Flume.enqueue_in(:test, 10, List, :last, [[1]]) + + assert_receive %{ + schedule_in: 10, + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end + end + + describe "bulk_enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.bulk_enqueue( + :test, + [ + [List, "last", [[1]]], + [List, "last", [[2, 3]]] + ] + ) + + assert_receive %{ + queue: :test, + jobs: [ + [List, "last", [[1]]], + [List, "last", [[2, 3]]] + ] + } + end + end + end end