From d2944eb40a626d1417d3092a06cd75f965ed0e73 Mon Sep 17 00:00:00 2001 From: pavlobaron Date: Sun, 22 Apr 2012 01:31:46 +0200 Subject: [PATCH] merged with drkrab --- src/riak_mongo_riak.erl | 230 ++++++++++++++++++++++++++++++++++------ 1 file changed, 196 insertions(+), 34 deletions(-) diff --git a/src/riak_mongo_riak.erl b/src/riak_mongo_riak.erl index a820b5f..ebf32f0 100644 --- a/src/riak_mongo_riak.erl +++ b/src/riak_mongo_riak.erl @@ -24,11 +24,17 @@ -module(riak_mongo_riak). +-include_lib("riak_pipe/include/riak_pipe.hrl"). -include("riak_mongo_bson.hrl"). -include("riak_mongo_protocol.hrl"). -include("riak_mongo_state.hrl"). --export([insert/2, find/1, delete/2, update/2]). +-compile([{parse_transform, lager_transform}]). + +-export([insert/2, find/2, getmore/2, delete/2, update/2]). + +-define(DEFAULT_TIMEOUT, 60000). +-define(DEFAULT_FIND_SIZE, 101). insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnError}, State) -> @@ -53,7 +59,39 @@ insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnEr State#worker_state{ lastError=Errors }. -find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize }) -> + +getmore(#mongo_getmore{ cursorid=CursorID, batchsize=BatchSize }, #worker_state{ cursors=Dict } = State) -> + + case dict:find(CursorID, Dict) of + {ok, {_, CursorPID}} -> + + case cursor_get_results(CursorPID, BatchSize) of + + {more, StartingFrom, Documents} -> + {ok, + #mongo_reply{ startingfrom = StartingFrom, + cursorid = CursorID, + documents = Documents }, + State}; + + {done, StartingFrom, Documents} -> + {ok, + #mongo_reply{ startingfrom = StartingFrom, + cursorid = 0, + documents = Documents }, + cursor_remove(CursorID, State) } + end; + + error -> + {ok, #mongo_reply{ cursornotfound=true, documents=[] }, State} + end. + + +find_reply(Documents,State) -> + {ok, #mongo_reply{ documents=Documents, queryerror=false }, State}. + +find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize, + nocursortimeout=NoTimeout, tailablecursor=_Tailable }, State) -> Project = compute_projection_fun(Projection), @@ -64,34 +102,160 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs case C:get(Bucket, RiakKey) of {ok, RiakObject} -> case riak_to_bson_object(RiakObject) of - {ok, Document} -> [Project(Document)]; - _ -> [] + {ok, Document} -> + Result = [Project(RiakObject, Document)], + find_reply(Result, State); + _ -> + find_reply([], State) end; - _ -> [] + _ -> + find_reply([], State) end; false -> - CompiledQuery = riak_mongo_query:compile(Selector), + %% TODO: Server side does not know the LIMIT + if + BatchSize == 0 -> + Batch = ?DEFAULT_FIND_SIZE; + BatchSize == -1 -> + Batch = 1; + true -> + Batch = BatchSize + end, + error_logger:info_msg("Find executed ~p, ~p, ~p~n", [Projection, CompiledQuery, Project]), - {ok, Documents} - = riak_kv_mrc_pipe:mapred(Bucket, - [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]), + Owner = self(), + CursorPID = + proc_lib:spawn(fun() -> + cursor_init(Owner, Bucket, CompiledQuery, Project, + NoTimeout) + end), + + case cursor_get_results(CursorPID, Batch) of + {more, StartingFrom, Documents} -> + + if BatchSize == -1 -> + CursorPID ! die, + {ok, + #mongo_reply{ startingfrom = StartingFrom, + documents = Documents }, + State}; + + true -> + {ok, CursorID, State2} = cursor_add(CursorPID, State), + {ok, + #mongo_reply{ startingfrom = StartingFrom, + cursorid = CursorID, + documents = Documents }, + State2} + end; - %% TODO: dig deeper here to find out if it's possible to limit the - %% number of returned docs during mapred, not afterwards. - %% TODO2: Find a way to handle cursors ... the elements removed by "limit" - %% should be held by a cursor - case BatchSize /= 0 of - true -> - error_logger:info_msg("Limiting result set to ~p docs~n", [BatchSize]), - limit_docs(Documents, abs(BatchSize), 0); - false -> Documents + {done, StartingFrom, Documents} -> + {ok, + #mongo_reply{ startingfrom = StartingFrom, + documents = Documents }, + State} end end. +cursor_add(PID, #worker_state{ cursors=Dict, cursor_next=ID }=State) -> + MRef = erlang:monitor(process, PID), + {ok, ID, State#worker_state{ cursors=dict:store(ID,{MRef,PID},Dict), cursor_next=ID+1 }}. + +cursor_remove(CursorID, #worker_state{ cursors=Dict }=State) -> + {MRef,_PID} = dict:fetch(CursorID, Dict), + erlang:demonitor(MRef, [flush]), + State#worker_state{ cursors=dict:erase(CursorID, Dict) }. + +cursor_init(Owner, Bucket, CompiledQuery, Project, NoTimeout) -> + + TimeOut = case NoTimeout of + true -> infinity; + false -> ?DEFAULT_TIMEOUT + end, + + OwnerRef = erlang:monitor(process, Owner), + + {{ok, Pipe}, _} = + riak_kv_mrc_pipe:mapred_stream([{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]), + + case riak_kv_mrc_pipe:send_inputs(Pipe, Bucket, TimeOut) of + ok -> + collect_outputs(OwnerRef, Pipe, TimeOut); + Error -> + lager:error("pipe:send_inputs faild ~p~n", [Error]), + riak_pipe:eoi(Pipe), + collect_outputs(OwnerRef, Pipe, TimeOut) + end. + + +collect_outputs(OwnerRef, Pipe, Timeout) -> + cursor_main_loop(OwnerRef, Pipe, queue:new(), Timeout, 0, 0, more). + + +cursor_main_loop(OwnerRef, #pipe{sink=#fitting{ref=FittingRef}} = Pipe, ResultQueue, Timeout, Sent, N, State) -> + + receive + #pipe_result{ref=FittingRef, result=Result} -> + cursor_main_loop(OwnerRef, Pipe, queue:in(Result, ResultQueue), Timeout, Sent, N+1, State); + #pipe_log{ref=FittingRef, msg=Msg} -> + lager:info("riak_mongo: ~s~n", [Msg]), + cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, State); + #pipe_eoi{ref=FittingRef} -> + cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, done); + + {'DOWN', OwnerRef, _, _, _} -> + %% worker died + riak_pipe:destroy(Pipe), + ok; + + die -> + riak_pipe:destroy(Pipe), + ok; + + {next, {PID, ReplyRef}, NUM} when N >= NUM -> + {Q1,Q2} = queue:split(min(NUM,N), ResultQueue), + case State of + more -> + PID ! {more, ReplyRef, Sent, queue:to_list(Q1)}, + cursor_main_loop(OwnerRef, Pipe, Q2, Timeout, Sent + NUM, N-NUM, done); + done -> + PID ! {done, ReplyRef, Sent, queue:to_list(Q1)}, + ok + end; + + {next, {PID, ReplyRef}, _} when State =:= done -> + PID ! {done, ReplyRef, Sent, queue:to_list(ResultQueue)}, + ok; + + MSG when tuple_size(MSG) =/= 3, element(1,MSG) =/= next -> + error_logger:info_msg("cursor_main_loop.6 ~p~n", [MSG]), + ok + + + after Timeout -> + cursor_main_loop(OwnerRef, Pipe, ResultQueue, infinity, Sent, N, done) + + end. + +cursor_get_results(CursorPID, HowMany) -> + Ref = erlang:monitor(process, CursorPID), + CursorPID ! {next, {self(), Ref}, HowMany}, + receive + {more, Ref, StartingFrom, Documents} -> + erlang:demonitor(Ref, [flush]), + {more, StartingFrom, Documents}; + {done, Ref, StartingFrom, Documents} -> + erlang:demonitor(Ref, [flush]), + {done, StartingFrom, Documents}; + {'DOWN', Ref, _, _, Reason} -> + {error, Reason} + end. + + update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater, multiupdate=MultiUpdate, upsert=Upsert}, State) -> @@ -131,12 +295,11 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove false when not SingleRemove -> - Project = fun({struct, Elms}) -> - {<<"_id">>, ID} = lists:keyfind(<<"_id">>, 1, Elms), + Project = fun(RiakObject, _) -> {ok, C} = riak:local_client(), - case C:delete(Bucket, bson_to_riak_key(ID)) of - ok -> []; - Err -> [Err] + case C:delete(Bucket, riak_object:key(RiakObject)) of + ok -> ok; + Err -> Err end end, @@ -151,7 +314,7 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove false when SingleRemove -> - Project = fun(Doc) -> Doc end, + Project = fun(RiakObject, _) -> riak_object:key(RiakObject) end, CompiledQuery = riak_mongo_query:compile(Selector), case riak_kv_mrc_pipe:mapred(Bucket, @@ -160,10 +323,9 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove {ok, []} -> State; - {ok, [{struct, Elms}|_]} -> - {<<"_id">>, ID} = lists:keyfind(<<"_id">>, 1, Elms), + {ok, [RiakKey|_]} -> {ok, C} = riak:local_client(), - case C:delete(Bucket, bson_to_riak_key(ID)) of + case C:delete(Bucket, RiakKey) of ok -> State; Err -> State#worker_state{ lastError=Err } end @@ -225,20 +387,20 @@ riak_to_bson_object(Object) -> none end. -map_query(Object, _KeyData, {CompiledQuery, Project}) -> +map_query(RiakObject, _KeyData, {CompiledQuery, Project}) -> Acc = [], - case riak_to_bson_object(Object) of + case riak_to_bson_object(RiakObject) of {ok, Document} -> - do_mongo_match(Document, CompiledQuery, Project, Acc); + do_mongo_match(RiakObject, Document, CompiledQuery, Project, Acc); _ -> Acc end. -do_mongo_match(Document,CompiledQuery,Project,Acc) -> +do_mongo_match(RiakObject,Document,CompiledQuery,Project,Acc) -> case riak_mongo_query:matches(Document, CompiledQuery) of true -> - [Project(Document)|Acc]; + [Project(RiakObject, Document)|Acc]; false -> Acc end. @@ -246,11 +408,11 @@ do_mongo_match(Document,CompiledQuery,Project,Acc) -> compute_projection_fun(Projection) -> case Projection of [] -> - fun(O) -> O end; + fun(_RiakObject, O) -> O end; List when is_list(List) -> SelectedKeys = get_projection_keys(Projection, []), - fun({struct, Elems}) -> + fun(_RiakObject, {struct, Elems}) -> {struct, lists:foldl(fun(Key,Acc) -> case lists:keyfind(Key, 1, Elems) of