Skip to content

Commit

Permalink
Introduce basic distribution support
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin committed Oct 4, 2018
1 parent dfd2aef commit 7ed4639
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 79 deletions.
62 changes: 31 additions & 31 deletions lib/cachex.ex
Expand Up @@ -77,7 +77,7 @@ defmodule Cachex do
get: [ 2, 3 ],
get_and_update: [ 3, 4 ],
incr: [ 2, 3, 4 ],
inspect: [ 2 ],
inspect: [ 2, 3 ],
invoke: [ 3, 4 ],
keys: [ 1, 2 ],
load: [ 2, 3 ],
Expand Down Expand Up @@ -351,7 +351,7 @@ defmodule Cachex do
"""
@spec clear(cache, Keyword.t) :: { status, integer }
def clear(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :clear, [ options ] })
do: Router.call(cache, { :clear, [ options ] })

@doc """
Retrieves the number of unexpired records in a cache.
Expand All @@ -372,7 +372,7 @@ defmodule Cachex do
"""
@spec count(cache, Keyword.t) :: { status, number }
def count(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :count, [ options ] })
do: Router.call(cache, { :count, [ options ] })

@doc """
Decrements an entry in the cache.
Expand Down Expand Up @@ -430,7 +430,7 @@ defmodule Cachex do
"""
@spec del(cache, any, Keyword.t) :: { status, boolean }
def del(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :del, [ key, options ] })
do: Router.call(cache, { :del, [ key, options ] })

@doc """
Serializes a cache to a location on a filesystem.
Expand Down Expand Up @@ -466,7 +466,7 @@ defmodule Cachex do
@spec dump(cache, binary, Keyword.t) :: { status, any }
def dump(cache, path, options \\ [])
when is_binary(path) and is_list(options),
do: Router.dispatch(cache, { :dump, [ path, options ] })
do: Router.call(cache, { :dump, [ path, options ] })

@doc """
Determines whether a cache contains any entries.
Expand All @@ -488,7 +488,7 @@ defmodule Cachex do
"""
@spec empty?(cache, Keyword.t) :: { status, boolean }
def empty?(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :empty?, [ options ] })
do: Router.call(cache, { :empty?, [ options ] })

@doc """
Executes multiple functions in the context of a cache.
Expand Down Expand Up @@ -542,7 +542,7 @@ defmodule Cachex do
"""
@spec exists?(cache, any, Keyword.t) :: { status, boolean }
def exists?(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :exists?, [ key, options ] })
do: Router.call(cache, { :exists?, [ key, options ] })

@doc """
Places an expiration time on an entry in a cache.
Expand All @@ -566,7 +566,7 @@ defmodule Cachex do
@spec expire(cache, any, number, Keyword.t) :: { status, boolean }
def expire(cache, key, expiration, options \\ [])
when (is_nil(expiration) or is_number(expiration)) and is_list(options),
do: Router.dispatch(cache, { :expire, [ key, expiration, options ] })
do: Router.call(cache, { :expire, [ key, expiration, options ] })

@doc """
Updates an entry in a cache to expire at a given time.
Expand Down Expand Up @@ -645,7 +645,7 @@ defmodule Cachex do
Overseer.enforce(cache) do
case fallback || fallback(cache(cache, :fallback), :default) do
val when is_function(val) ->
Router.dispatch(cache, { :fetch, [ key, val, options ] })
Router.call(cache, { :fetch, [ key, val, options ] })
_na ->
error(:invalid_fallback)
end
Expand All @@ -667,7 +667,7 @@ defmodule Cachex do
"""
@spec get(cache, any, Keyword.t) :: { atom, any }
def get(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :get, [ key, options ] })
do: Router.call(cache, { :get, [ key, options ] })

@doc """
Retrieves and updates an entry in a cache.
Expand Down Expand Up @@ -696,7 +696,7 @@ defmodule Cachex do
@spec get_and_update(cache, any, function, Keyword.t) :: { :commit | :ignore, any }
def get_and_update(cache, key, update_function, options \\ [])
when is_function(update_function) and is_list(options),
do: Router.dispatch(cache, { :get_and_update, [ key, update_function, options ] })
do: Router.call(cache, { :get_and_update, [ key, update_function, options ] })

@doc """
Retrieves a list of all entry keys from a cache.
Expand All @@ -718,7 +718,7 @@ defmodule Cachex do
"""
@spec keys(cache, Keyword.t) :: { status, [ any ] }
def keys(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :keys, [ options ] })
do: Router.call(cache, { :keys, [ options ] })

@doc """
Increments an entry in the cache.
Expand Down Expand Up @@ -751,7 +751,7 @@ defmodule Cachex do
@spec incr(cache, any, integer, Keyword.t) :: { status, integer }
def incr(cache, key, amount \\ 1, options \\ [])
when is_integer(amount) and is_list(options),
do: Router.dispatch(cache, { :incr, [ key, amount, options ] })
do: Router.call(cache, { :incr, [ key, amount, options ] })

@doc """
Inspects various aspects of a cache.
Expand Down Expand Up @@ -848,9 +848,9 @@ defmodule Cachex do
{ :ok, 1328 }
"""
@spec inspect(cache, atom | tuple) :: { status, any }
def inspect(cache, option),
do: Router.dispatch(cache, { :inspect, [ option ] })
@spec inspect(cache, atom | tuple, Keyword.t) :: { status, any }
def inspect(cache, option, options \\ []),
do: Router.call(cache, { :inspect, [ option, options ] })

@doc """
Invokes a custom command against a cache entry.
Expand All @@ -876,7 +876,7 @@ defmodule Cachex do
"""
@spec invoke(cache, atom, any, Keyword.t) :: any
def invoke(cache, cmd, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :invoke, [ cmd, key, options ] })
do: Router.call(cache, { :invoke, [ cmd, key, options ] })

@doc """
Deserializes a cache from a location on a filesystem.
Expand Down Expand Up @@ -911,7 +911,7 @@ defmodule Cachex do
@spec load(cache, binary, Keyword.t) :: { status, any }
def load(cache, path, options \\ [])
when is_binary(path) and is_list(options),
do: Router.dispatch(cache, { :load, [ path, options ] })
do: Router.call(cache, { :load, [ path, options ] })

@doc """
Removes an expiration time from an entry in a cache.
Expand Down Expand Up @@ -946,7 +946,7 @@ defmodule Cachex do
"""
@spec purge(cache, Keyword.t) :: { status, number }
def purge(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :purge, [ options ] })
do: Router.call(cache, { :purge, [ options ] })

@doc """
Places an entry in a cache.
Expand Down Expand Up @@ -975,7 +975,7 @@ defmodule Cachex do
# TODO: maybe rename TTL to be expiration?
@spec put(cache, any, any, Keyword.t) :: { status, boolean }
def put(cache, key, value, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :put, [ key, value, options ] })
do: Router.call(cache, { :put, [ key, value, options ] })

@doc """
Places a batch of entries in a cache.
Expand Down Expand Up @@ -1007,7 +1007,7 @@ defmodule Cachex do
@spec put_many(cache, [ { any, any } ], Keyword.t) :: { status, boolean }
def put_many(cache, pairs, options \\ [])
when is_list(pairs) and is_list(options),
do: Router.dispatch(cache, { :put_many, [ pairs, options ] })
do: Router.call(cache, { :put_many, [ pairs, options ] })

@doc """
Refreshes an expiration for an entry in a cache.
Expand All @@ -1034,7 +1034,7 @@ defmodule Cachex do
"""
@spec refresh(cache, any, Keyword.t) :: { status, boolean }
def refresh(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :refresh, [ key, options ] })
do: Router.call(cache, { :refresh, [ key, options ] })

@doc """
Resets a cache by clearing the keyspace and restarting any hooks.
Expand Down Expand Up @@ -1076,7 +1076,7 @@ defmodule Cachex do
"""
@spec reset(cache, Keyword.t) :: { status, true }
def reset(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :reset, [ options ] })
do: Router.call(cache, { :reset, [ options ] })

@doc """
Deprecated implementation delegate of `put/4`.
Expand Down Expand Up @@ -1113,7 +1113,7 @@ defmodule Cachex do
"""
@spec size(cache, Keyword.t) :: { status, number }
def size(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :size, [ options ] })
do: Router.call(cache, { :size, [ options ] })

@doc """
Retrieves statistics about a cache.
Expand All @@ -1139,7 +1139,7 @@ defmodule Cachex do
"""
@spec stats(cache, Keyword.t) :: { status, %{ } }
def stats(cache, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :stats, [ options ] })
do: Router.call(cache, { :stats, [ options ] })

@doc """
Creates a `Stream` of entries in a cache.
Expand Down Expand Up @@ -1187,7 +1187,7 @@ defmodule Cachex do
@spec stream(cache, any, Keyword.t) :: { status, Enumerable.t }
def stream(cache, query \\ Query.create(true), options \\ [])
when is_list(options),
do: Router.dispatch(cache, { :stream, [ query, options ] })
do: Router.call(cache, { :stream, [ query, options ] })

@doc """
Takes an entry from a cache.
Expand All @@ -1210,7 +1210,7 @@ defmodule Cachex do
"""
@spec take(cache, any, Keyword.t) :: { status, any }
def take(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :take, [ key, options ] })
do: Router.call(cache, { :take, [ key, options ] })

@doc """
Updates the last write time on a cache entry.
Expand All @@ -1220,7 +1220,7 @@ defmodule Cachex do
"""
@spec touch(cache, any, Keyword.t) :: { status, boolean }
def touch(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :touch, [ key, options ] })
do: Router.call(cache, { :touch, [ key, options ] })

@doc """
Executes multiple functions in the context of a transaction.
Expand Down Expand Up @@ -1257,7 +1257,7 @@ defmodule Cachex do
|> Overseer.update(&cache(&1, transactional: true))
end

Router.dispatch(trans_cache, { :transaction, [ keys, operation, options ] })
Router.call(trans_cache, { :transaction, [ keys, operation, options ] })
end
end

Expand All @@ -1282,7 +1282,7 @@ defmodule Cachex do
"""
@spec ttl(cache, any, Keyword.t) :: { status, number }
def ttl(cache, key, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :ttl, [ key, options ] })
do: Router.call(cache, { :ttl, [ key, options ] })

@doc """
Updates an entry in a cache.
Expand All @@ -1308,7 +1308,7 @@ defmodule Cachex do
"""
@spec update(cache, any, any, Keyword.t) :: { status, any }
def update(cache, key, value, options \\ []) when is_list(options),
do: Router.dispatch(cache, { :update, [ key, value, options ] })
do: Router.call(cache, { :update, [ key, value, options ] })

###############
# Private API #
Expand Down
9 changes: 7 additions & 2 deletions lib/cachex/actions/clear.ex
Expand Up @@ -22,11 +22,16 @@ defmodule Cachex.Actions.Clear do
This action executes inside a transaction to ensure that there are no keys under
a lock - thus ensuring consistency (any locks are executed sequentially).
"""
def execute(cache(name: name) = cache, _options) do
def execute(cache(name: name) = cache, options) do
Locksmith.transaction(cache, [], fn ->
options =
options
|> Keyword.take([ :local ])
|> Enum.concat(const(:notify_false))

evicted =
cache
|> Cachex.size(const(:notify_false))
|> Cachex.size(options)
|> handle_evicted

true = :ets.delete_all_objects(name)
Expand Down
9 changes: 7 additions & 2 deletions lib/cachex/actions/empty.ex
Expand Up @@ -21,8 +21,13 @@ defmodule Cachex.Actions.Empty do
Internally this action is delegated through to the `size()` command and the
returned numeric value is just "cast" to a boolean value.
"""
def execute(cache() = cache, _options) do
{ :ok, size } = Cachex.size(cache, const(:notify_false))
def execute(cache() = cache, options) do
options =
options
|> Keyword.take([ :local ])
|> Enum.concat(const(:notify_false))

{ :ok, size } = Cachex.size(cache, options)
{ :ok, size == 0 }
end
end
26 changes: 13 additions & 13 deletions lib/cachex/actions/inspect.ex
Expand Up @@ -20,7 +20,7 @@ defmodule Cachex.Actions.Inspect do
@type option :: { :expired, :count } | { :expired, :keys } |
{ :janitor, :last } | { :memory, :bytes } |
{ :memory, :binary } | { :memory, :words } |
{ :record, any } | :cache
{ :entry, any } | :cache

# pre-calculated memory size
@memory_exponent :math.log(1024)
Expand Down Expand Up @@ -51,21 +51,21 @@ defmodule Cachex.Actions.Inspect do
There are many options broken up by function head, so please see the source
commands for definition for further documentation.
"""
def execute(cache, option)
def execute(cache, option, options)

# Retrieves the internal state of the cache.
#
# This is relatively easy to get via other methods, but it's available here
# as the "best" way for a developer to do so (outside of the internal API).
def execute(cache(name: name), :cache),
def execute(cache(name: name), :cache, _options),
do: { :ok, Overseer.retrieve(name) }

# Retrieves a raw entry from the cache table.
#
# This is useful when you need access to a record which may have expired. If
# the entry does not exist, a nil value will be returned instead. Expirations
# are not taken into account (either lazily or otherwise) on this read call.
def execute(cache(name: name), { :entry, key }) do
def execute(cache(name: name), { :entry, key }, _options) do
case :ets.lookup(name, key) do
[ ] -> { :ok, nil }
[e] -> { :ok, e }
Expand All @@ -77,15 +77,15 @@ defmodule Cachex.Actions.Inspect do
# The number of entries returned represents the number of records which will
# be removed on the next run of the Janitor service. It does not track the
# number of expired records which have already been purged or removed.
def execute(cache(name: name), { :expired, :count }),
def execute(cache(name: name), { :expired, :count }, _options),
do: { :ok, :ets.select_count(name, Query.expired(true)) }

# Returns the keys of expired entries currently inside the cache.
#
# This is essentially the same as the definition above, except that it will
# return the list of entry keys rather than just a count. Naturally this is
# an expensive call and should really only be used when debugging.
def execute(cache(name: name), { :expired, :keys }),
def execute(cache(name: name), { :expired, :keys }, _options),
do: { :ok, :ets.select(name, Query.expired(:key)) }

# Returns information about the last run of the Janitor service.
Expand All @@ -95,36 +95,36 @@ defmodule Cachex.Actions.Inspect do
# schema is defined in the `Cachex.Services.Janitor` module.
#
# In the case the Janitor service is not running, an error will be returned.
def execute(cache() = cache, { :janitor, :last }),
def execute(cache() = cache, { :janitor, :last }, _options),
do: Janitor.last_run(cache)

# Retrieves the current size of the backing cache table in bytes.
#
# This should be treated as an estimation as it's rounded based on
# the number of words used to maintain the cache.
def execute(cache() = cache, { :memory, :bytes }) do
{ :ok, mem_words } = execute(cache, { :memory, :words })
def execute(cache() = cache, { :memory, :bytes }, options) do
{ :ok, mem_words } = execute(cache, { :memory, :words }, options)
{ :ok, mem_words * :erlang.system_info(:wordsize) }
end

# Retrieves the current size of the backing cache table in a readable format.
#
# This should be treated as an estimation as it's rounded based on the number
# of words used to maintain the cache.
def execute(cache() = cache, { :memory, :binary }) do
{ :ok, bytes } = execute(cache, { :memory, :bytes })
def execute(cache() = cache, { :memory, :binary }, options) do
{ :ok, bytes } = execute(cache, { :memory, :bytes }, options)
{ :ok, bytes_to_readable(bytes) }
end

# Retrieves the current size of the backing cache table in machine words.
#
# It's unlikely the caller will want to use this directly, but as it's used
# by other inspection methods there's no harm in exposing it in the API.
def execute(cache(name: name), { :memory, :words }),
def execute(cache(name: name), { :memory, :words }, _options),
do: { :ok, :ets.info(name, :memory) }

# Catch-all to return an error.
def execute(_cache, _option),
def execute(_cache, _option, _options),
do: error(:invalid_option)

# Converts a number of bytes to a binary representation.
Expand Down

0 comments on commit 7ed4639

Please sign in to comment.