Permalink
Browse files

merge pavlobaron/master

  • Loading branch information...
2 parents 0b4232a + 940d325 commit d3185c8a88d2cfbf4c448ab11fc3293ef171576a @krestenkrab krestenkrab committed Apr 21, 2012
Showing with 75 additions and 28 deletions.
  1. +4 −0 README.md
  2. +2 −1 include/riak_mongo_protocol.hrl
  3. +4 −0 src/riak_mongo_message.erl
  4. +4 −3 src/riak_mongo_protocol.erl
  5. +61 −24 src/riak_mongo_riak.erl
View
@@ -26,6 +26,10 @@ And do some basic commands:
> db.things.find({a:1}, {b:1})
> db.things.remove({a:1})
> db.things.remove()
+ > db.things.insert({a:1})
+ > x = db.things.findOne()
+ > x.a = 2
+ > db.things.update({_id:x._id}, x, false)
Buckets in the Riak store will be named like "collection.things" - the prefix is thus the name of the database you connect from the Mongo shell to.
@@ -18,7 +18,8 @@
upsert = false :: boolean(),
multiupdate = false :: boolean(),
selector :: bson:document(),
- updater :: bson:document() }).
+ updater :: bson:document(),
+ rawupdater :: tuple()}).
-record (mongo_delete, {
request_id :: integer(),
@@ -106,6 +106,10 @@ process_message(#mongo_delete{}=Delete, State) ->
State2 = riak_mongo_riak:delete(Delete, State),
{noreply, State2};
+process_message(#mongo_update{}=Update, State) ->
+ State2 = riak_mongo_riak:update(Update, State),
+ {noreply, State2};
+
process_message(Message, State) ->
error_logger:info_msg("unhandled message: ~p~n", [Message]),
{noreply, State}.
@@ -26,6 +26,7 @@
-include_lib ("bson/include/bson_binary.hrl").
-include_lib ("riak_mongo_protocol.hrl").
+-include("riak_mongo_bson.hrl").
-define (ReplyOpcode, 1).
-define (UpdateOpcode, 2001).
@@ -76,15 +77,15 @@ decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnEr
decode_packet(<< ?HDR(_, ?UpdateOpcode), 0:32, Rest/binary>> ) ->
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
- <<?get_bits32(0,0,0,0,0,0,MultiUpdate,Upsert), Rest2>> = Rest1,
+ <<?get_bits32(0,0,0,0,0,0,MultiUpdate,Upsert), Rest2/binary>> = Rest1,
{Selector, Rest3} = riak_mongo_bson:get_document(Rest2),
+ {#bson_raw_document{id=_BSON_ID, body=RawUpdate}, <<>>} = riak_mongo_bson:get_raw_document(Rest3),
{Update, <<>>} = riak_mongo_bson:get_document(Rest3),
-
{ok, #mongo_update{ dbcoll=DBColl,
request_id=RequestId,
selector=Selector,
updater=Update,
-
+ rawupdater=RawUpdate,
multiupdate = bool(MultiUpdate),
upsert = bool(Upsert)
}};
View
@@ -31,7 +31,7 @@
-compile([{parse_transform, lager_transform}]).
--export([insert/2, find/2, getmore/2, delete/2]).
+-export([insert/2, find/2, getmore/2, delete/2, update/2]).
-define(DEFAULT_TIMEOUT, 60000).
-define(DEFAULT_FIND_SIZE, 101).
@@ -117,8 +117,10 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs
%% TODO: Server side does not know the LIMIT
if
- BatchSize < 1 ->
+ BatchSize == 0 ->
Batch = ?DEFAULT_FIND_SIZE;
+ BatchSize == -1 ->
+ Batch = 1;
true ->
Batch = BatchSize
end,
@@ -135,12 +137,21 @@ find(#mongo_query{dbcoll=Bucket, selector=Selector, projector=Projection, batchs
case cursor_get_results(CursorPID, Batch) of
{more, StartingFrom, Documents} ->
- {ok, CursorID, State2} = cursor_add(CursorPID, State),
- {ok,
- #mongo_reply{ startingfrom = StartingFrom,
- cursorid = CursorID,
- documents = Documents },
- State2};
+ if BatchSize == -1 ->
+ CursorPID ! die,
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ documents = Documents },
+ State};
+
+ true ->
+ {ok, CursorID, State2} = cursor_add(CursorPID, State),
+ {ok,
+ #mongo_reply{ startingfrom = StartingFrom,
+ cursorid = CursorID,
+ documents = Documents },
+ State2}
+ end;
{done, StartingFrom, Documents} ->
{ok,
@@ -245,23 +256,32 @@ cursor_get_results(CursorPID, HowMany) ->
end.
+update(#mongo_update{dbcoll=Bucket, selector=Selector, updater=Updater, rawupdater=RawUpdater,
+ multiupdate=MultiUpdate, upsert=Upsert}, State) ->
-id_document({struct, [{<<"_id">>, ID}]}) ->
- case
- case ID of
- {objectid, _} -> true;
- {binary, _} -> true;
- {md5, _} -> true;
- {uuid, _} -> true;
- _ when is_binary(ID) -> true;
- _ -> false
- end
- of
- true -> {ok, bson_to_riak_key(ID)};
- false -> false
- end;
-id_document(_) ->
- false.
+ error_logger:info_msg("About to update ~p, ~p, ~p~n", [Updater, MultiUpdate, Upsert]),
+
+ case id_document(Selector) of
+ {ok, RiakKey} when not Upsert ->
+ {ok, C} = riak:local_client(),
+ case C:get(Bucket, RiakKey) of
+ {ok, RiakObject} ->
+ NewObject = riak_object:update_value(RiakObject, RawUpdater),
+ case C:put(NewObject) of
+ ok -> State;
+ Err -> State#worker_state{ lastError=Err }
+ end;
+ Err -> State#worker_state{ lastError=Err }
+ end;
+ _ ->
+ error_logger:info_msg("This update variant is not yet supported~n", []),
+ State
+ end.
+
+
+ %Documents = find(#mongo_query{dbcol=Bucket, selector=Selector, batchsize=BatchSize}),
+
+ %State#worker_state{ lastError=Errors }.
delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove}, State) ->
@@ -315,6 +335,23 @@ delete(#mongo_delete{dbcoll=Bucket, selector=Selector, singleremove=SingleRemove
%% internals
+id_document({struct, [{<<"_id">>, ID}]}) ->
+ case
+ case ID of
+ {objectid, _} -> true;
+ {binary, _} -> true;
+ {md5, _} -> true;
+ {uuid, _} -> true;
+ _ when is_binary(ID) -> true;
+ _ -> false
+ end
+ of
+ true -> {ok, bson_to_riak_key(ID)};
+ false -> false
+ end;
+id_document(_) ->
+ false.
+
limit_docs(_, BatchSize, N) when N =:= BatchSize ->
[];
limit_docs([], _, _) ->

0 comments on commit d3185c8

Please sign in to comment.