Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Couple of things

- packet-4 encoding for response
- use '_id' from incoming documents for insert
- support multi insert
  • Loading branch information...
commit e5b46d969dcf583a7a64b177408556018f86851d 1 parent c0870ff
@krestenkrab authored
View
12 src/riak_mongo_logic.erl
@@ -28,10 +28,16 @@ you(Peer) ->
{ok, {{A, B, C, D}, P}} = Peer, %IPv6???
io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]).
-insert(Collection, Documents) ->
+insert(Collection, Document) ->
{ok, C} = riak:local_client(),
- ID = riak_core_util:unique_id_62(),
- O = riak_object:new(Collection, list_to_binary(ID), Documents),
+ case bson:lookup ('_id', Document) of
+ {ID} ->
+ Doc = Document;
+ {} ->
+ ID = list_to_binary(riak_core_util:unique_id_62()),
+ Doc = bson:append({'_id', ID}, Document)
+ end,
+ O = riak_object:new(Collection, ID, Doc),
C:put(O).
find(Collection, NumberToReturn, _) when NumberToReturn == -1 ->
View
6 src/riak_mongo_server.erl
@@ -26,6 +26,8 @@
-export([start_link/2, handle_info/2, new_connection/2, init/1, sock_opts/0]).
+-export([send_packet/2]).
+
-behavior(gen_nb_server).
start_link(IpAddr, Port) ->
@@ -73,7 +75,9 @@ handle_data(Sock, Rest) when is_binary(Rest) ->
inet:setopts(Sock, [{active, once}]),
{ok, Rest}.
-
+send_packet(Sock, Data) ->
+ Size = byte_size(Data),
+ gen_tcp:send(Sock, <<?put_int32(Size+4), Data>>).
View
17 src/wire_protocol.erl
@@ -38,7 +38,7 @@
-define(INSERT, <<_Flags:32, Rest/binary>>).
-define(QUERY, <<_Flags:32, Rest/binary>>).
--define(REPLY(L, I, T, OP, F, C, S, N, D), <<L:32/little, I:32/little, T:32/little,
+-define(REPLY(I, T, OP, F, C, S, N, D), <<I:32/little, T:32/little,
OP:32/little, F:32/little, C:64/little,
S:32/little, N:32/little, D/binary>>).
@@ -52,12 +52,20 @@ process_packet(Sock, _) ->
reply_error(Sock, 0, "unsupported message").
process_insert(_Sock, _ID, ?INSERT) ->
- [Collection, Documents|_] = binary:split(Rest, <<0:8>>),
- riak_mongo_logic:insert(Collection, bson_binary:get_document(Documents));
+ {Collection, Documents} = get_cstring(Rest),
+ process_insert_loop(Collection, Documents);
process_insert(Sock, _, _) ->
reply_error(Sock, 0, "unsupported insert").
+process_insert_loop(Collection, <<>>) ->
+ ok;
+process_insert_loop(Collection, Documents) ->
+ {Document, Rest} = bson_binary:get_document(Documents),
+ riak_mongo_logic:insert(Collection, Document),
+ process_insert_loop(Collection, Rest).
+
+
process_query(Sock, ID, ?CMD) ->
process_cmd(Sock, ID, bson_binary:get_document(Rest));
@@ -85,8 +93,7 @@ process_cmd(Sock, _, _) ->
reply(Sock, ID, T) ->
Res = bson_binary:put_document(T),
- L = byte_size(Res) + 36,
- gen_tcp:send(Sock, ?REPLY(L, ID, ID, ?OP_REPLY, 8, 0, 0, 1, Res)).
+ riak_mongo_server:send_packet(Sock, ?REPLY(ID, ID, ?OP_REPLY, 8, 0, 0, 1, Res)).
reply_error(Sock, ID, S) ->
T = {errmsg, list_to_binary(S), ok, 0},

0 comments on commit e5b46d9

Please sign in to comment.
Something went wrong with that request. Please try again.