Permalink
Browse files

Merge branch 'buffered_writes'

  • Loading branch information...
2 parents 4fabb7c + 97b3431 commit 949e4042ca0fbc8e5041ee56eee16420d50bd50f Cliff Moon committed Feb 22, 2009
Showing with 144 additions and 29 deletions.
  1. +3 −2 elibs/configuration.erl
  2. +39 −24 elibs/storage_server.erl
  3. +5 −1 etest/mock.erl
  4. +96 −1 etest/storage_server_test.erl
  5. +1 −1 include/config.hrl
View
5 elibs/configuration.erl
@@ -54,7 +54,8 @@ set_config(Config) ->
gen_server:call(configuration, {set_config, Config}).
stop() ->
- gen_server:cast(configuration, stop).
+ erase(config),
+ gen_server:cast(configuration, stop).
%%====================================================================
@@ -155,7 +156,7 @@ pick_node_and_merge(Config, Nodes) ->
merge_configs(Remote, Local) ->
%we need to merge in any cluster invariants
- merge_configs([n, r, w, q, storage_mod, blocksize], Remote, Local).
+ merge_configs([n, r, w, q, storage_mod, blocksize, buffered_writes], Remote, Local).
merge_configs([], Remote, Merged) -> Merged;
View
63 elibs/storage_server.erl
@@ -24,6 +24,7 @@
-include("chunk_size.hrl").
-include("common.hrl").
+-include("config.hrl").
-ifdef(TEST).
-include("etest/storage_server_test.erl").
@@ -130,10 +131,10 @@ close(Name, Timeout) ->
%% @end
%%--------------------------------------------------------------------
init({StorageModule,DbKey,Name,Min,Max,BlockSize}) ->
- %% ?debugMsg("storage_server init"),
+ Config = configuration:get_config(),
+ load_config_into_dict(Config),
process_flag(trap_exit, true), % need to trap exits to deal with merkle issues. gotta do shit the hard way.
{ok, Table} = StorageModule:open(DbKey,Name),
- %% ?debugFmt("storage table ~p", [Table]),
DMName = filename:join([DbKey, "dmerkle.idx"]),
V = if
BlockSize == undefined -> {ok, undefined};
@@ -153,8 +154,6 @@ init({StorageModule,DbKey,Name,Min,Max,BlockSize}) ->
end),
T
end,
-
- %% ?debugFmt("dmerkle tree ~p", [Tree]),
{ok, #storage{module=StorageModule,dbkey=DbKey,blocksize=BlockSize,table=Table,name=Name,tree=Tree}}.
%%--------------------------------------------------------------------
@@ -189,22 +188,8 @@ handle_call({get, Key}, {RemotePid, _Tag}, State = #storage{module=Module,table=
end;
handle_call({put, Key, Context, ValIn}, _From, State = #storage{module=Module,table=Table,tree=Tree}) ->
- %% ?debugFmt("handle_call put ~p", [Key]),
- Values = lib_misc:listify(ValIn),
- ?prof(outer_put),
- R = case Context of
- {clobber, Context2} -> internal_put(Key, Context2, Values, Tree, Table, Module, State);
- _ ->
- case (catch Module:get(sanitize_key(Key), Table)) of
- {ok, {ReadContext, ReadValues}} ->
- {ResolvedContext, ResolvedValues} = vector_clock:resolve({ReadContext, ReadValues}, {Context, Values}),
- internal_put(Key, ResolvedContext, ResolvedValues, Tree, Table, Module, State);
- {ok, not_found} -> internal_put(Key, Context, Values, Tree, Table, Module, State);
- Failure -> {reply, Failure, State}
- end
- end,
- ?forp(outer_put),
- R;
+ {Reply, NewState} = inside_process_put(Key, Context, ValIn, State),
+ {reply, Reply, NewState};
handle_call({has_key, Key}, _From, State = #storage{module=Module,table=Table}) ->
{reply, catch Module:has_key(sanitize_key(Key),Table), State};
@@ -274,6 +259,10 @@ handle_call(close, _From, State) ->
%% @doc Handling cast messages
%% @end
%%--------------------------------------------------------------------
+handle_cast({put, Key, Context, ValIn}, State) ->
+ {_, NewState} = inside_process_put(Key, Context, ValIn, State),
+ {noreply, NewState};
+
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -314,12 +303,38 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
+load_config_into_dict(#config{buffered_writes=BufferedWrites}) ->
+ put(buffered_writes, BufferedWrites).
+
int_put(Name, Key, Context, Value, Timeout) ->
- gen_server:call(Name, {put, Key, Context, Value}, Timeout).
+ Config = configuration:get_config(),
+ case Config#config.buffered_writes of
+ true ->
+ gen_server:cast(Name, {put, Key, Context, Value});
+ undefined -> gen_server:call(Name, {put, Key, Context, Value}, Timeout)
+ end.
+
+inside_process_put(Key, Context, ValIn, State = #storage{module=Module,table=Table,tree=Tree}) ->
+ Values = lib_misc:listify(ValIn),
+ ?prof(outer_put),
+ R = case Context of
+ {clobber, Context2} -> internal_put(Key, Context2, Values, Tree, Table, Module, State);
+ _ ->
+ case (catch Module:get(sanitize_key(Key), Table)) of
+ {ok, {ReadContext, ReadValues}} ->
+ {ResolvedContext, ResolvedValues} = vector_clock:resolve({ReadContext, ReadValues}, {Context, Values}),
+ internal_put(Key, ResolvedContext, ResolvedValues, Tree, Table, Module, State);
+ {ok, not_found} -> internal_put(Key, Context, Values, Tree, Table, Module, State);
+ Failure -> {Failure, State}
+ end
+ end,
+ ?forp(outer_put),
+ R.
% we want to pre-arrange a rendevous so as to not block the storage server
% blocking whomever is local is perfectly ok
stream(Name, Key, Context, Value) ->
+ Config = configuration:get_config(),
Ref = make_ref(),
Pid = gen_server:call(Name, {streaming_put, Ref}),
stream:send(Pid, Ref, {{Key, Context}, lib_misc:listify(Value)}),
@@ -340,7 +355,7 @@ internal_put(Key, Context, Values, Tree, Table, Module, State) ->
?forp(dmerkle_update),
T
end,
- TableFun = fun() ->
+ TableFun = fun() ->
?prof(put),
T = Module:put(sanitize_key(Key), vector_clock:truncate(Context), Values, Table),
?forp(put),
@@ -350,8 +365,8 @@ internal_put(Key, Context, Values, Tree, Table, Module, State) ->
case TableResult of
{ok, ModifiedTable} ->
stats_server:request(put, iolist_size(Values)),
- {reply, ok, State#storage{table=ModifiedTable,tree=UpdatedTree}};
- Failure -> {reply, Failure, State}
+ {ok, State#storage{table=ModifiedTable,tree=UpdatedTree}};
+ Failure -> {Failure, State}
end.
sanitize_key(Key) when is_atom(Key) -> atom_to_list(Key);
View
6 etest/mock.erl
@@ -12,7 +12,7 @@
-author('cliff@powerset.com').
%% API
--export([mock/1, proxy_call/2, proxy_call/3, expects/4, expects/5, verify/1, stub_proxy_call/3, stub_function/4, stop/1]).
+-export([mock/1, proxy_call/2, proxy_call/3, expects/4, expects/5, verify_and_stop/1, verify/1, stub_proxy_call/3, stub_function/4, stop/1]).
-include("common.hrl").
-include_lib("eunit/include/eunit.hrl").
@@ -79,6 +79,10 @@ expects(Module, Function, Args, Ret, Times) ->
stub(Module, Function, Args, Ret) ->
gen_server:call(mod_to_name(Module), {stub, Function, Args, Ret}).
+verify_and_stop(Module) ->
+ verify(Module),
+ stop(Module).
+
verify(Module) ->
?assertEqual(ok, gen_server:call(mod_to_name(Module), verify)).
View
97 etest/storage_server_test.erl
@@ -1,15 +1,19 @@
-include_lib("eunit.hrl").
store_conflicting_versions_test() ->
+ configuration:start_link(#config{}),
{ok, Pid} = storage_server:start_link(dets_storage, db_key(confl), store, 0, (2 bsl 31), 4096),
A = vector_clock:create(a),
B = vector_clock:create(b),
storage_server:put(Pid, "key", A, ["blah"]),
storage_server:put(Pid, "key", B, ["blah2"]),
?assertMatch({ok, {_, ["blah", "blah2"]}}, storage_server:get(Pid, "key")),
+ configuration:stop(),
+ timer:sleep(1),
storage_server:close(Pid).
couch_storage_test() ->
+ configuration:start_link(#config{}),
CouchFile = filename:join(priv_dir(), "couch"),
{ok, State} = couch_storage:open(CouchFile, storage_test),
{ok, St2} = couch_storage:put("key_one", context, <<"value one">>, State),
@@ -24,9 +28,12 @@ couch_storage_test() ->
{ok, false} = couch_storage:has_key("key_one", St5),
{ok, true} = couch_storage:has_key("key_two", St5),
{ok, St6} = couch_storage:delete("key_two", St5),
+ configuration:stop(),
+ timer:sleep(1),
couch_storage:close(St6).
dict_storage_test() ->
+ configuration:start_link(#config{}),
{ok, Pid} = storage_server:start_link(
dict_storage, db_key(dict), store, 0, (2 bsl 31), 4096),
?debugFmt("storage server at ~p", [Pid]),
@@ -39,9 +46,12 @@ dict_storage_test() ->
storage_server:delete(store, "key"),
{ok, false} = storage_server:has_key(store, "key"),
storage_server:close(store),
+ configuration:stop(),
+ timer:sleep(1),
receive _ -> true end.
mnesia_storage_test() ->
+ configuration:start_link(#config{}),
mnesia:stop(),
application:set_env(mnesia, dir, priv_dir()),
{ok, Pid} = storage_server:start_link(mnesia_storage, db_key(mnesia), store2, 0, (2 bsl 31), 4096),
@@ -63,16 +73,21 @@ mnesia_storage_test() ->
{ok,true} = storage_server:has_key(store2, "key_two"),
storage_server:delete(store2, "key_two"),
exit(Pid, shutdown),
+ configuration:stop(),
+ timer:sleep(1),
receive _ -> true end.
mnesia_large_value_test() ->
+ configuration:start_link(#config{}),
crypto:start(), %% filename generation uses crypto:sha
mnesia:stop(),
application:set_env(mnesia, dir, priv_dir()),
{ok, Pid} = storage_server:start_link(mnesia_storage, db_key(mnesia), store3, 0, (2 bsl 31), 4096),
Val = big_val(2048),
ok = storage_server:put(store3, "key_one", [], Val),
- {ok, {_Context, [Val]}} = storage_server:get(store3, "key_one").
+ {ok, {_Context, [Val]}} = storage_server:get(store3, "key_one"),
+ configuration:stop(),
+ timer:sleep(1).
% concurrent_update_test() ->
% process_flag(trap_exit, true),
@@ -93,6 +108,7 @@ mnesia_large_value_test() ->
% receive {'EXIT', P2, _} -> ok end.
rebuild_merkle_trees_test() ->
+ configuration:start_link(#config{}),
{ok, _} = mock:mock(dmerkle),
{ok, _} = mock:mock(dets_storage),
mock:expects(dmerkle, open, fun(_) -> true end, {error, "Poop, fart. Balls."}),
@@ -108,9 +124,12 @@ rebuild_merkle_trees_test() ->
mock:verify(dets_storage),
mock:stop(dmerkle),
mock:stop(dets_storage),
+ configuration:stop(),
+ timer:sleep(1),
storage_server:close(Pid).
streaming_put_test() ->
+ configuration:start_link(#config{}),
{ok, _} = mock:mock(dmerkle),
{ok, _} = mock:mock(dets_storage),
mock:expects(dmerkle, open, fun(_) -> true end, {ok, pid}),
@@ -128,8 +147,84 @@ streaming_put_test() ->
mock:verify(dets_storage),
mock:stop(dmerkle),
mock:stop(dets_storage),
+ configuration:stop(),
+ timer:sleep(1),
storage_server:close(Pid).
+buffered_test_loop(Called) ->
+ receive
+ put -> buffered_test_loop(true);
+ {called, Pid} ->
+ Pid ! {called, Called},
+ buffered_test_loop(Called)
+ end.
+
+interrogate_test_loop(Pid) ->
+ Pid ! {called, self()},
+ receive
+ {called, Called} -> Called
+ end.
+
+buffered_small_write_test() ->
+ configuration:start_link(#config{buffered_writes=true}),
+ {ok, _} = mock:mock(dmerkle),
+ {ok, _} = mock:mock(dets_storage),
+ mock:expects(dmerkle, open, fun(_) -> true end, {ok, pid}),
+ mock:expects(dets_storage, open, fun(_) -> true end, {ok, table}),
+ Bits = 10 * 8,
+ Bin = <<0:Bits>>,
+ % race conditions ahoy cap'n
+ Pid = spawn(fun() -> buffered_test_loop(false) end),
+ mock:expects(dets_storage, put, fun({_, _, [Val], table}) -> Val == Bin end, fun(_, _) ->
+ timer:sleep(200), %we sleep to simulate a long write so we can test that shit returns b4 write is complete
+ ?debugMsg("processing put"),
+ Pid ! put, %hence a buffered write. hopefully this won't cause the cluster to explode
+ {ok, table}
+ end),
+ mock:expects(dets_storage, get, fun({Key, Table}) -> Key == "key" end, {ok, not_found}),
+ mock:expects(dmerkle, update, fun(_) -> true end, fun(_, _) -> self() end),
+ {ok, Store} = storage_server:start_link(dets_storage, db_key(buff_test), store7, 0, (2 bsl 31), 4096),
+ int_put(Store, "key", ctx, Bin, 1000),
+ ?assertEqual(false, interrogate_test_loop(Pid)),
+ timer:sleep(100), %this should work, yes? icky.
+ mock:verify_and_stop(dmerkle),
+ mock:verify_and_stop(dets_storage),
+ ?assertEqual(true, interrogate_test_loop(Pid)),
+ exit(Pid, shutdown),
+ configuration:stop(),
+ timer:sleep(1),
+ storage_server:close(Store).
+
+buffered_stream_write_test() ->
+ configuration:start_link(#config{buffered_writes=true}),
+ {ok, _} = mock:mock(dmerkle),
+ {ok, _} = mock:mock(dets_storage),
+ mock:expects(dmerkle, open, fun(_) -> true end, {ok, pid}),
+ mock:expects(dets_storage, open, fun(_) -> true end, {ok, table}),
+ Bits = 10000 * 8,
+ Bin = <<0:Bits>>,
+ % race conditions ahoy cap'n
+ Pid = spawn(fun() -> buffered_test_loop(false) end),
+ mock:expects(dets_storage, put, fun({_, _, [Val], table}) -> Val == Bin end, fun(_, _) ->
+ timer:sleep(200), %we sleep to simulate a long write so we can test that shit returns b4 write is complete
+ ?debugMsg("processing put"),
+ Pid ! put, %hence a buffered write. hopefully this won't cause the cluster to explode
+ {ok, table}
+ end),
+ mock:expects(dets_storage, get, fun({Key, Table}) -> Key == "key" end, {ok, not_found}),
+ mock:expects(dmerkle, update, fun(_) -> true end, fun(_, _) -> self() end),
+ {ok, Store} = storage_server:start_link(dets_storage, db_key(buff_test), store7, 0, (2 bsl 31), 4096),
+ stream(Store, "key", ctx, Bin),
+ ?debugHere,
+ ?assertEqual(false, interrogate_test_loop(Pid)),
+ timer:sleep(100), %this should work, yes? icky.
+ mock:verify_and_stop(dmerkle),
+ mock:verify_and_stop(dets_storage),
+ ?assertEqual(true, interrogate_test_loop(Pid)),
+ exit(Pid, shutdown),
+ configuration:stop(),
+ timer:sleep(1),
+ storage_server:close(Store).
%
% local_fs_storage_test() ->
% {ok, State} = fs_storage:open("/Users/cliff/data/storage_test", storage_test),
View
2 include/config.hrl
@@ -3,6 +3,6 @@
-define(CONFIG_HRL, true).
%we don't want to turn protocol buffers on by default, since the library is not included
%it should be very easy for new users to start up an instance
--record(config, {n=3, r=1, w=1, q=6, directory, web_port, text_port=11222, storage_mod=dets_storage, blocksize=4096, thrift_port=9200, pb_port=undefined}).
+-record(config, {n=3, r=1, w=1, q=6, directory, web_port, text_port=11222, storage_mod=dets_storage, blocksize=4096, thrift_port=9200, pb_port=undefined, buffered_writes=undefined}).
-endif.

0 comments on commit 949e404

Please sign in to comment.