Skip to content

Commit

Permalink
merged with drkrab
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlobaron committed Apr 21, 2012
1 parent 706c5b8 commit d2944eb
Showing 1 changed file with 196 additions and 34 deletions.
230 changes: 196 additions & 34 deletions src/riak_mongo_riak.erl
Expand Up @@ -24,11 +24,17 @@


-module(riak_mongo_riak). -module(riak_mongo_riak).


-include_lib("riak_pipe/include/riak_pipe.hrl").
-include("riak_mongo_bson.hrl"). -include("riak_mongo_bson.hrl").
-include("riak_mongo_protocol.hrl"). -include("riak_mongo_protocol.hrl").
-include("riak_mongo_state.hrl"). -include("riak_mongo_state.hrl").


-export([insert/2, find/1, delete/2, update/2]). -compile([{parse_transform, lager_transform}]).

-export([insert/2, find/2, getmore/2, delete/2, update/2]).

-define(DEFAULT_TIMEOUT, 60000).
-define(DEFAULT_FIND_SIZE, 101).


insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnError}, State) -> insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnError}, State) ->


Expand All @@ -53,7 +59,39 @@ insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnEr


State#worker_state{ lastError=Errors }. State#worker_state{ lastError=Errors }.


find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize }) ->
getmore(#mongo_getmore{ cursorid=CursorID, batchsize=BatchSize }, #worker_state{ cursors=Dict } = State) ->

case dict:find(CursorID, Dict) of
{ok, {_, CursorPID}} ->

case cursor_get_results(CursorPID, BatchSize) of

{more, StartingFrom, Documents} ->
{ok,
#mongo_reply{ startingfrom = StartingFrom,
cursorid = CursorID,
documents = Documents },
State};

{done, StartingFrom, Documents} ->
{ok,
#mongo_reply{ startingfrom = StartingFrom,
cursorid = 0,
documents = Documents },
cursor_remove(CursorID, State) }
end;

error ->
{ok, #mongo_reply{ cursornotfound=true, documents=[] }, State}
end.


find_reply(Documents,State) ->
{ok, #mongo_reply{ documents=Documents, queryerror=false }, State}.

find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize,
nocursortimeout=NoTimeout, tailablecursor=_Tailable }, State) ->


Project = compute_projection_fun(Projection), Project = compute_projection_fun(Projection),


Expand All @@ -64,34 +102,160 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs
case C:get(Bucket, RiakKey) of case C:get(Bucket, RiakKey) of
{ok, RiakObject} -> {ok, RiakObject} ->
case riak_to_bson_object(RiakObject) of case riak_to_bson_object(RiakObject) of
{ok, Document} -> [Project(Document)]; {ok, Document} ->
_ -> [] Result = [Project(RiakObject, Document)],
find_reply(Result, State);
_ ->
find_reply([], State)
end; end;
_ -> [] _ ->
find_reply([], State)
end; end;


false -> false ->

CompiledQuery = riak_mongo_query:compile(Selector), CompiledQuery = riak_mongo_query:compile(Selector),


%% TODO: Server side does not know the LIMIT
if
BatchSize == 0 ->
Batch = ?DEFAULT_FIND_SIZE;
BatchSize == -1 ->
Batch = 1;
true ->
Batch = BatchSize
end,

error_logger:info_msg("Find executed ~p, ~p, ~p~n", [Projection, CompiledQuery, Project]), error_logger:info_msg("Find executed ~p, ~p, ~p~n", [Projection, CompiledQuery, Project]),


{ok, Documents} Owner = self(),
= riak_kv_mrc_pipe:mapred(Bucket, CursorPID =
[{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]), proc_lib:spawn(fun() ->
cursor_init(Owner, Bucket, CompiledQuery, Project,
NoTimeout)
end),

case cursor_get_results(CursorPID, Batch) of
{more, StartingFrom, Documents} ->

if BatchSize == -1 ->
CursorPID ! die,
{ok,
#mongo_reply{ startingfrom = StartingFrom,
documents = Documents },
State};

true ->
{ok, CursorID, State2} = cursor_add(CursorPID, State),
{ok,
#mongo_reply{ startingfrom = StartingFrom,
cursorid = CursorID,
documents = Documents },
State2}
end;


%% TODO: dig deeper here to find out if it's possible to limit the {done, StartingFrom, Documents} ->
%% number of returned docs during mapred, not afterwards. {ok,
%% TODO2: Find a way to handle cursors ... the elements removed by "limit" #mongo_reply{ startingfrom = StartingFrom,
%% should be held by a cursor documents = Documents },
case BatchSize /= 0 of State}
true ->
error_logger:info_msg("Limiting result set to ~p docs~n", [BatchSize]),
limit_docs(Documents, abs(BatchSize), 0);
false -> Documents
end end
end. end.


cursor_add(PID, #worker_state{ cursors=Dict, cursor_next=ID }=State) ->
MRef = erlang:monitor(process, PID),
{ok, ID, State#worker_state{ cursors=dict:store(ID,{MRef,PID},Dict), cursor_next=ID+1 }}.

cursor_remove(CursorID, #worker_state{ cursors=Dict }=State) ->
{MRef,_PID} = dict:fetch(CursorID, Dict),
erlang:demonitor(MRef, [flush]),
State#worker_state{ cursors=dict:erase(CursorID, Dict) }.

cursor_init(Owner, Bucket, CompiledQuery, Project, NoTimeout) ->

TimeOut = case NoTimeout of
true -> infinity;
false -> ?DEFAULT_TIMEOUT
end,

OwnerRef = erlang:monitor(process, Owner),

{{ok, Pipe}, _} =
riak_kv_mrc_pipe:mapred_stream([{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),

case riak_kv_mrc_pipe:send_inputs(Pipe, Bucket, TimeOut) of
ok ->
collect_outputs(OwnerRef, Pipe, TimeOut);
Error ->
lager:error("pipe:send_inputs faild ~p~n", [Error]),
riak_pipe:eoi(Pipe),
collect_outputs(OwnerRef, Pipe, TimeOut)
end.


collect_outputs(OwnerRef, Pipe, Timeout) ->
cursor_main_loop(OwnerRef, Pipe, queue:new(), Timeout, 0, 0, more).


cursor_main_loop(OwnerRef, #pipe{sink=#fitting{ref=FittingRef}} = Pipe, ResultQueue, Timeout, Sent, N, State) ->

receive
#pipe_result{ref=FittingRef, result=Result} ->
cursor_main_loop(OwnerRef, Pipe, queue:in(Result, ResultQueue), Timeout, Sent, N+1, State);
#pipe_log{ref=FittingRef, msg=Msg} ->
lager:info("riak_mongo: ~s~n", [Msg]),
cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, State);
#pipe_eoi{ref=FittingRef} ->
cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, done);

{'DOWN', OwnerRef, _, _, _} ->
%% worker died
riak_pipe:destroy(Pipe),
ok;

die ->
riak_pipe:destroy(Pipe),
ok;

{next, {PID, ReplyRef}, NUM} when N >= NUM ->
{Q1,Q2} = queue:split(min(NUM,N), ResultQueue),
case State of
more ->
PID ! {more, ReplyRef, Sent, queue:to_list(Q1)},
cursor_main_loop(OwnerRef, Pipe, Q2, Timeout, Sent + NUM, N-NUM, done);
done ->
PID ! {done, ReplyRef, Sent, queue:to_list(Q1)},
ok
end;

{next, {PID, ReplyRef}, _} when State =:= done ->
PID ! {done, ReplyRef, Sent, queue:to_list(ResultQueue)},
ok;

MSG when tuple_size(MSG) =/= 3, element(1,MSG) =/= next ->
error_logger:info_msg("cursor_main_loop.6 ~p~n", [MSG]),
ok


after Timeout ->
cursor_main_loop(OwnerRef, Pipe, ResultQueue, infinity, Sent, N, done)

end.

cursor_get_results(CursorPID, HowMany) ->
Ref = erlang:monitor(process, CursorPID),
CursorPID ! {next, {self(), Ref}, HowMany},
receive
{more, Ref, StartingFrom, Documents} ->
erlang:demonitor(Ref, [flush]),
{more, StartingFrom, Documents};
{done, Ref, StartingFrom, Documents} ->
erlang:demonitor(Ref, [flush]),
{done, StartingFrom, Documents};
{'DOWN', Ref, _, _, Reason} ->
{error, Reason}
end.


update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater, update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater,
multiupdate=MultiUpdate, upsert=Upsert}, State) -> multiupdate=MultiUpdate, upsert=Upsert}, State) ->


Expand Down Expand Up @@ -131,12 +295,11 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove


false when not SingleRemove -> false when not SingleRemove ->


Project = fun({struct, Elms}) -> Project = fun(RiakObject, _) ->
{<<"_id">>, ID} = lists:keyfind(<<"_id">>, 1, Elms),
{ok, C} = riak:local_client(), {ok, C} = riak:local_client(),
case C:delete(Bucket, bson_to_riak_key(ID)) of case C:delete(Bucket, riak_object:key(RiakObject)) of
ok -> []; ok -> ok;
Err -> [Err] Err -> Err
end end
end, end,


Expand All @@ -151,7 +314,7 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove


false when SingleRemove -> false when SingleRemove ->


Project = fun(Doc) -> Doc end, Project = fun(RiakObject, _) -> riak_object:key(RiakObject) end,
CompiledQuery = riak_mongo_query:compile(Selector), CompiledQuery = riak_mongo_query:compile(Selector),


case riak_kv_mrc_pipe:mapred(Bucket, case riak_kv_mrc_pipe:mapred(Bucket,
Expand All @@ -160,10 +323,9 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove
{ok, []} -> {ok, []} ->
State; State;


{ok, [{struct, Elms}|_]} -> {ok, [RiakKey|_]} ->
{<<"_id">>, ID} = lists:keyfind(<<"_id">>, 1, Elms),
{ok, C} = riak:local_client(), {ok, C} = riak:local_client(),
case C:delete(Bucket, bson_to_riak_key(ID)) of case C:delete(Bucket, RiakKey) of
ok -> State; ok -> State;
Err -> State#worker_state{ lastError=Err } Err -> State#worker_state{ lastError=Err }
end end
Expand Down Expand Up @@ -225,32 +387,32 @@ riak_to_bson_object(Object) ->
none none
end. end.


map_query(Object, _KeyData, {CompiledQuery, Project}) -> map_query(RiakObject, _KeyData, {CompiledQuery, Project}) ->
Acc = [], Acc = [],
case riak_to_bson_object(Object) of case riak_to_bson_object(RiakObject) of
{ok, Document} -> {ok, Document} ->
do_mongo_match(Document, CompiledQuery, Project, Acc); do_mongo_match(RiakObject, Document, CompiledQuery, Project, Acc);
_ -> _ ->
Acc Acc
end. end.


do_mongo_match(Document,CompiledQuery,Project,Acc) -> do_mongo_match(RiakObject,Document,CompiledQuery,Project,Acc) ->
case riak_mongo_query:matches(Document, case riak_mongo_query:matches(Document,
CompiledQuery) of CompiledQuery) of
true -> true ->
[Project(Document)|Acc]; [Project(RiakObject, Document)|Acc];
false -> false ->
Acc Acc
end. end.


compute_projection_fun(Projection) -> compute_projection_fun(Projection) ->
case Projection of case Projection of
[] -> [] ->
fun(O) -> O end; fun(_RiakObject, O) -> O end;


List when is_list(List) -> List when is_list(List) ->
SelectedKeys = get_projection_keys(Projection, []), SelectedKeys = get_projection_keys(Projection, []),
fun({struct, Elems}) -> fun(_RiakObject, {struct, Elems}) ->
{struct, {struct,
lists:foldl(fun(Key,Acc) -> lists:foldl(fun(Key,Acc) ->
case lists:keyfind(Key, 1, Elems) of case lists:keyfind(Key, 1, Elems) of
Expand Down

0 comments on commit d2944eb

Please sign in to comment.