Skip to content

Commit

Permalink
Merge remote branch 'upstream/master'
Browse files Browse the repository at this point in the history
Conflicts:
	src/riak_kv_pb_socket.erl
  • Loading branch information
Mihai Balea committed Sep 27, 2010
2 parents 5f7b9c4 + 0e0e78e commit b0bc41f
Show file tree
Hide file tree
Showing 32 changed files with 1,471 additions and 384 deletions.
3 changes: 3 additions & 0 deletions .hgtags
@@ -0,0 +1,3 @@
e716ebd150ff8698a89a1ae28bc868385a164497 riak_kv-0.13.0rc1
04f5cfd0b8ca7c195e67658367afa5625c186218 riak_kv-0.13.0rc2
a5e0a7b843b52fe846b8006543d1484b548b9a18 riak_kv-0.13.0rc3
7 changes: 6 additions & 1 deletion ebin/riak_kv.app
Expand Up @@ -3,8 +3,9 @@
{application, riak_kv,
[
{description, "Riak Key/Value Store"},
{vsn, "0.12.0"},
{vsn, "0.13.0rc3"},
{modules, [
lk,
raw_link_walker,
riak,
riak_client,
Expand All @@ -23,8 +24,12 @@
riak_kv_js_manager,
riak_kv_js_sup,
riak_kv_js_vm,
riak_kv_keylister,
riak_kv_keylister_master,
riak_kv_keylister_sup,
riak_kv_keys_fsm,
riak_kv_legacy_vnode,
riak_kv_lru,
riak_kv_map_executor,
riak_kv_map_localphase,
riak_kv_map_phase,
Expand Down
13 changes: 8 additions & 5 deletions include/riak_kv_vnode.hrl
Expand Up @@ -15,25 +15,28 @@
bucket :: binary(),
req_id :: non_neg_integer()}).

-record(riak_kv_listkeys_req_v2, {
bucket :: binary(),
req_id :: non_neg_integer(),
caller :: pid()}).

-record(riak_kv_delete_req_v1, {
bkey :: {binary(), binary()},
req_id :: non_neg_integer()}).

-record(riak_kv_map_req_v1, {
bkey :: {binary(), binary()},
qterm :: term(),
keydata :: term()}).
keydata :: term(),
from :: term()}).

-record(riak_kv_vclock_req_v1, {
bkeys = [] :: [{binary(), binary()}]
}).

-define(KV_PUT_REQ, #riak_kv_put_req_v1).
-define(KV_GET_REQ, #riak_kv_get_req_v1).
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v1).
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v2).
-define(KV_DELETE_REQ, #riak_kv_delete_req_v1).
-define(KV_MAP_REQ, #riak_kv_map_req_v1).
-define(KV_VCLOCK_REQ, #riak_kv_vclock_req_v1).



24 changes: 12 additions & 12 deletions rebar.config
Expand Up @@ -3,16 +3,16 @@
{erl_opts, [debug_info, fail_on_warning]}.

{deps, [
{riak_core, "0.12.0", {hg, "http://bitbucket.org/basho/riak_core",
"tip"}},
{riakc, "0.2.0", {hg, "http://bitbucket.org/basho/riak-erlang-client",
"tip"}},
{luke, "\.*", {hg, "http://bitbucket.org/basho/luke",
"tip"}},
{erlang_js, "0\.4", {hg, "http://bitbucket.org/basho/erlang_js",
"erlang_js-0.4"}},
{bitcask, "1.0.2", {hg, "http://bitbucket.org/basho/bitcask",
"bitcask-1.0.2"}},
{ebloom, "1.0.1", {hg, "http://bitbucket.org/basho/ebloom",
"ebloom-1.0.1"}}
{riak_core, "0.13.0rc3", {hg, "http://bitbucket.org/basho/riak_core",
"riak_core-0.13.0rc3"}},
{riakc, "1.0.0", {hg, "http://bitbucket.org/basho/riak-erlang-client",
"riakc-1.0.0"}},
{luke, "0.2.1", {hg, "http://bitbucket.org/basho/luke",
"luke-0.2.1"}},
{erlang_js, "0.4.1", {hg, "http://bitbucket.org/basho/erlang_js",
"90"}},
{bitcask, "1.1.1", {hg, "http://bitbucket.org/basho/bitcask",
"bitcask-1.1.1"}},
{ebloom, "1.0.2", {hg, "http://bitbucket.org/basho/ebloom",
"ebloom-1.0.2"}}
]}.
89 changes: 89 additions & 0 deletions src/lk.erl
@@ -0,0 +1,89 @@
%% -------------------------------------------------------------------
%%
%% lk: Helper functions for list keys
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(lk).

-export([fsm/1, pn/1]).

fsm(Bucket) ->
ReqId = random:uniform(10000),
Start = erlang:now(),
riak_kv_keys_fsm:start(ReqId, Bucket, 60000, plain, 0.0001, self()),
{ok, Count} = gather_fsm_results(ReqId, 0),
io:format("~n"),
End = erlang:now(),
Ms = erlang:round(timer:now_diff(End, Start) / 1000),
io:format("Found ~p keys in ~pms.~n", [Count, Ms]).

pn(Bucket) ->
ReqId = random:uniform(10000),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
{ok, Bloom} = ebloom:new(10000000,0.0001,ReqId),
BucketProps = riak_core_bucket:get_bucket(Bucket, Ring),
N = proplists:get_value(n_val,BucketProps),
PLS = lists:flatten(riak_core_ring:all_preflists(Ring,N)),
Nodes = [node()|nodes()],
Start = erlang:now(),
start_listers(Nodes, ReqId, Bucket, PLS),
{ok, Count} = gather_pn_results(ReqId, Bloom, length(Nodes), 0),
End = erlang:now(),
Ms = erlang:round(timer:now_diff(End, Start) / 1000),
io:format("Found ~p keys in ~pms.~n", [Count, Ms]).

gather_fsm_results(ReqId, Count) ->
receive
{ReqId, {keys, Keys}} ->
io:format("."),
gather_fsm_results(ReqId, Count + length(Keys));
{ReqId, done} ->
{ok, Count}
after 120000 ->
{error, timeout}
end.

start_listers([], _ReqId, _Bucket, _VNodes) ->
ok;
start_listers([H|T], ReqId, Bucket, VNodes) ->
riak_kv_keylister_master:start_keylist(H, ReqId, self(), Bucket, VNodes, 60000),
start_listers(T, ReqId, Bucket, VNodes).

gather_pn_results(_, BF, 0, Count) ->
ebloom:clear(BF),
{ok, Count};
gather_pn_results(ReqId, BF, NodeCount, Count) ->
%%io:format("NodeCount: ~p, key count: ~p~n", [NodeCount, Count]),
receive
{ReqId, {kl, Keys0}} ->
F = fun(Key, Acc) ->
case ebloom:contains(BF, Key) of
false ->
ebloom:insert(BF, Key),
[Key|Acc];
true ->
Acc
end end,
Keys = lists:foldl(F, [], Keys0),
gather_pn_results(ReqId, BF, NodeCount, Count + length(Keys));
{ReqId, done} ->
gather_pn_results(ReqId, BF, NodeCount - 1, Count)
after 10000 ->
{error, timeout}
end.
15 changes: 10 additions & 5 deletions src/raw_link_walker.erl
@@ -1,19 +1,24 @@
%% -------------------------------------------------------------------
%%
%% raw_link_walker: Backwards compatibility module for link traversal
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at

%%
%% http://www.apache.org/licenses/LICENSE-2.0

%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.

%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.

%%
%% -------------------------------------------------------------------
-module(raw_link_walker).

-export([mapreduce_linkfun/3]).
Expand Down
19 changes: 16 additions & 3 deletions src/riak_client.erl
Expand Up @@ -29,6 +29,7 @@
-export([mapred_stream/2,mapred_stream/3,mapred_stream/4]).
-export([mapred_bucket/2,mapred_bucket/3,mapred_bucket/4]).
-export([mapred_bucket_stream/3,mapred_bucket_stream/4,mapred_bucket_stream/6]).
-export([mapred_dynamic_inputs_stream/3]).
-export([get/2, get/3,get/4]).
-export([put/1, put/2,put/3,put/4,put/5]).
-export([delete/2,delete/3,delete/4]).
Expand Down Expand Up @@ -163,9 +164,21 @@ mapred_bucket(Bucket, Query, ResultTransformer, Timeout, ErrorTolerance) ->
ResultTransformer, Timeout, ErrorTolerance),
luke_flow:collect_output(MR_ReqId, Timeout).

%%

%%
-define(PRINT(Var), io:format("DEBUG: ~p:~p - ~p~n~n ~p~n~n", [?MODULE, ?LINE, ??Var, Var])).

%% An InputDef defines a Module and Function to call to generate
%% inputs for a map/reduce job. Should return {ok,
%% LukeReqID}. Ideally, we'd combine both the other input types (BKeys
%% and Bucket) into this approach, but postponing until after a code
%% review of Map/Reduce.
mapred_dynamic_inputs_stream(FSMPid, InputDef, Timeout) ->
case InputDef of
{modfun, Mod, Fun, Options} ->
Mod:Fun(FSMPid, Options, Timeout);
_ ->
throw({invalid_inputdef, InputDef})
end.

%% @spec get(riak_object:bucket(), riak_object:key()) ->
%% {ok, riak_object:riak_object()} |
Expand Down Expand Up @@ -431,7 +444,7 @@ get_bucket(BucketName) ->
%% @spec reload_all(Module :: atom()) -> term()
%% @doc Force all Riak nodes to reload Module.
%% This is used when loading new modules for map/reduce functionality.
reload_all(Module) -> rpc:call(Node,riak_kv_util,reload_all,[Module]).
reload_all(Module) -> rpc:call(Node,riak_core_util,reload_all,[Module]).

%% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
%% @doc Cause all partitions owned by ExitingNode to be taken over
Expand Down
2 changes: 1 addition & 1 deletion src/riak_kv_app.erl
Expand Up @@ -74,7 +74,7 @@ start(_Type, _StartArgs) ->
case riak_kv_sup:start_link() of
{ok, Pid} ->
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_kv_ring_handler blocks until all vnodes have been started
%% The riak_core_ring_handler blocks until all vnodes have been started
%% synchronously.
riak_core:register_vnode_module(riak_kv_vnode),
riak_core_node_watcher:service_up(riak_kv, self()),
Expand Down
22 changes: 22 additions & 0 deletions src/riak_kv_backend.erl
@@ -1,3 +1,25 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_backend: Riak backend behaviour
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_kv_backend).
-export([behaviour_info/1]).
-export([callback_after/3]).
Expand Down
59 changes: 44 additions & 15 deletions src/riak_kv_bitcask_backend.erl
Expand Up @@ -33,15 +33,17 @@
list/1,
list_bucket/2,
fold/3,
fold_keys/3,
drop/1,
is_empty/1,
callback/3]).


-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-include_lib("bitcask/include/bitcask.hrl").

-define(MERGE_CHECK_INTERVAL, timer:minutes(3)).

start(Partition, _Config) ->
Expand Down Expand Up @@ -115,15 +117,38 @@ list({Ref, _}) ->
Other
end.

list_bucket(State, {filter, Bucket, Fun}) ->
[K || {B, K} <- ?MODULE:list(State),
B =:= Bucket,
Fun(K)];
list_bucket(State, '_') ->
[B || {B, _K} <- ?MODULE:list(State)];
list_bucket(State, Bucket) ->
[K || {B, K} <- ?MODULE:list(State), B =:= Bucket].

list_bucket({Ref, _}, {filter, Bucket, Fun}) ->
bitcask:fold_keys(Ref,
fun(#bitcask_entry{key=BK},Acc) ->
{B,K} = binary_to_term(BK),
case B of
Bucket ->
case Fun(K) of
true -> [K|Acc];
false -> Acc
end;
_ ->
Acc
end
end, []);
list_bucket({Ref, _}, '_') ->
bitcask:fold_keys(Ref,
fun(#bitcask_entry{key=BK},Acc) ->
{B,_K} = binary_to_term(BK),
case lists:member(B,Acc) of
true -> Acc;
false -> [B|Acc]
end
end, []);
list_bucket({Ref, _}, Bucket) ->
bitcask:fold_keys(Ref,
fun(#bitcask_entry{key=BK},Acc) ->
{B,K} = binary_to_term(BK),
case B of
Bucket -> [K|Acc];
_ -> Acc
end
end, []).

fold({Ref, _}, Fun0, Acc0) ->
%% When folding across the bitcask, the bucket/key tuple must
Expand All @@ -135,6 +160,11 @@ fold({Ref, _}, Fun0, Acc0) ->
end,
Acc0).

fold_keys({Ref, _}, Fun, Acc) ->
F = fun(#bitcask_entry{key=K}, Acc1) ->
Fun(binary_to_term(K), Acc1) end,
bitcask:fold_keys(Ref, F, Acc).

drop({Ref, BitcaskRoot}) ->
%% todo: once bitcask has a more friendly drop function
%% of its own, use that instead.
Expand All @@ -146,12 +176,12 @@ drop({Ref, BitcaskRoot}) ->

is_empty({Ref, _}) ->
%% Determining if a bitcask is empty requires us to find at least
%% one value that is NOT a tombstone. Accomplish this by doing a fold
%% that forcibly bails on the very first k/v encountered.
F = fun(_K, _V, _Acc0) ->
%% one value that is NOT a tombstone. Accomplish this by doing a fold_keys
%% that forcibly bails on the very first key encountered.
F = fun(_K, _Acc0) ->
throw(found_one_value)
end,
case catch(bitcask:fold(Ref, F, undefined)) of
case catch(bitcask:fold_keys(Ref, F, undefined)) of
found_one_value ->
false;
_ ->
Expand Down Expand Up @@ -200,7 +230,6 @@ schedule_sync(Ref, SyncIntervalMs) when is_reference(Ref) ->
schedule_merge(Ref) when is_reference(Ref) ->
riak_kv_backend:callback_after(?MERGE_CHECK_INTERVAL, Ref, merge_check).


%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down

0 comments on commit b0bc41f

Please sign in to comment.