Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBD-771: Stats archiver uses ETS to replace Mnesia

Stats are stored in memory resident ETS tables and are dumped to
backup files periodically. Cascade and truncate happen in memory
now. Stats backup files are saved in data/<node>/stats directory
which is parallel to data/<node>/mnesia.

Using ETS to replace mnesia certainly saves CPU usage. On idle
2-node cluster with 3~5 buckets, I see the CPU% drops at least
10%.

Based on work of Liang Guo.

Change-Id: If57932e0caae6e28e9b57b6317ba87e0b870b422
Reviewed-on: http://review.couchbase.org/22545
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information...
commit 9e0a980e538ee1414327c1b0f0cfc5fbb123a5a6 1 parent 4bbe56b
@aartamonau aartamonau authored alk committed
View
5 CHANGES
@@ -14,6 +14,11 @@ Between versions 2.0.1 and 2.*
grab bucket stats now is /pools/default/buckets/<bucket name>/stats
REST call.
+* (CBD-771) Stats archives are not stored in mnesia anymore.
+
+ Instead they are collected in ETS tables and saved to plain files
+ from time to time.
+
-----------------------------------------
Between versions 2.0.0 and 2.0.1
-----------------------------------------
View
3  Makefile
@@ -139,7 +139,6 @@ do-install:
cp cbbrowse_logs $(DESTDIR)$(PREFIX)/bin/cbbrowse_logs
cp cbcollect_info $(DESTDIR)$(PREFIX)/bin/cbcollect_info
chmod +x $(DESTDIR)$(PREFIX)/bin/couchbase-server $(DESTDIR)$(PREFIX)/bin/cbbrowse_logs $(DESTDIR)$(PREFIX)/bin/cbcollect_info
- mkdir -p $(DESTDIR)$(PREFIX)/var/lib/couchbase/mnesia
mkdir -p $(DESTDIR)$(PREFIX)/var/lib/couchbase/logs
cp ebucketmigrator $(DESTDIR)$(PREFIX)/bin/ebucketmigrator
chmod +x $(DESTDIR)$(PREFIX)/bin/ebucketmigrator
@@ -186,7 +185,7 @@ $(COUCHBASE_PLT): | all
do_build_plt:
dialyzer --output_plt $(COUCHBASE_PLT) --build_plt \
- --apps compiler crypto erts inets kernel mnesia os_mon sasl ssl stdlib xmerl \
+ --apps compiler crypto erts inets kernel os_mon sasl ssl stdlib xmerl \
$(COUCH_PATH)/src/mochiweb \
$(COUCH_PATH)/src/snappy $(COUCH_PATH)/src/etap $(realpath $(COUCH_PATH)/src/ibrowse) \
$(realpath $(COUCH_PATH)/src/lhttpc) \
View
4 cluster_run
@@ -76,10 +76,6 @@ def start_cluster(num_nodes, start_index, host, extra_args, args_prefix):
os.makedirs(logdir)
except:
pass
- try:
- os.stat("data/n_{0}/mnesia".format(i))
- except:
- os.makedirs("data/n_{0}/mnesia".format(i))
args = args_prefix + ["erl", "+MMmcs" "30",
"+A", "16", "+sbtu",
"+P", "327680", "-pa"] + ebin_path + [
View
5 doc/ns-server-hierarchy.txt
@@ -16,9 +16,6 @@
cookie changes as well")
(gen-server 'ns_cluster
"performs node join/leave requests")
- (supervisor 'mb_mnesia_sup :mode :one_for_one
- (gen-event 'mb_mnesia_events)
- (gen-server 'mb_mnesia))
(supervisor 'ns_config_sup :mode :rest_for_one
(gen-event 'ns_config_events
"fired when any ns_config variable is changed")
@@ -201,7 +198,7 @@
ns_stats_event"
(pubsub-link nil :to 'ns_stats_event))
(gen-server '(stats_reader "default")
- "used to read mnesia stats archive")
+ "used to read stats archive")
(event-handler '(failover_safeness_level "default") :to 'ns_stats_event
"estimates bucket's replication up-to-date-ness
'from this node' based on stats it sees"))))
View
580 src/mb_mnesia.erl
@@ -1,580 +0,0 @@
-%% @author Northscale <info@northscale.com>
-%% @copyright 2010 NorthScale, Inc.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%
-%% @doc Manage mnesia
-%%
--module(mb_mnesia).
-
--include("ns_common.hrl").
-
--include_lib("eunit/include/eunit.hrl").
-
--behaviour(gen_server).
-
--record(state, {peers = []}).
-
-%% API
--export([delete_schema/0,
- ensure_table/2,
- maybe_rename/1,
- start_link/0,
- truncate/2,
- wipe/0]).
-
-%% gen_server callbacks
--export([code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, init/1, terminate/2]).
-
-
-%%
-%% API
-%%
-
-%% @doc Delete the current mnesia schema for joining/renaming purposes.
-delete_schema() ->
- false = misc:running(?MODULE),
- %% Shut down mnesia in case something else started it.
- stopped = mnesia:stop(),
- ok = mnesia:delete_schema([node()]),
- ?log_debug("Deleted schema.~nCurrent config: ~p",
- [mnesia:system_info(all)]).
-
-
-%% @doc Make sure table exists and has a copy on this node, creating it or
-%% adding a copy if it does not.
-ensure_table(TableName, Opts) ->
- gen_server:call(?MODULE, {ensure_table, TableName, Opts}, 30000).
-
-
-%% @doc Possibly rename the node, if it's not clustered. Returns true
-%% if it renamed the node, false otherwise.
-maybe_rename(NewAddr) ->
- gen_server:call(?MODULE, {maybe_rename, NewAddr}, 30000).
-
-
-%% @doc Start the gen_server
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-%% @doc Truncate the given table to the last N records.
-truncate(Tab, N) ->
- {atomic, _M} = mnesia:transaction(
- fun () -> truncate(Tab, mnesia:last(Tab), N, 0) end).
-
-
-%% @doc Wipe all mnesia data.
-wipe() ->
- gen_server:call(?MODULE, wipe, 30000).
-
-
-%%
-%% gen_server callbacks
-%%
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-handle_call({ensure_table, TableName, Opts}, From, State) ->
- try mnesia:table_info(TableName, disc_copies) of
- Nodes when is_list(Nodes) ->
- case lists:member(node(), Nodes) of
- true ->
- {reply, ok, State};
- false ->
- ?log_debug("Creating local copy of ~p",
- [TableName]),
- do_with_timeout(
- fun () ->
- {atomic, ok} =
- mnesia:add_table_copy(TableName, node(),
- disc_copies)
- end, From, 5000),
- {noreply, State}
- end
- catch exit:{aborted, {no_exists, _, _}} ->
- ?log_debug("Creating table ~p", [TableName]),
- do_with_timeout(
- fun () ->
- {atomic, ok} =
- mnesia:create_table(
- TableName, Opts ++ [{disc_copies, [node()]}])
- end, From, 5000),
- {noreply, State}
- end;
-
-handle_call({maybe_rename, NewAddr}, _From, State) ->
- %% We need to back up the db before changing the node name,
- %% because it will fail if the node name doesn't match the schema.
- backup(),
- OldName = node(),
- misc:executing_on_new_process(
- fun () ->
- %% prevent node disco events while we're in the middle
- %% of renaming
- ns_node_disco:register_node_renaming_txn(self()),
- case dist_manager:adjust_my_address(NewAddr) of
- nothing ->
- ?log_debug("Not renaming node. Deleting backup.", []),
- ok = file:delete(backup_file()),
- {reply, false, State};
- net_restarted ->
- master_activity_events:note_name_changed(),
- %% Make sure the cookie's still the same
- NewName = node(),
- ?log_debug("Renaming node from ~p to ~p.", [OldName, NewName]),
- rename_node_in_config(OldName, NewName),
- stopped = mnesia:stop(),
- change_node_name(OldName, NewName),
- ok = mnesia:delete_schema([NewName]),
- {ok, State1} = init([]),
- {reply, true, State1}
- end
- end);
-
-handle_call(wipe, _From, State) ->
- stopped = mnesia:stop(),
- ok = mnesia:delete_schema([node()]),
- start_mnesia([ensure_schema]),
- {reply, ok, State};
-
-handle_call(Request, From, State) ->
- ?log_warning("Unexpected call from ~p: ~p", [From, Request]),
- {reply, unhandled, State}.
-
-rename_node_in_config(Old, New) ->
- ns_config:update(fun ({K, V} = Pair) ->
- NewK = misc:rewrite_value(Old, New, K),
- NewV = misc:rewrite_value(Old, New, V),
- if
- NewK =/= K orelse NewV =/= V ->
- ale:debug(?CLUSTER_LOGGER,
- "renaming node conf ~p -> ~p:~n ~p ->~n ~p",
- [K, NewK, V, NewV]),
- {NewK, NewV};
- true ->
- Pair
- end
- end, erlang:make_ref()).
-
-
-handle_cast(Msg, State) ->
- ?log_warning("Unexpected cast: ~p", [Msg]),
- {noreply, State}.
-
-
-handle_info({mnesia_system_event, Event}, State) ->
- case Event of
- {mnesia_error, Format, Args} ->
- ?log_error("Error from Mnesia:~n" ++ Format, Args),
- {noreply, State};
- {mnesia_fatal, Format, Args, _} ->
- ?log_error("Fatal Mnesia error, exiting:~n" ++ Format, Args),
- timer:sleep(3000),
- {stop, mnesia_fatal, State};
- {mnesia_info, Format, Args} ->
- ?log_debug("Info from Mnesia:~n" ++ Format, Args),
- {noreply, State};
- {mnesia_down, Node} ->
- ?log_info("Saw Mnesia go down on ~p", [Node]),
- {noreply, State};
- {mnesia_up, Node} ->
- ?log_info("Saw Mnesia come up on ~p", [Node]),
- {noreply, State};
- {mnesia_overload, {What, Why}} ->
- ?log_warning("Mnesia detected overload during ~p because of ~p",
- [What, Why]),
- {noreply, State};
- {inconsistent_database, running_partitioned_network, Node} ->
- ?log_warning("Network partition detected with ~p. Restarting.",
- [Node]),
- {stop, partitioned, State};
- {inconsistent_database, starting_partitioned_network, Node} ->
- %% TODO do we need to do something in this case?
- ?log_warning("Starting partitioned network with ~p.", [Node]),
- {noreply, State};
- _ ->
- ?log_debug("Mnesia system event: ~p", [Event]),
- {noreply, State}
- end;
-
-handle_info({mnesia_table_event, {write, {schema, cluster, L}, _}},
- State) ->
- NewPeers = proplists:get_value(disc_copies, L),
- ?log_debug("Peers: ~p", [NewPeers]),
- gen_event:notify(mb_mnesia_events, {peers, NewPeers}),
- {noreply, State#state{peers=NewPeers}};
-
-handle_info({mnesia_table_event, Event}, State) ->
- ?log_debug("Mnesia table event:~n~p", [Event]),
- {noreply, State};
-
-handle_info({'EXIT', _Pid, Reason}, State) ->
- case Reason of
- normal ->
- {noreply, State};
- _ ->
- {stop, Reason, State}
- end;
-
-handle_info(Msg, State) ->
- ?log_warning("Unexpected message: ~p", [Msg]),
- {noreply, State}.
-
-
-init([]) ->
- process_flag(trap_exit, true),
- mnesia:set_debug_level(verbose),
- %% Don't hang forever if a node goes down when a transaction is in
- %% an unclear state
- application:set_env(mnesia, max_wait_for_decision, 10000),
- case is_mnesia_running() of
- yes ->
- ok;
- no ->
- case mnesia:system_info(db_nodes) of
- [Node] when Node == node() ->
- %% Check for a backup, but only if we're the only
- %% node
- BackupFile = backup_file(),
- case file:read_file_info(BackupFile) of
- {ok, _} ->
- ?log_info("Found backup. Restoring Mnesia database.", []),
- ok = mnesia:install_fallback(BackupFile),
- start_mnesia([]),
- ok = file:delete(BackupFile);
- {error, enoent} ->
- %% Crash if it's not one of these
- start_mnesia([ensure_schema])
- end;
- _ ->
- start_mnesia([ensure_schema])
- end
- end,
- Peers = ensure_config_tables(),
- ?log_debug("Current config:~n~p~nPeers: ~p",
- [mnesia:system_info(all), Peers]),
- %% Send an initial notification of our peer list.
- gen_event:notify(mb_mnesia_events, {peers, Peers}),
- {ok, #state{peers=Peers}}.
-
-
-terminate(Reason, _State) when Reason == normal; Reason == shutdown ->
- stopped = mnesia:stop(),
- ?log_info("Shut Mnesia down: ~p. Exiting.", [Reason]),
- ok;
-
-terminate(_Reason, _State) ->
- %% Don't shut down on crash
- ok.
-
-
-%%
-%% Internal functions
-%%
-
-%% @doc Back up the database to a standard location.
-backup() ->
- backup(5).
-
-backup(Tries) ->
- %% Only back up if there is a disc copy of the schema somewhere
- case mnesia:table_info(schema, disc_copies) of
- [] ->
- ?log_info("No local copy of the schema. Skipping backup.", []),
- ok;
- _ ->
- case mnesia:activate_checkpoint([{min, mnesia:system_info(tables)}]) of
- {ok, Name, _} ->
- ok = mnesia:backup_checkpoint(Name, backup_file()),
- ok = mnesia:deactivate_checkpoint(Name),
- ok;
- {error, Err} ->
- ?log_error("Error backing up: ~p (~B tries remaining)",
- [Err, Tries]),
- case Tries of
- 0 ->
- exit({backup_error, Err});
- _ ->
- backup(Tries-1)
- end
- end
- end.
-
-
-%% @doc The backup location.
-backup_file() ->
- tmpdir("mnesia_backup").
-
-
-%% Shamelessly stolen from Mnesia docs.
-change_node_name(From, To) ->
- Switch =
- fun(Node) when Node == From -> To;
- (Node) when Node == To -> throw({error, already_exists});
- (Node) -> Node
- end,
- Convert =
- fun({schema, db_nodes, Nodes}, Acc) ->
- {[{schema, db_nodes, lists:map(Switch,Nodes)}], Acc};
- ({schema, version, Version}, Acc) ->
- {[{schema, version, Version}], Acc};
- ({schema, cookie, Cookie}, Acc) ->
- {[{schema, cookie, Cookie}], Acc};
- ({schema, Tab, CreateList}, Acc) ->
- Keys = [ram_copies, disc_copies, disc_only_copies],
- OptSwitch =
- fun({Key, Val}) ->
- case lists:member(Key, Keys) of
- true -> {Key, lists:map(Switch, Val)};
- false-> {Key, Val}
- end
- end,
- {[{schema, Tab, lists:map(OptSwitch, CreateList)}], Acc};
- (Tuple, Acc) ->
- {[Tuple], Acc}
- end,
- Source = backup_file(),
- Target = tmpdir("post_rename"),
- {ok, switched} = mnesia:traverse_backup(Source, mnesia_backup, Target,
- mnesia_backup, Convert, switched),
- ok = file:rename(Target, Source),
- ok.
-
-
-%% @private
-%% @doc Exit if Fun hasn't returned within the specified timeout.
--spec do_with_timeout(fun(), {pid(), any()}, non_neg_integer()) ->
- ok.
-do_with_timeout(Fun, Client, Timeout) ->
- {Pid, Ref} =
- spawn_monitor(
- fun () ->
- Reply = Fun(),
- gen_server:reply(Client, Reply)
- end),
- receive
- {'DOWN', Ref, _, _, Reason} ->
- case Reason of
- normal ->
- ok;
- _ ->
- %% We'll just let the caller time out, since we
- %% have no idea if the worker actually tried to
- %% send a reply.
- ?log_error("Worker process exited with reason ~p",
- [Reason]),
- ok
- end
- after Timeout ->
- ?log_error("Worker process timed out. Killing it.", []),
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- ok
- end.
-
-
-%% @private @doc Make sure the config tables exist for bootstrapping a
-%% new node. Returns the list of peers.
-ensure_config_tables() ->
- %% We distinguish between worker and peer nodes by whether or not
- %% the "cluster" table exists. This could as easily be any other
- %% table we can be sure will exist on the nodes with clustered
- %% Mnesia, but it's best to have a table we create here for that
- %% purpose. In order to distinguish between a brand new node that
- %% may have crashed between laying down the schema and creating
- %% the cluster table, we create a local content table called
- %% local_config that is expected to exist on *all* nodes.
- try mnesia:table_info(cluster, disc_copies)
- catch exit:{aborted, {no_exists, _, _}} ->
- try mnesia:table_info(local_config, disc_copies) of
- _ ->
- %% Not a new node, so a worker node
- []
- catch exit:{aborted, {no_exists, _, _}} ->
- %% New node; create cluster table first so we
- %% can't get fooled by a crash into thinking
- %% this is a worker node.
- {atomic, ok} =
- mnesia:create_table(
- cluster, [{disc_copies, [node()]}]),
- {atomic, ok} =
- mnesia:create_table(
- local_config, [{local_content, true},
- {disc_copies, [node()]}]),
- [node()]
- end
- end.
-
-
-%% @private
-%% @doc Make sure we have a disk copy of the schema and all disc tables.
-ensure_schema() ->
- %% Create a new on-disk schema if one doesn't already exist
- Nodes = mnesia:table_info(schema, disc_copies),
- case lists:member(node(), Nodes) of
- false ->
- case mnesia:change_table_copy_type(schema, node(), disc_copies) of
- {atomic, ok} ->
- ?log_debug("Committed schema to disk.", []);
- {aborted, {already_exists, _, _, _}} ->
- ?log_warning("Failed to write schema. Retrying.~n"
- "Config = ~p", [mnesia:system_info(all)]),
- timer:sleep(500),
- ensure_schema()
- end;
- true ->
- ?log_debug("Using existing disk schema on ~p.", [Nodes])
- end,
- %% Lay down copies of all the tables.
- Tables = mnesia:system_info(tables) -- [schema],
- lists:foreach(
- fun (Table) ->
- case mnesia:add_table_copy(Table, node(), disc_copies) of
- {atomic, ok} ->
- ?log_debug("Created local copy of ~p", [Table]);
- {aborted, {already_exists, _, _}} ->
- ?log_debug("Have local copy of ~p", [Table])
- end
- end, Tables),
- ok = mnesia:wait_for_tables(Tables, 2500).
-
-
-%% @doc Return yes or no depending on if mnesia is running, or {error,
-%% Reason} if something is wrong.
-is_mnesia_running() ->
- case mnesia:wait_for_tables([schema], 2500) of
- ok ->
- yes;
- {error, {node_not_running, _}} ->
- no;
- E -> E
- end.
-
-
-%% @doc Start mnesia and wait for it to come up.
-start_mnesia(Options) ->
- do_start_mnesia(Options, false).
-
-cleanup_and_start_mnesia(Options) ->
- MnesiaDir = path_config:component_path(data, "mnesia"),
- ToDelete = filelib:wildcard("*", MnesiaDir),
- ?log_info("Recovering from damaged mnesia files by deleting them:~n~p~n", [ToDelete]),
- lists:foreach(fun (BaseName) ->
- file:delete(filename:join(MnesiaDir, BaseName))
- end, ToDelete),
- do_start_mnesia(Options, true).
-
-do_start_mnesia(Options, Repeat) ->
- EnsureSchema = proplists:get_bool(ensure_schema, Options),
- MnesiaDir = path_config:component_path(data, "mnesia"),
- application:set_env(mnesia, dir, MnesiaDir),
- case mnesia:start() of
- ok ->
- {ok, _} = mnesia:subscribe(system),
- {ok, _} = mnesia:subscribe({table, schema, simple}),
- case mnesia:wait_for_tables([schema], 30000) of
- {error, _} when Repeat =/= true ->
- mnesia:unsubscribe({table, schema, simple}),
- mnesia:unsubscribe(system),
- stopped = mnesia:stop(),
- cleanup_and_start_mnesia(Options);
- ok -> ok
- end;
- {error, _} when Repeat =/= true ->
- cleanup_and_start_mnesia(Options)
- end,
- case {EnsureSchema, Repeat} of
- {true, false} ->
- try
- ensure_schema()
- catch T:E ->
- ?log_warning("ensure_schema failed: ~p:~p~n", [T,E]),
- mnesia:unsubscribe({table, schema, simple}),
- mnesia:unsubscribe(system),
- stopped = mnesia:stop(),
- cleanup_and_start_mnesia(Options)
- end;
- {true, _} ->
- ensure_schema();
- _ ->
- ok
- end.
-
-%% @doc Hack.
-tmpdir() ->
- path_config:component_path(tmp).
-
-
-tmpdir(Filename) ->
- Path = filename:join(tmpdir(), Filename),
- ok = filelib:ensure_dir(Path),
- Path.
-
-
-truncate(_Tab, '$end_of_table', N, M) ->
- case N of
- 0 -> M;
- _ -> -N
- end;
-truncate(Tab, Key, 0, M) ->
- NextKey = mnesia:prev(Tab, Key),
- ok = mnesia:delete({Tab, Key}),
- truncate(Tab, NextKey, 0, M + 1);
-truncate(Tab, Key, N, 0) ->
- truncate(Tab, mnesia:prev(Tab, Key), N - 1, 0).
-
-
-%%
-%% Tests
-%%
-
-shutdown_child(Pid) ->
- exit(Pid, shutdown),
- receive
- {'EXIT', Pid, shutdown} ->
- ok;
- {'EXIT', Pid, Reason} ->
- exit({shutdown_failed, Reason})
- after 5000 ->
- ?log_error("Mnesia shutdown timed out", []),
- exit(Pid, kill),
- exit(shutdown_timeout)
- end.
-
-startup_test_() ->
- {spawn, fun () ->
- ets:new(path_config_override, [public, named_table]),
- ets:insert(path_config_override, {path_config_tmpdir, "./tmp"}),
- ets:insert(path_config_override, {path_config_datadir, "./tmp"}),
- _ = file:delete(backup_file()),
- OldFlag = process_flag(trap_exit, true),
- try
- Node = node(),
- ok = mnesia:delete_schema([Node]),
- {ok, Pid} = mb_mnesia_sup:start_link(),
- yes = mnesia:system_info(is_running),
- [Node] = mnesia:table_info(schema, disc_copies),
- receive
- {'EXIT', Pid, Reason} ->
- exit({child_exited, Reason})
- after 0 -> ok
- end,
- shutdown_child(Pid)
- after
- process_flag(trap_exit, OldFlag)
- end
- end}.
View
44 src/mb_mnesia_sup.erl
@@ -1,44 +0,0 @@
-%% @author Northscale <info@northscale.com>
-%% @copyright 2010 NorthScale, Inc.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%
--module(mb_mnesia_sup).
-
--behavior(supervisor).
-
-%% API
--export ([start_link/0]).
-
-%% Supervisor callbacks
--export([init/1]).
-
-%%
-%% API
-%%
-
-%% @doc Start the supervisor.
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-
-%%
-%% Supervisor callbacks
-%%
-
-init([]) ->
- {ok, {{one_for_one, 10, 1},
- [{mb_mnesia_events, {gen_event, start_link, [{local, mb_mnesia_events}]},
- permanent, 10, worker, dynamic},
- {mb_mnesia, {mb_mnesia, start_link, []},
- permanent, 5000, worker, [mb_mnesia]}]}}.
View
53 src/misc.erl
@@ -1196,34 +1196,41 @@ start_event_link(SubscriptionBody) ->
end
end)}.
-%% Writes to file atomically using write_file + rename trick.
-%% NB: this does not work on Windows
-%% (http://osdir.com/ml/racket.development/2011-01/msg00149.html).
+%% Writes to file atomically using write_file + atomic_rename
atomic_write_file(Path, Contents) ->
TmpPath = Path ++ ".tmp",
case file:write_file(TmpPath, Contents) of
ok ->
- case file:open(TmpPath, [raw, binary, read, write]) of
- {ok, IO} ->
- SyncRV =
- try
- file:sync(IO)
- after
- ok = file:close(IO)
- end,
- case SyncRV of
- ok ->
- %% NOTE: linux manpages also mention sync
- %% on directory, but erlang can't do that
- %% and that's not portable
- file:rename(TmpPath, Path);
- _ ->
- SyncRV
- end;
- Err ->
- Err
+ atomic_rename(TmpPath, Path);
+ X ->
+ X
+ end.
+
+%% Rename file (more or less) atomically.
+%% See https://lwn.net/Articles/351422/ for some details.
+%%
+%% NB: this does not work on Windows
+%% (http://osdir.com/ml/racket.development/2011-01/msg00149.html).
+atomic_rename(From, To) ->
+ case file:open(From, [raw, binary, read, write]) of
+ {ok, IO} ->
+ SyncRV =
+ try
+ file:sync(IO)
+ after
+ ok = file:close(IO)
+ end,
+ case SyncRV of
+ ok ->
+ %% NOTE: linux manpages also mention sync
+ %% on directory, but erlang can't do that
+ %% and that's not portable
+ file:rename(From, To);
+ _ ->
+ SyncRV
end;
- X -> X
+ Err ->
+ Err
end.
%% Return a list containing all but the last elements of the source list.
View
38 src/ns_cluster.erl
@@ -136,7 +136,7 @@ handle_call({complete_join, NodeKVList}, _From, State) ->
handle_cast(leave, State) ->
?cluster_log(0001, "Node ~p is leaving cluster.", [node()]),
ok = ns_server_cluster_sup:stop_cluster(),
- mb_mnesia:wipe(),
+ stats_archiver:wipe(),
NewCookie = ns_cookie_manager:cookie_gen(),
erlang:set_cookie(node(), NewCookie),
lists:foreach(fun erlang:disconnect_node/1, nodes()),
@@ -279,7 +279,7 @@ do_change_address(NewAddr) ->
ok;
{_, _} ->
?cluster_info("Decided to change address to ~p~n", [NewAddr1]),
- case mb_mnesia:maybe_rename(NewAddr1) of
+ case maybe_rename(NewAddr1) of
false ->
ok;
true ->
@@ -290,6 +290,40 @@ do_change_address(NewAddr) ->
ok
end.
+maybe_rename(NewAddr) ->
+ OldName = node(),
+ misc:executing_on_new_process(
+ fun () ->
+ %% prevent node disco events while we're in the middle
+ %% of renaming
+ ns_node_disco:register_node_renaming_txn(self()),
+ case dist_manager:adjust_my_address(NewAddr) of
+ nothing ->
+ ?cluster_debug("Not renaming node.", []),
+ false;
+ net_restarted ->
+ master_activity_events:note_name_changed(),
+ NewName = node(),
+ ?cluster_debug("Renaming node from ~p to ~p.", [OldName, NewName]),
+ rename_node_in_config(OldName, NewName),
+ true
+ end
+ end).
+
+rename_node_in_config(Old, New) ->
+ ns_config:update(fun ({K, V} = Pair) ->
+ NewK = misc:rewrite_value(Old, New, K),
+ NewV = misc:rewrite_value(Old, New, V),
+ if
+ NewK =/= K orelse NewV =/= V ->
+ ?cluster_debug("renaming node conf ~p -> ~p:~n ~p ->~n ~p",
+ [K, NewK, V, NewV]),
+ {NewK, NewV};
+ true ->
+ Pair
+ end
+ end, erlang:make_ref()).
+
check_add_possible(Body) ->
case menelaus_web:is_system_provisioned() of
false -> {error, system_not_provisioned,
View
2  src/ns_server_cluster_sup.erl
@@ -65,8 +65,6 @@ init([]) ->
permanent, 1000, worker, []},
{ns_cluster, {ns_cluster, start_link, []},
permanent, 5000, worker, [ns_cluster]},
- {mb_mnesia_sup, {mb_mnesia_sup, start_link, []},
- permanent, infinity, supervisor, [mb_mnesia_sup]},
{ns_config_sup, {ns_config_sup, start_link, []},
permanent, infinity, supervisor,
[ns_config_sup]},
View
217 src/stats_archiver.erl
@@ -13,32 +13,34 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
-%% @doc Store and aggregate statistics collected from stats_collector into
-%% mnesia, emitting 'sample_archived' events when aggregates are created.
+%% @doc Store and aggregate statistics collected from stats_collector into a
+%% collection of ETS tables, emitting 'sample_archived' events when aggregates
+%% are created. The contents of ETS table is periodically dumped to files that
+%% then used to restore ETS tables after restart.
%%
-module(stats_archiver).
--include_lib("eunit/include/eunit.hrl").
--include_lib("stdlib/include/qlc.hrl").
-
-include("ns_common.hrl").
-include("ns_stats.hrl").
-behaviour(gen_server).
--define(RETRIES, 10).
-
-record(state, {bucket}).
-export([ start_link/1,
archives/0,
table/2,
- avg/2 ]).
+ avg/2,
+ latest_sample/2,
+ wipe/0 ]).
-export([code_change/3, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2]).
+-define(BACKUP_INTERVAL,
+ ns_config_ets_dup:get_timeout(stats_archiver_backup_interval, 120000)).
+
%%
%% API
@@ -59,10 +61,39 @@ archives() ->
{year, 21600, 1464}]. % 366 days
-%% @doc Generate a suitable name for the Mnesia table.
+%% @doc Generate a suitable name for the ETS stats table.
table(Bucket, Period) ->
list_to_atom(fmt("~s-~s-~s", [?MODULE_STRING, Bucket, Period])).
+logger_file(Bucket, Period) ->
+ Name = io_lib:format("~s-~s.~s", [?MODULE_STRING, Bucket, Period]),
+ filename:join(stats_dir(), Name).
+
+%% Ensure directory for stats archiver ETS table backup files
+ensure_stats_storage() ->
+ StatsDir = stats_dir(),
+ R = case filelib:ensure_dir(StatsDir) of
+ ok ->
+ case file:make_dir(StatsDir) of
+ ok ->
+ ok;
+ {error, eexist} ->
+ ok;
+ Error ->
+ Error
+ end;
+ Error ->
+ Error
+ end,
+
+ case R of
+ ok ->
+ ok;
+ _ ->
+ ?log_error("Failed to create ETS stats directory with error: ~p~n", [R])
+ end,
+
+ R.
%% @doc Compute the average of a list of entries.
-spec avg(atom() | integer(), list()) -> #stat_entry{}.
@@ -79,6 +110,28 @@ avg(TS, [First|Rest]) ->
(_Key, Value) -> Value / Count
end, Sums)}.
+%% @doc Fetch the latest stats sample
+latest_sample(Bucket, Period) ->
+ Tab = table(Bucket, Period),
+ case ets:last(Tab) of
+ '$end_of_table' ->
+ {error, no_samples};
+ Key ->
+ {_, Sample} = hd(ets:lookup(Tab, Key)),
+ {ok, Sample}
+ end.
+
+%% This function is called when ns_server_sup is shut down. So we don't race
+%% with 'backup' handler here.
+wipe() ->
+ R = misc:rm_rf(stats_dir()),
+ case R of
+ ok ->
+ ?log_info("Deleted stats directory.");
+ _ ->
+ ?log_error("Failed to delete stats directory: ~p", [R])
+ end,
+ R.
%%
%% gen_server callbacks
@@ -89,8 +142,10 @@ code_change(_OldVsn, State, _Extra) ->
init(Bucket) ->
+ ok = ensure_stats_storage(),
start_timers(),
ns_pubsub:subscribe_link(ns_stats_event),
+ process_flag(trap_exit, true),
self() ! init,
{ok, #state{bucket=Bucket}}.
@@ -115,25 +170,40 @@ do_handle_info(init, State) ->
{noreply, State};
do_handle_info({stats, Bucket, Sample}, State = #state{bucket=Bucket}) ->
Tab = table(Bucket, minute),
- {atomic, ok} = mnesia:transaction(fun () ->
- mnesia:write(Tab, Sample, write)
- end, ?RETRIES),
+ #stat_entry{timestamp=TS} = Sample,
+ ets:insert(Tab, {TS, Sample}),
gen_event:notify(ns_stats_event, {sample_archived, Bucket, Sample}),
{noreply, State};
do_handle_info({sample_archived, _, _}, State) ->
{noreply, State};
do_handle_info({truncate, Period, N}, #state{bucket=Bucket} = State) ->
Tab = table(Bucket, Period),
- mb_mnesia:truncate(Tab, N),
+ truncate_logger(Tab, N),
{noreply, State};
do_handle_info({cascade, Prev, Period, Step}, #state{bucket=Bucket} = State) ->
- cascade(Bucket, Prev, Period, Step),
+ cascade_logger(Bucket, Prev, Period, Step),
+ {noreply, State};
+do_handle_info(backup, #state{bucket=Bucket} = State) ->
+ misc:flush(backup),
+ proc_lib:spawn_link(
+ fun () ->
+ backup_loggers(Bucket)
+ end),
+ {noreply, State};
+do_handle_info({'EXIT', _Pid, Reason} = Exit, State) ->
+ case Reason of
+ normal ->
+ ok;
+ _Other ->
+ ?log_warning("Process exited unexpectedly: ~p", [Exit])
+ end,
{noreply, State};
do_handle_info(_Msg, State) -> % Don't crash on delayed responses from calls
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{bucket=Bucket} = _State) ->
+ backup_loggers(Bucket),
ok.
@@ -141,47 +211,97 @@ terminate(_Reason, _State) ->
%% Internal functions
%%
-cascade(Bucket, Prev, Period, Step) ->
- PrevTab = table(Bucket, Prev),
- NextTab = table(Bucket, Period),
- {atomic, ok} = mnesia:transaction(
- fun () ->
- case last_chunk(PrevTab, Step) of
- false -> ok;
- Avg ->
- mnesia:write(NextTab, Avg, write)
- end
- end, ?RETRIES).
-
create_tables(Bucket) ->
- Table = [{record_name, stat_entry},
- {type, ordered_set},
- {local_content, true},
- {attributes, record_info(fields, stat_entry)}],
- [ mb_mnesia:ensure_table(table(Bucket, Period), Table)
- || {Period, _, _} <- archives() ].
+ %% create stats logger tables
+ [ check_logger(Bucket, Period) || {Period, _, _} <- archives() ].
+
+check_logger(Bucket, Period) ->
+ File = logger_file(Bucket, Period),
+ %% check existence of stats backup file in data/<node>/stats
+ Create =
+ case filelib:is_regular(File) of
+ true ->
+ case ets:file2tab(File) of
+ {error, Reason} ->
+ ?log_error("Failed to restore stats table from "
+ "file ~p with error ~p~n", [File, Reason]),
+ true;
+ {ok, _} ->
+ false
+ end;
+ false ->
+ true
+ end,
+
+ case Create of
+ true ->
+ ets:new(table(Bucket, Period), [ordered_set, protected, named_table]);
+ false ->
+ ok
+ end,
+ ok.
+backup_logger(Bucket, Period) ->
+ Tab = table(Bucket, Period),
+ File = logger_file(Bucket, Period),
+ TempFile = File ++ ".tmp",
+ case ets:tab2file(Tab, TempFile) of
+ {error, Reason} = Error ->
+ ?log_error("Failed to backup stats table ~p with error ~p~n", [Tab, Reason]),
+ Error;
+ _ ->
+ misc:atomic_rename(TempFile, File)
+ end.
-last_chunk(Tab, Step) ->
- case mnesia:last(Tab) of
- '$end_of_table' ->
- false;
- TS ->
- last_chunk(Tab, TS, Step, [])
+backup_loggers(Bucket) ->
+ lists:foreach(
+ fun ({Period, _, _}) ->
+ backup_logger(Bucket, Period)
+ end, archives()).
+
+%% keep the last N stats samples and delete the rest
+truncate_logger(Tab, NumToKeep) ->
+ ets:foldr(fun ({Key, _}, I) ->
+ case I >= NumToKeep of
+ true ->
+ ets:delete(Tab, Key);
+ false ->
+ ok
+ end,
+ I + 1
+ end, 0, Tab).
+
+cascade_logger(Bucket, Prev, Period, Step) ->
+ true = (Period =/= minute),
+
+ PrevTab = table(Bucket, Prev),
+ NextTab = table(Bucket, Period),
+ case coalesce_stats(PrevTab, Step) of
+ false ->
+ ok;
+ Avg ->
+ #stat_entry{timestamp=TS} = Avg,
+ ets:insert(NextTab, {TS, Avg})
+ end.
+
+coalesce_stats(Tab, Step) ->
+ case ets:last(Tab) of
+ '$end_of_table' -> false;
+ LastTS -> coalesce_stats(Tab, LastTS, Step, [])
end.
-last_chunk(Tab, TS, Step, Samples) ->
- Samples1 = [hd(mnesia:read(Tab, TS))|Samples],
- TS1 = mnesia:prev(Tab, TS),
+coalesce_stats(Tab, TS, Step, Samples) ->
+ [{_, OneSample}] = ets:lookup(Tab, TS),
+ Samples1 = [OneSample|Samples],
+ PrevTS = ets:prev(Tab, TS),
T = misc:trunc_ts(TS, Step),
- case TS1 == '$end_of_table' orelse misc:trunc_ts(TS1, Step) /= T of
+ case PrevTS == '$end_of_table' orelse misc:trunc_ts(PrevTS, Step) /= T of
false ->
- last_chunk(Tab, TS1, Step, Samples1);
+ coalesce_stats(Tab, PrevTS, Step, Samples1);
true ->
avg(T, Samples1)
end.
-
%% @doc Generate a suitable name for the per-bucket gen_server.
server(Bucket) ->
list_to_atom(?MODULE_STRING ++ "-" ++ Bucket).
@@ -195,7 +315,6 @@ start_cascade_timers([{Prev, _, _} | [{Next, Step, _} | _] = Rest]) ->
start_cascade_timers([_]) ->
ok.
-
%% @doc Start timers for various housekeeping tasks.
start_timers() ->
Archives = archives(),
@@ -205,8 +324,12 @@ start_timers() ->
% total samples
timer:send_interval(Interval, {truncate, Period, Samples})
end, Archives),
- start_cascade_timers(Archives).
+ start_cascade_timers(Archives),
+ timer:send_interval(?BACKUP_INTERVAL, backup).
-spec fmt(string(), list()) -> list().
fmt(Str, Args) ->
lists:flatten(io_lib:format(Str, Args)).
+
+stats_dir() ->
+ path_config:component_path(data, "stats").
View
29 src/stats_reader.erl
@@ -107,18 +107,7 @@ handle_call(Req, From, State) ->
end.
do_handle_call({latest, Period}, _From, #state{bucket=Bucket} = State) ->
- Reply = try mnesia:activity(
- async_dirty,
- fun () ->
- Tab = stats_archiver:table(Bucket, Period),
- Key = mnesia:last(Tab),
- hd(mnesia:read(Tab, Key))
- end, []) of
- Result ->
- {ok, Result}
- catch
- Type:Err -> {error, {Type, Err}}
- end,
+ Reply = stats_archiver:latest_sample(Bucket, Period),
{reply, Reply, State};
do_handle_call({latest, Period, N}, _From, #state{bucket=Bucket} = State) ->
Reply = try fetch_latest(Bucket, Period, N) of
@@ -160,14 +149,12 @@ fetch_latest(Bucket, Period, N) ->
{_, Interval, _} ->
Seconds = N * Interval,
Tab = stats_archiver:table(Bucket, Period),
- case mnesia:dirty_last(Tab) of
+ case ets:last(Tab) of
'$end_of_table' ->
{ok, []};
Key ->
Oldest = Key - Seconds * 1000 + 500,
- Handle = qlc:q([Sample || #stat_entry{timestamp=TS} = Sample
- <- mnesia:table(Tab), TS > Oldest]),
- case mnesia:activity(async_dirty, fun qlc:eval/1, [Handle]) of
+ case qlc:eval(qlc:q([Sample || {TS,Sample} <- ets:table(Tab), TS > Oldest])) of
{error, _, _} = Error ->
Error;
Results ->
@@ -196,13 +183,13 @@ log_bad_responses({Replies, Zombies}) ->
resample(Bucket, Period, Step, N) ->
Seconds = N * Step,
Tab = stats_archiver:table(Bucket, Period),
- case mnesia:dirty_last(Tab) of
+ case ets:last(Tab) of
'$end_of_table' ->
{ok, []};
Key ->
Oldest = Key - Seconds * 1000 + 500,
- Handle = qlc:q([Sample || #stat_entry{timestamp=TS} = Sample
- <- mnesia:table(Tab), TS > Oldest]),
+ Handle = qlc:q([Sample || {TS, Sample}
+ <- ets:table(Tab), TS > Oldest]),
F = fun (#stat_entry{timestamp = T} = Sample,
{T1, Acc, Chunk}) ->
case misc:trunc_ts(T, Step) of
@@ -214,9 +201,7 @@ resample(Bucket, Period, Step, N) ->
{T2, [avg(T1, Chunk)|Acc], [Sample]}
end
end,
- case mnesia:activity(async_dirty, fun qlc:fold/3,
- [F, {undefined, [], []},
- Handle]) of
+ case qlc:fold(F, {undefined, [], []}, Handle) of
{error, _, _} = Error ->
Error;
{undefined, [], []} ->
Please sign in to comment.
Something went wrong with that request. Please try again.