diff --git a/include/riak_mongo_protocol.hrl b/include/riak_mongo_protocol.hrl index 48e1924..c3962b6 100644 --- a/include/riak_mongo_protocol.hrl +++ b/include/riak_mongo_protocol.hrl @@ -3,7 +3,7 @@ request_id :: integer(), dbcoll :: binary(), continueonerror = false :: boolean(), - documents :: [bson:document()] }). + documents :: [tuple()] }). -record (mongo_update, { request_id :: integer(), @@ -37,7 +37,7 @@ skip = 0 :: integer(), batchsize = 0 :: integer(), %% negative closes cursor - selector :: bson:document(), + selector :: tuple(), projector = [] :: bson:document() }). -record (mongo_getmore, { diff --git a/src/riak_mongo_bson.erl b/src/riak_mongo_decenc.erl similarity index 70% rename from src/riak_mongo_bson.erl rename to src/riak_mongo_decenc.erl index cc615e6..e0d5a1f 100644 --- a/src/riak_mongo_bson.erl +++ b/src/riak_mongo_decenc.erl @@ -17,28 +17,34 @@ %% %% @author Pavlo Baron -%% @doc This is the wrapper around the mongo-bson encoder/decoder +%% @doc Utils around mongo-bson and JSON encoding/decoding %% @copyright 2012 Pavlo Baron --module(riak_mongo_bson). +-module(riak_mongo_decenc). --export([encode_bson/1, decode_bson/1]). +-export([encode_struct/1, decode_bson/1]). -include_lib("bson/include/bson_binary.hrl"). -spec decode_bson(binary()) -> {binary(), binary()}. +decode_bson(<<>>) -> + []; decode_bson(<>) -> S = N - 5, - <> = RawBson, + <> = RawBson, RawStruct = do_fields(DB), - ID = [V || {K, V} <- RawStruct, K =:= <<"_id">>], - {list_to_binary(ID), {struct, RawStruct}}. + TID = lists:keyfind(<<"_id">>, 1, RawStruct), + ID = case is_tuple(TID) of + true -> element(2, TID); + _ -> <<>> + end, + [{ID, {struct, RawStruct}}|decode_bson(Rest)]. do_fields(<<>>) -> []; do_fields(B) -> {N, V, B1} = bson_binary:get_field(B), [{N, V} | do_fields(B1)]. --spec encode_bson(binary()) -> binary(). -encode_bson(Struct) -> +-spec encode_struct(tuple()) -> binary(). +encode_struct(Struct) -> iolist_to_binary(mochijson2:encode(Struct)). diff --git a/src/riak_mongo_message.erl b/src/riak_mongo_message.erl index 8f31faf..de67d99 100644 --- a/src/riak_mongo_message.erl +++ b/src/riak_mongo_message.erl @@ -27,10 +27,12 @@ -include ("riak_mongo_protocol.hrl"). -include_lib("riak_mongo_state.hrl"). -process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={whatsmyuri, 1}}, State) -> +process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={struct,[{<<"whatsmyuri">>,1}]}}, State) -> + {reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary(you(State)), utf8), 1}]}, State}; -process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={replSetGetStatus, 1, forShell, 1}}, State) -> +process_message(#mongo_query{ dbcoll= <<"admin.$cmd">>, selector={struct, [{<<"replSetGetStatus">>, 1}, + {<<"forShell">>, 1}]}}, State) -> {reply, #mongo_reply{ documents=[{binary_to_atom(iolist_to_binary("not running with --replSet"), utf8), 1}]}, State}; @@ -38,6 +40,10 @@ process_message(#mongo_query{}=Message, State) -> error_logger:info_msg("unhandled query: ~p~n", [Message]), {reply, #mongo_reply{ queryerror=true }, State}; +process_message(#mongo_insert{dbcoll=DbCol, documents=Documents}, State) -> + process_insert(DbCol, Documents), + {noreply, State}; + process_message(Message, State) -> error_logger:info_msg("unhandled message: ~p~n", [Message]), {noreply, State}. @@ -46,3 +52,9 @@ process_message(Message, State) -> you(#state{peer=Peer}) -> {ok, {{A, B, C, D}, P}} = Peer, %IPv6??? io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]). + +process_insert(_, []) -> + ok; +process_insert(DbCol, [Document|L]) -> + riak_mongo_store:insert(DbCol, Document), + process_insert(DbCol, L). diff --git a/src/riak_mongo_protocol.erl b/src/riak_mongo_protocol.erl index 2518f61..0e6b415 100644 --- a/src/riak_mongo_protocol.erl +++ b/src/riak_mongo_protocol.erl @@ -67,10 +67,10 @@ bit(false) -> 0. decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnError), Rest/binary >> ) -> {DBColl, Rest1} = bson_binary:get_cstring(Rest), - BsonDocs = get_all_docs(Rest1), + Docs = riak_mongo_decenc:decode_bson(Rest1), {ok, #mongo_insert{ dbcoll=DBColl, request_id=RequestId, - documents=BsonDocs, + documents=Docs, continueonerror = bool(ContinueOnError) }}; @@ -109,8 +109,11 @@ decode_packet(<< ?HDR(_, ?QueryOpcode), Rest/binary >>) -> {DBColl, Rest1} = bson_binary:get_cstring(Rest), << ?get_int32(NumberToSkip), ?get_int32(NumberToReturn), Rest2/binary >> = Rest1, - [Query | ReturnFieldSelectors ] = get_all_docs(Rest2), - + [{_, Query} | TReturnFieldSelectors] = riak_mongo_decenc:decode_bson(Rest2), + ReturnFieldSelectors = case TReturnFieldSelectors of + [] -> {struct, []}; + _ -> element(2, TReturnFieldSelectors) + end, {ok, #mongo_query{ request_id=RequestId, dbcoll=DBColl, tailablecursor=bool(Tailable), @@ -165,14 +168,14 @@ encode_packet(#mongo_reply{ %% %% %% -get_all_docs(Binary) -> - get_all_docs(Binary, []). - -get_all_docs(<<>>, Acc) -> - lists:reverse(Acc); -get_all_docs(Data, Acc) -> - {Doc, Rest} = bson_binary:get_document(Data), - get_all_docs(Rest, [Doc|Acc]). +%get_all_docs(Binary) -> +% get_all_docs(Binary, []). + +%get_all_docs(<<>>, Acc) -> +% lists:reverse(Acc); +%get_all_docs(Data, Acc) -> +% {Doc, Rest} = riak:get_document(Data), +% get_all_docs(Rest, [Doc|Acc]). get_int64_list(Num, Binary) -> diff --git a/src/riak_mongo_store.erl b/src/riak_mongo_store.erl index e4403d6..ca82181 100644 --- a/src/riak_mongo_store.erl +++ b/src/riak_mongo_store.erl @@ -22,12 +22,13 @@ -module(riak_mongo_store). -%-export([insert/1, find/1]). +-export([insert/2]). -%insert(#mongo_insert{dbcol=Collection, documents=Doc}) -> -% {ok, C} = riak:local_client(), -% O = riak_object:new(Collection, ID, Doc), -% C:put(O). +-spec insert(binary(), tuple()) -> term(). +insert(Bucket, {ID, Struct}) -> + {ok, C} = riak:local_client(), + O = riak_object:new(Bucket, ID, riak_mongo_decenc:encode_struct(Struct)), + C:put(O). %find(#mongo_query=Query) -> % query!!!