From 1324eee2f493e17fc4f8019cb8a2df79bc5ef316 Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Tue, 26 Nov 2019 14:59:50 +0530 Subject: [PATCH 1/6] Add mock module for queue APIs --- lib/flume/api.ex | 42 +--------------- lib/flume/queue/api.ex | 54 +++++++++++++++++++++ lib/flume/queue/mock_api.ex | 97 +++++++++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 41 deletions(-) create mode 100644 lib/flume/queue/api.ex create mode 100644 lib/flume/queue/mock_api.ex diff --git a/lib/flume/api.ex b/lib/flume/api.ex index 5a1e1c7..bb25c20 100644 --- a/lib/flume/api.ex +++ b/lib/flume/api.ex @@ -5,52 +5,12 @@ defmodule Flume.API do alias Flume.{Config, Pipeline} alias Flume.Queue.Manager - def enqueue( - queue, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue( - namespace(), - queue, - worker, - function_name, - args, - opts - ) - end - - def bulk_enqueue(queue, jobs, opts \\ []) do - Manager.bulk_enqueue(namespace(), queue, jobs, opts) - end - - def enqueue_in( - queue, - time_in_seconds, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue_in( - namespace(), - queue, - time_in_seconds, - worker, - function_name, - args, - opts - ) - end + import Flume.Queue.API def pause_all(temporary \\ true) do Config.pipeline_names() |> Enum.map(&pause(&1, temporary)) end - defp namespace, do: Config.namespace() - defdelegate pause(pipeline_name, temporary \\ true), to: Pipeline.Event defdelegate resume(pipeline_name, temporary \\ true), to: Pipeline.Event diff --git a/lib/flume/queue/api.ex b/lib/flume/queue/api.ex new file mode 100644 index 0000000..9ffcb79 --- /dev/null +++ b/lib/flume/queue/api.ex @@ -0,0 +1,54 @@ +defmodule Flume.Queue.API do + @callback bulk_enqueue(String.t(), [any()], [any()]) :: {:ok, term} | {:error, String.t()} + + @callback enqueue(String.t(), Atom.t(), Atom.t(), [any()], [any()]) :: + {:ok, term} | {:error, String.t()} + + @callback enqueue_in(String.t(), integer, Atom.t(), Atom.t(), [any()], [any()]) :: + {:ok, term} | {:error, String.t()} + + alias Flume.Config + alias Flume.Queue.Manager + + def bulk_enqueue(queue, jobs, opts \\ []) do + Manager.bulk_enqueue(namespace(), queue, jobs, opts) + end + + def enqueue( + queue, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue( + namespace(), + queue, + worker, + function_name, + args, + opts + ) + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue_in( + namespace(), + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ) + end + + defp namespace, do: Config.namespace() +end diff --git a/lib/flume/queue/mock_api.ex b/lib/flume/queue/mock_api.ex new file mode 100644 index 0000000..e69420c --- /dev/null +++ b/lib/flume/queue/mock_api.ex @@ -0,0 +1,97 @@ +defmodule Flume.Queue.MockAPI do + @behaviour Flume.Queue.API + + def bulk_enqueue(queue, jobs, opts \\ []) + + def bulk_enqueue(queue, jobs, []) do + send(self(), %{queue: queue, jobs: jobs}) + end + + def bulk_enqueue(queue, jobs, opts) do + send(self(), %{queue: queue, jobs: jobs, options: opts}) + end + + def enqueue( + queue, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) + + def enqueue( + queue, + worker, + function_name, + args, + [] + ) do + send( + self(), + %{queue: queue, worker: worker, function_name: function_name, args: args} + ) + end + + def enqueue( + queue, + worker, + function_name, + args, + opts + ) do + send( + self(), + %{queue: queue, worker: worker, function_name: function_name, args: args, options: opts} + ) + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name, + args, + [] + ) do + send( + self(), + %{ + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args + } + ) + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ) do + send( + self(), + %{ + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args, + options: opts + } + ) + end +end From 397762808e617a6cc70f399cc9e88c9ae21760b5 Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Tue, 26 Nov 2019 15:11:03 +0530 Subject: [PATCH 2/6] Add config to enable mocking in test env. --- lib/flume/api.ex | 32 +++++++++++++++++++++- lib/flume/config.ex | 11 ++++++++ lib/flume/queue/mock_api.ex | 54 +++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/lib/flume/api.ex b/lib/flume/api.ex index bb25c20..c45964a 100644 --- a/lib/flume/api.ex +++ b/lib/flume/api.ex @@ -5,7 +5,37 @@ defmodule Flume.API do alias Flume.{Config, Pipeline} alias Flume.Queue.Manager - import Flume.Queue.API + def bulk_enqueue(queue, jobs) do + apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs]) + end + + def bulk_enqueue(queue, jobs, opts) do + apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs, opts]) + end + + def enqueue(queue, worker, args) do + apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, args]) + end + + def enqueue(queue, worker, function_name, args) do + apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args]) + end + + def enqueue(queue, worker, function_name, args, opts) do + apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args, opts]) + end + + def enqueue_in(queue, time_in_seconds, worker, args) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args]) + end + + def enqueue_in(queue, time_in_seconds, worker, function_name, args) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, function_name, args]) + end + + def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do + apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, function_name, args, opts]) + end def pause_all(temporary \\ true) do Config.pipeline_names() |> Enum.map(&pause(&1, temporary)) diff --git a/lib/flume/config.ex b/lib/flume/config.ex index ae00650..07475fb 100644 --- a/lib/flume/config.ex +++ b/lib/flume/config.ex @@ -5,6 +5,7 @@ defmodule Flume.Config do database: 0, host: "127.0.0.1", logger: Flume.DefaultLogger, + mock: false, max_retries: 5, name: Flume, namespace: "flume", @@ -92,4 +93,14 @@ defmodule Flume.Config do def queues, do: Enum.map(pipelines(), & &1.queue) def pipeline_names, do: Enum.map(pipelines(), & &1.name) + + def queue_api_module do + case mock() do + false -> + Flume.Queue.API + + true -> + Flume.Queue.MockAPI + end + end end diff --git a/lib/flume/queue/mock_api.ex b/lib/flume/queue/mock_api.ex index e69420c..daafab9 100644 --- a/lib/flume/queue/mock_api.ex +++ b/lib/flume/queue/mock_api.ex @@ -4,7 +4,11 @@ defmodule Flume.Queue.MockAPI do def bulk_enqueue(queue, jobs, opts \\ []) def bulk_enqueue(queue, jobs, []) do - send(self(), %{queue: queue, jobs: jobs}) + message = %{queue: queue, jobs: jobs} + + send(self(), message) + + {:ok, message} end def bulk_enqueue(queue, jobs, opts) do @@ -26,10 +30,11 @@ defmodule Flume.Queue.MockAPI do args, [] ) do - send( - self(), - %{queue: queue, worker: worker, function_name: function_name, args: args} - ) + message = %{queue: queue, worker: worker, function_name: function_name, args: args} + + send(self(), message) + + {:ok, message} end def enqueue( @@ -39,10 +44,11 @@ defmodule Flume.Queue.MockAPI do args, opts ) do - send( - self(), - %{queue: queue, worker: worker, function_name: function_name, args: args, options: opts} - ) + message = %{queue: queue, worker: worker, function_name: function_name, args: args, options: opts} + + send(self(), message) + + {:ok, message} end def enqueue_in( @@ -62,16 +68,17 @@ defmodule Flume.Queue.MockAPI do args, [] ) do - send( - self(), - %{ - schedule_in: time_in_seconds, - queue: queue, - worker: worker, - function_name: function_name, - args: args - } - ) + message = %{ + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args + } + + send(self(), message) + + {:ok, message} end def enqueue_in( @@ -82,9 +89,7 @@ defmodule Flume.Queue.MockAPI do args, opts ) do - send( - self(), - %{ + message = %{ schedule_in: time_in_seconds, queue: queue, worker: worker, @@ -92,6 +97,9 @@ defmodule Flume.Queue.MockAPI do args: args, options: opts } - ) + + send(self(), message) + + {:ok, message} end end From a79b2784b0fd3505bce64c66e07cd4c824bf520d Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Wed, 27 Nov 2019 12:47:12 +0530 Subject: [PATCH 3/6] Add a test helper to mock Flume --- README.md | 54 ++++++++++++++++++++++++++++++++++++++------ lib/flume/mock.ex | 27 ++++++++++++++++++++++ test/flume_test.exs | 55 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 7 deletions(-) create mode 100644 lib/flume/mock.ex diff --git a/README.md b/README.md index c573fb6..ace7b83 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir- - [Batch Processing](#batch-processing) - [Pipeline Control](#pipeline-control) - [Instrumentation](#instrumentation) -- [Testing](#testing) +- [Writing Tests](#writing-tests) - [Roadmap](#roadmap) - [References](#references) - [Contributing](#contributing) @@ -329,17 +329,57 @@ Following metrics are emitted: - duration of a job/worker - count, latency and payload_size of dequeued jobs -## Testing +## Writing Tests -Use these guidelines for running tests: - -* Disable flume pipelines in test env +**To enable mock in the test environment** **config/test.exs** ```elixir -config :flume, - pipelines: [] +config :flume, mock: true +``` + +**To mock individual test** + +```elixir +import Flume.Mock +... +describe "enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end +end +``` + +**To enable mock for all tests in a module** + +```elixir +defmodule ListTest do + use ExUnit.Case, async: true + use Flume.Mock + + describe "enqueue/4" do + test "mock works" do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end +end ``` ## Roadmap diff --git a/lib/flume/mock.ex b/lib/flume/mock.ex new file mode 100644 index 0000000..d234977 --- /dev/null +++ b/lib/flume/mock.ex @@ -0,0 +1,27 @@ +defmodule Flume.Mock do + defmacro __using__(_) do + quote do + setup _mock do + Application.put_env(:flume, :mock, true) + + on_exit(fn -> + Application.put_env(:flume, :mock, false) + end) + + :ok + end + end + end + + defmacro with_flume_mock(do: yield) do + quote do + Application.put_env(:flume, :mock, true) + + on_exit(fn -> + Application.put_env(:flume, :mock, false) + end) + + unquote(yield) + end + end +end diff --git a/test/flume_test.exs b/test/flume_test.exs index c6d862e..030b729 100644 --- a/test/flume_test.exs +++ b/test/flume_test.exs @@ -1,6 +1,8 @@ defmodule FlumeTest do use Flume.TestWithRedis + import Flume.Mock + alias Flume.Redis.Job alias Flume.{Config, Pipeline, JobFactory} alias Flume.Pipeline.Event.{ProducerConsumer, Consumer, Producer} @@ -155,4 +157,57 @@ defmodule FlumeTest do GenStage.stop(producer) end end + + describe "enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.enqueue(:test, List, :last, [[1]]) + + assert_receive %{ + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end + end + + describe "enqueue_in/5" do + test "mock works" do + with_flume_mock do + Flume.enqueue_in(:test, 10, List, :last, [[1]]) + + assert_receive %{ + schedule_in: 10, + queue: :test, + worker: List, + function_name: :last, + args: [[1]] + } + end + end + end + + describe "bulk_enqueue/4" do + test "mock works" do + with_flume_mock do + Flume.bulk_enqueue( + :test, + [ + [List, "last", [[1]]], + [List, "last", [[2, 3]]] + ] + ) + + assert_receive %{ + queue: :test, + jobs: [ + [List, "last", [[1]]], + [List, "last", [[2, 3]]] + ] + } + end + end + end end From 91a9fa41e2672ea29a15b81622b6244445a9cb74 Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Wed, 27 Nov 2019 16:57:23 +0530 Subject: [PATCH 4/6] Avoid starting supervisors when mock is enabled --- lib/flume.ex | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/flume.ex b/lib/flume.ex index 56dcf14..0c8307f 100644 --- a/lib/flume.ex +++ b/lib/flume.ex @@ -16,16 +16,19 @@ defmodule Flume do end def start_link do - children = [ - supervisor(Flume.Redis.Supervisor, []), - worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]), - supervisor(Flume.Pipeline.SystemEvent.Supervisor, []), - supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]]) - ] - - # This order matters, first we need to start all redix worker processes - # then all other processes. - children = children ++ Flume.Support.Pipelines.list() + children = + if Config.mock() do + [] + else + # This order matters, first we need to start all redix worker processes + # then all other processes. + [ + supervisor(Flume.Redis.Supervisor, []), + worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]), + supervisor(Flume.Pipeline.SystemEvent.Supervisor, []), + supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]]) + ] ++ Flume.Support.Pipelines.list() + end opts = [ strategy: :one_for_one, From 85caecdf88a0e92eb13af938abf2406e41ea1390 Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Tue, 3 Dec 2019 19:36:52 +0530 Subject: [PATCH 5/6] Move queue API implementation to DefaultAPI module --- lib/flume/config.ex | 2 +- lib/flume/queue/api.ex | 45 ------------------------------- lib/flume/queue/default_api.ex | 48 ++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 46 deletions(-) create mode 100644 lib/flume/queue/default_api.ex diff --git a/lib/flume/config.ex b/lib/flume/config.ex index 07475fb..c3881c6 100644 --- a/lib/flume/config.ex +++ b/lib/flume/config.ex @@ -97,7 +97,7 @@ defmodule Flume.Config do def queue_api_module do case mock() do false -> - Flume.Queue.API + Flume.Queue.DefaultAPI true -> Flume.Queue.MockAPI diff --git a/lib/flume/queue/api.ex b/lib/flume/queue/api.ex index 9ffcb79..84c1127 100644 --- a/lib/flume/queue/api.ex +++ b/lib/flume/queue/api.ex @@ -6,49 +6,4 @@ defmodule Flume.Queue.API do @callback enqueue_in(String.t(), integer, Atom.t(), Atom.t(), [any()], [any()]) :: {:ok, term} | {:error, String.t()} - - alias Flume.Config - alias Flume.Queue.Manager - - def bulk_enqueue(queue, jobs, opts \\ []) do - Manager.bulk_enqueue(namespace(), queue, jobs, opts) - end - - def enqueue( - queue, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue( - namespace(), - queue, - worker, - function_name, - args, - opts - ) - end - - def enqueue_in( - queue, - time_in_seconds, - worker, - function_name \\ :perform, - args, - opts \\ [] - ) do - Manager.enqueue_in( - namespace(), - queue, - time_in_seconds, - worker, - function_name, - args, - opts - ) - end - - defp namespace, do: Config.namespace() end diff --git a/lib/flume/queue/default_api.ex b/lib/flume/queue/default_api.ex new file mode 100644 index 0000000..adfd5dc --- /dev/null +++ b/lib/flume/queue/default_api.ex @@ -0,0 +1,48 @@ +defmodule Flume.Queue.DefaultAPI do + @behaviour Flume.Queue.API + + alias Flume.Config + alias Flume.Queue.Manager + + def bulk_enqueue(queue, jobs, opts \\ []) do + Manager.bulk_enqueue(namespace(), queue, jobs, opts) + end + + def enqueue( + queue, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue( + namespace(), + queue, + worker, + function_name, + args, + opts + ) + end + + def enqueue_in( + queue, + time_in_seconds, + worker, + function_name \\ :perform, + args, + opts \\ [] + ) do + Manager.enqueue_in( + namespace(), + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ) + end + + defp namespace, do: Config.namespace() +end From 83065d28c7f555a93971f889f4fe647c2ddd764b Mon Sep 17 00:00:00 2001 From: Vasu Adari Date: Tue, 3 Dec 2019 19:39:32 +0530 Subject: [PATCH 6/6] Fix formatting using `mix format` --- lib/flume/api.ex | 25 ++++++++++++++++++++++--- lib/flume/queue/mock_api.ex | 22 ++++++++++++++-------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/lib/flume/api.ex b/lib/flume/api.ex index c45964a..672b9f3 100644 --- a/lib/flume/api.ex +++ b/lib/flume/api.ex @@ -22,7 +22,13 @@ defmodule Flume.API do end def enqueue(queue, worker, function_name, args, opts) do - apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args, opts]) + apply(Flume.Config.queue_api_module(), :enqueue, [ + queue, + worker, + function_name, + args, + opts + ]) end def enqueue_in(queue, time_in_seconds, worker, args) do @@ -30,11 +36,24 @@ defmodule Flume.API do end def enqueue_in(queue, time_in_seconds, worker, function_name, args) do - apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, function_name, args]) + apply(Flume.Config.queue_api_module(), :enqueue_in, [ + queue, + time_in_seconds, + worker, + function_name, + args + ]) end def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do - apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, function_name, args, opts]) + apply(Flume.Config.queue_api_module(), :enqueue_in, [ + queue, + time_in_seconds, + worker, + function_name, + args, + opts + ]) end def pause_all(temporary \\ true) do diff --git a/lib/flume/queue/mock_api.ex b/lib/flume/queue/mock_api.ex index daafab9..da2c47c 100644 --- a/lib/flume/queue/mock_api.ex +++ b/lib/flume/queue/mock_api.ex @@ -44,7 +44,13 @@ defmodule Flume.Queue.MockAPI do args, opts ) do - message = %{queue: queue, worker: worker, function_name: function_name, args: args, options: opts} + message = %{ + queue: queue, + worker: worker, + function_name: function_name, + args: args, + options: opts + } send(self(), message) @@ -90,13 +96,13 @@ defmodule Flume.Queue.MockAPI do opts ) do message = %{ - schedule_in: time_in_seconds, - queue: queue, - worker: worker, - function_name: function_name, - args: args, - options: opts - } + schedule_in: time_in_seconds, + queue: queue, + worker: worker, + function_name: function_name, + args: args, + options: opts + } send(self(), message)