diff --git a/.hgtags b/.hgtags new file mode 100644 index 0000000000..c3e7f9825f --- /dev/null +++ b/.hgtags @@ -0,0 +1,3 @@ +e716ebd150ff8698a89a1ae28bc868385a164497 riak_kv-0.13.0rc1 +04f5cfd0b8ca7c195e67658367afa5625c186218 riak_kv-0.13.0rc2 +a5e0a7b843b52fe846b8006543d1484b548b9a18 riak_kv-0.13.0rc3 diff --git a/ebin/riak_kv.app b/ebin/riak_kv.app index a3435360ab..635a20118f 100644 --- a/ebin/riak_kv.app +++ b/ebin/riak_kv.app @@ -3,8 +3,9 @@ {application, riak_kv, [ {description, "Riak Key/Value Store"}, - {vsn, "0.12.0"}, + {vsn, "0.13.0rc3"}, {modules, [ + lk, raw_link_walker, riak, riak_client, @@ -23,8 +24,12 @@ riak_kv_js_manager, riak_kv_js_sup, riak_kv_js_vm, + riak_kv_keylister, + riak_kv_keylister_master, + riak_kv_keylister_sup, riak_kv_keys_fsm, riak_kv_legacy_vnode, + riak_kv_lru, riak_kv_map_executor, riak_kv_map_localphase, riak_kv_map_phase, diff --git a/include/riak_kv_vnode.hrl b/include/riak_kv_vnode.hrl index f5491b69ae..f08e71fd30 100644 --- a/include/riak_kv_vnode.hrl +++ b/include/riak_kv_vnode.hrl @@ -15,6 +15,11 @@ bucket :: binary(), req_id :: non_neg_integer()}). +-record(riak_kv_listkeys_req_v2, { + bucket :: binary(), + req_id :: non_neg_integer(), + caller :: pid()}). + -record(riak_kv_delete_req_v1, { bkey :: {binary(), binary()}, req_id :: non_neg_integer()}). @@ -22,7 +27,8 @@ -record(riak_kv_map_req_v1, { bkey :: {binary(), binary()}, qterm :: term(), - keydata :: term()}). + keydata :: term(), + from :: term()}). -record(riak_kv_vclock_req_v1, { bkeys = [] :: [{binary(), binary()}] @@ -30,10 +36,7 @@ -define(KV_PUT_REQ, #riak_kv_put_req_v1). -define(KV_GET_REQ, #riak_kv_get_req_v1). --define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v1). +-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v2). -define(KV_DELETE_REQ, #riak_kv_delete_req_v1). -define(KV_MAP_REQ, #riak_kv_map_req_v1). -define(KV_VCLOCK_REQ, #riak_kv_vclock_req_v1). - - - diff --git a/rebar.config b/rebar.config index 0552d631bb..9d3fbc9811 100644 --- a/rebar.config +++ b/rebar.config @@ -3,16 +3,16 @@ {erl_opts, [debug_info, fail_on_warning]}. {deps, [ - {riak_core, "0.12.0", {hg, "http://bitbucket.org/basho/riak_core", - "tip"}}, - {riakc, "0.2.0", {hg, "http://bitbucket.org/basho/riak-erlang-client", - "tip"}}, - {luke, "\.*", {hg, "http://bitbucket.org/basho/luke", - "tip"}}, - {erlang_js, "0\.4", {hg, "http://bitbucket.org/basho/erlang_js", - "erlang_js-0.4"}}, - {bitcask, "1.0.2", {hg, "http://bitbucket.org/basho/bitcask", - "bitcask-1.0.2"}}, - {ebloom, "1.0.1", {hg, "http://bitbucket.org/basho/ebloom", - "ebloom-1.0.1"}} + {riak_core, "0.13.0rc3", {hg, "http://bitbucket.org/basho/riak_core", + "riak_core-0.13.0rc3"}}, + {riakc, "1.0.0", {hg, "http://bitbucket.org/basho/riak-erlang-client", + "riakc-1.0.0"}}, + {luke, "0.2.1", {hg, "http://bitbucket.org/basho/luke", + "luke-0.2.1"}}, + {erlang_js, "0.4.1", {hg, "http://bitbucket.org/basho/erlang_js", + "90"}}, + {bitcask, "1.1.1", {hg, "http://bitbucket.org/basho/bitcask", + "bitcask-1.1.1"}}, + {ebloom, "1.0.2", {hg, "http://bitbucket.org/basho/ebloom", + "ebloom-1.0.2"}} ]}. diff --git a/src/lk.erl b/src/lk.erl new file mode 100644 index 0000000000..000b63a515 --- /dev/null +++ b/src/lk.erl @@ -0,0 +1,89 @@ +%% ------------------------------------------------------------------- +%% +%% lk: Helper functions for list keys +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(lk). + +-export([fsm/1, pn/1]). + +fsm(Bucket) -> + ReqId = random:uniform(10000), + Start = erlang:now(), + riak_kv_keys_fsm:start(ReqId, Bucket, 60000, plain, 0.0001, self()), + {ok, Count} = gather_fsm_results(ReqId, 0), + io:format("~n"), + End = erlang:now(), + Ms = erlang:round(timer:now_diff(End, Start) / 1000), + io:format("Found ~p keys in ~pms.~n", [Count, Ms]). + +pn(Bucket) -> + ReqId = random:uniform(10000), + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + {ok, Bloom} = ebloom:new(10000000,0.0001,ReqId), + BucketProps = riak_core_bucket:get_bucket(Bucket, Ring), + N = proplists:get_value(n_val,BucketProps), + PLS = lists:flatten(riak_core_ring:all_preflists(Ring,N)), + Nodes = [node()|nodes()], + Start = erlang:now(), + start_listers(Nodes, ReqId, Bucket, PLS), + {ok, Count} = gather_pn_results(ReqId, Bloom, length(Nodes), 0), + End = erlang:now(), + Ms = erlang:round(timer:now_diff(End, Start) / 1000), + io:format("Found ~p keys in ~pms.~n", [Count, Ms]). + +gather_fsm_results(ReqId, Count) -> + receive + {ReqId, {keys, Keys}} -> + io:format("."), + gather_fsm_results(ReqId, Count + length(Keys)); + {ReqId, done} -> + {ok, Count} + after 120000 -> + {error, timeout} + end. + +start_listers([], _ReqId, _Bucket, _VNodes) -> + ok; +start_listers([H|T], ReqId, Bucket, VNodes) -> + riak_kv_keylister_master:start_keylist(H, ReqId, self(), Bucket, VNodes, 60000), + start_listers(T, ReqId, Bucket, VNodes). + +gather_pn_results(_, BF, 0, Count) -> + ebloom:clear(BF), + {ok, Count}; +gather_pn_results(ReqId, BF, NodeCount, Count) -> + %%io:format("NodeCount: ~p, key count: ~p~n", [NodeCount, Count]), + receive + {ReqId, {kl, Keys0}} -> + F = fun(Key, Acc) -> + case ebloom:contains(BF, Key) of + false -> + ebloom:insert(BF, Key), + [Key|Acc]; + true -> + Acc + end end, + Keys = lists:foldl(F, [], Keys0), + gather_pn_results(ReqId, BF, NodeCount, Count + length(Keys)); + {ReqId, done} -> + gather_pn_results(ReqId, BF, NodeCount - 1, Count) + after 10000 -> + {error, timeout} + end. diff --git a/src/raw_link_walker.erl b/src/raw_link_walker.erl index ec7ba38193..7b83a53743 100644 --- a/src/raw_link_walker.erl +++ b/src/raw_link_walker.erl @@ -1,19 +1,24 @@ +%% ------------------------------------------------------------------- +%% +%% raw_link_walker: Backwards compatibility module for link traversal +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file %% except in compliance with the License. You may obtain %% a copy of the License at - +%% %% http://www.apache.org/licenses/LICENSE-2.0 - +%% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. - -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. - +%% +%% ------------------------------------------------------------------- -module(raw_link_walker). -export([mapreduce_linkfun/3]). diff --git a/src/riak_client.erl b/src/riak_client.erl index a564964d3c..d7708498e6 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -29,6 +29,7 @@ -export([mapred_stream/2,mapred_stream/3,mapred_stream/4]). -export([mapred_bucket/2,mapred_bucket/3,mapred_bucket/4]). -export([mapred_bucket_stream/3,mapred_bucket_stream/4,mapred_bucket_stream/6]). +-export([mapred_dynamic_inputs_stream/3]). -export([get/2, get/3,get/4]). -export([put/1, put/2,put/3,put/4,put/5]). -export([delete/2,delete/3,delete/4]). @@ -163,9 +164,21 @@ mapred_bucket(Bucket, Query, ResultTransformer, Timeout, ErrorTolerance) -> ResultTransformer, Timeout, ErrorTolerance), luke_flow:collect_output(MR_ReqId, Timeout). -%% -%% +-define(PRINT(Var), io:format("DEBUG: ~p:~p - ~p~n~n ~p~n~n", [?MODULE, ?LINE, ??Var, Var])). + +%% An InputDef defines a Module and Function to call to generate +%% inputs for a map/reduce job. Should return {ok, +%% LukeReqID}. Ideally, we'd combine both the other input types (BKeys +%% and Bucket) into this approach, but postponing until after a code +%% review of Map/Reduce. +mapred_dynamic_inputs_stream(FSMPid, InputDef, Timeout) -> + case InputDef of + {modfun, Mod, Fun, Options} -> + Mod:Fun(FSMPid, Options, Timeout); + _ -> + throw({invalid_inputdef, InputDef}) + end. %% @spec get(riak_object:bucket(), riak_object:key()) -> %% {ok, riak_object:riak_object()} | @@ -431,7 +444,7 @@ get_bucket(BucketName) -> %% @spec reload_all(Module :: atom()) -> term() %% @doc Force all Riak nodes to reload Module. %% This is used when loading new modules for map/reduce functionality. -reload_all(Module) -> rpc:call(Node,riak_kv_util,reload_all,[Module]). +reload_all(Module) -> rpc:call(Node,riak_core_util,reload_all,[Module]). %% @spec remove_from_cluster(ExitingNode :: atom()) -> term() %% @doc Cause all partitions owned by ExitingNode to be taken over diff --git a/src/riak_kv_app.erl b/src/riak_kv_app.erl index 5014629a33..f0ad452d95 100644 --- a/src/riak_kv_app.erl +++ b/src/riak_kv_app.erl @@ -74,7 +74,7 @@ start(_Type, _StartArgs) -> case riak_kv_sup:start_link() of {ok, Pid} -> %% Go ahead and mark the riak_kv service as up in the node watcher. - %% The riak_kv_ring_handler blocks until all vnodes have been started + %% The riak_core_ring_handler blocks until all vnodes have been started %% synchronously. riak_core:register_vnode_module(riak_kv_vnode), riak_core_node_watcher:service_up(riak_kv, self()), diff --git a/src/riak_kv_backend.erl b/src/riak_kv_backend.erl index df32771dda..e9c6b3d7a1 100644 --- a/src/riak_kv_backend.erl +++ b/src/riak_kv_backend.erl @@ -1,3 +1,25 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_backend: Riak backend behaviour +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + -module(riak_kv_backend). -export([behaviour_info/1]). -export([callback_after/3]). diff --git a/src/riak_kv_bitcask_backend.erl b/src/riak_kv_bitcask_backend.erl index 448d187a60..d8ed1343ea 100644 --- a/src/riak_kv_bitcask_backend.erl +++ b/src/riak_kv_bitcask_backend.erl @@ -33,15 +33,17 @@ list/1, list_bucket/2, fold/3, + fold_keys/3, drop/1, is_empty/1, callback/3]). - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. +-include_lib("bitcask/include/bitcask.hrl"). + -define(MERGE_CHECK_INTERVAL, timer:minutes(3)). start(Partition, _Config) -> @@ -115,15 +117,38 @@ list({Ref, _}) -> Other end. -list_bucket(State, {filter, Bucket, Fun}) -> - [K || {B, K} <- ?MODULE:list(State), - B =:= Bucket, - Fun(K)]; -list_bucket(State, '_') -> - [B || {B, _K} <- ?MODULE:list(State)]; -list_bucket(State, Bucket) -> - [K || {B, K} <- ?MODULE:list(State), B =:= Bucket]. - +list_bucket({Ref, _}, {filter, Bucket, Fun}) -> + bitcask:fold_keys(Ref, + fun(#bitcask_entry{key=BK},Acc) -> + {B,K} = binary_to_term(BK), + case B of + Bucket -> + case Fun(K) of + true -> [K|Acc]; + false -> Acc + end; + _ -> + Acc + end + end, []); +list_bucket({Ref, _}, '_') -> + bitcask:fold_keys(Ref, + fun(#bitcask_entry{key=BK},Acc) -> + {B,_K} = binary_to_term(BK), + case lists:member(B,Acc) of + true -> Acc; + false -> [B|Acc] + end + end, []); +list_bucket({Ref, _}, Bucket) -> + bitcask:fold_keys(Ref, + fun(#bitcask_entry{key=BK},Acc) -> + {B,K} = binary_to_term(BK), + case B of + Bucket -> [K|Acc]; + _ -> Acc + end + end, []). fold({Ref, _}, Fun0, Acc0) -> %% When folding across the bitcask, the bucket/key tuple must @@ -135,6 +160,11 @@ fold({Ref, _}, Fun0, Acc0) -> end, Acc0). +fold_keys({Ref, _}, Fun, Acc) -> + F = fun(#bitcask_entry{key=K}, Acc1) -> + Fun(binary_to_term(K), Acc1) end, + bitcask:fold_keys(Ref, F, Acc). + drop({Ref, BitcaskRoot}) -> %% todo: once bitcask has a more friendly drop function %% of its own, use that instead. @@ -146,12 +176,12 @@ drop({Ref, BitcaskRoot}) -> is_empty({Ref, _}) -> %% Determining if a bitcask is empty requires us to find at least - %% one value that is NOT a tombstone. Accomplish this by doing a fold - %% that forcibly bails on the very first k/v encountered. - F = fun(_K, _V, _Acc0) -> + %% one value that is NOT a tombstone. Accomplish this by doing a fold_keys + %% that forcibly bails on the very first key encountered. + F = fun(_K, _Acc0) -> throw(found_one_value) end, - case catch(bitcask:fold(Ref, F, undefined)) of + case catch(bitcask:fold_keys(Ref, F, undefined)) of found_one_value -> false; _ -> @@ -200,7 +230,6 @@ schedule_sync(Ref, SyncIntervalMs) when is_reference(Ref) -> schedule_merge(Ref) when is_reference(Ref) -> riak_kv_backend:callback_after(?MERGE_CHECK_INTERVAL, Ref, merge_check). - %% =================================================================== %% EUnit tests %% =================================================================== diff --git a/src/riak_kv_console.erl b/src/riak_kv_console.erl index 1e9feafcde..1ac4cde9da 100644 --- a/src/riak_kv_console.erl +++ b/src/riak_kv_console.erl @@ -24,7 +24,7 @@ -module(riak_kv_console). --export([join/1, leave/1, status/1, reip/1]). +-export([join/1, leave/1, status/1, reip/1, ringready/1, transfers/1]). join([NodeStr]) -> case riak:join(NodeStr) of @@ -73,6 +73,62 @@ reip([OldNode, NewNode]) -> io:format("New ring file written to ~p~n", [element(2, riak_core_ring_manager:find_latest_ringfile())]). +%% Check if all nodes in the cluster agree on the partition assignment +ringready([]) -> + case get_rings() of + {[], Rings} -> + {N1,R1}=hd(Rings), + case rings_match(hash_ring(R1), tl(Rings)) of + true -> + Nodes = [N || {N,_} <- Rings], + io:format("TRUE All nodes agree on the ring ~p\n", [Nodes]); + {false, N2} -> + io:format("FALSE Node ~p and ~p list different partition owners\n", [N1, N2]), + false % make nodetool exit 1 + end; + {Down, _Rings} -> + io:format("FALSE ~p down. All nodes need to be up to check.\n", [Down]), + false % make nodetool exit 1 + end. + +%% Provide a list of nodes with pending partition transfers (i.e. any secondary vnodes) +%% and list any owned vnodes that are *not* running +transfers([]) -> + {Down, Rings} = get_rings(), + case Down of + [] -> + ok; + _ -> + io:format("Nodes ~p are currently down.\n", [Down]) + end, + + %% Work out which vnodes are running and which partitions they claim + F = fun({N,R}, Acc) -> + {_Pri, Sec, Stopped} = partitions(N, R), + case Sec of + [] -> + []; + _ -> + io:format("~p waiting to handoff ~p partitions\n", [N, length(Sec)]), + [{waiting_to_handoff, N, length(Sec)}] + end ++ + case Stopped of + [] -> + []; + _ -> + io:format("~p does not have ~p primary partitions running\n", + [N, length(Stopped)]), + [{stopped, N}] + end ++ + Acc + end, + case lists:foldl(F, [], Rings) of + [] -> + io:format("No transfers active\n"); + _ -> + ok + end. + format_stats([], Acc) -> lists:reverse(Acc); @@ -80,4 +136,59 @@ format_stats([{vnode_gets, V}|T], Acc) -> format_stats(T, [io_lib:format("vnode gets : ~p~n", [V])|Acc]); format_stats([{Stat, V}|T], Acc) -> format_stats(T, [io_lib:format("~p : ~p~n", [Stat, V])|Acc]). + +%% Retrieve the rings for all other nodes by RPC +get_rings() -> + {ok, MyRing} = riak_core_ring_manager:get_my_ring(), + Nodes = riak_core_ring:all_members(MyRing), + {RawRings, Down} = rpc:multicall(Nodes, riak_core_ring_manager, get_my_ring, [], 30000), + Rings = orddict:from_list([{riak_core_ring:owner_node(R), R} || {ok, R} <- RawRings]), + {lists:sort(Down), Rings}. + +%% Produce a hash of the 'chash' portion of the ring +hash_ring(R) -> + erlang:phash2(riak_core_ring:all_owners(R)). + +%% Check if all rings match given a hash and a list of [{N,P}] to check +rings_match(_, []) -> + true; +rings_match(R1hash, [{N2, R2} | Rest]) -> + case hash_ring(R2) of + R1hash -> + rings_match(R1hash, Rest); + _ -> + {false, N2} + end. + +%% Get a list of active partition numbers - regardless of vnode type +active_partitions(Node) -> + lists:foldl(fun({_,P}, Ps) -> + ordsets:add_element(P, Ps) + end, [], running_vnodes(Node)). + + +%% Get a list of running vnodes for a node +running_vnodes(Node) -> + Pids = vnode_pids(Node), + [rpc:call(Node, riak_core_vnode, get_mod_index, [Pid], 30000) || Pid <- Pids]. + +%% Get a list of vnode pids for a node +vnode_pids(Node) -> + [Pid || {_,Pid,_,_} <- supervisor:which_children({riak_core_vnode_sup, Node})]. + +%% Return a list of active primary partitions, active secondary partitions (to be handed off) +%% and stopped partitions that should be started +partitions(Node, Ring) -> + Owners = riak_core_ring:all_owners(Ring), + Owned = ordsets:from_list(owned_partitions(Owners, Node)), + Active = ordsets:from_list(active_partitions(Node)), + Stopped = ordsets:subtract(Owned, Active), + Secondary = ordsets:subtract(Active, Owned), + Primary = ordsets:subtract(Active, Secondary), + {Primary, Secondary, Stopped}. + +%% Return the list of partitions owned by a node +owned_partitions(Owners, Node) -> + [P || {P, Owner} <- Owners, Owner =:= Node]. + diff --git a/src/riak_kv_js_manager.erl b/src/riak_kv_js_manager.erl index b356dd3865..35f0c8a81a 100644 --- a/src/riak_kv_js_manager.erl +++ b/src/riak_kv_js_manager.erl @@ -21,83 +21,117 @@ %% ------------------------------------------------------------------- %% @doc dispatch work to JavaScript VMs - -module(riak_kv_js_manager). -behaviour(gen_server). %% API --export([start_link/1, dispatch/1, blocking_dispatch/1, add_to_manager/0, reload/1, reload/0]). +-export([start_link/1, + add_vm/0, + reload/0, + reload/1, + mark_idle/0, + reserve_vm/0, + dispatch/2, + blocking_dispatch/2, + pool_size/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {tid}). - -dispatch(JSCall) -> - case select_random() of - no_vms -> - {error, no_vms}; - Target -> - JobId = {Target, make_ref()}, - riak_kv_js_vm:dispatch(Target, self(), JobId, JSCall), - {ok, JobId} - end. +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). -blocking_dispatch(JSCall) -> - case select_random() of - no_vms -> - {error, no_vms}; - Target -> - JobId = {Target, make_ref()}, - riak_kv_js_vm:blocking_dispatch(Target, JobId, JSCall) - end. +-define(SERVER, ?MODULE). + +-record('DOWN', {ref, type, pid, info}). +-record(vm_state, {pid, needs_reload=false}). +-record(state, {master, idle, reserve}). + +start_link(ChildCount) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [ChildCount], []). -%% Hack to allow riak-admin to trigger a reload reload([]) -> reload(). - reload() -> - gen_server:call(?MODULE, reload_all_vm). + gen_server:call(?SERVER, reload_vms). -add_to_manager() -> - gen_server:cast(?MODULE, {add_child, self()}). +add_vm() -> + gen_server:cast(?SERVER, {add_vm, self()}). -start_link(ChildCount) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [ChildCount], []). +mark_idle() -> + gen_server:call(?SERVER, {mark_idle, self()}). + +dispatch(JSCall, Tries) -> + dispatch(JSCall, Tries, Tries). + +blocking_dispatch(JSCall, Tries) -> + blocking_dispatch(JSCall, Tries, Tries). + +reserve_vm() -> + gen_server:call(?SERVER, reserve_vm). + +pool_size() -> + gen_server:call(?SERVER, pool_size). init([ChildCount]) -> - Tid = ets:new(?MODULE, [named_table]), - start_children(ChildCount), - {ok, #state{tid=Tid}}. - -handle_call(reload_all_vm, _From, #state{tid=Tid}=State) -> - ets:safe_fixtable(Tid, true), - reload_children(ets:first(Tid), Tid), - ets:safe_fixtable(Tid, false), + Master = ets:new(jsvm_master, [private, {keypos, 2}]), + Idle = ets:new(jsvm_idle, [private]), + start_vms(ChildCount), + {ok, #state{master=Master, idle=Idle}}. + +handle_call({mark_idle, VM}, _From, #state{master=Master, + idle=Idle}=State) -> + case needs_reload(Master, VM) of + true -> + riak_kv_js_vm:reload(VM), + clear_reload(Master, VM); + false -> + ok + end, + ets:insert(Idle, {VM}), + {reply, ok, State}; + +handle_call(reload_vms, _From, #state{master=Master, idle=Idle}=State) -> + reload_idle_vms(Idle), + mark_pending_reloads(Master, Idle), riak_kv_vnode:purge_mapcaches(), {reply, ok, State}; +handle_call(reserve_vm, _From, #state{idle=Idle}=State) -> + Reply = case ets:first(Idle) of + '$end_of_table' -> + {error, no_vms}; + VM -> + ets:delete(Idle, VM), + {ok, VM} + end, + {reply, Reply, State}; + +handle_call(pool_size, _From, #state{idle=Idle}=State) -> + {reply, ets:info(Idle, size), State}; + handle_call(_Request, _From, State) -> {reply, ignore, State}. -handle_cast({add_child, ChildPid}, #state{tid=Tid}=State) -> - erlang:monitor(process, ChildPid), - ets:insert_new(Tid, {ChildPid}), +handle_cast({add_vm, VMPid}, #state{master=Master, idle=Idle}=State) -> + erlang:monitor(process, VMPid), + VMState = #vm_state{pid=VMPid}, + ets:insert(Master, VMState), + ets:insert(Idle, {VMPid}), {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, _Type, Pid, _Info}, #state{tid=Tid}=State) -> - case ets:lookup(Tid, Pid) of - [] -> - {noreply, State}; - [{Pid}] -> - ets:delete(?MODULE, Pid), - riak_kv_js_sup:start_js(self()), - {noreply, State} - end; +handle_info(#'DOWN'{pid=Pid}, #state{master=Master, idle=Idle}=State) -> + ets:delete(Master, Pid), + ets:delete(Idle, Pid), + riak_kv_js_sup:start_js(self()), + {noreply, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -108,39 +142,80 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %% Internal functions -start_children(0) -> - ok; -start_children(Count) -> - riak_kv_js_sup:start_js(self()), - start_children(Count - 1). +needs_reload(Master, VMPid) -> + [VMState] = ets:lookup(Master, VMPid), + VMState#vm_state.needs_reload. + +clear_reload(Master, VMPid) -> + [VMState] = ets:lookup(Master, VMPid), + VMState1 = VMState#vm_state{needs_reload=false}, + ets:insert(Master, VMState1). -select_random() -> - case ets:match(?MODULE, {'$1'}) of +is_vm_idle(Idle, VMPid) -> + case ets:lookup(Idle, {VMPid}) of [] -> - no_vms; - Members -> - {T1, T2, T3} = erlang:now(), - random:seed(T1, T2, T3), - Pos = pick_pos(erlang:get(?MODULE), length(Members)), - [Member] = lists:nth(Pos, Members), - Member + false; + _ -> + true end. -pick_pos(undefined, Size) -> - Pos = random:uniform(Size), - erlang:put(?MODULE, Pos), - Pos; -pick_pos(OldPos, Size) -> - case random:uniform(Size) of - OldPos -> - pick_pos(OldPos, Size); - Pos -> - erlang:put(?MODULE, Pos), - Pos - end. +start_vms(0) -> + ok; +start_vms(Count) -> + riak_kv_js_sup:start_js(self()), + start_vms(Count - 1). + +reload_idle_vms(Tid) -> + reload_idle_vms(ets:first(Tid), Tid). -reload_children('$end_of_table', _Tid) -> +reload_idle_vms('$end_of_table', _Tid) -> ok; -reload_children(Current, Tid) -> +reload_idle_vms(Current, Tid) -> riak_kv_js_vm:reload(Current), - reload_children(ets:next(Tid, Current), Tid). + reload_idle_vms(ets:next(Tid), Tid). + +mark_pending_reloads(Master, Idle) -> + mark_pending_reloads(ets:first(Master), Master, Idle). + +mark_pending_reloads('$end_of_table', _Master, _Idle) -> + ok; +mark_pending_reloads(VMState, Master, Idle) -> + case is_vm_idle(Idle, VMState#vm_state.pid) of + true -> + ok; + false -> + VMState1 = VMState#vm_state{needs_reload=true}, + ets:insert(Master, VMState1) + end, + mark_pending_reloads(ets:next(Master), Master, Idle). + +dispatch(_JSCall, _MaxCount, 0) -> + error_logger:info_msg("JS call failed: All VMs are busy.~n"), + {error, no_vms}; +dispatch(JSCall, MaxCount, Count) -> + case reserve_vm() of + {ok, VM} -> + JobId = {VM, make_ref()}, + riak_kv_js_vm:dispatch(VM, self(), JobId, JSCall), + {ok, JobId}; + {error, no_vms} -> + ScalingFactor = (1 + (MaxCount - Count)) * + (0.1 + random:uniform(100) * 0.001), + timer:sleep(erlang:round(500 * ScalingFactor)), + dispatch(JSCall, MaxCount, Count - 1) + end. + +blocking_dispatch(_JSCall, _MaxCount, 0) -> + error_logger:info_msg("JS call failed: All VMs are busy.~n"), + {error, no_vms}; +blocking_dispatch(JSCall, MaxCount, Count) -> + case reserve_vm() of + {ok, VM} -> + JobId = {VM, make_ref()}, + riak_kv_js_vm:blocking_dispatch(VM, JobId, JSCall); + {error, no_vms} -> + ScalingFactor = (1 + (MaxCount - Count)) * + (0.1 + random:uniform(100) * 0.001), + timer:sleep(erlang:round(500 * ScalingFactor)), + blocking_dispatch(JSCall, MaxCount, Count - 1) + end. diff --git a/src/riak_kv_js_vm.erl b/src/riak_kv_js_vm.erl index 5e36561c89..8f751819fb 100644 --- a/src/riak_kv_js_vm.erl +++ b/src/riak_kv_js_vm.erl @@ -58,7 +58,7 @@ init([Manager]) -> {ok, Ctx} -> error_logger:info_msg("Spidermonkey VM (thread stack: ~pMB, max heap: ~pMB) host starting (~p)~n", [StackSize, HeapSize, self()]), - riak_kv_js_manager:add_to_manager(), + riak_kv_js_manager:add_vm(), erlang:monitor(process, Manager), {ok, #state{manager=Manager, ctx=Ctx}}; Error -> @@ -66,26 +66,34 @@ init([Manager]) -> end. %% Reduce phase with anonymous function -handle_call({dispatch, _JobId, {{jsanon, JS}, Reduced, Arg}}, _From, #state{ctx=Ctx}=State) -> - {Reply, UpdatedState} = case define_anon_js(JS, State) of - {ok, FunName, NewState} -> - case invoke_js(Ctx, FunName, [Reduced, Arg]) of - {ok, R} -> - {{ok, R}, NewState}; - Error -> - {Error, State} - end; - {Error, undefined, NewState} -> - {Error, NewState} - end, +handle_call({dispatch, _JobId, {{jsanon, JS}, Reduced, Arg}}, _From, State) -> + {Reply, UpdatedState} = define_invoke_anon_js(JS, [Reduced, Arg], State), + riak_kv_js_manager:mark_idle(), {reply, Reply, UpdatedState}; %% Reduce phase with named function handle_call({dispatch, _JobId, {{jsfun, JS}, Reduced, Arg}}, _From, #state{ctx=Ctx}=State) -> - {reply, invoke_js(Ctx, JS, [Reduced, Arg]), State}; + Reply = invoke_js(Ctx, JS, [Reduced, Arg]), + riak_kv_js_manager:mark_idle(), + {reply, Reply, State}; +%% General dispatch function for anonymous function with variable number of arguments +handle_call({dispatch, _JobId, {{jsanon, Source}, Args}}, _From, + State) when is_list(Args) -> + {Reply, UpdatedState} = define_invoke_anon_js(Source, Args, State), + riak_kv_js_manager:mark_idle(), + {reply, Reply, UpdatedState}; +%% General dispatch function for named function with variable number of arguments +handle_call({dispatch, _JobId, {{jsfun, JS}, Args}}, _From, + #state{ctx=Ctx}=State) when is_list(Args) -> + Reply = invoke_js(Ctx, JS, Args), + riak_kv_js_manager:mark_idle(), + {reply, Reply, State}; %% Pre-commit hook with named function handle_call({dispatch, _JobId, {{jsfun, JS}, Obj}}, _From, #state{ctx=Ctx}=State) -> - {reply, invoke_js(Ctx, JS, [riak_object:to_json(Obj)]), State}; -handle_call(_Request, _From, State) -> + Reply = invoke_js(Ctx, JS, [riak_object:to_json(Obj)]), + riak_kv_js_manager:mark_idle(), + {reply, Reply, State}; +handle_call(Request, _From, State) -> + io:format("Request: ~p~n", [Request]), {reply, ignore, State}. handle_cast(reload, #state{ctx=Ctx}=State) -> @@ -94,45 +102,38 @@ handle_cast(reload, #state{ctx=Ctx}=State) -> {noreply, State}; %% Map phase with anonymous function -handle_cast({dispatch, Requestor, _JobId, {Sender, {map, {jsanon, JS}, Arg, _Acc}, +handle_cast({dispatch, _Requestor, JobId, {Sender, {map, {jsanon, JS}, Arg, _Acc}, Value, - KeyData, _BKey}}, #state{ctx=Ctx}=State) -> - {Result, UpdatedState} = case define_anon_js(JS, State) of - {ok, FunName, NewState} -> - JsonValue = riak_object:to_json(Value), - JsonArg = jsonify_arg(Arg), - case invoke_js(Ctx, FunName, [JsonValue, KeyData, JsonArg]) of - {ok, R} -> - {{ok, R}, NewState}; - Error -> - {Error, State} - end; - {Error, undefined, NewState} -> - {Error, NewState} - end, - case Result of - {ok, ReturnValue} -> - riak_core_vnode:reply(Sender, {mapexec_reply, ReturnValue, Requestor}), - {noreply, UpdatedState}; - ErrorResult -> - riak_core_vnode:reply(Sender, {mapexec_error_noretry, Requestor, ErrorResult}), - {noreply, State} - end; + KeyData, _BKey}}, State) -> + JsonValue = riak_object:to_json(Value), + JsonArg = jsonify_arg(Arg), + {Result, UpdatedState} = define_invoke_anon_js(JS, [JsonValue, KeyData, JsonArg], State), + FinalState = case Result of + {ok, ReturnValue} -> + riak_core_vnode:send_command(Sender, {mapexec_reply, JobId, ReturnValue}), + UpdatedState; + ErrorResult -> + riak_core_vnode:send_command(Sender, {mapexec_error_noretry, JobId, ErrorResult}), + State + end, + riak_kv_js_manager:mark_idle(), + {noreply, FinalState}; %% Map phase with named function -handle_cast({dispatch, Requestor, _JobId, {Sender, {map, {jsfun, JS}, Arg, _Acc}, +handle_cast({dispatch, _Requestor, JobId, {Sender, {map, {jsfun, JS}, Arg, _Acc}, Value, - KeyData, BKey}}, #state{ctx=Ctx}=State) -> + KeyData, _BKey}}, #state{ctx=Ctx}=State) -> JsonValue = riak_object:to_json(Value), JsonArg = jsonify_arg(Arg), case invoke_js(Ctx, JS, [JsonValue, KeyData, JsonArg]) of {ok, R} -> %% Requestor should be the dispatching vnode - riak_kv_vnode:mapcache(Requestor, BKey, {JS, Arg, KeyData}, R), - riak_core_vnode:reply(Sender, {mapexec_reply, R, Requestor}); + %%riak_kv_vnode:mapcache(Requestor, BKey, {JS, Arg, KeyData}, R), + riak_core_vnode:send_command(Sender, {mapexec_reply, JobId, R}); Error -> - riak_core_vnode:reply(Sender, {mapexec_error_noretry, Requestor, Error}) + riak_core_vnode:send_command(Sender, {mapexec_error_noretry, JobId, Error}) end, + riak_kv_js_manager:mark_idle(), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -151,6 +152,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %% Internal functions +define_invoke_anon_js(JS, Args, #state{ctx=Ctx}=State) -> + case define_anon_js(JS, State) of + {ok, FunName, NewState} -> + case invoke_js(Ctx, FunName, Args) of + {ok, R} -> + {{ok, R}, NewState}; + Error -> + {Error, State} + end; + {Error, undefined, NewState} -> + {Error, NewState} + end. + invoke_js(Ctx, Js, Args) -> try case js:call(Ctx, Js, Args) of @@ -190,7 +204,7 @@ define_anon_js(JS, #state{ctx=Ctx, anon_funs=AnonFuns, next_funid=NextFunId}=Sta {ok, FunName, State#state{anon_funs=[{Hash, FunName}|AnonFuns], next_funid=NextFunId + 1}}; Error -> error_logger:warning_msg("Error defining anonymous Javascript function: ~p~n", [Error]), - {error, undefined, State} + {Error, undefined, State} end; FunName -> {ok, FunName, State} diff --git a/src/riak_kv_keylister.erl b/src/riak_kv_keylister.erl new file mode 100644 index 0000000000..7ebcb9b9b9 --- /dev/null +++ b/src/riak_kv_keylister.erl @@ -0,0 +1,103 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_keylister: Manage streaming keys for a bucket from a +%% cluster node +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_kv_keylister). + +-behaviour(gen_fsm). + +%% API +-export([start_link/3, + list_keys/2]). + +%% States +-export([waiting/2]). + +%% gen_fsm callbacks +-export([init/1, state_name/2, state_name/3, handle_event/3, + handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). + +-record(state, {reqid, + caller, + bucket, + bloom}). + +list_keys(ListerPid, VNode) -> + gen_fsm:send_event(ListerPid, {lk, VNode}). + +start_link(ReqId, Caller, Bucket) -> + gen_fsm:start_link(?MODULE, [ReqId, Caller, Bucket], []). + +init([ReqId, Caller, Bucket]) -> + erlang:monitor(process, Caller), + {ok, Bloom} = ebloom:new(10000000, 0.0001, crypto:rand_uniform(1, 5000)), + {ok, waiting, #state{reqid=ReqId, caller=Caller, bloom=Bloom, bucket=Bucket}}. + +waiting({lk, VNode}, #state{reqid=ReqId, bucket=Bucket}=State) -> + riak_kv_vnode:list_keys(VNode, ReqId, self(), Bucket), + {next_state, waiting, State}. + +state_name(_Event, State) -> + {next_state, waiting, State}. + +state_name(_Event, _From, State) -> + {reply, ignored, state_name, State}. + +handle_event(_Event, StateName, State) -> + {next_state, StateName, State}. + +handle_sync_event(_Event, _From, StateName, State) -> + {reply, ignored, StateName, State}. + +handle_info({ReqId, {kl, Idx, Keys0}}, waiting, #state{reqid=ReqId, bloom=Bloom, + caller=Caller}=State) -> + F = fun(Key, Acc) -> + case ebloom:contains(Bloom, Key) of + true -> + Acc; + false -> + ebloom:insert(Bloom, Key), + [Key|Acc] + end end, + case lists:foldl(F, [], Keys0) of + [] -> + ok; + Keys -> + gen_fsm:send_event(Caller, {ReqId, {kl, Idx, Keys}}) + end, + {next_state, waiting, State}; +handle_info({ReqId, Idx, done}, waiting, #state{reqid=ReqId, caller=Caller}=State) -> + gen_fsm:send_event(Caller, {ReqId, Idx, done}), + {next_state, waiting, State}; +handle_info({'DOWN', _MRef, _Type, Caller, _Info}, waiting, #state{caller=Caller}=State) -> + {stop, normal, State}; +handle_info(_Info, StateName, State) -> + {next_state, StateName, State}. + +terminate(_Reason, _StateName, #state{bloom=Bloom}) -> + ebloom:clear(Bloom), + ok. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +%% Internal functions diff --git a/src/riak_kv_keylister_master.erl b/src/riak_kv_keylister_master.erl new file mode 100644 index 0000000000..6f585f9947 --- /dev/null +++ b/src/riak_kv_keylister_master.erl @@ -0,0 +1,69 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_keylister_master: Starts keylister processes on demand +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_kv_keylister_master). + +-behaviour(gen_server). + +%% API +-export([start_link/0, + start_keylist/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +start_keylist(Node, ReqId, Bucket) -> + case gen_server:call({?SERVER, Node}, {start_kl, ReqId, self(), Bucket}) of + {ok, Pid} -> + {ok, Pid}; + Error -> + Error + end. + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + {ok, #state{}}. + +handle_call({start_kl, ReqId, Caller, Bucket}, _From, State) -> + Reply = riak_kv_keylister_sup:new_lister(ReqId, Caller, Bucket), + {reply, Reply, State}; + +handle_call(_Request, _From, State) -> + {reply, ignore, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/riak_kv_keylister_sup.erl b/src/riak_kv_keylister_sup.erl new file mode 100644 index 0000000000..5089af11df --- /dev/null +++ b/src/riak_kv_keylister_sup.erl @@ -0,0 +1,48 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_keylister_sup: Supervisor for starting keylister processes +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_kv_keylister_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, + new_lister/3]). + +%% Supervisor callbacks +-export([init/1]). + +new_lister(ReqId, Bucket, Caller) -> + start_child([ReqId, Bucket, Caller]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = {simple_one_for_one, 0, 1}, + Process = {undefined, + {riak_kv_keylister, start_link, []}, + temporary, brutal_kill, worker, dynamic}, + {ok, {SupFlags, [Process]}}. + +%% Internal functions +start_child(Args) -> + supervisor:start_child(?MODULE, Args). diff --git a/src/riak_kv_keys_fsm.erl b/src/riak_kv_keys_fsm.erl index 7dacbfb7e2..2e89c13189 100644 --- a/src/riak_kv_keys_fsm.erl +++ b/src/riak_kv_keys_fsm.erl @@ -40,7 +40,8 @@ bucket :: riak_object:bucket(), timeout :: pos_integer(), req_id :: pos_integer(), - ring :: riak_core_ring:riak_core_ring() + ring :: riak_core_ring:riak_core_ring(), + listers :: [{atom(), pid()}] }). start(ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From) -> @@ -49,30 +50,45 @@ start(ReqId,Bucket,Timeout,ClientType,ErrorTolerance,From) -> %% @private init([ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]) -> + process_flag(trap_exit, true), {ok, Ring} = riak_core_ring_manager:get_my_ring(), {ok, Bloom} = ebloom:new(10000000,ErrorTolerance,ReqId), StateData = #state{client=Client, client_type=ClientType, timeout=Timeout, bloom=Bloom, req_id=ReqId, bucket=Bucket, ring=Ring}, + case ClientType of + %% Link to the mapred job so we die if the job dies + mapred -> + link(Client); + _ -> + ok + end, {ok,initialize,StateData,0}. %% @private -initialize(timeout, StateData0=#state{bucket=Bucket, ring=Ring}) -> +initialize(timeout, StateData0=#state{bucket=Bucket, ring=Ring, req_id=ReqId}) -> BucketProps = riak_core_bucket:get_bucket(Bucket, Ring), N = proplists:get_value(n_val,BucketProps), PLS0 = riak_core_ring:all_preflists(Ring,N), - {LA1, LA2} = lists:partition(fun({A,_B}) -> A rem N == 0 end, - lists:zip(lists:seq(0,(length(PLS0)-1)), PLS0)), - {_, PLS} = lists:unzip(lists:append(LA1,LA2)), + {LA1, LA2} = lists:partition(fun({A,_B}) -> + A rem N == 0 orelse A rem (N + 1) == 0 + end, + lists:zip(lists:seq(0,(length(PLS0)-1)), PLS0)), + {_, PLS} = lists:unzip(LA1 ++ LA2), Simul_PLS = trunc(length(PLS) / N), - StateData = StateData0#state{pls=PLS,simul_pls=Simul_PLS, + Listers = start_listers(ReqId, Bucket), + StateData = StateData0#state{pls=PLS,simul_pls=Simul_PLS, listers=Listers, wait_pls=[],vns=sets:from_list([])}, reduce_pls(StateData). -waiting_kl({kl, Keys, Idx, ReqId}, - StateData0=#state{pls=PLS,vns=VNS0,wait_pls=WPL0,bloom=Bloom, +waiting_kl({ReqId, {kl, _Idx, Keys}}, + StateData=#state{bloom=Bloom, req_id=ReqId,client=Client,timeout=Timeout, bucket=Bucket,client_type=ClientType}) -> process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client), + {next_state, waiting_kl, StateData, Timeout}; + +waiting_kl({ReqId, Idx, done}, StateData0=#state{wait_pls=WPL0,vns=VNS0,pls=PLS, + req_id=ReqId,timeout=Timeout}) -> WPL = [{W_Idx,W_Node,W_PL} || {W_Idx,W_Node,W_PL} <- WPL0, W_Idx /= Idx], WNs = [W_Node || {W_Idx,W_Node,_W_PL} <- WPL0, W_Idx =:= Idx], Node = case WNs of @@ -90,6 +106,7 @@ waiting_kl({kl, Keys, Idx, ReqId}, _ -> reduce_pls(StateData) end; + waiting_kl(timeout, StateData=#state{pls=PLS,wait_pls=WPL}) -> NewPLS = lists:append(PLS, [W_PL || {_W_Idx,_W_Node,W_PL} <- WPL]), reduce_pls(StateData#state{pls=NewPLS,wait_pls=[]}). @@ -100,9 +117,9 @@ finish(StateData=#state{req_id=ReqId,client=Client,client_type=ClientType}) -> plain -> Client ! {ReqId, done} end, {stop,normal,StateData}. - -reduce_pls(StateData0=#state{timeout=Timeout, req_id=ReqId,wait_pls=WPL, - simul_pls=Simul_PLS, bucket=Bucket}) -> + +reduce_pls(StateData0=#state{timeout=Timeout, wait_pls=WPL, + listers=Listers, simul_pls=Simul_PLS}) -> case find_free_pl(StateData0) of {none_free,NewPLS} -> StateData = StateData0#state{pls=NewPLS}, @@ -111,20 +128,33 @@ reduce_pls(StateData0=#state{timeout=Timeout, req_id=ReqId,wait_pls=WPL, false -> {next_state, waiting_kl, StateData, Timeout} end; {[{Idx,Node}|RestPL],PLS} -> - case net_adm:ping(Node) of - pong -> - riak_kv_vnode:list_keys({Idx,Node},Bucket,ReqId), - WaitPLS = [{Idx,Node,RestPL}|WPL], - StateData = StateData0#state{pls=PLS, wait_pls=WaitPLS}, - case length(WaitPLS) > Simul_PLS of - true -> - {next_state, waiting_kl, StateData, Timeout}; - false -> - reduce_pls(StateData) - end; - pang -> - reduce_pls(StateData0#state{pls=[RestPL|PLS]}) - end + case riak_core_node_watcher:services(Node) of + [] -> + reduce_pls(StateData0#state{pls=[RestPL|PLS]}); + _ -> + %% Look up keylister for that node + case proplists:get_value(Node, Listers) of + undefined -> + %% Node is down or hasn't been removed from preflists yet + %% Log a warning, skip the node and continue sending + %% out key list requests + error_logger:warning_msg("Skipping keylist request for unknown node: ~p~n", [Node]), + WaitPLS = [{Idx,Node,RestPL}|WPL], + StateData = StateData0#state{pls=PLS, wait_pls=WaitPLS}, + reduce_pls(StateData); + LPid -> + %% Send the keylist request to the lister + riak_kv_keylister:list_keys(LPid, {Idx, Node}), + WaitPLS = [{Idx,Node,RestPL}|WPL], + StateData = StateData0#state{pls=PLS, wait_pls=WaitPLS}, + case length(WaitPLS) > Simul_PLS of + true -> + {next_state, waiting_kl, StateData, Timeout}; + false -> + reduce_pls(StateData) + end + end + end end. find_free_pl(StateData) -> find_free_pl1(StateData, []). @@ -158,13 +188,18 @@ process_keys(Keys,Bucket,ClientType,Bloom,ReqId,Client) -> %% @private process_keys([],Bucket,ClientType,_Bloom,ReqId,Client,Acc) -> case ClientType of - mapred -> luke_flow:add_inputs(Client, [{Bucket,K} || K <- Acc]); + mapred -> + try + luke_flow:add_inputs(Client, [{Bucket,K} || K <- Acc]) + catch _:_ -> + exit(self(), normal) + end; plain -> Client ! {ReqId, {keys, Acc}} end, ok; process_keys([K|Rest],Bucket,ClientType,Bloom,ReqId,Client,Acc) -> case ebloom:contains(Bloom,K) of - true -> + true -> process_keys(Rest,Bucket,ClientType, Bloom,ReqId,Client,Acc); false -> @@ -182,13 +217,27 @@ handle_sync_event(_Event, _From, _StateName, StateData) -> {stop,badmsg,StateData}. %% @private +handle_info({'EXIT', Pid, normal}, _StateName, #state{client=Pid}=StateData) -> + {stop,normal,StateData}; handle_info(_Info, _StateName, StateData) -> {stop,badmsg,StateData}. %% @private -terminate(Reason, _StateName, _State) -> +terminate(Reason, _StateName, #state{bloom=Bloom}) -> + ebloom:clear(Bloom), Reason. %% @private code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. + +%% @private +start_listers(ReqId, Bucket) -> + Nodes = riak_core_node_watcher:nodes(riak_kv), + start_listers(Nodes, ReqId, Bucket, []). + +start_listers([], _ReqId, _Bucket, Accum) -> + Accum; +start_listers([H|T], ReqId, Bucket, Accum) -> + {ok, Pid} = riak_kv_keylister_master:start_keylist(H, ReqId, Bucket), + start_listers(T, ReqId, Bucket, [{H, Pid}|Accum]). diff --git a/src/riak_kv_legacy_vnode.erl b/src/riak_kv_legacy_vnode.erl index 126d46dec5..0120761b93 100644 --- a/src/riak_kv_legacy_vnode.erl +++ b/src/riak_kv_legacy_vnode.erl @@ -71,8 +71,8 @@ rewrite_cast({vnode_get, {Partition,_Node}, rewrite_cast({vnode_list_bucket, {Partition,_Node}, {FSM_pid, Bucket, ReqID}}) -> Req = riak_core_vnode_master:make_request( - ?KV_LISTKEYS_REQ{ - bucket=Bucket, + #riak_kv_listkeys_req_v1{ + bucket=Bucket, req_id=ReqID}, {fsm, undefined, FSM_pid}, Partition), diff --git a/src/riak_kv_lru.erl b/src/riak_kv_lru.erl new file mode 100644 index 0000000000..339d611d70 --- /dev/null +++ b/src/riak_kv_lru.erl @@ -0,0 +1,221 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_lru: ETS-based LRU cache +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_kv_lru). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-export([new/1, + put/4, + remove/3, + fetch/3, + size/1, + max_size/1, + clear/1, + clear_bkey/2, + destroy/1]). + +-record(kv_lru, {max_size, + bucket_idx, + age_idx, + cache}). + +-record(kv_lru_entry, {key, + value, + ts}). + +new(0) -> + #kv_lru{max_size=0}; +new(Size) -> + IdxName = pid_to_list(self()) ++ "_cache_age_idx", + BucketIdxName = pid_to_list(self()) ++ "_bucket_idx", + CacheName = pid_to_list(self()) ++ "_cache", + Idx = ets:new(list_to_atom(IdxName), [ordered_set, private]), + BucketIdx = ets:new(list_to_atom(BucketIdxName), [bag, private]), + Cache = ets:new(list_to_atom(CacheName), [private, {keypos, 2}]), + #kv_lru{max_size=Size, age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}. + +put(#kv_lru{max_size=0}, _BKey, _Key, _Value) -> + ok; +put(#kv_lru{max_size=MaxSize, age_idx=Idx, bucket_idx=BucketIdx, + cache=Cache}, BKey, Key, Value) -> + remove_existing(Idx, BucketIdx, Cache, BKey, Key), + insert_value(Idx, BucketIdx, Cache, BKey, Key, Value), + prune_oldest_if_needed(MaxSize, Idx, BucketIdx, Cache). + +fetch(#kv_lru{max_size=0}, _BKey, _Key) -> + notfound; +fetch(#kv_lru{cache=Cache}=LRU, BKey, Key) -> + case fetch_value(Cache, BKey, Key) of + notfound -> + notfound; + Value -> + %% Do a put to update the timestamp in the cache + riak_kv_lru:put(LRU, BKey, Key, Value), + Value + end. + +remove(#kv_lru{max_size=0}, _BKey, _Key) -> + ok; +remove(#kv_lru{age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}, BKey, Key) -> + remove_existing(Idx, BucketIdx, Cache, BKey, Key), + ok. + +size(#kv_lru{max_size=0}) -> + 0; +size(#kv_lru{age_idx=Idx}) -> + ets:info(Idx, size). + +max_size(#kv_lru{max_size=MaxSize}) -> + MaxSize. + +clear(#kv_lru{max_size=0}) -> + ok; +clear(#kv_lru{age_idx=Idx, cache=Cache}) -> + ets:delete_all_objects(Idx), + ets:delete_all_objects(Cache), + ok. + +clear_bkey(#kv_lru{max_size=0}, _BKey) -> + ok; +clear_bkey(#kv_lru{bucket_idx=BucketIdx}=LRU, BKey) -> + R = ets:match(BucketIdx, {BKey, '$1'}), + case R of + [] -> + ok; + Keys -> + [remove(LRU, BKey, Key) || [Key] <- Keys], + ok + end. + +destroy(#kv_lru{max_size=0}) -> + ok; +destroy(#kv_lru{age_idx=Idx, bucket_idx=BucketIdx, cache=Cache}) -> + ets:delete(Idx), + ets:delete(BucketIdx), + ets:delete(Cache), + ok. + +%% Internal functions +remove_existing(Idx, BucketIdx, Cache, BKey, Key) -> + CacheKey = {BKey, Key}, + case ets:lookup(Cache, CacheKey) of + [Entry] -> + ets:delete(Idx, Entry#kv_lru_entry.ts), + ets:delete(BucketIdx, CacheKey), + ets:delete(Cache, CacheKey), + ok; + [] -> + ok + end. + +insert_value(Idx, BucketIdx, Cache, BKey, Key, Value) -> + CacheKey = {BKey, Key}, + TS = erlang:now(), + Entry = #kv_lru_entry{key=CacheKey, value=Value, ts=TS}, + ets:insert_new(Cache, Entry), + ets:insert_new(Idx, {TS, CacheKey}), + ets:insert(BucketIdx, CacheKey). + +prune_oldest_if_needed(MaxSize, Idx, BucketIdx, Cache) -> + OverSize = MaxSize + 1, + case ets:info(Idx, size) of + OverSize -> + TS = ets:first(Idx), + [{TS, {BKey, Key}}] = ets:lookup(Idx, TS), + remove_existing(Idx, BucketIdx, Cache, BKey, Key), + ok; + _ -> + ok + end. + +fetch_value(Cache, BKey, Key) -> + CacheKey = {BKey, Key}, + case ets:lookup(Cache, CacheKey) of + [] -> + notfound; + [Entry] -> + Entry#kv_lru_entry.value + end. + +-ifdef(TEST). +put_fetch_test() -> + BKey = {<<"test">>, <<"foo">>}, + C = riak_kv_lru:new(5), + riak_kv_lru:put(C, BKey, <<"hello">>, <<"world">>), + <<"world">> = riak_kv_lru:fetch(C, BKey, <<"hello">>), + riak_kv_lru:destroy(C). + +delete_test() -> + BKey = {<<"test">>, <<"foo">>}, + C = riak_kv_lru:new(5), + riak_kv_lru:put(C, BKey, "hello", "world"), + riak_kv_lru:remove(C, BKey, "hello"), + notfound = riak_kv_lru:fetch(C, BKey, "hello"), + riak_kv_lru:destroy(C). + +size_test() -> + BKey = {<<"test">>, <<"foo">>}, + C = riak_kv_lru:new(5), + [riak_kv_lru:put(C, BKey, X, X) || X <- lists:seq(1, 6)], + notfound = riak_kv_lru:fetch(C, BKey, 1), + 5 = riak_kv_lru:size(C), + 5 = riak_kv_lru:max_size(C), + 2 = riak_kv_lru:fetch(C, BKey, 2), + 6 = riak_kv_lru:fetch(C, BKey, 6), + riak_kv_lru:destroy(C). + +age_test() -> + BKey = {<<"test">>, <<"foo">>}, + C = riak_kv_lru:new(3), + [riak_kv_lru:put(C, BKey, X, X) || X <- lists:seq(1, 3)], + timer:sleep(500), + 2 = riak_kv_lru:fetch(C, BKey, 2), + riak_kv_lru:put(C, BKey, 4, 4), + 2 = riak_kv_lru:fetch(C, BKey, 2), + 4 = riak_kv_lru:fetch(C, BKey, 4), + notfound = riak_kv_lru:fetch(C, BKey, 1), + riak_kv_lru:destroy(C). + +clear_bkey_test() -> + BKey1 = {<<"test">>, <<"foo">>}, + BKey2 = {<<"test">>, <<"bar">>}, + C = riak_kv_lru:new(10), + F = fun(X) -> + riak_kv_lru:put(C, BKey1, X, X), + riak_kv_lru:put(C, BKey2, X, X) end, + [F(X) || X <- lists:seq(1, 5)], + riak_kv_lru:clear_bkey(C, BKey2), + notfound = riak_kv_lru:fetch(C, BKey2, 3), + 3 = riak_kv_lru:fetch(C, BKey1, 3), + riak_kv_lru:destroy(C). + +zero_size_test() -> + BKey = {<<"test">>, <<"foo">>}, + C = riak_kv_lru:new(0), + ok = riak_kv_lru:put(C, BKey, 1, 1), + notfound = riak_kv_lru:fetch(C, BKey, 1), + 0 = riak_kv_lru:size(C), + riak_kv_lru:destroy(C). + +-endif. diff --git a/src/riak_kv_map_executor.erl b/src/riak_kv_map_executor.erl index c948b78735..a843376740 100644 --- a/src/riak_kv_map_executor.erl +++ b/src/riak_kv_map_executor.erl @@ -90,9 +90,16 @@ try_vnode(#state{qterm=QTerm, bkey=BKey, keydata=KeyData, vnodes=[{P, VN}|VNs], false -> try_vnode(StateData#state{vnodes=VNs}); true -> - riak_kv_vnode:map({P,VN},self(),QTerm,BKey,KeyData), - {ok, TRef} = timer:send_after(VNodeTimeout, self(), timeout), - StateData#state{vnodes=VNs, vnode_timer=TRef} + case riak_kv_vnode:map({P,VN},self(),QTerm,BKey,KeyData) of + {mapexec_reply, executing, _} -> + {ok, TRef} = timer:send_after(VNodeTimeout, self(), timeout), + StateData#state{vnodes=VNs, vnode_timer=TRef}; + {error, no_vms} -> + try_vnode(StateData); + Msg -> + gen_fsm:send_event(self(), Msg), + StateData#state{vnodes=VNs} + end end. wait(timeout, StateData=#state{bkey=BKey, keydata=KD, phase_pid=PhasePid,vnodes=[]}) -> @@ -101,21 +108,21 @@ wait(timeout, StateData=#state{bkey=BKey, keydata=KD, phase_pid=PhasePid,vnodes= wait(timeout, #state{timeout=Timeout}=StateData) -> case try_vnode(StateData) of {error, no_vnodes} -> - {stop, normal, StateData}; + {stop, normal, StateData}; NewState -> {next_state, wait, NewState, Timeout} end; wait({mapexec_error, _VN, _ErrMsg}, StateData=#state{bkey=BKey, keydata=KD, phase_pid=PhasePid,vnodes=[], vnode_timer=TRef}) -> - timer:cancel(TRef), + cancel_timer(TRef), riak_kv_phase_proto:mapexec_result(PhasePid, [{not_found, BKey, KD}]), {stop,normal,StateData}; wait({mapexec_error_noretry, _VN, ErrMsg}, #state{phase_pid=PhasePid, vnode_timer=TRef}=StateData) -> - timer:cancel(TRef), + cancel_timer(TRef), riak_kv_phase_proto:mapexec_error(PhasePid, ErrMsg), {stop, normal, StateData}; wait({mapexec_error, _VN, _ErrMsg}, #state{timeout=Timeout, vnode_timer=TRef}=StateData) -> - timer:cancel(TRef), + cancel_timer(TRef), case try_vnode(StateData) of {error, no_vnodes} -> {stop, normal, StateData}; @@ -125,7 +132,7 @@ wait({mapexec_error, _VN, _ErrMsg}, #state{timeout=Timeout, vnode_timer=TRef}=St wait({mapexec_reply, executing, _}, #state{timeout=Timeout}=StateData) -> {next_state, wait, StateData, Timeout}; wait({mapexec_reply, RetVal, _VN}, StateData=#state{phase_pid=PhasePid, vnode_timer=TRef}) -> - timer:cancel(TRef), + cancel_timer(TRef), riak_kv_phase_proto:mapexec_result(PhasePid, RetVal), {stop,normal,StateData}. @@ -151,3 +158,8 @@ terminate(Reason, _StateName, _State) -> %% @private code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. + +cancel_timer(undefined) -> + ok; +cancel_timer(TRef) -> + timer:cancel(TRef). diff --git a/src/riak_kv_map_phase.erl b/src/riak_kv_map_phase.erl index acb8574593..aab29eebc7 100644 --- a/src/riak_kv_map_phase.erl +++ b/src/riak_kv_map_phase.erl @@ -58,7 +58,7 @@ handle_event({mapexec_reply, Reply, Executor}, #state{fsms=FSMs0}=State) -> FSMs = lists:delete(Executor, FSMs0), {output, Reply, State#state{fsms=FSMs}}; handle_event({mapexec_error, _Executor, Reply}, State) -> - {stop, Reply, State}; + {stop, Reply, State#state{ring=none, fsms=none, acc=none}}; handle_event(_Event, State) -> {no_output, State}. diff --git a/src/riak_kv_mapred_json.erl b/src/riak_kv_mapred_json.erl index 2928c0fa49..c8a10c8ee9 100644 --- a/src/riak_kv_mapred_json.erl +++ b/src/riak_kv_mapred_json.erl @@ -70,12 +70,32 @@ parse_inputs(Bucket) when is_binary(Bucket) -> {ok, Bucket}; parse_inputs(Targets) when is_list(Targets) -> parse_inputs(Targets, []); +parse_inputs({struct, ModFun}) -> + case {proplists:lookup(<<"module">>, ModFun), + proplists:lookup(<<"function">>, ModFun), + proplists:lookup(<<"arg">>, ModFun)} of + {{_, Module}, {_, Function}, {_, Options}} -> + {ok, {modfun, + binary_to_atom(Module, utf8), + binary_to_atom(Function, utf8), + Options}}; + _ -> + {error, ["Missing fields in modfun input specification.\n" + "Required fields are:\n" + " - module : string name of a module\n", + " - function : string name of a function in module\n", + " - arg : argument to pass function\n"]} + end; parse_inputs(Invalid) -> {error, ["Unrecognized format of \"inputs\" field:", " ",mochijson2:encode(Invalid), "\n\nValid formats are:\n" " - a bucket name, as a string\n" - " - a list of bucket/key pairs\n"]}. + " - a list of bucket/key pairs\n", + " - an object naming a module and function to run, ex:\n", + " {\"module\":\"my_module\",\n", + " \"function\":\"my_function\",\n", + " \"arg\":[\"my\",\"arguments\"]}\n"]}. parse_inputs([], Accum) -> if diff --git a/src/riak_kv_mapred_query.erl b/src/riak_kv_mapred_query.erl index 25ae1a5692..c6e55b8e4e 100644 --- a/src/riak_kv_mapred_query.erl +++ b/src/riak_kv_mapred_query.erl @@ -126,29 +126,15 @@ phase_behavior(map, _QueryFun, true) -> [accumulate]; phase_behavior(map, _QueryFun, false) -> []; -%% Turn off parallel converges for jsanon since -%% they take too long to execute and wind up -%% monopolizing the available JS VMs on a given node -phase_behavior(reduce, {FunType, _}, Accumulate) -> - CP = if - FunType =:= jsanon -> - 1; - true -> - 2 - end, - if - Accumulate =:= true -> - [{converge, CP}, accumulate]; +phase_behavior(reduce, _QueryFun, Accumulate) -> + Behaviors0 = [{converge, 2}], + case Accumulate of true -> - [{converge, CP}] - end; -phase_behavior(reduce, {modfun, _, _}, Accumulate) -> - if - Accumulate =:= true -> - [{converge, 2}, accumulate]; - true -> - [{converge, 2}] + [accumulate|Behaviors0]; + false -> + Behaviors0 end. + fetch_js(Bucket, Key) -> {ok, Client} = riak:local_client(), case Client:get(Bucket, Key, 1) of diff --git a/src/riak_kv_mapred_term.erl b/src/riak_kv_mapred_term.erl index fb6199deab..179a7ce005 100644 --- a/src/riak_kv_mapred_term.erl +++ b/src/riak_kv_mapred_term.erl @@ -60,8 +60,11 @@ valid_inputs(Bucket) when is_binary(Bucket) -> ok; valid_inputs(Targets) when is_list(Targets) -> valid_input_targets(Targets); +valid_inputs({modfun, Module, Function, _Options}) + when is_atom(Module), is_atom(Function) -> + ok; valid_inputs(Invalid) -> - {error, {"Inputs must be a binary bucket or a list of target tuples:", Invalid}}. + {error, {"Inputs must be a binary bucket, a list of target tuples, or a modfun tuple:", Invalid}}. valid_input_targets([]) -> ok; diff --git a/src/riak_kv_pb_socket.erl b/src/riak_kv_pb_socket.erl index 5daaa09ce3..835024e0df 100644 --- a/src/riak_kv_pb_socket.erl +++ b/src/riak_kv_pb_socket.erl @@ -229,6 +229,23 @@ process_message_w(#rpbmapredreq{request=MrReq, content_type=ContentType}=Req, {ok, ReqId} -> wait_for_mapred(ReqId, State#state{req = Req}, ?DEFAULT_TIMEOUT) + end; + is_tuple(Inputs), size(Inputs)==4, + element(1, Inputs) == modfun, + is_atom(element(2, Inputs)), + is_atom(element(3, Inputs)) -> + case C:mapred_stream(Query, self(), Timeout) of + {stop, Error} -> + send_error("~p", [Error], State); + + {ok, {ReqId, FSM}} -> + C:mapred_dynamic_inputs_stream( + FSM, Inputs, Timeout), + luke_flow:finish_inputs(FSM), + %% Pause incoming packets - map/reduce results + %% will be processed by handle_info, it will + %% set socket active again on completion of streaming. + wait_for_mapred(ReqId, State#state{req = Req}, ?DEFAULT_TIMEOUT) end end end. diff --git a/src/riak_kv_put_fsm.erl b/src/riak_kv_put_fsm.erl index 24bc2db052..17d0d1c2bc 100644 --- a/src/riak_kv_put_fsm.erl +++ b/src/riak_kv_put_fsm.erl @@ -335,7 +335,7 @@ invoke_hook(precommit, Mod0, Fun0, undefined, RObj) -> Fun = binary_to_atom(Fun0, utf8), wrap_hook(Mod, Fun, RObj); invoke_hook(precommit, undefined, undefined, JSName, RObj) -> - case riak_kv_js_manager:blocking_dispatch({{jsfun, JSName}, RObj}) of + case riak_kv_js_manager:blocking_dispatch({{jsfun, JSName}, RObj}, 5) of {ok, <<"fail">>} -> fail; {ok, [{<<"fail">>, Message}]} -> diff --git a/src/riak_kv_reduce_phase.erl b/src/riak_kv_reduce_phase.erl index 1f512fa86c..d6f926dc73 100644 --- a/src/riak_kv_reduce_phase.erl +++ b/src/riak_kv_reduce_phase.erl @@ -85,20 +85,22 @@ perform_reduce({Lang,{reduce,FunTerm,Arg,_Acc}}, try case {Lang, FunTerm} of {erlang, {qfun,F}} -> - {ok, F(Reduced,Arg)}; + Value = F(Reduced,Arg), + {ok, Value}; {erlang, {modfun,M,F}} -> - {ok, M:F(Reduced,Arg)}; + Value = M:F(Reduced,Arg), + {ok, Value}; {javascript, _} -> - case riak_kv_js_manager:blocking_dispatch({FunTerm, - [riak_kv_mapred_json:jsonify_not_found(R) || R <- Reduced], - Arg}) of - {ok, Data} when is_list(Data) -> - {ok, [riak_kv_mapred_json:dejsonify_not_found(Datum) || Datum <- Data]}; - Data -> - Data - end + case riak_kv_js_manager:blocking_dispatch({FunTerm, + [riak_kv_mapred_json:jsonify_not_found(R) || R <- Reduced], + Arg}, 5) of + {ok, Data} when is_list(Data) -> + Data1 = [riak_kv_mapred_json:dejsonify_not_found(Datum) || Datum <- Data], + {ok, Data1}; + Error -> + throw(Error) + end end catch _:R -> - error_logger:error_msg("Failed reduce: ~p~n", [R]), - {error, failed_reduce} + R end. diff --git a/src/riak_kv_sup.erl b/src/riak_kv_sup.erl index 8a87e8ea22..678a32df95 100644 --- a/src/riak_kv_sup.erl +++ b/src/riak_kv_sup.erl @@ -45,12 +45,10 @@ init([]) -> {riak_core_vnode_master, start_link, [riak_kv_vnode, riak_kv_legacy_vnode]}, permanent, 5000, worker, [riak_core_vnode_master]}, - RiakPb = [{riak_kv_pb_listener, - {riak_kv_pb_listener, start_link, []}, - permanent, 5000, worker, [riak_kv_pb_listener]}, - {riak_kv_pb_socket_sup, - {riak_kv_pb_socket_sup, start_link, []}, - permanent, infinity, supervisor, [riak_kv_pb_socket_sup]} + RiakPb = [ {riak_kv_pb_socket_sup, {riak_kv_pb_socket_sup, start_link, []}, + permanent, infinity, supervisor, [riak_kv_pb_socket_sup]}, + {riak_kv_pb_listener, {riak_kv_pb_listener, start_link, []}, + permanent, 5000, worker, [riak_kv_pb_listener]} ], RiakStat = {riak_kv_stat, {riak_kv_stat, start_link, []}, @@ -62,6 +60,14 @@ init([]) -> RiakJsSup = {riak_kv_js_sup, {riak_kv_js_sup, start_link, []}, permanent, infinity, supervisor, [riak_kv_js_sup]}, + KLMaster = {riak_kv_keylister_master, + {riak_kv_keylister_master, start_link, []}, + permanent, 30000, worker, [riak_kv_keylister_master]}, + KLSup = {riak_kv_keylister_sup, + {riak_kv_keylister_sup, start_link, []}, + permanent, infinity, supervisor, [riak_kv_keylister_sup]}, + + % Figure out which processes we should run... IsPbConfigured = (app_helper:get_env(riak_kv, pb_ip) /= undefined) andalso (app_helper:get_env(riak_kv, pb_port) /= undefined), @@ -73,6 +79,8 @@ init([]) -> ?IF(HasStorageBackend, VMaster, []), ?IF(IsPbConfigured, RiakPb, []), ?IF(IsStatEnabled, RiakStat, []), + KLSup, + KLMaster, RiakJsSup, RiakJsMgr ]), diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 525c241a90..5f7560722e 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1,3 +1,9 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_vnode: VNode Implementation +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file %% except in compliance with the License. You may obtain @@ -11,19 +17,18 @@ %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. - -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. - +%% +%% ------------------------------------------------------------------- -module(riak_kv_vnode). -behaviour(riak_core_vnode). %% API --export([start_vnode/1, - del/3, - put/6, - readrepair/6, - list_keys/3, - map/5, - fold/3, +-export([start_vnode/1, + del/3, + put/6, + readrepair/6, + list_keys/4, + map/5, + fold/3, get_vclocks/2, mapcache/4, purge_mapcaches/0]). @@ -49,20 +54,25 @@ -export([map_test/3]). -endif. --record(state, {idx :: partition(), +-record(mrjob, {cachekey :: term(), + bkey :: term(), + reqid :: term(), + target :: pid()}). + +-record(state, {idx :: partition(), mod :: module(), modstate :: term(), mapcache :: term(), + mrjobs :: term(), in_handoff = false :: boolean()}). -record(putargs, {returnbody :: boolean(), lww :: boolean(), bkey :: {binary(), binary()}, robj :: term(), - reqid :: non_neg_integer(), + reqid :: non_neg_integer(), bprops :: maybe_improper_list(), prunetime :: non_neg_integer()}). --define(CLEAR_MAPCACHE_INTERVAL, 60000). %% TODO: add -specs to all public API funcs, this module seems fragile? @@ -81,7 +91,7 @@ del(Preflist, BKey, ReqId) -> put(Preflist, BKey, Obj, ReqId, StartTime, Options) when is_integer(StartTime) -> put(Preflist, BKey, Obj, ReqId, StartTime, Options, {fsm, undefined, self()}). -put(Preflist, BKey, Obj, ReqId, StartTime, Options, Sender) +put(Preflist, BKey, Obj, ReqId, StartTime, Options, Sender) when is_integer(StartTime) -> riak_core_vnode_master:command(Preflist, ?KV_PUT_REQ{ @@ -94,25 +104,26 @@ put(Preflist, BKey, Obj, ReqId, StartTime, Options, Sender) riak_kv_vnode_master). %% Do a put without sending any replies -readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) -> +readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) -> put(Preflist, BKey, Obj, ReqId, StartTime, Options, ignore). -list_keys(Preflist, Bucket, ReqId) -> - riak_core_vnode_master:command(Preflist, - ?KV_LISTKEYS_REQ{ - bucket=Bucket, - req_id=ReqId}, - {fsm, undefined, self()}, - riak_kv_vnode_master). +list_keys(Preflist, ReqId, Caller, Bucket) -> + riak_core_vnode_master:command(Preflist, + ?KV_LISTKEYS_REQ{ + bucket=Bucket, + req_id=ReqId, + caller=Caller}, + ignore, + riak_kv_vnode_master). map(Preflist, ClientPid, QTerm, BKey, KeyData) -> - riak_core_vnode_master:command(Preflist, - ?KV_MAP_REQ{ - qterm=QTerm, - bkey=BKey, - keydata=KeyData}, - {fsm, undefined, ClientPid}, - riak_kv_vnode_master). + riak_core_vnode_master:sync_spawn_command(Preflist, + ?KV_MAP_REQ{ + qterm=QTerm, + bkey=BKey, + keydata=KeyData, + from=ClientPid}, + riak_kv_vnode_master). fold(Preflist, Fun, Acc0) -> riak_core_vnode_master:sync_spawn_command(Preflist, @@ -124,23 +135,24 @@ fold(Preflist, Fun, Acc0) -> purge_mapcaches() -> VNodes = riak_core_vnode_master:all_nodes(?MODULE), lists:foreach(fun(VNode) -> riak_core_vnode:send_command(VNode, purge_mapcache) end, VNodes). - + mapcache(Pid, BKey, What, R) -> riak_core_vnode:send_command(Pid, {mapcache, BKey, What, R}). - -get_vclocks(Preflist, BKeyList) -> + +get_vclocks(Preflist, BKeyList) -> riak_core_vnode_master:sync_spawn_command(Preflist, ?KV_VCLOCK_REQ{bkeys=BKeyList}, - riak_kv_vnode_master). + riak_kv_vnode_master). %% VNode callbacks init([Index]) -> Mod = app_helper:get_env(riak_kv, storage_backend), + CacheSize = app_helper:get_env(riak_kv, vnode_cache_entries, 100), Configuration = app_helper:get_env(riak_kv), {ok, ModState} = Mod:start(Index, Configuration), - schedule_clear_mapcache(), - {ok, #state{idx=Index, mod=Mod, modstate=ModState, mapcache=orddict:new()}}. + + {ok, #state{idx=Index, mod=Mod, modstate=ModState, mapcache=riak_kv_lru:new(CacheSize), mrjobs=dict:new()}}. handle_command(?KV_PUT_REQ{bkey=BKey, object=Object, @@ -150,33 +162,39 @@ handle_command(?KV_PUT_REQ{bkey=BKey, Sender, State=#state{idx=Idx,mapcache=Cache}) -> riak_core_vnode:reply(Sender, {w, Idx, ReqId}), do_put(Sender, BKey, Object, ReqId, StartTime, Options, State), - {noreply, State#state{mapcache=orddict:erase(BKey,Cache)}}; + riak_kv_lru:clear_bkey(Cache, BKey), + {noreply, State}; handle_command(?KV_GET_REQ{bkey=BKey,req_id=ReqId},Sender,State) -> do_get(Sender, BKey, ReqId, State); -handle_command(?KV_LISTKEYS_REQ{bucket=Bucket, req_id=ReqId}, _Sender, - State=#state{mod=Mod, modstate=ModState, idx=Idx}) -> +handle_command(#riak_kv_listkeys_req_v1{bucket=Bucket, req_id=ReqId}, _Sender, + State=#state{mod=Mod, modstate=ModState, idx=Idx}) -> do_list_bucket(ReqId,Bucket,Mod,ModState,Idx,State); -handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=ReqId}, _Sender, - State=#state{mod=Mod, modstate=ModState, +handle_command(?KV_LISTKEYS_REQ{bucket=Bucket, req_id=ReqId, caller=Caller}, _Sender, + State=#state{mod=Mod, modstate=ModState, idx=Idx}) -> + do_list_keys(Caller,ReqId,Bucket,Idx,Mod,ModState), + {noreply, State}; + +handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=ReqId}, _Sender, + State=#state{mod=Mod, modstate=ModState, idx=Idx, mapcache=Cache}) -> - NewState = State#state{mapcache=orddict:erase(BKey,Cache)}, + riak_kv_lru:clear_bkey(Cache, BKey), case Mod:delete(ModState, BKey) of ok -> - {reply, {del, Idx, ReqId}, NewState}; + {reply, {del, Idx, ReqId}, State}; {error, _Reason} -> - {reply, {fail, Idx, ReqId}, NewState} + {reply, {fail, Idx, ReqId}, State} end; -handle_command(?KV_MAP_REQ{bkey=BKey,qterm=QTerm,keydata=KeyData}, - Sender, State) -> - do_map(Sender,QTerm,BKey,KeyData,State,self()); +handle_command(?KV_MAP_REQ{bkey=BKey,qterm=QTerm,keydata=KeyData,from=From}, + _Sender, State) -> + do_map(From,QTerm,BKey,KeyData,State,self()); handle_command(?KV_VCLOCK_REQ{bkeys=BKeys}, _Sender, State) -> {reply, do_get_vclocks(BKeys, State), State}; handle_command(?FOLD_REQ{foldfun=Fun, acc0=Acc},_Sender,State) -> Reply = do_fold(Fun, Acc, State), {reply, Reply, State}; %% Commands originating from inside this vnode -handle_command({backend_callback, Ref, Msg}, _Sender, +handle_command({backend_callback, Ref, Msg}, _Sender, State=#state{mod=Mod, modstate=ModState}) -> Mod:callback(ModState, Ref, Msg), {noreply, State}; @@ -188,21 +206,42 @@ handle_command({mapcache, BKey,{FunName,Arg,KeyData}, MF_Res}, _Sender, end, KeyCache = orddict:store({FunName,Arg,KeyData},MF_Res,KeyCache0), {noreply, State#state{mapcache=orddict:store(BKey,KeyCache,Cache)}}; -handle_command({mapcache, BKey,{M,F,Arg,KeyData},MF_Res}, _Sender, +handle_command({mapcache, BKey,{M,F,Arg,KeyData},MF_Res}, _Sender, State=#state{mapcache=Cache}) -> - KeyCache0 = case orddict:find(BKey, Cache) of - error -> orddict:new(); - {ok,CDict} -> CDict - end, - KeyCache = orddict:store({M,F,Arg,KeyData},MF_Res,KeyCache0), - {noreply, State#state{mapcache=orddict:store(BKey,KeyCache,Cache)}}; -handle_command(purge_mapcache, _Sender, State) -> - {noreply, State#state{mapcache=orddict:new()}}; -handle_command(clear_mapcache, _Sender, State) -> - schedule_clear_mapcache(), - {noreply, State#state{mapcache=orddict:new()}}. - -handle_handoff_command(Req=?FOLD_REQ{}, Sender, State) -> + riak_kv_lru:put(Cache, BKey, {M,F,Arg,KeyData}, MF_Res), + {noreply, State}; +handle_command(purge_mapcache, _Sender, #state{mapcache=Cache}=State) -> + riak_kv_lru:clear(Cache), + {noreply, State}; +handle_command(clear_mapcache, _Sender, #state{mapcache=Cache}=State) -> + riak_kv_lru:clear(Cache), + {noreply, State}; +handle_command({mapexec_error_noretry, JobId, Err}, _Sender, #state{mrjobs=Jobs}=State) -> + NewState = case dict:find(JobId, Jobs) of + {ok, Job} -> + Jobs1 = dict:erase(JobId, Jobs), + #mrjob{target=Target} = Job, + gen_fsm:send_event(Target, {mapexec_error_noretry, self(), Err}), + State#state{mrjobs=Jobs1}; + error -> + State + end, + {noreply, NewState}; +handle_command({mapexec_reply, JobId, Result}, _Sender, #state{mrjobs=Jobs, + mapcache=MapCache}=State) -> + NewState = case dict:find(JobId, Jobs) of + {ok, Job} -> + Jobs1 = dict:erase(JobId, Jobs), + #mrjob{cachekey=CacheKey, target=Target, bkey=BKey} = Job, + riak_kv_lru:put(MapCache, BKey, CacheKey, Result), + gen_fsm:send_event(Target, {mapexec_reply, Result, self()}), + State#state{mrjobs=Jobs1}; + error -> + State + end, + {noreply, NewState}. + +handle_handoff_command(Req=?FOLD_REQ{}, Sender, State) -> handle_command(Req, Sender, State); handle_handoff_command(Req={backend_callback, _Ref, _Msg}, Sender, State) -> handle_command(Req, Sender, State); @@ -269,7 +308,7 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, PruneTime, Options, State) -> riak_core_vnode:reply(Sender, Reply), riak_kv_stat:update(vnode_put). -prepare_put(#state{}, #putargs{lww=true, robj=RObj}) -> +prepare_put(#state{}, #putargs{lww=true, robj=RObj}) -> {true, RObj}; prepare_put(#state{mod=Mod,modstate=ModState}, #putargs{bkey=BKey, robj=RObj, @@ -366,6 +405,58 @@ do_list_bucket(ReqID,Bucket,Mod,ModState,Idx,State) -> RetVal = Mod:list_bucket(ModState,Bucket), {reply, {kl, RetVal, Idx, ReqID}, State}. +%% Use in-memory key list for bitcask backend +%% @private +do_list_keys(Caller,ReqId,Bucket,Idx,Mod,ModState) + when Mod =:= riak_kv_bitcask_backend -> + F = fun(BKey, Acc) -> + process_keys(Caller, ReqId, Idx, Bucket, BKey, Acc) end, + case Mod:fold_keys(ModState, F, []) of + [] -> + ok; + Remainder -> + Caller ! {ReqId, {kl, Idx, Remainder}} + end, + Caller ! {ReqId, Idx, done}; +%% @private +do_list_keys(Caller,ReqId,Bucket,Idx,Mod,ModState) -> + F = fun(BKey, _, Acc) -> + process_keys(Caller, ReqId, Idx, Bucket, BKey, Acc) end, + case Mod:fold(ModState, F, []) of + [] -> + ok; + Remainder -> + Caller ! {ReqId, {kl, Idx, Remainder}} + end, + Caller ! {ReqId, Idx, done}. + +%% @private +process_keys(Caller, ReqId, Idx, '_', {Bucket, _K}, Acc) -> + %% Bucket='_' means "list buckets" instead of "list keys" + buffer_key_result(Caller, ReqId, Idx, [Bucket|Acc]); +process_keys(Caller, ReqId, Idx, {filter, Bucket, Fun}, {Bucket, K}, Acc) -> + %% Bucket={filter,Bucket,Fun} means "only include keys + %% in Bucket that make Fun(K) return 'true'" + case Fun(K) of + true -> + buffer_key_result(Caller, ReqId, Idx, [K|Acc]); + false -> + Acc + end; +process_keys(Caller, ReqId, Idx, Bucket, {Bucket, K}, Acc) -> + buffer_key_result(Caller, ReqId, Idx, [K|Acc]); +process_keys(_Caller, _ReqId, _Idx, _Bucket, {_B, _K}, Acc) -> + Acc. + +buffer_key_result(Caller, ReqId, Idx, Acc) -> + case length(Acc) >= 100 of + true -> + Caller ! {ReqId, {kl, Idx, Acc}}, + []; + false -> + Acc + end. + %% @private do_fold(Fun, Acc0, _State=#state{mod=Mod, modstate=ModState}) -> Mod:fold(ModState, Fun, Acc0). @@ -399,10 +490,14 @@ do_diffobj_put(BKey={Bucket,_}, DiffObj, end. %% @private -do_map(Sender, QTerm, BKey, KeyData, #state{mod=Mod, modstate=ModState, mapcache=Cache}=State, VNode) -> - {Reply, NewState} = case do_map(QTerm, BKey, Mod, ModState, KeyData, Cache, VNode, Sender) of - map_executing -> - {{mapexec_reply, executing, self()}, State}; +do_map(Sender, QTerm, BKey, KeyData, #state{mrjobs=Jobs, mod=Mod, modstate=ModState, + mapcache=Cache}=State, VNode) -> + {Reply, NewState} = case do_map(QTerm, BKey, Mod, ModState, KeyData, Cache, VNode) of + {map_executing, BKey, CacheKey, ReqId} -> + J = #mrjob{reqid=ReqId, target=Sender, + bkey=BKey, cachekey=CacheKey}, + Jobs1 = dict:store(ReqId, J, Jobs), + {{mapexec_reply, executing, self()}, State#state{mrjobs=Jobs1}}; {ok, Retval} -> {{mapexec_reply, Retval, self()}, State}; {error, Error} -> @@ -410,25 +505,27 @@ do_map(Sender, QTerm, BKey, KeyData, #state{mod=Mod, modstate=ModState, mapcache end, {reply, Reply, NewState}. -do_map({erlang, {map, FunTerm, Arg, _Acc}}, BKey, Mod, ModState, KeyData, Cache, VNode, _Sender) -> +do_map({erlang, {map, FunTerm, Arg, _Acc}}, BKey, Mod, ModState, KeyData, Cache, _VNode) -> CacheKey = build_key(FunTerm, Arg, KeyData), - CacheVal = cache_fetch(BKey, CacheKey, Cache), - case CacheVal of - not_cached -> - uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData, VNode); + case riak_kv_lru:fetch(Cache, BKey, CacheKey) of + notfound -> + uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData); CV -> {ok, CV} end; -do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, Cache, _VNode, Sender) -> +do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, Cache, _VNode) -> CacheKey = build_key(FunTerm, Arg, KeyData), - CacheVal = cache_fetch(BKey, CacheKey, Cache), - case CacheVal of - not_cached -> + case riak_kv_lru:fetch(Cache, BKey, CacheKey) of + notfound -> case Mod:get(ModState, BKey) of {ok, Binary} -> V = binary_to_term(Binary), - riak_kv_js_manager:dispatch({Sender, QTerm, V, KeyData, BKey}), - map_executing; + case riak_kv_js_manager:dispatch({self(), QTerm, V, KeyData, BKey}, 10) of + {ok, JobId} -> + {map_executing, BKey, CacheKey, JobId}; + Error -> + Error + end; {error, notfound} -> {error, notfound} end; @@ -440,33 +537,23 @@ build_key({modfun, CMod, CFun}, Arg, KeyData) -> {CMod, CFun, Arg, KeyData}; build_key({jsfun, FunName}, Arg, KeyData) -> {FunName, Arg, KeyData}; +build_key({jsanon, Src}, Arg, KeyData) -> + {mochihex:to_hex(crypto:sha(Src)), Arg, KeyData}; build_key(_, _, _) -> no_key. -cache_fetch(_BKey, no_key, _Cache) -> - not_cached; -cache_fetch(BKey, CacheKey, Cache) -> - case orddict:find(BKey, Cache) of - error -> not_cached; - {ok,CDict} -> - case orddict:find(CacheKey,CDict) of - error -> not_cached; - {ok,CVal} -> CVal - end - end. - -uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData, VNode) -> +uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData) -> case Mod:get(ModState, BKey) of {ok, Binary} -> V = binary_to_term(Binary), - exec_map(V, FunTerm, Arg, BKey, KeyData, VNode); + exec_map(V, FunTerm, Arg, BKey, KeyData); {error, notfound} -> - exec_map({error, notfound}, FunTerm, Arg, BKey, KeyData, VNode); + exec_map({error, notfound}, FunTerm, Arg, BKey, KeyData); X -> {error, X} end. -exec_map(V, FunTerm, Arg, BKey, KeyData, _VNode) -> +exec_map(V, FunTerm, Arg, BKey, KeyData) -> try case FunTerm of {qfun, F} -> {ok, (F)(V,KeyData,Arg)}; @@ -480,10 +567,6 @@ exec_map(V, FunTerm, Arg, BKey, KeyData, _VNode) -> {error, Reason} end. -schedule_clear_mapcache() -> - riak_core_vnode:send_command_after(?CLEAR_MAPCACHE_INTERVAL, clear_mapcache). - - -ifdef(TEST). dummy_backend() -> @@ -491,7 +574,91 @@ dummy_backend() -> riak_core_ring_manager:set_ring_global(Ring), application:set_env(riak_kv, storage_backend, riak_kv_ets_backend), application:set_env(riak_core, default_bucket_props, []). - + +backend_with_known_key() -> + dummy_backend(), + {ok, S1} = init([0]), + B = <<"f">>, + K = <<"b">>, + O = riak_object:new(B, K, <<"z">>), + {noreply, S2} = handle_command(?KV_PUT_REQ{bkey={B,K}, + object=O, + req_id=123, + start_time=riak_core_util:moment(), + options=[]}, + {raw, 456, self()}, + S1), + {S2, B, K}. + +list_buckets_test() -> + {S, B, _K} = backend_with_known_key(), + Caller = new_result_listener(), + handle_command(?KV_LISTKEYS_REQ{bucket='_', + req_id=124, + caller=Caller}, + {raw, 456, self()}, S), + ?assertEqual({ok, [B]}, results_from_listener(Caller)), + flush_msgs(). + +filter_keys_test() -> + {S, B, K} = backend_with_known_key(), + + Caller1 = new_result_listener(), + handle_command(?KV_LISTKEYS_REQ{ + bucket={filter,B,fun(_) -> true end}, + req_id=124, + caller=Caller1}, + {raw, 456, self()}, S), + ?assertEqual({ok, [K]}, results_from_listener(Caller1)), + + Caller2 = new_result_listener(), + handle_command(?KV_LISTKEYS_REQ{ + bucket={filter,B,fun(_) -> false end}, + req_id=125, + caller=Caller2}, + {raw, 456, self()}, S), + ?assertEqual({ok, []}, results_from_listener(Caller2)), + + Caller3 = new_result_listener(), + handle_command(?KV_LISTKEYS_REQ{ + bucket={filter,<<"g">>,fun(_) -> true end}, + req_id=126, + caller=Caller3}, + {raw, 456, self()}, S), + ?assertEqual({ok, []}, results_from_listener(Caller3)), + + flush_msgs(). + +new_result_listener() -> + spawn(fun result_listener/0). + +result_listener() -> + result_listener_keys([]). + +result_listener_keys(Acc) -> + receive + {_,{kl,_,Keys}} -> + result_listener_keys(Keys++Acc); + {_, _, done} -> + result_listener_done(Acc) + after 5000 -> + result_listener_done({timeout, Acc}) + end. + +result_listener_done(Result) -> + receive + {get_results, Pid} -> + Pid ! {listener_results, Result} + end. + +results_from_listener(Listener) -> + Listener ! {get_results, self()}, + receive + {listener_results, Result} -> + {ok, Result} + after 5000 -> + {error, listener_timeout} + end. %% Make sure the mapcache gets cleared when the bkey is updated mapcache_put_test() -> @@ -499,10 +666,10 @@ mapcache_put_test() -> BKey = {<<"b">>,<<"k">>}, CacheKey = {mod,func,arg,keydata}, {ok, S1} = init([0]), - ?assertEqual(not_cached, cache_fetch(BKey, CacheKey, S1#state.mapcache)), + ?assertEqual(notfound, riak_kv_lru:fetch(S1#state.mapcache, BKey, CacheKey)), {noreply, S2} = handle_command({mapcache, BKey, CacheKey, result}, noreply, S1), - ?assertEqual(result, cache_fetch(BKey, CacheKey, S2#state.mapcache)), + ?assertEqual(result, riak_kv_lru:fetch(S2#state.mapcache, BKey, CacheKey)), O = riak_object:new(<<"b">>,<<"k">>,<<"v">>), {noreply, S3} = handle_command(?KV_PUT_REQ{bkey=BKey, @@ -511,7 +678,7 @@ mapcache_put_test() -> start_time=riak_core_util:moment(), options=[]}, {raw, 456, self()}, S2), - ?assertEqual(not_cached, cache_fetch(BKey, CacheKey, S3#state.mapcache)), + ?assertEqual(notfound, riak_kv_lru:fetch(S3#state.mapcache, BKey, CacheKey)), %% The put request generates a {w,...} and {dw,...} event flush_msgs(). @@ -521,15 +688,15 @@ mapcache_delete_test() -> BKey = {<<"b">>,<<"k">>}, CacheKey = {mod,func,arg,keydata}, {ok, S1} = init([0]), - ?assertEqual(not_cached, cache_fetch(BKey, CacheKey, S1#state.mapcache)), + ?assertEqual(notfound, riak_kv_lru:fetch(S1#state.mapcache, BKey, CacheKey)), {noreply, S2} = handle_command({mapcache, BKey, CacheKey, result}, noreply, S1), - ?assertEqual(result, cache_fetch(BKey, CacheKey, S2#state.mapcache)), + ?assertEqual(result, riak_kv_lru:fetch(S2#state.mapcache, BKey, CacheKey)), {reply, {del, 0, 123}, S3} = handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=123}, {raw, 456, self()}, S2), - ?assertEqual(not_cached, cache_fetch(BKey, CacheKey, S3#state.mapcache)), + ?assertEqual(notfound, riak_kv_lru:fetch(S3#state.mapcache, BKey, CacheKey)), %% The put request generates a {w,...} and {dw,...} event flush_msgs(). @@ -559,7 +726,7 @@ purge_mapcaches_test() -> %% Prove nothing there FunTerm = {modfun, ?MODULE, map_test}, - Arg = arg, + Arg = arg, QTerm = {erlang, {map, FunTerm, Arg, acc}}, KeyData = keydata, CacheKey = build_key(FunTerm, Arg, KeyData), @@ -581,31 +748,28 @@ purge_mapcaches_test() -> riak_core_node_watcher:service_down(riak_kv), cleanup_servers(). - + cleanup_servers() -> riak_kv_test_util:stop_process(riak_core_node_watcher), riak_kv_test_util:stop_process(riak_core_node_watcher_events), riak_kv_test_util:stop_process(riak_core_ring_events), riak_kv_test_util:stop_process(riak_core_vnode_sup), riak_kv_test_util:stop_process(riak_kv_vnode_master). - + check_mapcache(Index, QTerm, BKey, KeyData, Expect) -> - map({Index,node()}, self(), QTerm, BKey, KeyData), - receive - Msg -> - {'$gen_event',{mapexec_reply,Result,_Pid}} = Msg, - ?assertEqual(Expect, Result) - after - 100 -> + case map({Index,node()}, self(), QTerm, BKey, KeyData) of + {mapexec_reply, Result, _Pid} -> + ?assertMatch(Expect, Result); + _ -> ?assert(false) end. - -%% Map identity function - returns what you give it + +%% Map identity function - returns what you give it map_test(Obj, _KeyData, _Arg) -> Obj. -flush_msgs() -> +flush_msgs() -> receive _Msg -> flush_msgs() @@ -613,7 +777,7 @@ flush_msgs() -> 0 -> ok end. - - + + -endif. % TEST diff --git a/src/riak_kv_wm_mapred.erl b/src/riak_kv_wm_mapred.erl index 05c7544f17..7d1814691d 100644 --- a/src/riak_kv_wm_mapred.erl +++ b/src/riak_kv_wm_mapred.erl @@ -1,6 +1,6 @@ %% ------------------------------------------------------------------- %% -%% mapred_resource: webmachine resource for mapreduce requests +%% riak_kv_wm_mapred: webmachine resource for mapreduce requests %% %% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. %% @@ -76,19 +76,22 @@ nop(RD, State) -> process_post(RD, #state{inputs=Inputs, mrquery=Query, timeout=Timeout}=State) -> Me = self(), {ok, Client} = riak:local_client(), + ResultTransformer = fun riak_kv_mapred_json:jsonify_not_found/1, case wrq:get_qs_value("chunked", RD) of "true" -> {ok, ReqId} = if is_list(Inputs) -> - {ok, {RId, FSM}} = Client:mapred_stream(Query, Me, - fun riak_kv_mapred_json:jsonify_not_found/1, - Timeout), + {ok, {RId, FSM}} = Client:mapred_stream(Query, Me, ResultTransformer, Timeout), luke_flow:add_inputs(FSM, Inputs), luke_flow:finish_inputs(FSM), {ok, RId}; is_binary(Inputs) -> - Client:mapred_bucket_stream(Inputs, Query, Me, - Timeout) + Client:mapred_bucket_stream(Inputs, Query, Me, Timeout); + is_tuple(Inputs) -> + {ok, {RId, FSM}} = Client:mapred_stream(Query, Me, ResultTransformer, Timeout), + Client:mapred_dynamic_inputs_stream(FSM, Inputs, Timeout), + luke_flow:finish_inputs(FSM), + {ok, RId} end, Boundary = riak_core_util:unique_id_62(), RD1 = wrq:set_resp_header("Content-Type", "multipart/mixed;boundary=" ++ Boundary, RD), @@ -97,12 +100,18 @@ process_post(RD, #state{inputs=Inputs, mrquery=Query, timeout=Timeout}=State) -> Param when Param =:= "false"; Param =:= undefined -> Results = if is_list(Inputs) -> - Client:mapred(Inputs, Query, - fun riak_kv_mapred_json:jsonify_not_found/1, - ?DEFAULT_TIMEOUT); + Client:mapred(Inputs, Query, ResultTransformer, Timeout); is_binary(Inputs) -> - Client:mapred_bucket(Inputs, Query, fun riak_kv_mapred_json:jsonify_not_found/1, - ?DEFAULT_TIMEOUT) + Client:mapred_bucket(Inputs, Query, ResultTransformer, Timeout); + is_tuple(Inputs) -> + case Client:mapred_stream(Query,Me,ResultTransformer,Timeout) of + {ok, {ReqId, FlowPid}} -> + Client:mapred_dynamic_inputs_stream(FlowPid, Inputs, Timeout), + luke_flow:finish_inputs(FlowPid), + luke_flow:collect_output(ReqId, Timeout); + Error -> + Error + end end, RD1 = wrq:set_resp_header("Content-Type", "application/json", RD), case Results of diff --git a/src/riak_kv_wm_ping.erl b/src/riak_kv_wm_ping.erl index 7238f06ea2..5cfd3e4196 100644 --- a/src/riak_kv_wm_ping.erl +++ b/src/riak_kv_wm_ping.erl @@ -1,6 +1,6 @@ %% ------------------------------------------------------------------- %% -%% ping_http_resource: simple Webmachine resource for availability test +%% riak_kv_wm_ping: simple Webmachine resource for availability test %% %% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. %% diff --git a/src/riak_object.erl b/src/riak_object.erl index 6374937e61..0f6d700661 100644 --- a/src/riak_object.erl +++ b/src/riak_object.erl @@ -60,7 +60,7 @@ %% @doc Constructor for new riak objects. -spec new(Bucket::bucket(), Key::key(), Value::value()) -> riak_object(). new(B, K, V) when is_binary(B), is_binary(K) -> - new(B, K, V, dict:new()). + new(B, K, V, no_initial_metadata). %% @doc Constructor for new riak objects with an initial content-type. -spec new(Bucket::bucket(), Key::key(), Value::value(), string() | dict()) -> riak_object(). @@ -76,9 +76,16 @@ new(B, K, V, MD) when is_binary(B), is_binary(K) -> true -> throw({error,key_too_large}); false -> - Contents = [#r_content{metadata=MD, value=V}], - #r_object{bucket=B,key=K,updatemetadata=MD, - contents=Contents,vclock=vclock:fresh()} + case MD of + no_initial_metadata -> + Contents = [#r_content{metadata=dict:new(), value=V}], + #r_object{bucket=B,key=K, + contents=Contents,vclock=vclock:fresh()}; + _ -> + Contents = [#r_content{metadata=MD, value=V}], + #r_object{bucket=B,key=K,updatemetadata=MD, + contents=Contents,vclock=vclock:fresh()} + end end. -spec equal(riak_object(), riak_object()) -> true | false.