From 7ed4639c6fcebad6353c229c424515bea5081080 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Wed, 3 Oct 2018 21:19:07 -0700 Subject: [PATCH] Introduce basic distribution support --- lib/cachex.ex | 62 ++++----- lib/cachex/actions/clear.ex | 9 +- lib/cachex/actions/empty.ex | 9 +- lib/cachex/actions/inspect.ex | 26 ++-- lib/cachex/actions/reset.ex | 14 ++- lib/cachex/errors.ex | 29 +++-- lib/cachex/options.ex | 25 ++++ lib/cachex/router.ex | 223 ++++++++++++++++++++++++++++++--- lib/cachex/services/janitor.ex | 2 +- lib/cachex/spec.ex | 6 + mix.exs | 1 + test/cachex/errors_test.exs | 9 ++ test/cachex/spec_test.exs | 1 + test/cachex_test.exs | 2 +- 14 files changed, 339 insertions(+), 79 deletions(-) diff --git a/lib/cachex.ex b/lib/cachex.ex index 1ad20ecb..8b88604c 100644 --- a/lib/cachex.ex +++ b/lib/cachex.ex @@ -77,7 +77,7 @@ defmodule Cachex do get: [ 2, 3 ], get_and_update: [ 3, 4 ], incr: [ 2, 3, 4 ], - inspect: [ 2 ], + inspect: [ 2, 3 ], invoke: [ 3, 4 ], keys: [ 1, 2 ], load: [ 2, 3 ], @@ -351,7 +351,7 @@ defmodule Cachex do """ @spec clear(cache, Keyword.t) :: { status, integer } def clear(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :clear, [ options ] }) + do: Router.call(cache, { :clear, [ options ] }) @doc """ Retrieves the number of unexpired records in a cache. @@ -372,7 +372,7 @@ defmodule Cachex do """ @spec count(cache, Keyword.t) :: { status, number } def count(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :count, [ options ] }) + do: Router.call(cache, { :count, [ options ] }) @doc """ Decrements an entry in the cache. @@ -430,7 +430,7 @@ defmodule Cachex do """ @spec del(cache, any, Keyword.t) :: { status, boolean } def del(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :del, [ key, options ] }) + do: Router.call(cache, { :del, [ key, options ] }) @doc """ Serializes a cache to a location on a filesystem. @@ -466,7 +466,7 @@ defmodule Cachex do @spec dump(cache, binary, Keyword.t) :: { status, any } def dump(cache, path, options \\ []) when is_binary(path) and is_list(options), - do: Router.dispatch(cache, { :dump, [ path, options ] }) + do: Router.call(cache, { :dump, [ path, options ] }) @doc """ Determines whether a cache contains any entries. @@ -488,7 +488,7 @@ defmodule Cachex do """ @spec empty?(cache, Keyword.t) :: { status, boolean } def empty?(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :empty?, [ options ] }) + do: Router.call(cache, { :empty?, [ options ] }) @doc """ Executes multiple functions in the context of a cache. @@ -542,7 +542,7 @@ defmodule Cachex do """ @spec exists?(cache, any, Keyword.t) :: { status, boolean } def exists?(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :exists?, [ key, options ] }) + do: Router.call(cache, { :exists?, [ key, options ] }) @doc """ Places an expiration time on an entry in a cache. @@ -566,7 +566,7 @@ defmodule Cachex do @spec expire(cache, any, number, Keyword.t) :: { status, boolean } def expire(cache, key, expiration, options \\ []) when (is_nil(expiration) or is_number(expiration)) and is_list(options), - do: Router.dispatch(cache, { :expire, [ key, expiration, options ] }) + do: Router.call(cache, { :expire, [ key, expiration, options ] }) @doc """ Updates an entry in a cache to expire at a given time. @@ -645,7 +645,7 @@ defmodule Cachex do Overseer.enforce(cache) do case fallback || fallback(cache(cache, :fallback), :default) do val when is_function(val) -> - Router.dispatch(cache, { :fetch, [ key, val, options ] }) + Router.call(cache, { :fetch, [ key, val, options ] }) _na -> error(:invalid_fallback) end @@ -667,7 +667,7 @@ defmodule Cachex do """ @spec get(cache, any, Keyword.t) :: { atom, any } def get(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :get, [ key, options ] }) + do: Router.call(cache, { :get, [ key, options ] }) @doc """ Retrieves and updates an entry in a cache. @@ -696,7 +696,7 @@ defmodule Cachex do @spec get_and_update(cache, any, function, Keyword.t) :: { :commit | :ignore, any } def get_and_update(cache, key, update_function, options \\ []) when is_function(update_function) and is_list(options), - do: Router.dispatch(cache, { :get_and_update, [ key, update_function, options ] }) + do: Router.call(cache, { :get_and_update, [ key, update_function, options ] }) @doc """ Retrieves a list of all entry keys from a cache. @@ -718,7 +718,7 @@ defmodule Cachex do """ @spec keys(cache, Keyword.t) :: { status, [ any ] } def keys(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :keys, [ options ] }) + do: Router.call(cache, { :keys, [ options ] }) @doc """ Increments an entry in the cache. @@ -751,7 +751,7 @@ defmodule Cachex do @spec incr(cache, any, integer, Keyword.t) :: { status, integer } def incr(cache, key, amount \\ 1, options \\ []) when is_integer(amount) and is_list(options), - do: Router.dispatch(cache, { :incr, [ key, amount, options ] }) + do: Router.call(cache, { :incr, [ key, amount, options ] }) @doc """ Inspects various aspects of a cache. @@ -848,9 +848,9 @@ defmodule Cachex do { :ok, 1328 } """ - @spec inspect(cache, atom | tuple) :: { status, any } - def inspect(cache, option), - do: Router.dispatch(cache, { :inspect, [ option ] }) + @spec inspect(cache, atom | tuple, Keyword.t) :: { status, any } + def inspect(cache, option, options \\ []), + do: Router.call(cache, { :inspect, [ option, options ] }) @doc """ Invokes a custom command against a cache entry. @@ -876,7 +876,7 @@ defmodule Cachex do """ @spec invoke(cache, atom, any, Keyword.t) :: any def invoke(cache, cmd, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :invoke, [ cmd, key, options ] }) + do: Router.call(cache, { :invoke, [ cmd, key, options ] }) @doc """ Deserializes a cache from a location on a filesystem. @@ -911,7 +911,7 @@ defmodule Cachex do @spec load(cache, binary, Keyword.t) :: { status, any } def load(cache, path, options \\ []) when is_binary(path) and is_list(options), - do: Router.dispatch(cache, { :load, [ path, options ] }) + do: Router.call(cache, { :load, [ path, options ] }) @doc """ Removes an expiration time from an entry in a cache. @@ -946,7 +946,7 @@ defmodule Cachex do """ @spec purge(cache, Keyword.t) :: { status, number } def purge(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :purge, [ options ] }) + do: Router.call(cache, { :purge, [ options ] }) @doc """ Places an entry in a cache. @@ -975,7 +975,7 @@ defmodule Cachex do # TODO: maybe rename TTL to be expiration? @spec put(cache, any, any, Keyword.t) :: { status, boolean } def put(cache, key, value, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :put, [ key, value, options ] }) + do: Router.call(cache, { :put, [ key, value, options ] }) @doc """ Places a batch of entries in a cache. @@ -1007,7 +1007,7 @@ defmodule Cachex do @spec put_many(cache, [ { any, any } ], Keyword.t) :: { status, boolean } def put_many(cache, pairs, options \\ []) when is_list(pairs) and is_list(options), - do: Router.dispatch(cache, { :put_many, [ pairs, options ] }) + do: Router.call(cache, { :put_many, [ pairs, options ] }) @doc """ Refreshes an expiration for an entry in a cache. @@ -1034,7 +1034,7 @@ defmodule Cachex do """ @spec refresh(cache, any, Keyword.t) :: { status, boolean } def refresh(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :refresh, [ key, options ] }) + do: Router.call(cache, { :refresh, [ key, options ] }) @doc """ Resets a cache by clearing the keyspace and restarting any hooks. @@ -1076,7 +1076,7 @@ defmodule Cachex do """ @spec reset(cache, Keyword.t) :: { status, true } def reset(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :reset, [ options ] }) + do: Router.call(cache, { :reset, [ options ] }) @doc """ Deprecated implementation delegate of `put/4`. @@ -1113,7 +1113,7 @@ defmodule Cachex do """ @spec size(cache, Keyword.t) :: { status, number } def size(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :size, [ options ] }) + do: Router.call(cache, { :size, [ options ] }) @doc """ Retrieves statistics about a cache. @@ -1139,7 +1139,7 @@ defmodule Cachex do """ @spec stats(cache, Keyword.t) :: { status, %{ } } def stats(cache, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :stats, [ options ] }) + do: Router.call(cache, { :stats, [ options ] }) @doc """ Creates a `Stream` of entries in a cache. @@ -1187,7 +1187,7 @@ defmodule Cachex do @spec stream(cache, any, Keyword.t) :: { status, Enumerable.t } def stream(cache, query \\ Query.create(true), options \\ []) when is_list(options), - do: Router.dispatch(cache, { :stream, [ query, options ] }) + do: Router.call(cache, { :stream, [ query, options ] }) @doc """ Takes an entry from a cache. @@ -1210,7 +1210,7 @@ defmodule Cachex do """ @spec take(cache, any, Keyword.t) :: { status, any } def take(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :take, [ key, options ] }) + do: Router.call(cache, { :take, [ key, options ] }) @doc """ Updates the last write time on a cache entry. @@ -1220,7 +1220,7 @@ defmodule Cachex do """ @spec touch(cache, any, Keyword.t) :: { status, boolean } def touch(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :touch, [ key, options ] }) + do: Router.call(cache, { :touch, [ key, options ] }) @doc """ Executes multiple functions in the context of a transaction. @@ -1257,7 +1257,7 @@ defmodule Cachex do |> Overseer.update(&cache(&1, transactional: true)) end - Router.dispatch(trans_cache, { :transaction, [ keys, operation, options ] }) + Router.call(trans_cache, { :transaction, [ keys, operation, options ] }) end end @@ -1282,7 +1282,7 @@ defmodule Cachex do """ @spec ttl(cache, any, Keyword.t) :: { status, number } def ttl(cache, key, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :ttl, [ key, options ] }) + do: Router.call(cache, { :ttl, [ key, options ] }) @doc """ Updates an entry in a cache. @@ -1308,7 +1308,7 @@ defmodule Cachex do """ @spec update(cache, any, any, Keyword.t) :: { status, any } def update(cache, key, value, options \\ []) when is_list(options), - do: Router.dispatch(cache, { :update, [ key, value, options ] }) + do: Router.call(cache, { :update, [ key, value, options ] }) ############### # Private API # diff --git a/lib/cachex/actions/clear.ex b/lib/cachex/actions/clear.ex index 2a6d71d1..725dc538 100644 --- a/lib/cachex/actions/clear.ex +++ b/lib/cachex/actions/clear.ex @@ -22,11 +22,16 @@ defmodule Cachex.Actions.Clear do This action executes inside a transaction to ensure that there are no keys under a lock - thus ensuring consistency (any locks are executed sequentially). """ - def execute(cache(name: name) = cache, _options) do + def execute(cache(name: name) = cache, options) do Locksmith.transaction(cache, [], fn -> + options = + options + |> Keyword.take([ :local ]) + |> Enum.concat(const(:notify_false)) + evicted = cache - |> Cachex.size(const(:notify_false)) + |> Cachex.size(options) |> handle_evicted true = :ets.delete_all_objects(name) diff --git a/lib/cachex/actions/empty.ex b/lib/cachex/actions/empty.ex index d0e50423..92059bde 100644 --- a/lib/cachex/actions/empty.ex +++ b/lib/cachex/actions/empty.ex @@ -21,8 +21,13 @@ defmodule Cachex.Actions.Empty do Internally this action is delegated through to the `size()` command and the returned numeric value is just "cast" to a boolean value. """ - def execute(cache() = cache, _options) do - { :ok, size } = Cachex.size(cache, const(:notify_false)) + def execute(cache() = cache, options) do + options = + options + |> Keyword.take([ :local ]) + |> Enum.concat(const(:notify_false)) + + { :ok, size } = Cachex.size(cache, options) { :ok, size == 0 } end end diff --git a/lib/cachex/actions/inspect.ex b/lib/cachex/actions/inspect.ex index 5cdceecd..c6bf54e5 100644 --- a/lib/cachex/actions/inspect.ex +++ b/lib/cachex/actions/inspect.ex @@ -20,7 +20,7 @@ defmodule Cachex.Actions.Inspect do @type option :: { :expired, :count } | { :expired, :keys } | { :janitor, :last } | { :memory, :bytes } | { :memory, :binary } | { :memory, :words } | - { :record, any } | :cache + { :entry, any } | :cache # pre-calculated memory size @memory_exponent :math.log(1024) @@ -51,13 +51,13 @@ defmodule Cachex.Actions.Inspect do There are many options broken up by function head, so please see the source commands for definition for further documentation. """ - def execute(cache, option) + def execute(cache, option, options) # Retrieves the internal state of the cache. # # This is relatively easy to get via other methods, but it's available here # as the "best" way for a developer to do so (outside of the internal API). - def execute(cache(name: name), :cache), + def execute(cache(name: name), :cache, _options), do: { :ok, Overseer.retrieve(name) } # Retrieves a raw entry from the cache table. @@ -65,7 +65,7 @@ defmodule Cachex.Actions.Inspect do # This is useful when you need access to a record which may have expired. If # the entry does not exist, a nil value will be returned instead. Expirations # are not taken into account (either lazily or otherwise) on this read call. - def execute(cache(name: name), { :entry, key }) do + def execute(cache(name: name), { :entry, key }, _options) do case :ets.lookup(name, key) do [ ] -> { :ok, nil } [e] -> { :ok, e } @@ -77,7 +77,7 @@ defmodule Cachex.Actions.Inspect do # The number of entries returned represents the number of records which will # be removed on the next run of the Janitor service. It does not track the # number of expired records which have already been purged or removed. - def execute(cache(name: name), { :expired, :count }), + def execute(cache(name: name), { :expired, :count }, _options), do: { :ok, :ets.select_count(name, Query.expired(true)) } # Returns the keys of expired entries currently inside the cache. @@ -85,7 +85,7 @@ defmodule Cachex.Actions.Inspect do # This is essentially the same as the definition above, except that it will # return the list of entry keys rather than just a count. Naturally this is # an expensive call and should really only be used when debugging. - def execute(cache(name: name), { :expired, :keys }), + def execute(cache(name: name), { :expired, :keys }, _options), do: { :ok, :ets.select(name, Query.expired(:key)) } # Returns information about the last run of the Janitor service. @@ -95,15 +95,15 @@ defmodule Cachex.Actions.Inspect do # schema is defined in the `Cachex.Services.Janitor` module. # # In the case the Janitor service is not running, an error will be returned. - def execute(cache() = cache, { :janitor, :last }), + def execute(cache() = cache, { :janitor, :last }, _options), do: Janitor.last_run(cache) # Retrieves the current size of the backing cache table in bytes. # # This should be treated as an estimation as it's rounded based on # the number of words used to maintain the cache. - def execute(cache() = cache, { :memory, :bytes }) do - { :ok, mem_words } = execute(cache, { :memory, :words }) + def execute(cache() = cache, { :memory, :bytes }, options) do + { :ok, mem_words } = execute(cache, { :memory, :words }, options) { :ok, mem_words * :erlang.system_info(:wordsize) } end @@ -111,8 +111,8 @@ defmodule Cachex.Actions.Inspect do # # This should be treated as an estimation as it's rounded based on the number # of words used to maintain the cache. - def execute(cache() = cache, { :memory, :binary }) do - { :ok, bytes } = execute(cache, { :memory, :bytes }) + def execute(cache() = cache, { :memory, :binary }, options) do + { :ok, bytes } = execute(cache, { :memory, :bytes }, options) { :ok, bytes_to_readable(bytes) } end @@ -120,11 +120,11 @@ defmodule Cachex.Actions.Inspect do # # It's unlikely the caller will want to use this directly, but as it's used # by other inspection methods there's no harm in exposing it in the API. - def execute(cache(name: name), { :memory, :words }), + def execute(cache(name: name), { :memory, :words }, _options), do: { :ok, :ets.info(name, :memory) } # Catch-all to return an error. - def execute(_cache, _option), + def execute(_cache, _option, _options), do: error(:invalid_option) # Converts a number of bytes to a binary representation. diff --git a/lib/cachex/actions/reset.ex b/lib/cachex/actions/reset.ex index ef11f482..5f33a817 100644 --- a/lib/cachex/actions/reset.ex +++ b/lib/cachex/actions/reset.ex @@ -36,7 +36,7 @@ defmodule Cachex.Actions.Reset do |> Keyword.get(:only, [ :cache, :hooks ]) |> List.wrap - reset_cache(cache, only) + reset_cache(cache, only, options) reset_hooks(cache, only, options) { :ok, true } @@ -52,8 +52,16 @@ defmodule Cachex.Actions.Reset do # A cache is only emptied if the `:cache` property appears in the list of # cache components to reset. If not provided, this will short circut and # leave the cache table exactly as-is. - defp reset_cache(cache, only), - do: :cache in only and Clear.execute(cache, const(:notify_false)) + defp reset_cache(cache, only, options) do + with true <- :cache in only do + options = + options + |> Keyword.take([ :local ]) + |> Enum.concat(const(:notify_false)) + + Clear.execute(cache, options) + end + end # Handles reset of cache hooks. # diff --git a/lib/cachex/errors.ex b/lib/cachex/errors.ex index f6e945e4..5f36554f 100644 --- a/lib/cachex/errors.ex +++ b/lib/cachex/errors.ex @@ -12,14 +12,16 @@ defmodule Cachex.Errors do than bloating blocks with potentially large error messages. """ @known_errors [ - :invalid_command, :invalid_expiration, - :invalid_fallback, :invalid_hook, - :invalid_limit, :invalid_match, - :invalid_name, :invalid_option, - :invalid_pairs, :invalid_warmer, - :janitor_disabled, :no_cache, - :non_numeric_value, :not_started, - :stats_disabled, :unreachable_file + :cross_slot, :invalid_command, + :invalid_expiration, :invalid_fallback, + :invalid_hook, :invalid_limit, + :invalid_match, :invalid_name, + :invalid_nodes, :invalid_option, + :invalid_pairs, :invalid_warmer, + :janitor_disabled, :no_cache, + :non_distributed, :non_numeric_value, + :not_started, :stats_disabled, + :unreachable_file ] ########## @@ -53,11 +55,10 @@ defmodule Cachex.Errors do Error identifiers should be atoms and should be contained in the list of errors returned by `known/0`. The return type from this function will always be a binary. - - If an invalid error identifer is provided, there will simply be an error due - to no matching function head (and this is intended). """ @spec long_form(atom) :: binary + def long_form(:cross_slot), + do: "Target keys do not live on the same node" def long_form(:invalid_command), do: "Invalid command definition provided" def long_form(:invalid_expiration), @@ -72,6 +73,8 @@ defmodule Cachex.Errors do do: "Invalid match specification provided" def long_form(:invalid_name), do: "Invalid cache name provided" + def long_form(:invalid_nodes), + do: "Invalid nodes list provided" def long_form(:invalid_option), do: "Invalid option syntax provided" def long_form(:invalid_pairs), @@ -82,6 +85,8 @@ defmodule Cachex.Errors do do: "Specified janitor process running" def long_form(:no_cache), do: "Specified cache not running" + def long_form(:non_distributed), + do: "Attempted to use a local function across nodes" def long_form(:non_numeric_value), do: "Attempted arithmetic operations on a non-numeric value" def long_form(:not_started), @@ -90,4 +95,6 @@ defmodule Cachex.Errors do do: "Stats are not enabled for the specified cache" def long_form(:unreachable_file), do: "Unable to access provided file path" + def long_form(error), + do: error end diff --git a/lib/cachex/options.ex b/lib/cachex/options.ex index a3d59dff..53344cf9 100644 --- a/lib/cachex/options.ex +++ b/lib/cachex/options.ex @@ -17,6 +17,7 @@ defmodule Cachex.Options do # option parser order @option_parsers [ :name, + :nodes, :limit, :hooks, :commands, @@ -260,6 +261,30 @@ defmodule Cachex.Options do defp parse_type(:name, name, _options), do: cache(name: name) + # Configures any nodes assigned to the cache. + # + # This will enforce a non-empty list of nodes, containing at least + # the current local node. The list will be deduplicated, and sorted + # to ensure a deterministic ordering across nodes. + defp parse_type(:nodes, cache, options) do + nodes = + options + |> Keyword.get(:nodes, [ ]) + |> Enum.concat([ node() ]) + |> Enum.uniq + |> Enum.sort + + valid = + nodes + |> List.delete(node()) + |> Enum.all?(&Node.connect/1) + + case valid do + false -> error(:invalid_nodes) + true -> cache(cache, [ nodes: nodes ]) + end + end + # Configures a cache based on transaction flags. # # This will simply configure the `:transactional` field in the cache diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index cfe5eef5..dd415318 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -15,8 +15,13 @@ defmodule Cachex.Router do alias Services.Informant alias Services.Overseer - # actions which don't notify hooks for some reason - @quiet_actions [ :inspect, :reset, :stats, :transaction ] + # import macro stuff + import Cachex.Errors + import Cachex.Spec + + ############## + # Public API # + ############## @doc """ Dispatches a call to an appropriate execution environment. @@ -25,7 +30,7 @@ defmodule Cachex.Router do names are runtime, when they can be guaranteed at compile time much more easily. """ - defmacro dispatch(cache, { action, _arguments } = call) do + defmacro call(cache, { action, _arguments } = call) do act_name = action |> Kernel.to_string @@ -47,23 +52,33 @@ defmodule Cachex.Router do This macro should not be called externally; the only reason it remains public is due to the code injected by the `dispatch/2` macro. """ - defmacro execute(cache, module, call) - - # Quietly executes actions which do not broadcast. - defmacro execute(cache, module, { action, arguments }) - when action in @quiet_actions do + defmacro execute(cache, module, call) do quote do - apply(unquote(module), :execute, [ unquote(cache) | unquote(arguments) ]) + current = node() + case unquote(cache) do + cache(nodes: [ ^current ]) -> + unquote(configure_local(cache, module, call)) + cache(nodes: remote_nodes) -> + unquote(configure_remote(cache, module, call, quote(do: remote_nodes))) + end end end - # Executes a cache action, broadcasting to all hooks. + ############### + # Private API # + ############### + + # Provides handling for local actions on this node. # - # Hooks will be notified before and after the exection of the action. Rather - # than relying on this being hand-written everywhere, this is automatically - # injected to all dispatched actions at compile time to ensure that all cache - # notifications will be handled automatically. - defmacro execute(cache, module, { _action, arguments } = call) do + # This will provide handling of notifications across hooks before and after + # the execution of an action. This is taken from code formerly in the old + # `Cachex.Actions` module, but has been moved here as it's more appopriate. + # + # If `notify` is set to false, notifications are disabled and the call is + # simply executed as is. If `via` is provided, you can override the handle + # passed to the hooks (useful for re-use of functions). An example of this + # is `decr/4` which simply calls `incr/4` with `via: { :decr, arguments }`. + defp configure_local(cache, module, { _action, arguments } = call) do quote do option = List.last(unquote(arguments)) notify = Keyword.get(option, :notify, true) @@ -84,4 +99,182 @@ defmodule Cachex.Router do result end end + + # actions based on a key + @keyed_actions [ + :del, :exists?, :expire, :fetch, :get, :get_and_update, + :incr, :invoke, :put, :refresh, :take, :touch, + :ttl, :update + ] + + # Provides handling to key-based actions distributed to remote nodes. + # + # The algorithm here is simple; hash the key and slot the value using JCH into + # the total number of slots available (i.e. the count of the nodes). If it comes + # out to the local node, just execute the local code, otherwise RPC the base call + # to the remote node, and just assume that it'll correctly handle it. + defp configure_remote(cache, module, { action, [ key | _ ] } = call, nodes) + when action in @keyed_actions do + quote do + unquote(call_slot(cache, module, call, nodes, slot_key(key, nodes))) + end + end + + # actions which merge outputs + @merge_actions [ + :clear, :count, :empty?, :keys, + :purge, :reset, :size, :stats + ] + + # Provides handling of cross-node actions distributed over remote nodes. + # + # This will do an RPC call across all nodes to fetch their results and merge + # them with the results on the local node. The hooks will only be notified + # on the local node, due to an annoying recursion issue when handling the + # same across all nodes - seems to provide better logic though. + defp configure_remote(cache, module, { action, arguments } = call, nodes) + when action in @merge_actions do + quote do + # all calls have options we can use + options = List.last(unquote(arguments)) + + results = + # can force local node setting local: true + case Keyword.get(options, :local) do + true -> [] + _any -> + # don't want to execute on the local node + other_nodes = List.delete(unquote(nodes), node()) + + # execute the call on all other nodes + { results, _ } = :rpc.multicall( + other_nodes, + unquote(module), + :execute, + [ unquote(cache) | unquote(arguments) ] + ) + + results + end + + # execution on the local node, using the local macros and then unpack + { :ok, result } = (unquote(configure_local(cache, module, call))) + + # results merge + merge_result = + results + |> Enum.map(&elem(&1, 1)) + |> Enum.reduce(result, fn + # lists are always joined up + (result, acc) when is_list(acc) -> + acc ++ result + + # numbers are always summed + (result, acc) when is_number(acc) -> + acc + result + + # booleans are just and-ed + (result, acc) when is_boolean(acc) -> + acc && result + + # maps are always merged + (result, acc) when is_map(acc) -> + Map.merge(acc, result) + end) + + # return after merge + { :ok, merge_result } + end + end + + # Provides handling of `:inspect` operations. + # + # These operations are guaranteed to run on the local nodes. + defp configure_remote(cache, module, { :inspect, _arguments } = call, _nodes), + do: configure_local(cache, module, call) + + # Provides handling of `:put_many` operations. + # + # These operations can only execute if their keys slot to the same remote nodes. + defp configure_remote(cache, module, { :put_many, _arguments } = call, nodes), + do: multi_call_slot(cache, module, call, nodes, quote(do: &elem(&1, 0))) + + # Provides handling of `:transaction` operations. + # + # These operations can only execute if their keys slot to the same remote nodes. + defp configure_remote(cache, module, { :transaction, [ keys | _ ] } = call, nodes) do + case keys do + [] -> configure_local(cache, module, call) + __ -> multi_call_slot(cache, module, call, nodes, quote(do: &(&1))) + end + end + + # Any other actions are explicitly disabled in distributed environments. + defp configure_remote(_cache, _module, _call, _nodes), + do: error(:non_distributed) + + # Calls a slot for the provided cache action. + # + # This will determine a local slot and delegate locally if so, bypassing + # any RPC calls required. This function currently assumes that there is + # a local variable available named "remote_nodes" and "slot", until I + # figure out how to better improve the macro scoping in use locally. + defp call_slot(cache, module, { action, arguments } = call, nodes, slot) do + quote do + case Enum.at(unquote(nodes), unquote(slot)) do + ^current -> + unquote(configure_local(cache, module, call)) + + targeted -> + result = :rpc.call( + targeted, + Cachex, + unquote(action), + [ cache(unquote(cache), :name) | unquote(arguments) ] + ) + + with { :badrpc, reason } <- result do + { :error, reason } + end + end + end + end + + # Calls a slot for the provided cache action if all keys slot to the same node. + # + # This is a delegate handler for `call_slot/5`, but ensures that all keys slot to the + # same node to avoid the case where we have to fork a call out internally. + defp multi_call_slot(cache, module, { _action, [ keys | _ ] } = call, nodes, mapper) do + quote do + # map all keys to a slot in the nodes list + slots = Enum.map(unquote(keys), fn(key) -> + # basically just slot_key(mapper.(key), nodes) + unquote(slot_key(quote(do: unquote(mapper).(key)), nodes)) + end) + + # unique to avoid dups + case Enum.uniq(slots) do + # if there's a single slot it's safe to continue with the call to the remote + [ slot ] -> unquote(call_slot(cache, module, call, nodes, quote(do: slot))) + + # otherwise, cross_slot errors! + _disable -> error(:cross_slot) + end + end + end + + # Slots a key into the list of provided nodes. + # + # This uses `:erlang.phash2/1` to hash the key to a numeric value, + # as keys can be basically any type - so others hashes would be + # more expensive due to the serialization costs. Note that the + # collision possibility isn't really relevant, as long as there's + # a uniformly random collision possibility. + defp slot_key(key, nodes) do + quote do + unquote(key) + |> :erlang.phash2 + |> Jumper.slot(length(unquote(nodes))) + end + end end diff --git a/lib/cachex/services/janitor.ex b/lib/cachex/services/janitor.ex index 05feab8e..339ca229 100644 --- a/lib/cachex/services/janitor.ex +++ b/lib/cachex/services/janitor.ex @@ -105,7 +105,7 @@ defmodule Cachex.Services.Janitor do new_caches = Overseer.retrieve(name) { duration, { :ok, count } = result } = :timer.tc(fn -> - Cachex.purge(new_caches) + Cachex.purge(new_caches, const(:local)) end) case count do diff --git a/lib/cachex/spec.ex b/lib/cachex/spec.ex index 3a155f9b..a5b12d43 100644 --- a/lib/cachex/spec.ex +++ b/lib/cachex/spec.ex @@ -31,6 +31,7 @@ defmodule Cachex.Spec do fallback: fallback, hooks: hooks, limit: limit, + nodes: [ atom ], transactional: boolean, warmers: [ warmer ] ) @@ -107,6 +108,7 @@ defmodule Cachex.Spec do fallback: nil, hooks: nil, limit: nil, + nodes: [], transactional: false, warmers: [] @@ -314,6 +316,10 @@ defmodule Cachex.Spec do @spec const(atom) :: any defmacro const(key) + # Constant to only run locally. + defmacro const(:local), + do: quote(do: [ local: true ]) + # Constant to disable hook notifications. defmacro const(:notify_false), do: quote(do: [ notify: false ]) diff --git a/mix.exs b/mix.exs index 7e693394..a200cc1d 100644 --- a/mix.exs +++ b/mix.exs @@ -92,6 +92,7 @@ defmodule Cachex.Mixfile do [ # Production dependencies { :eternal, "~> 1.2" }, + { :jumper, "~> 1.0" }, { :unsafe, "~> 1.0" }, # Testing dependencies { :excoveralls, "~> 0.8", optional: true, only: [ :cover ] }, diff --git a/test/cachex/errors_test.exs b/test/cachex/errors_test.exs index aea59f4f..824e0aa0 100644 --- a/test/cachex/errors_test.exs +++ b/test/cachex/errors_test.exs @@ -6,6 +6,7 @@ defmodule Cachex.ErrorsTest do test "error generation and mapping" do # define all recognised errors errors = [ + cross_slot: "Target keys do not live on the same node", invalid_command: "Invalid command definition provided", invalid_expiration: "Invalid expiration definition provided", invalid_fallback: "Invalid fallback function provided", @@ -13,12 +14,14 @@ defmodule Cachex.ErrorsTest do invalid_limit: "Invalid limit fields provided", invalid_match: "Invalid match specification provided", invalid_name: "Invalid cache name provided", + invalid_nodes: "Invalid nodes list provided", invalid_option: "Invalid option syntax provided", invalid_pairs: "Invalid insertion pairs provided", invalid_warmer: "Invalid warmer definition provided", janitor_disabled: "Specified janitor process running", no_cache: "Specified cache not running", non_numeric_value: "Attempted arithmetic operations on a non-numeric value", + non_distributed: "Attempted to use a local function across nodes", not_started: "Cache table not active, have you started the Cachex application?", stats_disabled: "Stats are not enabled for the specified cache", unreachable_file: "Unable to access provided file path" @@ -36,4 +39,10 @@ defmodule Cachex.ErrorsTest do # make sure we're not missing any error definitions assert(length(Cachex.Errors.known()) == length(errors)) end + + # This just ensures that unrecognised errors are simply + # echoed back without change, in case of unknown errors. + test "unknown error echoing" do + assert(Cachex.Errors.long_form(:nodedown) == :nodedown) + end end diff --git a/test/cachex/spec_test.exs b/test/cachex/spec_test.exs index 13d5ffd9..e2e55cd3 100644 --- a/test/cachex/spec_test.exs +++ b/test/cachex/spec_test.exs @@ -23,6 +23,7 @@ defmodule Cachex.SpecTest do do: assert limit() == { :limit, nil, Cachex.Policy.LRW, 0.1, [] } test "generating constants via macros" do + assert const(:local) == [ local: true ] assert const(:notify_false) == [ notify: false ] assert const(:purge_override_call) == { :purge, [[]] } assert const(:purge_override_result) == { :ok, 1 } diff --git a/test/cachex_test.exs b/test/cachex_test.exs index 84f2c16b..1fcf8043 100644 --- a/test/cachex_test.exs +++ b/test/cachex_test.exs @@ -177,7 +177,7 @@ defmodule CachexTest do assert(is_even(length(definitions))) # verify the size to cause errors on addition/removal - assert(length(definitions) == 142) + assert(length(definitions) == 144) # validate all definitions for { name, arity } <- definitions do