Skip to content

Commit

Permalink
Merge branch 'master' into gh-riak-core-197-delete-bucket-properties
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Aug 28, 2012
2 parents 88c0665 + db17f87 commit df1da36
Show file tree
Hide file tree
Showing 32 changed files with 888 additions and 503 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
@@ -1,5 +1,7 @@
language: erlang
script: (rebar compile && rebar eunit skip_deps=true) || (find . -name "*.log" -print -exec cat \{\} \; && sh -c "exit 1")
notifications:
webhooks: http://basho-engbot.herokuapp.com/travis?key=ad9a6e51d706903e1fd0963c7b8e064b93e85b56
email: eng@basho.com
before_script:
- "ulimit -n 4096"
Expand Down
Binary file modified rebar
Binary file not shown.
3 changes: 1 addition & 2 deletions rebar.config
@@ -1,6 +1,7 @@
{cover_enabled, true}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
{eunit_opts, [verbose]}.

{erl_first_files, [
"src/riak_kv_backend.erl",
Expand All @@ -21,7 +22,5 @@
{sext, ".*", {git, "git://github.com/esl/sext", "master"}},
{riak_pipe, ".*", {git, "git://github.com/basho/riak_pipe.git",
"master"}},
{basho_metrics, ".*", {git, "git://github.com/basho/basho_metrics.git",
"master"}},
{riak_api, ".*", {git, "git://github.com/basho/riak_api.git", "master"}}
]}.
2 changes: 1 addition & 1 deletion src/riak_client.erl
Expand Up @@ -796,7 +796,7 @@ wait_for_query_results(ReqId, Timeout, Acc) ->
{ReqId,{results, Res}} -> wait_for_query_results(ReqId, Timeout, [Res | Acc]);
{ReqId, Error} -> {error, Error}
after Timeout ->
{error, timeout, Acc}
{error, timeout}
end.

add_inputs(_FlowPid, []) ->
Expand Down
3 changes: 2 additions & 1 deletion src/riak_kv.app.src
Expand Up @@ -3,12 +3,13 @@
{application, riak_kv,
[
{description, "Riak Key/Value Store"},
{vsn, "1.1.2"},
{vsn, "1.2.0"},
{applications, [
kernel,
stdlib,
sasl,
crypto,
riak_api,
riak_core,
luke,
erlang_js,
Expand Down
21 changes: 11 additions & 10 deletions src/riak_kv_app.erl
Expand Up @@ -25,6 +25,13 @@
-behaviour(application).
-export([start/2,stop/1]).

-define(SERVICES, [{riak_kv_pb_object, 3, 6}, %% ClientID stuff
{riak_kv_pb_object, 9, 14}, %% Object requests
{riak_kv_pb_bucket, 15, 22}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
{riak_kv_pb_index, 25, 26} %% Secondary index requests
]).

%% @spec start(Type :: term(), StartArgs :: term()) ->
%% {ok,Pid} | ignore | {error,Error}
%% @doc The application:start callback for riak.
Expand Down Expand Up @@ -118,23 +125,16 @@ start(_Type, _StartArgs) ->
mapred_2i_pipe,
[{true, true}, {false, false}]}),

%% register stats
riak_kv_stat:register_stats(),

%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
%% synchronously.
riak_core:register(riak_kv, [
{vnode_module, riak_kv_vnode},
{bucket_validator, riak_kv_bucket}
{bucket_validator, riak_kv_bucket},
{stat_mod, riak_kv_stat}
]),

ok = riak_api_pb_service:register([{riak_kv_pb_object, 3, 6}, %% ClientID stuff
{riak_kv_pb_object, 9, 14}, %% Object requests
{riak_kv_pb_bucket, 15, 22}, %% Bucket requests
{riak_kv_pb_mapred, 23, 24}, %% MapReduce requests
{riak_kv_pb_index, 25, 26} %% Secondary index requests
]),
ok = riak_api_pb_service:register(?SERVICES),

%% Add routes to webmachine
[ webmachine_router:add_route(R)
Expand All @@ -147,6 +147,7 @@ start(_Type, _StartArgs) ->
%% @spec stop(State :: term()) -> ok
%% @doc The application:stop callback for riak.
stop(_State) ->
ok = riak_api_pb_service:deregister(?SERVICES),
ok.

%% 719528 days from Jan 1, 0 to Jan 1, 1970
Expand Down
45 changes: 17 additions & 28 deletions src/riak_kv_bitcask_backend.erl
Expand Up @@ -124,7 +124,12 @@ start(Partition, Config) ->
%% @doc Stop the bitcask backend
-spec stop(state()) -> ok.
stop(#state{ref=Ref}) ->
bitcask:close(Ref).
case Ref of
undefined ->
ok;
_ ->
bitcask:close(Ref)
end.

%% @doc Retrieve an object from the bitcask backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
Expand Down Expand Up @@ -304,8 +309,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{opts=BitcaskOpts,
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{ref=Ref,
partition=Partition,
root=DataRoot,
opts=BitcaskOpts}=State) ->
root=DataRoot}=State) ->
%% Close the bitcask reference
bitcask:close(Ref),

Expand All @@ -321,32 +325,17 @@ drop(#state{ref=Ref,
CleanupDir = check_for_cleanup_dir(DataRoot, auto),
move_unused_dirs(CleanupDir, PartitionDirs),

%% Spawn a process to cleanup the old data files.
%% The use of spawn is intentional. We do not
%% care if this process dies since any lingering
%% files will be cleaned up on the next drop.
%% The worst case is that the files hang
%% around and take up some disk space.
spawn(drop_data_cleanup(PartitionStr, CleanupDir)),

%% Make sure the data directory is now empty
data_directory_cleanup(PartitionDir),

case make_data_dir(filename:join([DataRoot,
PartitionStr])) of
{ok, DataDir} ->
%% Spawn a process to cleanup the old data files.
%% The use of spawn is intentional. We do not
%% care if this process dies since any lingering
%% files will be cleaned up on the next drop.
%% The worst case is that the files hang
%% around and take up some disk space.
spawn(drop_data_cleanup(PartitionStr, CleanupDir)),

%% Now open the bitcask and return an updated state
%% so this backend can continue processing.
case bitcask:open(filename:join(DataRoot, DataDir), BitcaskOpts) of
Ref1 when is_reference(Ref1) ->
{ok, State#state{data_dir=DataDir,
ref=Ref1}};
{error, Reason} ->
{error, Reason, State#state{data_dir=DataDir}}
end;
{error, Reason1} ->
{error, Reason1, State}
end.
{ok, State#state{ref = undefined}}.

%% @doc Returns true if this bitcasks backend contains any
%% non-tombstone values; otherwise returns false.
Expand Down Expand Up @@ -737,7 +726,7 @@ drop_test() ->
%% Stop the backend
ok = stop(State1),
os:cmd("rm -rf test/bitcask-backend/*"),
?assertEqual(["42", "auto_cleanup"], lists:sort(DataDirs)),
?assertEqual(["auto_cleanup"], lists:sort(DataDirs)),
%% The drop cleanup happens in a separate process so
%% there is no guarantee it has happened yet when
%% this test runs.
Expand Down
84 changes: 13 additions & 71 deletions src/riak_kv_delete.erl
Expand Up @@ -185,11 +185,10 @@ get_w_options(Bucket, Options) ->
-ifdef(TEST).

delete_test_() ->
cleanup(ignored_arg),
%% Execute the test cases
{ foreach,
fun setup/0,
fun cleanup/1,
{foreach,
setup(),
cleanup(),
[
fun invalid_r_delete/0,
fun invalid_rw_delete/0,
Expand Down Expand Up @@ -286,75 +285,18 @@ invalid_pw_delete() ->
?assert(false)
end.

setup() ->
%% Shut logging up - too noisy.
application:load(sasl),
application:set_env(sasl, sasl_error_logger, {file, "riak_kv_delete_test_sasl.log"}),
error_logger:tty(false),
error_logger:logfile({open, "riak_kv_delete_test.log"}),
%% Start erlang node
TestNode = list_to_atom("testnode" ++ integer_to_list(element(3, now())) ++
integer_to_list(element(2, now()))),
case net_kernel:start([TestNode, shortnames]) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
ok
end,
do_dep_apps(start, dep_apps()),
application:set_env(riak_core, default_bucket_props, [{r, quorum},
{w, quorum}, {pr, 0}, {pw, 0}, {rw, quorum}, {n_val, 3}]),
%% There's some weird interaction with the quickcheck tests in put_fsm_eqc
%% that somehow makes the riak_kv_delete sup not be running if those tests
%% run before these. I'm sick of trying to figure out what is not being
%% cleaned up right, thus the following workaround.
case whereis(riak_kv_delete_sup) of
undefined ->
{ok, _} = riak_kv_delete_sup:start_link();
_ ->
ok
end,
riak_kv_get_fsm_sup:start_link(),
timer:sleep(500).

cleanup(_Pid) ->
do_dep_apps(stop, lists:reverse(dep_apps())),
catch exit(whereis(riak_kv_vnode_master), kill), %% Leaks occasionally
catch exit(whereis(riak_sysmon_filter), kill), %% Leaks occasionally
catch unlink(whereis(riak_kv_get_fsm_sup)),
catch unlink(whereis(riak_kv_delete_sup)),
catch exit(whereis(riak_kv_get_fsm_sup), kill), %% Leaks occasionally
catch exit(whereis(riak_kv_delete_sup), kill), %% Leaks occasionally
net_kernel:stop(),
%% Reset the riak_core vnode_modules
application:unset_env(riak_core, default_bucket_props),
application:unset_env(sasl, sasl_error_logger),
error_logger:tty(true),
application:set_env(riak_core, vnode_modules, []).
configure(load) ->
application:set_env(riak_core, default_bucket_props,
[{r, quorum}, {w, quorum}, {pr, 0}, {pw, 0},
{rw, quorum}, {n_val, 3}]),
application:set_env(riak_kv, storage_backend, riak_kv_memory_backend);
configure(_) -> ok.

dep_apps() ->
SetupFun =
fun(start) ->
%% Set some missing env vars that are normally
%% part of release packaging.
application:set_env(riak_core, ring_creation_size, 64),
application:set_env(riak_kv, storage_backend, riak_kv_memory_backend),
%% Create a fresh ring for the test
Ring = riak_core_ring:fresh(),
riak_core_ring_manager:set_ring_global(Ring),
setup() ->
riak_kv_test_util:common_setup(?MODULE, fun configure/1).

%% Start riak_kv
timer:sleep(500);
(stop) ->
ok
end,
XX = fun(_) -> error_logger:info_msg("Registered: ~w\n", [lists:sort(registered())]) end,
[sasl, crypto, riak_sysmon, webmachine, XX, riak_core, XX, luke, erlang_js,
inets, mochiweb, os_mon, SetupFun, riak_kv].
cleanup() ->
riak_kv_test_util:common_cleanup(?MODULE, fun configure/1).

do_dep_apps(StartStop, Apps) ->
lists:map(fun(A) when is_atom(A) -> application:StartStop(A);
(F) -> F(StartStop)
end, Apps).

-endif.
35 changes: 25 additions & 10 deletions src/riak_kv_eleveldb_backend.erl
Expand Up @@ -106,8 +106,13 @@ start(Partition, Config) ->

%% @doc Stop the eleveldb backend
-spec stop(state()) -> ok.
stop(_State) ->
%% No-op; GC handles cleanup
stop(State) ->
case State#state.ref of
undefined ->
ok;
_ ->
eleveldb:close(State#state.ref)
end,
ok.

%% @doc Retrieve an object from the eleveldb backend
Expand Down Expand Up @@ -277,14 +282,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{fold_opts=FoldOpts,
%% and return a fresh reference.
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(State0) ->
eleveldb:close(State0#state.ref),
case eleveldb:destroy(State0#state.data_root, []) of
ok ->
case open_db(State0) of
{ok, State} ->
{ok, State};
{error, Reason} ->
{error, Reason, State0}
end;
{ok, State0#state{ref = undefined}};
{error, Reason} ->
{error, Reason, State0}
end.
Expand Down Expand Up @@ -341,6 +342,21 @@ init_state(DataRoot, Config) ->
%% Use read options for folding, but FORCE fill_cache to false
FoldOpts = lists:keystore(fill_cache, 1, ReadOpts, {fill_cache, false}),

%% Warn if block_size is set
SSTBS = proplists:get_value(sst_block_size, OpenOpts, false),
BS = proplists:get_value(block_size, OpenOpts, false),
case BS /= false andalso SSTBS == false of
true ->
lager:warning("eleveldb block_size has been renamed sst_block_size "
"and the current setting of ~p is being ignored. "
"Changing sst_block_size is strongly cautioned "
"against unless you know what you are doing. Remove "
"block_size from app.config to get rid of this "
"message.\n", [BS]);
_ ->
ok
end,

%% Generate a debug message with the options we'll use for each operation
lager:debug("Datadir ~s options for LevelDB: ~p\n",
[DataRoot, [{open, OpenOpts}, {read, ReadOpts}, {write, WriteOpts}, {fold, FoldOpts}]]),
Expand Down Expand Up @@ -542,8 +558,7 @@ eqc_test_() ->
[?_assertEqual(true,
backend_eqc:test(?MODULE, false,
[{data_root,
"test/eleveldb-backend"},
{async_folds, false}]))]},
"test/eleveldb-backend"}]))]},
{timeout, 60000,
[?_assertEqual(true,
backend_eqc:test(?MODULE, false,
Expand Down
13 changes: 7 additions & 6 deletions src/riak_kv_map_master.erl
Expand Up @@ -23,7 +23,7 @@
-module(riak_kv_map_master).
-include_lib("riak_kv_js_pools.hrl").

-behaviour(gen_server2).
-behaviour(riak_core_gen_server).

%% API
-export([start_link/0,
Expand All @@ -47,17 +47,18 @@
next}).

new_mapper({_, Node}=VNode, QTerm, MapInputs, PhasePid) ->
gen_server2:pcall({?SERVER, Node}, 5, {new_mapper, VNode,
QTerm, MapInputs, PhasePid}, infinity).
riak_core_gen_server:pcall({?SERVER, Node}, 5,
{new_mapper, VNode, QTerm, MapInputs, PhasePid},
infinity).

queue_depth() ->
Nodes = [node()|nodes()],
[{Node, gen_server2:pcall({?SERVER, Node}, 0, queue_depth,
infinity)} || Node <- Nodes].
[{Node, riak_core_gen_server:pcall({?SERVER, Node}, 0, queue_depth,
infinity)} || Node <- Nodes].


start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
riak_core_gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

init([]) ->
process_flag(trap_exit, true),
Expand Down
1 change: 1 addition & 0 deletions src/riak_kv_multi_backend.erl
Expand Up @@ -603,6 +603,7 @@ extra_callback_test() ->
application:stop(bitcask).

bad_config_test() ->
application:unset_env(riak_kv, multi_backend),
ErrorReason = multi_backend_config_unset,
?assertEqual({error, ErrorReason}, start(0, [])).

Expand Down

0 comments on commit df1da36

Please sign in to comment.