Permalink
Browse files

Let mongo_server decode packet framing

Mongo uses almost {packet,4} except the length
encoded on the wire includes the length of the
4 byte packet size, and it uses little endian
packet sizes.  This change makes
the tcp handling code responsible for packet 
decoding.
  • Loading branch information...
krestenkrab committed Apr 10, 2012
1 parent f99dfa7 commit c0870ffe7f2ccf7779292cac09640c0d4aeb906a
Showing with 41 additions and 19 deletions.
  1. +31 −12 src/riak_mongo_server.erl
  2. +10 −7 src/wire_protocol.erl
View
@@ -22,6 +22,8 @@
-module(riak_mongo_server).
+-include_lib ("bson/include/bson_binary.hrl").
+
-export([start_link/2, handle_info/2, new_connection/2, init/1, sock_opts/0]).
-behavior(gen_nb_server).
@@ -32,29 +34,46 @@ start_link(IpAddr, Port) ->
init(_Args) ->
{ok, ok}.
-handle_info({tcp, Sock, Data}, State) ->
- Me = self(),
- P = spawn(fun() -> worker(Me, Sock, Data) end),
- gen_tcp:controlling_process(Sock, P),
- {noreply, State};
-
handle_info(_Msg, State) ->
{noreply, State}.
new_connection(Sock, State) ->
Me = self(),
- P = spawn(fun() -> worker(Me, Sock) end),
+ P = spawn(fun() -> worker(Me) end),
gen_tcp:controlling_process(Sock, P),
+ P ! {set_socket, Sock},
{ok, State}.
sock_opts() ->
[binary, {active, once}, {packet, 0}].
-worker(Owner, Sock) ->
+%% this should really be a process under supervision
+
+worker(Owner) ->
+ receive {set_socket, Sock} -> ok end,
inet:setopts(Sock, [{active, once}]),
- gen_tcp:controlling_process(Sock, Owner).
+ worker_loop(Owner, Sock, <<>>).
+
+worker_loop(Owner, Sock, UnprocessedData) ->
+ receive
+ {tcp, Sock, Data} ->
+ {ok, Rest} = handle_data(Sock, <<UnprocessedData/binary, Data/binary>>),
+ worker_loop(Owner, Sock, Rest)
+
+ %% timeout?
+ end.
-worker(Owner, Sock, Data) ->
- wire_protocol:process_data(Sock, Data),
+handle_data(Sock, << ?get_int32(MsgLen), _/binary>>=RawData) when byte_size(RawData) >= MsgLen ->
+ PacketLen = MsgLen-4,
+ <<?get_int32(_), Packet:PacketLen/binary, Rest/binary>> = RawData,
+ wire_protocol:process_packet(Sock, Packet),
+ handle_data(Sock, Rest);
+
+handle_data(Sock, Rest) when is_binary(Rest) ->
inet:setopts(Sock, [{active, once}]),
- gen_tcp:controlling_process(Sock, Owner).
+ {ok, Rest}.
+
+
+
+
+
View
@@ -22,10 +22,13 @@
-module(wire_protocol).
--export([process_data/2]).
+-export([process_packet/2]).
--define(MSG(OP), <<_MessageLength:32, ID:32/little,
- _ResponseTo:32, OP:32/little, Rest/binary>>).
+-include_lib ("bson/include/bson_binary.hrl").
+
+-define(MSG(OP), <<?get_int32(ID),
+ ?get_int32(_ResponseTo),
+ ?get_int32(OP), Rest/binary>>).
-define(OP_REPLY, 1).
-define(OP_INSERT, 2002).
@@ -39,16 +42,16 @@
OP:32/little, F:32/little, C:64/little,
S:32/little, N:32/little, D/binary>>).
-process_data(Sock, ?MSG(?OP_QUERY)) ->
+process_packet(Sock, ?MSG(?OP_QUERY)) ->
process_query(Sock, ID, Rest);
-process_data(Sock, ?MSG(?OP_INSERT)) ->
+process_packet(Sock, ?MSG(?OP_INSERT)) ->
process_insert(Sock, ID, Rest);
-process_data(Sock, _) ->
+process_packet(Sock, _) ->
reply_error(Sock, 0, "unsupported message").
-process_insert(Sock, ID, ?INSERT) ->
+process_insert(_Sock, _ID, ?INSERT) ->
[Collection, Documents|_] = binary:split(Rest, <<0:8>>),
riak_mongo_logic:insert(Collection, bson_binary:get_document(Documents));

0 comments on commit c0870ff

Please sign in to comment.