Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

count() now doesn't return tombstones

  • Loading branch information...
commit 90665d2b432eb83cd7eecd1b6afe5f13f89870aa 1 parent e1efa8e
@pavlobaron authored
Showing with 32 additions and 13 deletions.
  1. +6 −2 src/riak_mongo_message.erl
  2. +26 −11 src/riak_mongo_riak.erl
View
8 src/riak_mongo_message.erl
@@ -149,6 +149,9 @@ you(#worker_state{sock=Sock}) ->
{ok, {{A, B, C, D}, P}} = inet:peername(Sock), %IPv6???
io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]).
+clean_ok([ok|L]) -> clean_ok(L);
+clean_ok(L) -> L.
+
db_command(?ADM, <<"whatsmyuri">>, _Collection, _Options, State) ->
{ok, [{you, {utf8, you(State)}}, {ok, 1}], State};
@@ -157,11 +160,12 @@ db_command(?ADM, <<"replSetGetStatus">>, _Collection, _Options, State) ->
{ok, [{ok, false}], State};
db_command(_DataBase, <<"getlasterror">>, _Collection, _Options, State) ->
- case State#worker_state.lastError of
+ E = clean_ok(State#worker_state.lastError),
+ case E of
[] ->
{ok, [{ok,true}], State#worker_state{lastError=[]}};
MSG ->
- {ok, [{err, io:format("~p", MSG)}], State#worker_state{lastError=[]}}
+ {ok, [{err, io:format("~p", [MSG])}], State#worker_state{lastError=[]}}
end;
db_command(DataBase, ?DROP, Collection, _Options, State) ->
View
37 src/riak_mongo_riak.erl
@@ -191,7 +191,6 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove
{ok, Errors}
= riak_kv_mrc_pipe:mapred(Bucket,
[{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),
-
State#worker_state{ lastError=Errors };
false when SingleRemove ->
@@ -215,16 +214,34 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove
end.
count(Bucket, State) ->
- {ok, C} = riak:local_client(),
- {ok, [Count]} = C:mapred(
- Bucket,
- [riak_kv_mapreduce:reduce_count_inputs(true)]
- ),
- Doc = [{n, Count}],
+ Doc = case riak_kv_mrc_pipe:mapred(Bucket,
+ [{map, {qfun, fun map_drop_tombstones/3},
+ none,
+ true},
+ {reduce, {qfun, fun reduce_count/2},
+ none, true}]) of
+ {ok, [[Count]]} -> [{n, Count}];
+ {ok, [_, [Count]]} -> [{n, Count}]
+ end,
{ok, Doc, State}.
%% internals
+reduce_count(Results, _) ->
+ [lists:foldl(fun input_counter_fold/2, 0, Results)].
+
+input_counter_fold(PrevCount, Acc) when is_integer(PrevCount) ->
+ PrevCount + Acc;
+input_counter_fold(_, Acc) ->
+ 1 + Acc.
+
+map_drop_tombstones(RiakObject, _KeyData, _) ->
+ Acc = [],
+ case riak_kv_util:is_x_deleted(RiakObject) of
+ true -> Acc;
+ false -> [1]
+ end.
+
cursor_add(PID, #worker_state{ cursors=Dict, cursor_next=ID }=State) ->
MRef = erlang:monitor(process, PID),
{ok, ID, State#worker_state{ cursors=dict:store(ID,{MRef,PID},Dict), cursor_next=ID+1 }}.
@@ -393,8 +410,7 @@ map_query(RiakObject, _KeyData, {CompiledQuery, Project}) ->
case riak_to_bson_object(RiakObject) of
{ok, Document} ->
do_mongo_match(RiakObject, Document, CompiledQuery, Project, Acc);
- _ ->
- Acc
+ _ -> Acc
end.
do_mongo_match(RiakObject,Document,CompiledQuery,Project,Acc) ->
@@ -402,8 +418,7 @@ do_mongo_match(RiakObject,Document,CompiledQuery,Project,Acc) ->
CompiledQuery) of
true ->
[Project(RiakObject, Document)|Acc];
- false ->
- Acc
+ false -> Acc
end.
compute_projection_fun(Projection) ->
Please sign in to comment.
Something went wrong with that request. Please try again.