Skip to content

Commit

Permalink
reorganized, insert has a weird bug though
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlobaron committed Apr 11, 2012
1 parent 4d8f5fe commit 127c3dc
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 29 deletions.
4 changes: 2 additions & 2 deletions include/riak_mongo_protocol.hrl
Expand Up @@ -3,7 +3,7 @@
request_id :: integer(), request_id :: integer(),
dbcoll :: binary(), dbcoll :: binary(),
continueonerror = false :: boolean(), continueonerror = false :: boolean(),
documents :: [bson:document()] }). documents :: [tuple()] }).


-record (mongo_update, { -record (mongo_update, {
request_id :: integer(), request_id :: integer(),
Expand Down Expand Up @@ -37,7 +37,7 @@


skip = 0 :: integer(), skip = 0 :: integer(),
batchsize = 0 :: integer(), %% negative closes cursor batchsize = 0 :: integer(), %% negative closes cursor
selector :: bson:document(), selector :: tuple(),
projector = [] :: bson:document() }). projector = [] :: bson:document() }).


-record (mongo_getmore, { -record (mongo_getmore, {
Expand Down
22 changes: 14 additions & 8 deletions src/riak_mongo_bson.erl → src/riak_mongo_decenc.erl
Expand Up @@ -17,28 +17,34 @@
%% %%


%% @author Pavlo Baron <pb at pbit dot org> %% @author Pavlo Baron <pb at pbit dot org>
%% @doc This is the wrapper around the mongo-bson encoder/decoder %% @doc Utils around mongo-bson and JSON encoding/decoding
%% @copyright 2012 Pavlo Baron %% @copyright 2012 Pavlo Baron


-module(riak_mongo_bson). -module(riak_mongo_decenc).


-export([encode_bson/1, decode_bson/1]). -export([encode_struct/1, decode_bson/1]).


-include_lib("bson/include/bson_binary.hrl"). -include_lib("bson/include/bson_binary.hrl").


-spec decode_bson(binary()) -> {binary(), binary()}. -spec decode_bson(binary()) -> {binary(), binary()}.
decode_bson(<<>>) ->
[];
decode_bson(<<?get_int32(N), RawBson/binary>>) -> decode_bson(<<?get_int32(N), RawBson/binary>>) ->
S = N - 5, S = N - 5,
<<DB:S/binary, 0:8, _Rest/binary>> = RawBson, <<DB:S/binary, 0:8, Rest/binary>> = RawBson,
RawStruct = do_fields(DB), RawStruct = do_fields(DB),
ID = [V || {K, V} <- RawStruct, K =:= <<"_id">>], TID = lists:keyfind(<<"_id">>, 1, RawStruct),
{list_to_binary(ID), {struct, RawStruct}}. ID = case is_tuple(TID) of
true -> element(2, TID);
_ -> <<>>
end,
[{ID, {struct, RawStruct}}|decode_bson(Rest)].


do_fields(<<>>) -> []; do_fields(<<>>) -> [];
do_fields(B) -> do_fields(B) ->
{N, V, B1} = bson_binary:get_field(B), {N, V, B1} = bson_binary:get_field(B),
[{N, V} | do_fields(B1)]. [{N, V} | do_fields(B1)].


-spec encode_bson(binary()) -> binary(). -spec encode_struct(tuple()) -> binary().
encode_bson(Struct) -> encode_struct(Struct) ->
iolist_to_binary(mochijson2:encode(Struct)). iolist_to_binary(mochijson2:encode(Struct)).
16 changes: 14 additions & 2 deletions src/riak_mongo_message.erl
Expand Up @@ -27,17 +27,23 @@
-include ("riak_mongo_protocol.hrl"). -include ("riak_mongo_protocol.hrl").
-include_lib("riak_mongo_state.hrl"). -include_lib("riak_mongo_state.hrl").


process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={whatsmyuri, 1}}, State) -> process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={struct,[{<<"whatsmyuri">>,1}]}}, State) ->

{reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary(you(State)), utf8), 1}]}, State}; {reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary(you(State)), utf8), 1}]}, State};


process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={replSetGetStatus, 1, forShell, 1}}, State) -> process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={struct, [{<<"replSetGetStatus">>, 1},
{<<"forShell">>, 1}]}}, State) ->
{reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary("not running with --replSet"), {reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary("not running with --replSet"),
utf8), 1}]}, State}; utf8), 1}]}, State};


process_message(#mongo_query{}=Message, State) -> process_message(#mongo_query{}=Message, State) ->
error_logger:info_msg("unhandled query: ~p~n", [Message]), error_logger:info_msg("unhandled query: ~p~n", [Message]),
{reply, #mongo_reply{ queryerror=true }, State}; {reply, #mongo_reply{ queryerror=true }, State};


process_message(#mongo_insert{dbcoll=DbCol, documents=Documents}, State) ->
process_insert(DbCol, Documents),
{noreply, State};

process_message(Message, State) -> process_message(Message, State) ->
error_logger:info_msg("unhandled message: ~p~n", [Message]), error_logger:info_msg("unhandled message: ~p~n", [Message]),
{noreply, State}. {noreply, State}.
Expand All @@ -46,3 +52,9 @@ process_message(Message, State) ->
you(#state{peer=Peer}) -> you(#state{peer=Peer}) ->
{ok, {{A, B, C, D}, P}} = Peer, %IPv6??? {ok, {{A, B, C, D}, P}} = Peer, %IPv6???
io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]). io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]).

process_insert(_, []) ->
ok;
process_insert(DbCol, [Document|L]) ->
riak_mongo_store:insert(DbCol, Document),
process_insert(DbCol, L).
27 changes: 15 additions & 12 deletions src/riak_mongo_protocol.erl
Expand Up @@ -67,10 +67,10 @@ bit(false) -> 0.


decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnError), Rest/binary >> ) -> decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnError), Rest/binary >> ) ->
{DBColl, Rest1} = bson_binary:get_cstring(Rest), {DBColl, Rest1} = bson_binary:get_cstring(Rest),
BsonDocs = get_all_docs(Rest1), Docs = riak_mongo_decenc:decode_bson(Rest1),
{ok, #mongo_insert{ dbcoll=DBColl, {ok, #mongo_insert{ dbcoll=DBColl,
request_id=RequestId, request_id=RequestId,
documents=BsonDocs, documents=Docs,
continueonerror = bool(ContinueOnError) continueonerror = bool(ContinueOnError)
}}; }};


Expand Down Expand Up @@ -109,8 +109,11 @@ decode_packet(<< ?HDR(_, ?QueryOpcode),
Rest/binary >>) -> Rest/binary >>) ->
{DBColl, Rest1} = bson_binary:get_cstring(Rest), {DBColl, Rest1} = bson_binary:get_cstring(Rest),
<< ?get_int32(NumberToSkip), ?get_int32(NumberToReturn), Rest2/binary >> = Rest1, << ?get_int32(NumberToSkip), ?get_int32(NumberToReturn), Rest2/binary >> = Rest1,
[Query | ReturnFieldSelectors ] = get_all_docs(Rest2), [{_, Query} | TReturnFieldSelectors] = riak_mongo_decenc:decode_bson(Rest2),

ReturnFieldSelectors = case TReturnFieldSelectors of
[] -> {struct, []};
_ -> element(2, TReturnFieldSelectors)
end,
{ok, #mongo_query{ request_id=RequestId, {ok, #mongo_query{ request_id=RequestId,
dbcoll=DBColl, dbcoll=DBColl,
tailablecursor=bool(Tailable), tailablecursor=bool(Tailable),
Expand Down Expand Up @@ -165,14 +168,14 @@ encode_packet(#mongo_reply{
%% %%
%% %%
%% %%
get_all_docs(Binary) -> %get_all_docs(Binary) ->
get_all_docs(Binary, []). % get_all_docs(Binary, []).


get_all_docs(<<>>, Acc) -> %get_all_docs(<<>>, Acc) ->
lists:reverse(Acc); % lists:reverse(Acc);
get_all_docs(Data, Acc) -> %get_all_docs(Data, Acc) ->
{Doc, Rest} = bson_binary:get_document(Data), % {Doc, Rest} = riak:get_document(Data),
get_all_docs(Rest, [Doc|Acc]). % get_all_docs(Rest, [Doc|Acc]).




get_int64_list(Num, Binary) -> get_int64_list(Num, Binary) ->
Expand Down
11 changes: 6 additions & 5 deletions src/riak_mongo_store.erl
Expand Up @@ -22,12 +22,13 @@


-module(riak_mongo_store). -module(riak_mongo_store).


%-export([insert/1, find/1]). -export([insert/2]).


%insert(#mongo_insert{dbcol=Collection, documents=Doc}) -> -spec insert(binary(), tuple()) -> term().
% {ok, C} = riak:local_client(), insert(Bucket, {ID, Struct}) ->
% O = riak_object:new(Collection, ID, Doc), {ok, C} = riak:local_client(),
% C:put(O). O = riak_object:new(Bucket, ID, riak_mongo_decenc:encode_struct(Struct)),
C:put(O).


%find(#mongo_query=Query) -> %find(#mongo_query=Query) ->
% query!!! % query!!!
Expand Down

0 comments on commit 127c3dc

Please sign in to comment.