Skip to content

Commit

Permalink
Rebuilt code base, supervised worker (need tests), merged with drkrab
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlobaron committed Apr 15, 2012
1 parent 782dfbf commit 4ceecd3
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 397 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ deps/*

.eunit
*/*~

rebuild.sh
2 changes: 0 additions & 2 deletions README
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ And do some basic commands:
> db.things.findOne()
> db.things.insert({a:1, b:2})

getLastError is not yet supported.

Buckets in the Riak store will be named like "collection.things" - the prefix is thus the name of the database you connect from the Mongo shell to.

No auth, SSL and no IPv6 are considered yet - the implementiation is growing.
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions include/riak_mongo_state.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
%% @doc Global state records
%% @copyright 2012 Pavlo Baron

-record(worker_state, {sock, request_id=0, peer, rest, lastError=[]}).
-record(worker_state, {sock, request_id=0, rest, lastError=[]}).

-record(server_state, {old_owner :: pid(), sock}).
-record(server_state, {owner :: pid(), sock}).
2 changes: 1 addition & 1 deletion src/riak_mongo.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
{riak_mongo, []}
},
{env, []
}]}.
}]}.
4 changes: 2 additions & 2 deletions src/riak_mongo_bson2.erl → src/riak_mongo_bson.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
%%
%% @copyright 2012 Trifork

-module(riak_mongo_bson2).
-module(riak_mongo_bson).

-export([get_document/1, get_raw_document/1, get_cstring/1, value_type/1]).
-export([encode_document/1]).

-include("riak_mongo_bson2.hrl").
-include("riak_mongo_bson.hrl").

-define(DOUBLE_TAG, 16#01).
-define(STRING_TAG, 16#02).
Expand Down
61 changes: 41 additions & 20 deletions src/riak_mongo_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
%% This file is part of riak_mongo
%%
%% Copyright (c) 2012 by Pavlo Baron (pb at pbit dot org)
%% Copyright (c) 2012 by Trifork
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand All @@ -23,14 +24,44 @@

-module(riak_mongo_message).

-export([process_message/2]).
-export([process_messages/2]).

-include ("riak_mongo_protocol.hrl").
-include_lib("riak_mongo_state.hrl").
-include("riak_mongo_state.hrl").
-include_lib("bson/include/bson_binary.hrl").

-define(CMD,<<"$cmd">>).
-define(ADM,<<"admin">>).

%%
%% loop over messages
%%
process_messages([], State) ->
State;
process_messages([Message|Rest], State) ->
error_logger:info_msg("processing ~p~n", [Message]),
case process_message(Message, State) of
{noreply, OutState} ->
ok;

{reply, Reply, #worker_state{sock=Sock}=State2} ->
MessageID = element(2, Message),
ReplyMessage = Reply#mongo_reply{ request_id = State2#worker_state.request_id,
reply_to = MessageID },
OutState = State2#worker_state{ request_id = (State2#worker_state.request_id+1) },

error_logger:info_msg("replying ~p~n", [ReplyMessage]),

{ok, Packet} = riak_mongo_protocol:encode_packet(ReplyMessage),
Size = byte_size(Packet),
gen_tcp:send(Sock, <<?put_int32(Size+4), Packet/binary>>)
end,

process_messages(Rest, OutState);
process_messages(A1,A2) ->
error_logger:info_msg("BAD ~p,~p~n", [A1,A2]),
exit({badarg,A1,A2}).

process_message(#mongo_query{ db=DataBase, coll=?CMD,
selector=Selector}, State) ->

Expand All @@ -44,36 +75,26 @@ process_message(#mongo_query{ db=DataBase, coll=?CMD,

process_message(#mongo_query{}=Message, State) ->

Result = riak_mongo_store:find(Message),
Result = riak_mongo_riak:find(Message),
{reply, #mongo_reply{ documents=Result, queryerror=false }, State};

process_message(#mongo_insert{}=Insert, State) ->
State2 = riak_mongo_store:insert(Insert, State),
State2 = riak_mongo_riak:insert(Insert, State),
{noreply, State2};

process_message(#mongo_delete{}=Delete, State) ->
State2 = riak_mongo_store:delete(Delete, State),
State2 = riak_mongo_riak:delete(Delete, State),
{noreply, State2};

process_message(#mongo_insert{dbcoll=DbCol, documents=Documents}, State) ->
process_insert(DbCol, Documents),
{noreply, State};

process_message(Message, State) ->
error_logger:info_msg("unhandled message: ~p~n", [Message]),
{noreply, State}.

%% internals
you(#state{peer=Peer}) ->
{ok, {{A, B, C, D}, P}} = Peer, %IPv6???
you(#worker_state{sock=Sock}) ->
{ok, {{A, B, C, D}, P}} = inet:peername(Sock), %IPv6???
io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, P]).

process_insert(_, []) ->
ok;
process_insert(DbCol, [Document|L]) ->
riak_mongo_riak:insert(DbCol, Document),
process_insert(DbCol, L).

db_command(?ADM, <<"whatsmyuri">>, _Options, State) ->
{ok, [{you, {utf8, you(State)}}, {ok, 1}], State};

Expand All @@ -82,11 +103,11 @@ db_command(?ADM, <<"replSetGetStatus">>, _Options, State) ->
{ok, [{ok, false}], State};

db_command(_DataBase, <<"getlasterror">>, _Options, State) ->
case State#state.lastError of
case State#worker_state.lastError of
[] ->
{ok, [{ok,true}], State#state{lastError=[]}};
{ok, [{ok,true}], State#worker_state{lastError=[]}};
MSG ->
{ok, [{err, io:format("~p", MSG)}], State#state{lastError=[]}}
{ok, [{err, io:format("~p", MSG)}], State#worker_state{lastError=[]}}
end;

db_command(DataBase, Command, _Options, State) ->
Expand Down
24 changes: 12 additions & 12 deletions src/riak_mongo_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,19 @@ bit(false) -> 0.


decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnError), Rest/binary >> ) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
BsonDocs = get_all_raw_docs(Rest1),
{ok, #mongo_insert{ dbcoll=DBColl,
request_id=RequestId,
documents=Docs,
documents=BsonDocs,
continueonerror = bool(ContinueOnError)
}};

decode_packet(<< ?HDR(_, ?UpdateOpcode), 0:32, Rest/binary>> ) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
<<?get_bits32(0,0,0,0,0,0,MultiUpdate,Upsert), Rest2>> = Rest1,
{Selector, Rest3} = riak_mongo_bson2:get_document(Rest2),
{Update, <<>>} = riak_mongo_bson2:get_document(Rest3),
{Selector, Rest3} = riak_mongo_bson:get_document(Rest2),
{Update, <<>>} = riak_mongo_bson:get_document(Rest3),

{ok, #mongo_update{ dbcoll=DBColl,
request_id=RequestId,
Expand All @@ -90,9 +90,9 @@ decode_packet(<< ?HDR(_, ?UpdateOpcode), 0:32, Rest/binary>> ) ->
}};

decode_packet(<< ?HDR(_, ?DeleteOpcode), 0:32, Rest/binary >>) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
<<?get_bits32(0,0,0,0,0,0,0,SingleRemove), Rest2>> = Rest1,
{Selector, <<>>} = riak_mongo_bson2:get_document(Rest2),
{Selector, <<>>} = riak_mongo_bson:get_document(Rest2),

{ok, #mongo_delete{ dbcoll=DBColl,
request_id=RequestId,
Expand All @@ -107,7 +107,7 @@ decode_packet(<< ?HDR(_, ?KillcursorOpcode), 0:32, ?get_int32(NumCursorIDs), Res
decode_packet(<< ?HDR(_, ?QueryOpcode),
?get_bits32(Partial,Exhaust,AwaitData,NoCursorTimeout,OplogReplay,SlaveOK,Tailable,0),
Rest/binary >>) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
<< ?get_int32(NumberToSkip), ?get_int32(NumberToReturn), Rest2/binary >> = Rest1,
[Query | ReturnFieldSelectors ] = get_all_docs(Rest2),

Expand All @@ -129,7 +129,7 @@ decode_packet(<< ?HDR(_, ?QueryOpcode),
projector=ReturnFieldSelectors }};

decode_packet(<< ?HDR(_, ?GetmoreOpcode), 0:32, Rest/binary >>) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
{DBColl, Rest1} = riak_mongo_bson:get_cstring(Rest),
<< ?get_int32(NumberToReturn), ?get_int64(CursorID) >> = Rest1,

{ok, #mongo_getmore{ request_id=RequestId,
Expand Down Expand Up @@ -162,7 +162,7 @@ encode_packet(#mongo_reply{
?put_int64(CursorID),
?put_int32(StartingFrom),
?put_int32(length(Documents)),
<< <<(riak_mongo_bson2:encode_document (Doc)) /binary>> || Doc <- Documents>> /binary >>}.
<< <<(riak_mongo_bson:encode_document (Doc)) /binary>> || Doc <- Documents>> /binary >>}.

%%
%%
Expand All @@ -173,7 +173,7 @@ get_all_docs(Binary) ->
get_all_docs(<<>>, Acc) ->
lists:reverse(Acc);
get_all_docs(Data, Acc) ->
{Doc, Rest} = riak_mongo_bson2:get_document(Data),
{Doc, Rest} = riak_mongo_bson:get_document(Data),
get_all_docs(Rest, [Doc|Acc]).

get_all_raw_docs(Binary) ->
Expand All @@ -182,7 +182,7 @@ get_all_raw_docs(Binary) ->
get_all_raw_docs(<<>>, Acc) ->
lists:reverse(Acc);
get_all_raw_docs(Data, Acc) ->
{Doc, Rest} = riak_mongo_bson2:get_raw_document(Data),
{Doc, Rest} = riak_mongo_bson:get_raw_document(Data),
get_all_raw_docs(Rest, [Doc|Acc]).

get_int64_list(Num, Binary) ->
Expand Down
4 changes: 2 additions & 2 deletions src/riak_mongo_query.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

-module(riak_mongo_query).

-include("riak_mongo_bson2.hrl").
-include("riak_mongo_bson.hrl").
-include_lib("eunit/include/eunit.hrl").

-export([matches/2, compile/1]).
Expand Down Expand Up @@ -94,7 +94,7 @@ elem_match({_Key,Value}, '$size', QueryValue) when length(Value) == QueryValue -
true;

elem_match({_Key,Value}, '$type', QueryValue) ->
riak_mongo_bson2:value_type(Value) =:= QueryValue;
riak_mongo_bson:value_type(Value) =:= QueryValue;

elem_match(_KeyValue, '$exists', Bool) when is_boolean(Bool) ->
Bool;
Expand Down
Loading

0 comments on commit 4ceecd3

Please sign in to comment.