Skip to content

Commit

Permalink
Merge pull request #13 from zackehh/issue-12
Browse files Browse the repository at this point in the history
Ensure nodes can reconnect after dropping out
  • Loading branch information
whitfin committed Apr 15, 2016
2 parents 403e1fa + 15b21fb commit 9443af4
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 37 deletions.
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ otp_release:
- 18.2
- 18.1
- 18.0
before_script:
- epmd -daemon
script:
- mix test --trace
- mix cachex.test --trace
after_success:
- mix coveralls.travis
- mix cachex.coveralls.travis
branches:
only:
- master
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ If you feel something can be improved, or have any questions about certain behav
If you *do* make changes to the codebase, please make sure you test your changes thoroughly, and include any unit tests alongside new or changed behaviours. Cachex currently uses the excellent [excoveralls](https://github.com/parroty/excoveralls) to track code coverage.

```elixir
$ mix test --trace
$ mix coveralls
$ mix coveralls.html && open cover/excoveralls.html
$ mix cachex.test
$ mix cachex.coveralls
$ mix cachex.coveralls.html && open cover/excoveralls.html
```
7 changes: 4 additions & 3 deletions coveralls.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
"^\\s+use\\s+"
],
"custom_stop_words": [
"defwrap.+(.+\\\\.+),?"
"defwrap.+"
],
"coverage_options": {
"treat_no_relevant_lines_as_covered": true
},
"skip_files": [
"lib/cachex/macros.ex",
"lib/cachex/macros/boilerplate.ex",
"lib/cachex/macros/gen_server.ex"
"lib/cachex/macros/*",
"lib/mix/cachex.ex",
"lib/mix/tasks/*",
]
}
97 changes: 82 additions & 15 deletions lib/cachex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ defmodule Cachex do
use Cachex.Macros.Boilerplate
use Supervisor

# import Connection
import Cachex.Connection

# add some aliases
alias Cachex.Inspector
alias Cachex.Janitor
alias Cachex.Options
alias Cachex.Util
alias Cachex.Worker

Expand Down Expand Up @@ -42,7 +47,8 @@ defmodule Cachex do
@type status :: :ok | :error | :missing

@doc """
Initialize the Mnesia table and supervision tree for this cache.
Initialize the Mnesia table and supervision tree for this cache, linking the
cache to the current process.
We also allow the user to define their own options for the cache. We start a
Supervisor to look after all internal workers backing the cache, in order to
Expand Down Expand Up @@ -152,26 +158,41 @@ defmodule Cachex do
"""
@spec start_link(options, options) :: { atom, pid }
def start_link(options \\ [], supervisor_options \\ []) do
with { :ok, true } <- ensure_not_started(options[:name]),
{ :ok, opts } <- parse_options(options),
{ :ok, true } <- start_table(opts),
do: Supervisor.start_link(__MODULE__, opts, supervisor_options)
with { :ok, opts } <- setup_env(options) do
Supervisor.start_link(__MODULE__, opts, supervisor_options)
end
end

@doc """
Initialize the Mnesia table and supervision tree for this cache, without linking
the cache to the current process.
Supports all the same options as `start_link/2`. This is mainly used for testing
in order to keep caches around when processes may be torn down.
"""
@spec start(options) :: { atom, pid }
def start(options \\ []) do
with { :ok, opts } <- setup_env(options) do
Janitor.start(opts, [ name: opts.janitor ])
Worker.start(opts, [ name: opts.cache ])
{ :ok, self }
end
end

@doc false
# Basic initialization phase, being passed arguments by the Supervisor.
#
# This function sets up the Mnesia table and options are parsed before being used
# to setup the internal workers. Workers are then given to `supervise/2`.
@spec init(Cachex.Options) :: { status, any }
def init(%Cachex.Options{ } = options) do
@spec init(Options) :: { status, any }
def init(%Options{ } = options) do
ttl_workers = case options.ttl_interval do
nil -> []
_other -> [worker(Cachex.Janitor, [options])]
_other -> [worker(Janitor, [options, [ name: options.janitor ]])]
end

children = ttl_workers ++ [
worker(Cachex.Worker, [options, [name: options.cache]])
worker(Worker, [options, [ name: options.cache ]])
]

supervise(children, strategy: :one_for_one)
Expand Down Expand Up @@ -369,12 +390,47 @@ defmodule Cachex do
"""
@spec abort(cache, any, options) :: Exception
def abort(cache, reason, options \\ []) when is_list(options) do
defwrap abort(cache, reason, options \\ []) when is_list(options) do
do_action(cache, fn(_) ->
:mnesia.is_transaction && :mnesia.abort(reason)
{ :ok, :mnesia.is_transaction && :mnesia.abort(reason) }
end)
end

@doc """
Adds a remote node to this cache. This should typically only be called internally.
Calling `add_node/2` will add the provided node to Mnesia and then create a new
replica on the node. We update the worker with knowledge of the node change to
ensure consistency.
## Examples
iex> Cachex.add_node(:my_cache, :node@remotehost)
{ :ok, :true }
"""
@spec add_node(cache, atom) :: { status, true | false | binary }
defwrap add_node(cache, node) when is_atom(node) do
case :net_adm.ping(node) do
:pong ->
:mnesia.change_config(:extra_db_nodes, [node])
:mnesia.add_table_copy(cache, node, :ram_copies)

server = case cache do
val when is_atom(val) ->
val
val ->
val.cache
end

GenServer.call(server, { :add_node, node })

{ :ok, true }
:pang ->
{ :error, "Unable to reach remote node!" }
end
end

@doc """
Removes all key/value pairs from the cache.
Expand Down Expand Up @@ -1091,14 +1147,25 @@ defmodule Cachex do
# Parses a keyword list of options into a Cachex Options structure. We return
# it in tuple just to avoid compiler warnings when using it with the `with` block.
defp parse_options(options) when is_list(options),
do: { :ok, Cachex.Options.parse(options) }
do: { :ok, Options.parse(options) }

# Runs through the initial setup for a cache, parsing a list of options into
# a set of Cachex options, before adding the node to any remote nodes and then
# setting up the local table. This is separated out as it's required in both
# `start_link/2` and `start/1`.
defp setup_env(options) when is_list(options) do
with { :ok, true } <- ensure_not_started(options[:name]),
{ :ok, opts } <- parse_options(options),
{ :ok, true } <- ensure_connection(opts),
{ :ok, true } <- start_table(opts),
do: { :ok, opts }
end

# Starts up an Mnesia table based on the provided options. If an error occurs
# when setting up the table, we return an error tuple to represent the issue.
defp start_table(%Cachex.Options{ } = options) do
defp start_table(%Options{ } = options) do
table_create = :mnesia.create_table(options.cache, [
{ :ram_copies, options.nodes },
{ :attributes, [ :key, :touched, :ttl, :value ]},
{ :attributes, [ :key, :touched, :ttl, :value ] },
{ :type, :set },
{ :storage_properties, [ { :ets, options.ets_opts } ] }
])
Expand Down
43 changes: 43 additions & 0 deletions lib/cachex/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Cachex.Connection do
@moduledoc false
# Module to handle the connections between remote Cachex nodes and to enforce
# replication and synchronicity between the nodes. Currently this module is
# rather small, but has been separated out in anticipation of further work.

# alias options
alias Cachex.Options

@doc """
Small module for handling the remote connections and rejoining a Cachex set of
remote nodes. We go through all supposed connected nodes and ping them to ensure
they're reachable - we then RPC to one of the online nodes to add the current
node to Mnesia. This will sync up the nodes and replication, and bring the node
back into the cluster.
"""
def ensure_connection(%Options{ cache: cache, nodes: nodes }) do
case find_online_nodes(nodes) do
[] -> { :ok, true }
li -> { :ok, reconnect_node(cache, li, node) || true }
end
end

# Searches a list of nodes to find those which are online (i.e. those which
# return :pong when pinged). We filter out the local node name to avoid pinging
# ourselves unnecessarily.
defp find_online_nodes(nodes) do
nodes
|> Enum.filter(&(is_atom(&1)))
|> Enum.filter(&(&1 != node()))
|> Enum.filter(&(:net_adm.ping(&1) == :pong))
end

# Loops through a list of nodes and attempts to reconnect to this node from that
# node. We do it this way as the other nodes can safely replicate to this node
# before we create our local tables - ensuring consistency.
defp reconnect_node(cache, nodes, node) do
Enum.any?(nodes, fn(remote_node) ->
:rpc.call(remote_node, Cachex, :add_node, [cache, node]) == { :ok, true }
end)
end

end
13 changes: 12 additions & 1 deletion lib/cachex/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@ defmodule Cachex.Janitor do
with.
"""
def start_link(options \\ %Cachex.Options { }, gen_options \\ []) do
GenServer.start(__MODULE__, options, gen_options)
if options.ttl_interval do
GenServer.start_link(__MODULE__, options, gen_options)
end
end

@doc """
Same as `start_link/2` however this function does not link to the calling process.
"""
def start(options \\ %Cachex.Options { }, gen_options \\ []) do
if options.ttl_interval do
GenServer.start(__MODULE__, options, gen_options)
end
end

@doc """
Expand Down
10 changes: 8 additions & 2 deletions lib/cachex/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Cachex.Options do
default_fallback: nil, # the default fallback implementation
default_ttl: nil, # any default ttl values to use
fallback_args: nil, # arguments to pass to a cache loader
janitor: nil, # the name of the janitor attached (if any)
pre_hooks: nil, # any pre hooks to attach
post_hooks: nil, # any post hooks to attach
nodes: nil, # a list of nodes to connect to
Expand Down Expand Up @@ -49,7 +50,11 @@ defmodule Cachex.Options do
nil
end

remote_node_list = Util.get_opt_list(options, :nodes, [node()])
janitor = if ttl_interval do
Util.janitor_for_cache(cache)
end

remote_node_list = Enum.uniq([ node | Util.get_opt_list(options, :nodes, [])])
default_fallback = Util.get_opt_function(options, :default_fallback)

fallback_args =
Expand All @@ -69,7 +74,7 @@ defmodule Cachex.Options do
type: :post,
results: true,
server_args: [
name: Cachex.Util.stats_for_cache(cache)
name: Util.stats_for_cache(cache)
]
})
false ->
Expand All @@ -91,6 +96,7 @@ defmodule Cachex.Options do
"default_fallback": default_fallback,
"default_ttl": default_ttl,
"fallback_args": fallback_args,
"janitor": janitor,
"nodes": remote_node_list,
"pre_hooks": pre_hooks,
"post_hooks": post_hooks,
Expand Down
38 changes: 37 additions & 1 deletion lib/cachex/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ defmodule Cachex.Util do
"""
def reply(value, state), do: { :reply, value, state }

@doc """
Appends a string to an atom and returns as an atom.
"""
def atom_append(atom, suffix),
do: String.to_atom(to_string(atom) <> suffix)

@doc """
Converts a number of memory bytes to a binary representation.
"""
Expand All @@ -42,6 +48,20 @@ defmodule Cachex.Util do
|> IO.iodata_to_binary
end

@doc """
Creates a long machine name from a provided binary name. If a hostname is given,
it will be used - otherwise we default to using the local node's hostname.
"""
def create_node_name(name, hostname \\ nil)
def create_node_name(name, hostname) when is_atom(name),
do: name |> to_string |> create_node_name(hostname)
def create_node_name(name, hostname) when is_binary(name) do
String.to_atom(name <> "@" <> case hostname do
nil -> local_hostname()
val -> val
end)
end

@doc """
Creates an input record based on a key, value and expiration. If the value
passed is nil, then we apply any defaults. Otherwise we add the value
Expand Down Expand Up @@ -214,6 +234,13 @@ defmodule Cachex.Util do
updated_map
end

@doc """
Very small handler for appending "_janitor" to the name of a cache in order to
create the name of a Janitor automatically.
"""
def janitor_for_cache(cache) when is_atom(cache),
do: atom_append(cache, "_janitor")

@doc """
Retrieves the last item in a Tuple. This is just shorthand around sizeof and
pulling the last element.
Expand All @@ -232,6 +259,15 @@ defmodule Cachex.Util do
def list_to_tuple(list) when is_list(list),
do: Enum.reduce(list, {}, &(Tuple.append(&2, &1)))

@doc """
Retrieves the local hostname of this node.
"""
def local_hostname do
:inet.gethostname
|> elem(1)
|> to_string
end

@doc """
Returns a selection to return the designated value for all rows. Enables things
like finding all stored keys and all stored values.
Expand All @@ -257,7 +293,7 @@ defmodule Cachex.Util do
create the name of a stats hook automatically.
"""
def stats_for_cache(cache) when is_atom(cache),
do: String.to_atom(to_string(cache) <> "_stats")
do: atom_append(cache, "_stats")

@doc """
Very small unwrapper for an Mnesia start result. We accept already started tables
Expand Down
Loading

0 comments on commit 9443af4

Please sign in to comment.