Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

reorg, manual merge :(

  • Loading branch information...
commit 7821b6a99c91246362e49a6c012da949a0cb9137 1 parent d2944eb
@pavlobaron authored
Showing with 77 additions and 84 deletions.
  1. +77 −84 src/riak_mongo_riak.erl
View
161 src/riak_mongo_riak.erl
@@ -86,10 +86,6 @@ getmore(#mongo_getmore{ cursorid=CursorID, batchsize=BatchSize }, #worker_state{
{ok, #mongo_reply{ cursornotfound=true, documents=[] }, State}
end.
-
-find_reply(Documents,State) ->
- {ok, #mongo_reply{ documents=Documents, queryerror=false }, State}.
-
find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchsize=BatchSize,
nocursortimeout=NoTimeout, tailablecursor=_Tailable }, State) ->
@@ -161,6 +157,80 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs
end
end.
+update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater,
+ multiupdate=MultiUpdate, upsert=Upsert}, State) ->
+
+ error_logger:info_msg("About to update ~p, ~p, ~p~n", [Updater, MultiUpdate, Upsert]),
+
+ case id_document(Selector) of
+ {ok, RiakKey} when not Upsert ->
+ {ok, C} = riak:local_client(),
+ case C:get(Bucket, RiakKey) of
+ {ok, RiakObject} ->
+ NewObject = riak_object:update_value(RiakObject, RawUpdater),
+ case C:put(NewObject) of
+ ok -> State;
+ Err -> State#worker_state{ lastError=Err }
+ end;
+ Err -> State#worker_state{ lastError=Err }
+ end;
+ _ ->
+ error_logger:info_msg("This update variant is not yet supported~n", []),
+ State
+ end.
+
+delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove}, State) ->
+
+ case id_document(Selector) of
+ {ok, RiakKey} ->
+ {ok, C} = riak:local_client(),
+ case C:delete(Bucket, RiakKey) of
+ ok -> State;
+ Err -> State#worker_state{ lastError=Err }
+ end;
+
+ false when not SingleRemove ->
+
+ Project = fun(RiakObject, _) ->
+ {ok, C} = riak:local_client(),
+ case C:delete(Bucket, riak_object:key(RiakObject)) of
+ ok -> ok;
+ Err -> Err
+ end
+ end,
+
+ CompiledQuery = riak_mongo_query:compile(Selector),
+
+ {ok, Errors}
+ = riak_kv_mrc_pipe:mapred(Bucket,
+ [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),
+
+
+ State#worker_state{ lastError=Errors };
+
+ false when SingleRemove ->
+
+ Project = fun(RiakObject, _) -> riak_object:key(RiakObject) end,
+ CompiledQuery = riak_mongo_query:compile(Selector),
+
+ case riak_kv_mrc_pipe:mapred(Bucket,
+ [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}])
+ of
+ {ok, []} ->
+ State;
+
+ {ok, [RiakKey|_]} ->
+ {ok, C} = riak:local_client(),
+ case C:delete(Bucket, RiakKey) of
+ ok -> State;
+ Err -> State#worker_state{ lastError=Err }
+ end
+ end
+ end.
+
+
+%% internals
+
cursor_add(PID, #worker_state{ cursors=Dict, cursor_next=ID }=State) ->
MRef = erlang:monitor(process, PID),
{ok, ID, State#worker_state{ cursors=dict:store(ID,{MRef,PID},Dict), cursor_next=ID+1 }}.
@@ -195,7 +265,7 @@ cursor_init(Owner, Bucket, CompiledQuery, Project, NoTimeout) ->
collect_outputs(OwnerRef, Pipe, Timeout) ->
cursor_main_loop(OwnerRef, Pipe, queue:new(), Timeout, 0, 0, more).
-
+%% TODO: check if it would make sense to have this processes under supervision (ETS for cursor?)
@krestenkrab Collaborator

If a cursor process dies,

  • All the map/reduce stuff is stopped appropriately (the riak_pipe is linked to it)
  • The worker notices see here, and makes to CursorID be invalid for further operations.

But it would probably make sense to make "cursor" be a separate gen_server, just because it would simplify interactions to be using gen_server:call and friends.

As for supervision ... it certainly does not make sense to restart a cursor as far as I can see.

@pavlobaron Owner

the BatchSize logic is the following: if it's negative (not only -1), the cursor gets closed and abs(BatchSize) objects get returned. Changed.

Supervision: the worker is currently just a restartable process. As far as I can see from the implementation (need to read more), cursors are also being stored within its state. So in the case of the worker restart (which is transparent to the Mongo client), it would be nice to have my cursors back again, wouldn't it? It sure can be done later, when we optimize and clean more

@krestenkrab Collaborator

I dunno why we're having this discussion here, but now I get it. Notice that there's a code path which makes sure the cursor process dies even if there are more query results

krestenkrab@32c16f1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
cursor_main_loop(OwnerRef, #pipe{sink=#fitting{ref=FittingRef}} = Pipe, ResultQueue, Timeout, Sent, N, State) ->
receive
@@ -255,85 +325,8 @@ cursor_get_results(CursorPID, HowMany) ->
{error, Reason}
end.
-
-update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater,
- multiupdate=MultiUpdate, upsert=Upsert}, State) ->
-
- error_logger:info_msg("About to update ~p, ~p, ~p~n", [Updater, MultiUpdate, Upsert]),
-
- case id_document(Selector) of
- {ok, RiakKey} when not Upsert ->
- {ok, C} = riak:local_client(),
- case C:get(Bucket, RiakKey) of
- {ok, RiakObject} ->
- NewObject = riak_object:update_value(RiakObject, RawUpdater),
- case C:put(NewObject) of
- ok -> State;
- Err -> State#worker_state{ lastError=Err }
- end;
- Err -> State#worker_state{ lastError=Err }
- end;
- _ ->
- error_logger:info_msg("This update variant is not yet supported~n", []),
- State
- end.
-
-
- %Documents = find(#mongo_query{dbcol=Bucket, selector=Selector, batchsize=BatchSize}),
-
- %State#worker_state{ lastError=Errors }.
-
-delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove}, State) ->
-
- case id_document(Selector) of
- {ok, RiakKey} ->
- {ok, C} = riak:local_client(),
- case C:delete(Bucket, RiakKey) of
- ok -> State;
- Err -> State#worker_state{ lastError=Err }
- end;
-
- false when not SingleRemove ->
-
- Project = fun(RiakObject, _) ->
- {ok, C} = riak:local_client(),
- case C:delete(Bucket, riak_object:key(RiakObject)) of
- ok -> ok;
- Err -> Err
- end
- end,
-
- CompiledQuery = riak_mongo_query:compile(Selector),
-
- {ok, Errors}
- = riak_kv_mrc_pipe:mapred(Bucket,
- [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}]),
-
-
- State#worker_state{ lastError=Errors };
-
- false when SingleRemove ->
-
- Project = fun(RiakObject, _) -> riak_object:key(RiakObject) end,
- CompiledQuery = riak_mongo_query:compile(Selector),
-
- case riak_kv_mrc_pipe:mapred(Bucket,
- [{map, {qfun, fun map_query/3}, {CompiledQuery, Project}, true}])
- of
- {ok, []} ->
- State;
-
- {ok, [RiakKey|_]} ->
- {ok, C} = riak:local_client(),
- case C:delete(Bucket, RiakKey) of
- ok -> State;
- Err -> State#worker_state{ lastError=Err }
- end
- end
- end.
-
-
-%% internals
+find_reply(Documents,State) ->
+ {ok, #mongo_reply{ documents=Documents, queryerror=false }, State}.
id_document({struct, [{<<"_id">>, ID}]}) ->
case
@krestenkrab

If a cursor process dies,

  • All the map/reduce stuff is stopped appropriately (the riak_pipe is linked to it)
  • The worker notices see here, and makes to CursorID be invalid for further operations.

But it would probably make sense to make "cursor" be a separate gen_server, just because it would simplify interactions to be using gen_server:call and friends.

As for supervision ... it certainly does not make sense to restart a cursor as far as I can see.

@pavlobaron

the BatchSize logic is the following: if it's negative (not only -1), the cursor gets closed and abs(BatchSize) objects get returned. Changed.

Supervision: the worker is currently just a restartable process. As far as I can see from the implementation (need to read more), cursors are also being stored within its state. So in the case of the worker restart (which is transparent to the Mongo client), it would be nice to have my cursors back again, wouldn't it? It sure can be done later, when we optimize and clean more

@krestenkrab

I dunno why we're having this discussion here, but now I get it. Notice that there's a code path which makes sure the cursor process dies even if there are more query results

krestenkrab@32c16f1

Please sign in to comment.
Something went wrong with that request. Please try again.