Skip to content

Commit

Permalink
Move to using ETS for Cachex states
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin committed Jun 14, 2016
1 parent d7b9351 commit ce608bc
Show file tree
Hide file tree
Showing 72 changed files with 299 additions and 389 deletions.
254 changes: 85 additions & 169 deletions lib/cachex.ex

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions lib/cachex/application.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
defmodule Cachex.Application do
use Application

@moduledoc false
# Application callback to start any needed resources

def start(_type, _args) do
Cachex.State.init()
Cachex.State.start_link()
end

end
2 changes: 1 addition & 1 deletion lib/cachex/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Cachex.Connection do
# replication and synchronicity between the nodes. Currently this module is
# rather small, but has been separated out in anticipation of further work.

# alias options
# alias internals
alias Cachex.Options

@doc """
Expand Down
3 changes: 2 additions & 1 deletion lib/cachex/inspector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Cachex.Inspector do
# but at least we can be sure we're getting an accurate view.

# alias some modules
alias Cachex.State
alias Cachex.Util
alias Cachex.Worker

Expand Down Expand Up @@ -98,7 +99,7 @@ defmodule Cachex.Inspector do
to block the worker process.
"""
def inspect(cache, option) when option in [ :state, :worker ] do
{ :ok, GenServer.call(cache, { :state }) }
{ :ok, State.get(cache) }
end

@doc """
Expand Down
5 changes: 4 additions & 1 deletion lib/cachex/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ defmodule Cachex.Janitor do
# import utils for convenience
import Cachex.Util

# add a worker alias
alias Cachex.Worker

@moduledoc false
# The main TTL cleanup for Cachex, providing a very basic task scheduler to
# repeatedly cleanup the cache table for all records which have expired. This
Expand Down Expand Up @@ -80,7 +83,7 @@ defmodule Cachex.Janitor do
# Schedules a check to occur after the designated interval. Once scheduled,
# returns the state - this is just sugar for pipelining with a state.
defp update_evictions({ :ok, evictions } = result, state) when evictions > 0 do
GenServer.cast(state.cache, { :broadcast, { { :purge, [] }, result } })
Worker.broadcast(state.cache, { :purge, [] }, result)
state
end
defp update_evictions(_other, state), do: state
Expand Down
38 changes: 32 additions & 6 deletions lib/cachex/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,43 @@ defmodule Cachex.State do
@state_table :cachex_state_table

# transaction manager
@transaction_manager :cachex_transaction_manager
@transaction_manager :cachex_state_tm

@doc false
def init do
def start_link do
setup = fn ->
:ets.new(@state_table, [
:named_table,
:public,
{ :read_concurrency, true },
{ :write_concurrency, true }
])
end
Agent.start_link(setup, [ name: @transaction_manager ])
end

@doc false
def start do
with { :ok, pid } <- start_link do
:erlang.unlink(pid) && { :ok, pid }
end
end

@doc """
Removes a state from the local state table.
"""
@spec del(cache :: atom) :: true
def del(cache) when is_atom(cache) do
:ets.delete(@state_table, cache)
end

@doc """
Retrieves a state from the local state table, or `nil` if none exists.
"""
@spec get(cache :: atom) :: state :: Worker.t | nil
def get(cache) when is_atom(cache) do
case :ets.lookup(@state_table, cache) do
[{ @state_table, ^cache, state }] ->
[{ ^cache, state }] ->
state
_other ->
nil
Expand All @@ -57,7 +73,7 @@ defmodule Cachex.State do
"""
@spec set(cache :: atom, state :: Worker.t) :: true
def set(cache, %Worker{ } = state) when is_atom(cache) do
:ets.insert(@state_table, { @state_table, cache, state })
:ets.insert(@state_table, { cache, state })
end

@doc """
Expand All @@ -76,15 +92,25 @@ defmodule Cachex.State do
@state_table
end

@doc """
Carries out a blocking set of actions against the state table.
"""
@spec transaction(cache :: atom, function :: fun) :: any
def transaction(cache, fun) when is_atom(cache) and is_function(fun, 0) do
Agent.get(@transaction_manager, fn(_) ->
fun.()
end)
end

@doc """
Updates a state inside the local state table.
This is atomic and happens inside a transaction to ensure that we don't get
out of sync. Hooks are notified of the change, and the new state is returned.
"""
@spec update(cache :: atom, function :: (Worker.t -> Worker.t)) :: state :: Worker.t
def update(cache, fun) when is_atom(cache) and is_function(fun) do
Agent.get(@transaction_manager, fn(_) ->
def update(cache, fun) when is_atom(cache) and is_function(fun, 1) do
transaction(cache, fn ->
cstate = get(cache)
nstate = fun.(cstate)

Expand Down
120 changes: 26 additions & 94 deletions lib/cachex/worker.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
defmodule Cachex.Worker do
# use GenDelegate and GenServer
use GenDelegate
use GenServer

@moduledoc false
# The main worker for Cachex, providing access to the backing tables using a
# GenServer implementation. This is separated into a new process as we store a
Expand All @@ -15,6 +11,7 @@ defmodule Cachex.Worker do
alias Cachex.Janitor
alias Cachex.Notifier
alias Cachex.Options
alias Cachex.State
alias Cachex.Stats
alias Cachex.Util

Expand All @@ -29,27 +26,16 @@ defmodule Cachex.Worker do
# define some types
@type record :: { atom, any, number, number | nil, any }

@doc """
Simple initialization for use in the main owner process in order to start an
instance of a worker. All options are passed throught to the initialization
function, and the GenServer options are passed straight to GenServer to deal
with.
"""
def start_link(options \\ %Options { }, gen_options \\ []) do
GenServer.start_link(__MODULE__, options, gen_options)
end

@doc """
Main initialization phase of a worker, plucking out the options we care about
and storing them internally for later use by this worker.
"""
def init(options \\ %Options { }) do
state = %__MODULE__{
%__MODULE__{
actions: options.remote && __MODULE__.Remote || __MODULE__.Local,
cache: options.cache,
options: options
}
{ :ok, state }
end

###
Expand Down Expand Up @@ -389,89 +375,46 @@ defmodule Cachex.Worker do
@callback take(__MODULE__, any, list) :: { :ok | :missing, any }

###
# GenServer delegate functions for call/cast.
###

gen_delegate get(state, key, options), type: :call
gen_delegate get_and_update(state, key, update_fun, options), type: :call
gen_delegate set(state, key, value, options), type: [ :call, :cast ]
gen_delegate update(state, key, value, options), type: [ :call, :cast ]
gen_delegate del(state, key, options), type: [ :call, :cast ]
gen_delegate clear(state, options), type: [ :call, :cast ]
gen_delegate count(state, options), type: :call
gen_delegate execute(state, operations, options), type: [ :call, :cast ]
gen_delegate exists?(state, key, options), type: :call
gen_delegate expire(state, key, expiration, options), type: [ :call, :cast ]
gen_delegate keys(state, options), type: :call
gen_delegate incr(state, key, options), type: [ :call, :cast ]
gen_delegate purge(state, options), type: [ :call, :cast ]
gen_delegate refresh(state, key, options), type: [ :call, :cast ]
gen_delegate reset(state, options), type: [ :call, :cast ]
gen_delegate size(state, options), type: :call
gen_delegate stats(state, options), type: :call
gen_delegate stream(state, options), type: :call
gen_delegate take(state, key, options), type: :call
gen_delegate transaction(state, operations, options), type: [ :call, :cast ]
gen_delegate ttl(state, key, options), type: :call

###
# GenServer manual handlers for call/cast.
# Functions designed to only be used internally (i.e. those not forwarded to
# the main Cachex interfaces).
###

@doc """
Very tiny wrapper to retrieve the current state of a cache
"""
def handle_call({ :state }, _ctx, state),
do: { :reply, state, state }

@doc """
Handler for adding a node to the worker, to ensure that we use the correct
actions.
"""
def handle_call({ :add_node, new_node }, _ctx, state) do
new_options = %Options{ state.options |
remote: true,
nodes: if Enum.member?(state.options.nodes, new_node) do
state.options.nodes
else
[new_node|state.options.nodes]
end
}
def add_node(cache, new_node) when is_atom(cache) and is_atom(new_node) do
if State.member?(cache) do
State.update(cache, fn(state) ->
new_options = %Options{ state.options |
remote: true,
nodes: if Enum.member?(state.options.nodes, new_node) do
state.options.nodes
else
[new_node|state.options.nodes]
end
}

new_state = if state.options.remote do
%__MODULE__{ state | options: new_options }
else
%__MODULE__{ state | actions: __MODULE__.Remote, options: new_options }
if state.options.remote do
%__MODULE__{ state | options: new_options }
else
%__MODULE__{ state | actions: __MODULE__.Remote, options: new_options }
end
end)
end

modify_hooks(new_state)

{ :reply, { :ok, true }, new_state }
end

@doc """
Handler for broadcasting a set of actions and results to all registered hooks.
This is fired by out-of-proc calls (i.e. Janitors) which need to notify hooks.
"""
def handle_cast({ :broadcast, { action, result } }, state) do
do_action(state, action, fn -> result end)
{ :noreply, state }
end

@doc """
Provides a way to update the internal state from an external process. The state
as it currently exists is fed into the function for modification. This is used
for hook modification and should not be used externally.
"""
def handle_cast({ :modify, func }, state) when is_function(func) do
{ :noreply, state |> func.() |> modify_hooks }
def broadcast(cache, action, result) when is_atom(cache) do
case State.get(cache) do
nil -> false
val -> do_action(val, action, fn -> result end)
end
end

###
# Functions designed to only be used internally (i.e. those not forwarded to
# the main Cachex interfaces).
###

@doc """
Retrieves and updates a raw record in the database. This is used in several
places in order to allow easy modification. The record is fed to an update
Expand Down Expand Up @@ -552,17 +495,6 @@ defmodule Cachex.Worker do
result
end

# A binding for the update of hooks requiring anything of this cache. As it
# stands this is just the worker, but we call from multiple places to it makes
# sense to break out into a function.
defp modify_hooks(%__MODULE__{ } = state) do
state
|> combine_hooks
|> Enum.filter(&(&1.provide |> List.wrap |> Enum.member?(:worker)))
|> Enum.each(&(Hook.provision(&1, { :worker, state })))
state
end

# A small helper for resetting a cache only when defined in the list of items
# to reset. If the key `:cache` lives inside the list of things to reset, we
# simply call the internal `clear/2` function with notifications turned off.
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/cachex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Mix.Cachex do

:rpc.call(name, :mnesia, :start, [])
:rpc.call(name, :code, :add_paths, [:code.get_path])
:rpc.call(name, State, :init, [])
:rpc.call(name, State, :start, [])
end)
end

Expand Down
2 changes: 0 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ defmodule Cachex.Mixfile do
# Type "mix help deps" for more examples and options
defp deps do
[
# Production dependencies
{ :gen_delegate, "~> 1.0.0" },
# Local dependencies
{ :benchfella, "~> 0.3.2", optional: true, only: [ :dev, :test ] },
{ :benchwarmer, "~> 0.0.2", optional: true, only: [ :dev, :test ] },
Expand Down
2 changes: 1 addition & 1 deletion test/cachex_test/abort.exs → test/cachex/abort_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Abort do
defmodule Cachex.AbortTest do
use PowerAssert

test "abort does nothing when not in a transaction" do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Clear.Local do
defmodule Cachex.Clear.LocalTest do
use PowerAssert

setup do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Clear.Remote do
defmodule Cachex.Clear.RemoteTest do
use PowerAssert

setup do
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion test/cachex_test/count.exs → test/cachex/count_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Count do
defmodule Cachex.CountTest do
use PowerAssert

setup do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Decr.Local do
defmodule Cachex.Decr.LocalTest do
use PowerAssert

setup do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Decr.Remote do
defmodule Cachex.Decr.RemoteTest do
use PowerAssert

setup do
Expand Down
2 changes: 1 addition & 1 deletion test/cachex_test/decr.exs → test/cachex/decr_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Decr do
defmodule Cachex.DecrTest do
use PowerAssert

setup do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Del.Local do
defmodule Cachex.Del.LocalTest do
use PowerAssert

setup do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Del.Remote do
defmodule Cachex.Del.RemoteTest do
use PowerAssert

setup do
Expand Down
2 changes: 1 addition & 1 deletion test/cachex_test/del.exs → test/cachex/del_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule CachexTest.Del do
defmodule Cachex.DelTest do
use PowerAssert

setup do
Expand Down
Loading

0 comments on commit ce608bc

Please sign in to comment.