Permalink
Browse files

repair

  • Loading branch information...
pavlobaron committed Apr 21, 2012
2 parents 940d325 + 0b4232a commit 73d864fc62fdcf57de13e6486ef505cef9d486e8
Showing with 39 additions and 8 deletions.
  1. +2 −4 README.md
  2. +1 −1 include/riak_mongo_state.hrl
  3. +3 −1 rebar.config
  4. +22 −2 src/riak_mongo_message.erl
  5. +11 −0 src/riak_mongo_worker.erl
@@ -43,14 +43,12 @@ Here are some details of the mapping
- Keys -- translated so strings, `ObjectId("xx")` becomes the riak key `"OID:xx"`, similarly for UUID, and MD5 values. String keys map to themselves. Other key types are currently not supported.
- Objects -- Stored as raw BSON using content-type `application/bson`
- Queries -- translated to map/reduce jobs that interpret the query across objects in a bucket.
+- Cursors -- When a query calls for a cursor, `riak_mongo` creates a process that holds on to the query results. These results are then simply held in the server, and fed back to the client in chunks.
All this is work in progress, at the present state only the most basic stuff works. We're planning to also support ...
- - Cursors -- When a query calls for a cursor, `riak_mongo` creates a process that holds on to the query results. These results are then simply held in the server, and fed back to the client in chunks.
- Indexes -- Become 2i Riak indexes, always "_bin" indexes holding the `sext:encode` value for the corresponding BSON Erlang term. `riak_mongo` will likely only support ascending indexes.
- - Map/reduce -- MongoDB uses runCommand to do this. We will evaluate
- if it makes more sense to map it to the low level Riak Erlang API
- or to exexute JavaScript coming from the Mongo client
+- Map/reduce -- MongoDB uses runCommand to do this. We will evaluate if it makes more sense to map it to the low level Riak Erlang API or to exexute JavaScript coming from the Mongo client
## Authors
@@ -20,4 +20,4 @@
%% @doc State records
%% @copyright 2012 Pavlo Baron
--record(worker_state, {sock, request_id=0, rest, lastError=[]}).
+-record(worker_state, {sock, request_id=0, rest, lastError=[], cursors=dict:new(), cursor_next=1}).
View
@@ -10,7 +10,9 @@
{deps, [
{edown, ".*", {git, "git://github.com/esl/edown.git", "HEAD"}},
{bson, ".*", {git, "git@github.com:pavlobaron/bson-erlang.git", "HEAD"}},
- {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", "HEAD"}}
+ {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", "HEAD"}},
+ {riak_pipe, ".*", {git, "https://github.com/basho/riak_pipe.git", "HEAD"}},
+ {lager, ".*", {git, "https://github.com/basho/lager.git", "HEAD"}}
]}.
{edoc_opts, [{doclet, edown_doclet}]}.
{sub_dirs, []}.
View
@@ -75,8 +75,28 @@ process_message(#mongo_query{ db=DataBase, coll=?CMD,
process_message(#mongo_query{}=Message, State) ->
- Result = riak_mongo_riak:find(Message),
- {reply, #mongo_reply{ documents=Result, queryerror=false }, State};
+ {ok, Reply, State2} = riak_mongo_riak:find(Message, State),
+ {reply, Reply, State2};
+
+process_message(#mongo_getmore{}=Message, State) ->
+ {ok, Reply, State2} = riak_mongo_riak:getmore(Message, State),
+ {reply, Reply, State2};
+
+process_message(#mongo_killcursor{ cursorids=IDs }, State = #worker_state { cursors=Dict0 }) ->
+ %% todo: move this to riak_mongo_riak
+ NewDict = lists:foldl(fun(CursorID, Dict) ->
+ case dict:find(CursorID, Dict) of
+ {ok, {Ref,PID}} ->
+ erlang:demonitor(Ref),
+ erlang:kill(PID, kill),
+ dict:erase(CursorID, Dict);
+ error ->
+ Dict
+ end
+ end,
+ Dict0,
+ IDs),
+ {noreply, State#worker_state{ cursors=NewDict }};
process_message(#mongo_insert{}=Insert, State) ->
State2 = riak_mongo_riak:insert(Insert, State),
View
@@ -58,6 +58,17 @@ handle_info(?CONTROL_MSG, State) ->
inet:setopts(State#worker_state.sock, ?SOCK_OPTS),
{noreply, State};
+
+handle_info({'DOWN',Ref,_,_,_}, State=#worker_state{ cursors=CursorDict }) ->
+
+ Dict2 = dict:filter(fun(_, {MRef,_}) ->
+ MRef =/= Ref
+ end,
+ CursorDict),
+
+ {noreply, State#worker_state{ cursors=Dict2 }};
+
+
handle_info(Msg, State) ->
error_logger:info_msg("unknown message in worker callback: ~p~n", [Msg]),
{noreply, State}.

0 comments on commit 73d864f

Please sign in to comment.