Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement cursors

A cursor is embodied in a process which is
asynchronously receiving the corresponding
map/reduce replies.

Right now, the cursor process brutally kills
the pipe if it has enough results.  This causes
riak_pipe to print "error" messages, which are
really just cleanup messages.
  • Loading branch information...
commit 0b4232a90adb04cd175790fb2a9153e66dcc4f15 1 parent 969387e
Kresten Krab Thorup krestenkrab authored
6 README.md
View
@@ -39,14 +39,12 @@ Here are some details of the mapping
- Keys -- translated so strings, `ObjectId("xx")` becomes the riak key `"OID:xx"`, similarly for UUID, and MD5 values. String keys map to themselves. Other key types are currently not supported.
- Objects -- Stored as raw BSON using content-type `application/bson`
- Queries -- translated to map/reduce jobs that interpret the query across objects in a bucket.
+- Cursors -- When a query calls for a cursor, `riak_mongo` creates a process that holds on to the query results. These results are then simply held in the server, and fed back to the client in chunks.
All this is work in progress, at the present state only the most basic stuff works. We're planning to also support ...
- - Cursors -- When a query calls for a cursor, `riak_mongo` creates a process that holds on to the query results. These results are then simply held in the server, and fed back to the client in chunks.
- Indexes -- Become 2i Riak indexes, always "_bin" indexes holding the `sext:encode` value for the corresponding BSON Erlang term. `riak_mongo` will likely only support ascending indexes.
- - Map/reduce -- MongoDB uses runCommand to do this. We will evaluate
- if it makes more sense to map it to the low level Riak Erlang API
- or to exexute JavaScript coming from the Mongo client
+- Map/reduce -- MongoDB uses runCommand to do this. We will evaluate if it makes more sense to map it to the low level Riak Erlang API or to exexute JavaScript coming from the Mongo client
## Authors
2  include/riak_mongo_state.hrl
View
@@ -20,4 +20,4 @@
%% @doc State records
%% @copyright 2012 Pavlo Baron
--record(worker_state, {sock, request_id=0, rest, lastError=[]}).
+-record(worker_state, {sock, request_id=0, rest, lastError=[], cursors=dict:new(), cursor_next=1}).
4 rebar.config
View
@@ -10,7 +10,9 @@
{deps, [
{edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}},
{bson, ".*", {git, "git@github.com:pavlobaron/bson-erlang.git", "HEAD"}},
- {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", "HEAD"}}
+ {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", "HEAD"}},
+ {riak_pipe, ".*", {git, "https://github.com/basho/riak_pipe.git", "HEAD"}},
+ {lager, ".*", {git, "https://github.com/basho/lager.git", "HEAD"}}
]}.
{edoc_opts, [{doclet, edown_doclet}]}.
{sub_dirs, []}.
24 src/riak_mongo_message.erl
View
@@ -75,8 +75,28 @@ process_message(#mongo_query{ db=DataBase, coll=?CMD,
process_message(#mongo_query{}=Message, State) ->
- Result = riak_mongo_riak:find(Message),
- {reply, #mongo_reply{ documents=Result, queryerror=false }, State};
+ {ok, Reply, State2} = riak_mongo_riak:find(Message, State),
+ {reply, Reply, State2};
+
+process_message(#mongo_getmore{}=Message, State) ->
+ {ok, Reply, State2} = riak_mongo_riak:getmore(Message, State),
+ {reply, Reply, State2};
+
+process_message(#mongo_killcursor{ cursorids=IDs }, State = #worker_state { cursors=Dict0 }) ->
+ %% todo: move this to riak_mongo_riak
+ NewDict = lists:foldl(fun(CursorID, Dict) ->
+ case dict:find(CursorID, Dict) of
+ {ok, {Ref,PID}} ->
+ erlang:demonitor(Ref),
+ erlang:kill(PID, kill),
+ dict:erase(CursorID, Dict);
+ error ->
+ Dict
+ end
+ end,
+ Dict0,
+ IDs),
+ {noreply, State#worker_state{ cursors=NewDict }};
process_message(#mongo_insert{}=Insert, State) ->
State2 = riak_mongo_riak:insert(Insert, State),
192 src/riak_mongo_riak.erl
View
@@ -24,11 +24,17 @@
-module(riak_mongo_riak).
+-include_lib("riak_pipe/include/riak_pipe.hrl").
-include("riak_mongo_bson.hrl").
-include("riak_mongo_protocol.hrl").
-include("riak_mongo_state.hrl").
--export([insert/2, find/1, delete/2]).
+-compile([{parse_transform, lager_transform}]).
+
+-export([insert/2, find/2, getmore/2, delete/2]).
+
+-define(DEFAULT_TIMEOUT, 60000).
+-define(DEFAULT_FIND_SIZE, 101).
insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnError}, State) ->
@@ -53,7 +59,39 @@ insert(#mongo_insert{dbcoll=Bucket, documents=Docs, continueonerror=ContinueOnEr
State#worker_state{ lastError=Errors }.
-find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize }) ->
+
+getmore(#mongo_getmore{ cursorid=CursorID, batchsize=BatchSize }, #worker_state{ cursors=Dict } = State) ->
+
+ case dict:find(CursorID, Dict) of
+ {ok, {_, CursorPID}} ->
+
+ case cursor_get_results(CursorPID, BatchSize) of
+
+ {more, StartingFrom, Documents} ->
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ cursorid = CursorID,
+ documents = Documents },
+ State};
+
+ {done, StartingFrom, Documents} ->
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ cursorid = 0,
+ documents = Documents },
+ cursor_remove(CursorID, State) }
+ end;
+
+ error ->
+ {ok, #mongo_reply{ cursornotfound=true, documents=[] }, State}
+ end.
+
+
+find_reply(Documents,State) ->
+ {ok, #mongo_reply{ documents=Documents, queryerror=false }, State}.
+
+find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize,
+ nocursortimeout=NoTimeout, tailablecursor=_Tailable }, State) ->
Project = compute_projection_fun(Projection),
@@ -64,35 +102,149 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs
case C:get(Bucket, RiakKey) of
{ok, RiakObject} ->
case riak_to_bson_object(RiakObject) of
- {ok, Document} -> [Project(RiakObject, Document)];
- _ -> []
+ {ok, Document} ->
+ Result = [Project(RiakObject, Document)],
+ find_reply(Result, State);
+ _ ->
+ find_reply([], State)
end;
- _ -> []
+ _ ->
+ find_reply([], State)
end;
false ->
-
CompiledQuery = riak_mongo_query:compile(Selector),
- error_logger:info_msg("Find executed ~p, ~p, ~p~n", [Projection, CompiledQuery, Project]),
+ %% TODO: Server side does not know the LIMIT
+ if
+ BatchSize < 1 ->
+ Batch = ?DEFAULT_FIND_SIZE;
+ true ->
+ Batch = BatchSize
+ end,
- %% TODO: simple key-based read shouldn't go through mapred for speed
- {ok, Documents}
- = riak_kv_mrc_pipe:mapred(Bucket,
- [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),
+ error_logger:info_msg("Find executed ~p, ~p, ~p~n", [Projection, CompiledQuery, Project]),
- %% TODO: dig deeper here to find out if it's possible to limit the
- %% number of returned docs during mapred, not afterwards.
- %% TODO2: Find a way to handle cursors ... the elements removed by "limit"
- %% should be held by a cursor
- case BatchSize /= 0 of
- true ->
- error_logger:info_msg("Limiting result set to ~p docs~n", [BatchSize]),
- limit_docs(Documents, abs(BatchSize), 0);
- false -> Documents
+ Owner = self(),
+ CursorPID =
+ proc_lib:spawn(fun() ->
+ cursor_init(Owner, Bucket, CompiledQuery, Project,
+ NoTimeout)
+ end),
+
+ case cursor_get_results(CursorPID, Batch) of
+ {more, StartingFrom, Documents} ->
+
+ {ok, CursorID, State2} = cursor_add(CursorPID, State),
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ cursorid = CursorID,
+ documents = Documents },
+ State2};
+
+ {done, StartingFrom, Documents} ->
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ documents = Documents },
+ State}
end
end.
+cursor_add(PID, #worker_state{ cursors=Dict, cursor_next=ID }=State) ->
+ MRef = erlang:monitor(process, PID),
+ {ok, ID, State#worker_state{ cursors=dict:store(ID,{MRef,PID},Dict), cursor_next=ID+1 }}.
+
+cursor_remove(CursorID, #worker_state{ cursors=Dict }=State) ->
+ {MRef,_PID} = dict:fetch(CursorID, Dict),
+ erlang:demonitor(MRef, [flush]),
+ State#worker_state{ cursors=dict:erase(CursorID, Dict) }.
+
+cursor_init(Owner, Bucket, CompiledQuery, Project, NoTimeout) ->
+
+ TimeOut = case NoTimeout of
+ true -> infinity;
+ false -> ?DEFAULT_TIMEOUT
+ end,
+
+ OwnerRef = erlang:monitor(process, Owner),
+
+ {{ok, Pipe}, _} =
+ riak_kv_mrc_pipe:mapred_stream([{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),
+
+ case riak_kv_mrc_pipe:send_inputs(Pipe, Bucket, TimeOut) of
+ ok ->
+ collect_outputs(OwnerRef, Pipe, TimeOut);
+ Error ->
+ lager:error("pipe:send_inputs faild ~p~n", [Error]),
+ riak_pipe:eoi(Pipe),
+ collect_outputs(OwnerRef, Pipe, TimeOut)
+ end.
+
+
+collect_outputs(OwnerRef, Pipe, Timeout) ->
+ cursor_main_loop(OwnerRef, Pipe, queue:new(), Timeout, 0, 0, more).
+
+
+cursor_main_loop(OwnerRef, #pipe{sink=#fitting{ref=FittingRef}} = Pipe, ResultQueue, Timeout, Sent, N, State) ->
+
+ receive
+ #pipe_result{ref=FittingRef, result=Result} ->
+ cursor_main_loop(OwnerRef, Pipe, queue:in(Result, ResultQueue), Timeout, Sent, N+1, State);
+ #pipe_log{ref=FittingRef, msg=Msg} ->
+ lager:info("riak_mongo: ~s~n", [Msg]),
+ cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, State);
+ #pipe_eoi{ref=FittingRef} ->
+ cursor_main_loop(OwnerRef, Pipe, ResultQueue, Timeout, Sent, N, done);
+
+ {'DOWN', OwnerRef, _, _, _} ->
+ %% worker died
+ riak_pipe:destroy(Pipe),
+ ok;
+
+ die ->
+ riak_pipe:destroy(Pipe),
+ ok;
+
+ {next, {PID, ReplyRef}, NUM} when N >= NUM ->
+ {Q1,Q2} = queue:split(min(NUM,N), ResultQueue),
+ case State of
+ more ->
+ PID ! {more, ReplyRef, Sent, queue:to_list(Q1)},
+ cursor_main_loop(OwnerRef, Pipe, Q2, Timeout, Sent + NUM, N-NUM, done);
+ done ->
+ PID ! {done, ReplyRef, Sent, queue:to_list(Q1)},
+ ok
+ end;
+
+ {next, {PID, ReplyRef}, _} when State =:= done ->
+ PID ! {done, ReplyRef, Sent, queue:to_list(ResultQueue)},
+ ok;
+
+ MSG when tuple_size(MSG) =/= 3, element(1,MSG) =/= next ->
+ error_logger:info_msg("cursor_main_loop.6 ~p~n", [MSG]),
+ ok
+
+
+ after Timeout ->
+ cursor_main_loop(OwnerRef, Pipe, ResultQueue, infinity, Sent, N, done)
+
+ end.
+
+cursor_get_results(CursorPID, HowMany) ->
+ Ref = erlang:monitor(process, CursorPID),
+ CursorPID ! {next, {self(), Ref}, HowMany},
+ receive
+ {more, Ref, StartingFrom, Documents} ->
+ erlang:demonitor(Ref, [flush]),
+ {more, StartingFrom, Documents};
+ {done, Ref, StartingFrom, Documents} ->
+ erlang:demonitor(Ref, [flush]),
+ {done, StartingFrom, Documents};
+ {'DOWN', Ref, _, _, Reason} ->
+ {error, Reason}
+ end.
+
+
id_document({struct, [{<<"_id">>, ID}]}) ->
case
11 src/riak_mongo_worker.erl
View
@@ -58,6 +58,17 @@ handle_info(?CONTROL_MSG, State) ->
inet:setopts(State#worker_state.sock, ?SOCK_OPTS),
{noreply, State};
+
+handle_info({'DOWN',Ref,_,_,_}, State=#worker_state{ cursors=CursorDict }) ->
+
+ Dict2 = dict:filter(fun(_, {MRef,_}) ->
+ MRef =/= Ref
+ end,
+ CursorDict),
+
+ {noreply, State#worker_state{ cursors=Dict2 }};
+
+
handle_info(Msg, State) ->
error_logger:info_msg("unknown message in worker callback: ~p~n", [Msg]),
{noreply, State}.
Please sign in to comment.
Something went wrong with that request. Please try again.