Skip to content

Commit

Permalink
Support watches
Browse files Browse the repository at this point in the history
  • Loading branch information
keathley committed Dec 10, 2019
1 parent b096011 commit 0130b1c
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 72 deletions.
15 changes: 12 additions & 3 deletions lib/vapor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Vapor do
Optional callback. Called when the configuration server starts. Passes the map
of the reified values.
"""
@callback init([{key, value}]) :: {:ok, value}
@callback init([{key, value}]) :: :ok

defmacro __using__(_opts) do
quote do
Expand Down Expand Up @@ -65,8 +65,8 @@ defmodule Vapor do
end
end

def init(values) do
{:ok, values}
def init(_values) do
:ok
end

defoverridable [init: 1]
Expand All @@ -93,6 +93,15 @@ defmodule Vapor do
end

def init({module, config}) do
table_opts = [
:set,
:public,
:named_table,
read_concurrency: true,
]

^module = :ets.new(module, table_opts)

children = [
{Watch.Supervisor, [name: Watch.Supervisor.sup_name(module)]},
{Store, {module, config}}
Expand Down
115 changes: 115 additions & 0 deletions lib/vapor/configuration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule Vapor.Configuration do
@moduledoc false
# Manages a layered set of configuration values.
# Not meant to be consumed by the end user
import Norm

defstruct [
layers: %{overrides: %{}},
translations: []
]

def s do
schema(%__MODULE__{
layers: map_of(one_of([spec(is_atom()), spec(is_integer())]), spec(is_map())),
translations: coll_of({spec(is_atom()), spec(is_function())}),
})
end

@doc """
Returns a new configuration with an initial set of layers and a list of
initial actions to run.
"""
def new(layers, translations) do
# We're abusing term ordering here. The `:overrides` atom will always
# be the highest precedence simply because its an atom
configuration = conform!(%__MODULE__{
layers: Map.merge(%{overrides: %{}}, layers),
translations: translations,
}, s())

merged = materialize(configuration)

actions =
merged
|> Enum.map(fn {key, value} -> {:upsert, key, value} end)

{configuration, merged, actions}
end

@doc """
Overwrites a value at a given path. Overwrites always take precedence over
any other configuration values.
"""
def set(config, key, value) do
overrides = config.layers.overrides
update(config, :overrides, Map.put(overrides, key, value))
end

@doc """
Updates a specific layer in the configuration.
"""
def update(%{layers: ls}=config, layer, value) do
old_paths = materialize(config)
new_config = %{config | layers: Map.put(ls, layer, value)}
new_paths = materialize(new_config)
actions = diff(new_paths, old_paths)

{new_config, actions}
end

defp materialize(config) do
config
|> flatten()
|> Enum.map(& do_translation(&1, config.translations))
|> Enum.into(%{})
end

# Takes an old configuration and new configuration and returns a list of
# commands needed to convert the old config into the new config.
defp diff(new_paths, old_paths) when is_map(new_paths) and is_map(old_paths) do
new_list =
new_paths
|> Enum.to_list

old_list =
old_paths
|> Enum.to_list

# This is expensive but it allows us to only diff the meaningful bits
diff(new_list -- old_list, old_list -- new_list, [])
end

# If we're out of new paths then any remaining old paths are deletes.
defp diff([], old_paths, acc) do
acc ++ Enum.map(old_paths, fn {path, _} -> {:delete, path} end)
end

# If we're out of old paths then everything left is an upsert by default
defp diff(new_paths, [], acc) do
acc ++ Enum.map(new_paths, fn {path, value} -> {:upsert, path, value} end)
end

# If we get here then we know that we need to do an upsert and remove any
# old configs with a matching path to our new config. Then we can keep
# recursing
defp diff([{path, value} | nps], old_paths, acc) do
acc = [{:upsert, path, value} | acc]
old_paths = Enum.reject(old_paths, fn {old_path, _} -> path == old_path end)
diff(nps, old_paths, acc)
end

defp flatten(%{layers: layers}) do
layers
|> Enum.sort(fn {a, _}, {b, _} -> a < b end) # Ensure proper sorting
|> Enum.map(fn {_, map} -> map end)
|> Enum.reduce(%{}, fn map, acc -> Map.merge(acc, map) end)
end

defp do_translation({key, value}, translations) do
case Enum.find(translations, fn {k, _f} -> key == k end) do
{_, f} -> {key, f.(value)}
_ -> {key, value}
end
end
end
40 changes: 40 additions & 0 deletions lib/vapor/plan.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Vapor.Plan do
@moduledoc false

alias Vapor.Provider

def new(providers) do
providers
|> Enum.map(fn provider -> if match?({_, _}, provider), do: provider, else: {provider, []} end)
|> Enum.with_index()
|> Enum.map(fn {provider, i} -> {i, provider} end)
|> Enum.into(%{})
end

def watches(plan) do
plan
|> Enum.filter(fn {_i, {_p, opts}} -> Keyword.get(opts, :watch) end)
end

def load(plan) do
results =
plan
|> Enum.map(fn {i, {provider, _ops}} -> {i, Provider.load(provider)} end)

errors =
results
|> Enum.map(fn {_i, result} -> result end)
|> Enum.filter(fn {result, _} -> result == :error end)

if Enum.any?(errors) do
{:error, errors}
else
layers =
results
|> Enum.map(fn {i, {:ok, v}} -> {i, v} end)
|> Enum.into(%{})

{:ok, layers}
end
end
end
7 changes: 0 additions & 7 deletions lib/vapor/providers/env.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,5 @@ defmodule Vapor.Provider.Env do
{:ok, bound_envs}
end
end

defp normalize(str, prefix) do
str
|> String.replace_leading(prefix, "")
|> String.downcase()
|> String.split("_")
end
end
end
60 changes: 12 additions & 48 deletions lib/vapor/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Vapor.Store do

alias Vapor.{
Configuration,
Provider,
Plan,
Watch,
}

Expand All @@ -23,28 +23,17 @@ defmodule Vapor.Store do
end

def init({module, config}) do
table_opts = [
:set,
:protected,
:named_table,
read_concurrency: true,
]
plan = Plan.new(config.plan)

^module = :ets.new(module, table_opts)

case load(config.plan) do
case Plan.load(plan) do
{:ok, layers} ->
translations = config[:translations] || []
{config, merged, actions} = Configuration.new(layers, config[:translations] || [])

merged =
layers
|> Enum.reduce(%{}, fn l, acc -> Map.merge(acc, Enum.into(l, %{})) end)
|> Enum.map(& do_translation(&1, translations))
|> Enum.into(%{})
with :ok <- module.init(merged) do
process_actions(actions, module)

with {:ok, values} <- module.init(merged) do
for {key, value} <- values do
:ets.insert(module, {key, value})
for watch <- Plan.watches(plan) do
start_watch(watch, module)
end

{:ok, %{config: config, table: module}}
Expand All @@ -63,8 +52,8 @@ defmodule Vapor.Store do
end

def handle_call({:set, key, value}, _from, %{config: config}=state) do
# {new_config, actions} = Configuration.set(config, key, value)
# process_actions(actions, state.table)
{new_config, actions} = Configuration.set(config, key, value)
process_actions(actions, state.table)

{:reply, {:ok, value}, %{state | config: new_config}}
end
Expand All @@ -81,32 +70,7 @@ defmodule Vapor.Store do
end)
end

defp load(providers) do
results =
providers
|> Enum.map(fn provider -> Provider.load(provider) end)

errors =
results
|> Enum.filter(fn {result, _} -> result == :error end)

if Enum.any?(errors) do
{:error, errors}
else
layers = Enum.into(results, [], fn {:ok, v} -> v end)

{:ok, layers}
end
end

defp do_translation({key, value}, translations) do
case Enum.find(translations, fn {k, _f} -> key == k end) do
{_, f} -> {key, f.(value)}
_ -> {key, value}
end
end

defp start_watch(layer, plan, module) do
Watch.Supervisor.start_child(module, %{layer: layer, plan: plan, store: module})
defp start_watch({layer, provider}, module) do
Watch.Supervisor.start_child(module, %{layer: layer, provider: provider, store: module})
end
end
18 changes: 9 additions & 9 deletions lib/vapor/watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ defmodule Vapor.Watch do

def init(state) do
state = Map.merge(%{error_count: 0, current_alarm: false}, state)
schedule_refresh(state.plan)
schedule_refresh(state.provider)
{:ok, state}
end

def handle_info(:refresh, %{plan: plan, store: store, layer: layer}=state) do
case Provider.load(plan.provider) do
def handle_info(:refresh, %{provider: {provider, opts}, store: store, layer: layer}=state) do
case Provider.load(provider) do
{:ok, new_config} ->
# We don't mind if we repeatedly spam the logs when there's a problem,
# but if everything is okay, we ought not attempt to clear an alarm
# that might not exist because the attempt with spam the logs.
if state.current_alarm do
:alarm_handler.clear_alarm({:vapor, {layer, plan.provider}})
:alarm_handler.clear_alarm({:vapor, {layer, provider}})
end
:ok = Vapor.Store.update(store, layer, new_config)
schedule_refresh(plan)
schedule_refresh({provider, opts})
{:noreply, %{state | current_alarm: false}}

{:error, _reason} ->
Expand All @@ -37,14 +37,14 @@ defmodule Vapor.Watch do
# and we don't want those to end up in logs just because there was a
# temporary problem. By alarming with the layer and provider, we ought
# to give troubleshooters a head start in knowing where to investigate.
:alarm_handler.set_alarm({{:vapor, {layer, plan.provider}}, :redacted})
schedule_refresh(plan)
:alarm_handler.set_alarm({{:vapor, {layer, provider}}, :redacted})
schedule_refresh({provider, opts})
{:noreply, %{state | current_alarm: true, error_count: state.error_count + 1}}
end
end

defp schedule_refresh(%{opts: opts}) do
Process.send_after(self(), :refresh, opts[:refresh_interval])
defp schedule_refresh({_provider, opts}) do
Process.send_after(self(), :refresh, opts[:refresh_interval] || 3_000)
end
end

Loading

0 comments on commit 0130b1c

Please sign in to comment.