Permalink
Browse files

startet supervision, some reorga again

  • Loading branch information...
1 parent 60afd4b commit 8dc593533fa5a58aa96d25ca327de38ed74fc858 @pavlobaron committed Apr 14, 2012
View
22 README
@@ -1,10 +1,14 @@
-riak_mongo makes Riak look like Mongo to clients. In the first step, it will allow Mongo drivers to seamlessly connect to it using Mongo Wire Protocol and to map to the underlying Riak data store. In the next step it also might be interesting to have a Mongo based Riak backend.
+riak_mongo makes Riak look like Mongo to clients.
+
+In the first step, it will allow Mongo drivers to seamlessly connect to it using Mongo Wire Protocol and to map to the underlying Riak data store. This can help migrate the data store of existing MongoDB based applications to Riak.
+
+In the next step it also might be interesting to have a Mongo based Riak backend.
Run it in a Riak node instance using:
> riak_mongo:start(temporary, "127.0.0.1", 32323).
-I use the low level riak_kv Erlang API to talk to the local store. Right now, you can connect to it from the Mongo shell on the same machine using:
+We use the low level riak_kv Erlang API to talk to the local store. You can connect to it from the Mongo shell (in this case, on the same machine) using:
$ mongo --verbose -port 32323 collection
@@ -13,11 +17,15 @@ And do some basic commands:
> db.things.findOne()
> db.things.insert({a:1, b:2})
-though getLastError is not yet supported.
+getLastError is not yet supported.
+
+Buckets in the Riak store will be named like "collection.things" - the prefix is thus the name of the database you connect from the Mongo shell to.
+
+No auth, SSL and no IPv6 are considered yet - the implementiation is growing.
-Buckets in the Riak store must be named like "collection.things" - the prefix is thus the name of the database you connect from the Mongo shell to.
+Authors:
-No auth, SSL and no IPv6 are considered yet - the implementiation is growing. Also the beauty of the code can be discussed, but shouldn't be yet.
+Pavlo Baron (pb@pbit.org)
+Kresten "the coding god" Krab Thorup (krab@trifork.com)
-The very huge task will of course be to understand and translate to Riak all the
-mongo commands. But hey, the first step is done.
+Feedback and helpers are welcome at any time.
View
4 include/riak_mongo_state.hrl
@@ -20,4 +20,6 @@
%% @doc Global state records
%% @copyright 2012 Pavlo Baron
--record(state, {owner :: pid(), sock, request_id=0, peer}).
+-record(worker_state, {sock, request_id=0, peer, rest}).
+
+-record(server_state, {old_owner :: pid(), sock}).
View
2 src/riak_mongo_bson2.erl
@@ -307,7 +307,7 @@ encode_element(Name, {md5, Value}) when is_binary(Value) ->
encode_element(Name, undefined) ->
<<?UNDEFINED_TAG, (encode_cstring(Name))/binary>>;
-encode_element(Name, {objectid, ID}) when byte_size(ID) =:= 12 ->
+encode_element(Name, {objectid, ID}) ->
<<?OBJECTID_TAG, (encode_cstring(Name))/binary, ID/binary>>;
encode_element(Name, true) ->
View
54 src/riak_mongo_decenc.erl
@@ -1,54 +0,0 @@
-%%
-%% This file is part of riak_mongo
-%%
-%% Copyright (c) 2012 by Pavlo Baron (pb at pbit dot org)
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%
-
-%% @author Pavlo Baron <pb at pbit dot org>
-%% @doc Utils around mongo-bson and JSON encoding/decoding
-%% @copyright 2012 Pavlo Baron
-
--module(riak_mongo_decenc).
-
--export([encode_struct/1, decode_bson/1]).
-
--include_lib("bson/include/bson_binary.hrl").
-
--spec decode_bson(binary()) -> {binary(), binary()}.
-decode_bson(<<>>) ->
- [];
-decode_bson(<<?get_int32(N), RawBson/binary>>) ->
- S = N - 5,
- <<DB:S/binary, 0:8, Rest/binary>> = RawBson,
- RawStruct = do_fields(DB),
- TID = lists:keyfind(<<"_id">>, 1, RawStruct),
- ID = case is_tuple(TID) of
- true -> element(2, TID);
- _ -> <<>>
- end,
- [{ID, {struct, RawStruct}}|decode_bson(Rest)].
-
-do_fields(<<>>) -> [];
-do_fields(B) ->
- {N, V, B1} = bson_binary:get_field(B),
- [{N, V} | do_fields(B1)].
-
--spec encode_struct(tuple()) -> binary().
-encode_struct(Struct) ->
-
-
-error_logger:info_msg("~p", [mochijson2:encode(Struct)]),
-
- iolist_to_binary(mochijson2:encode(Struct)).
View
2 src/riak_mongo_message.erl
@@ -58,7 +58,7 @@ you(#state{peer=Peer}) ->
process_insert(_, []) ->
ok;
process_insert(DbCol, [Document|L]) ->
- riak_mongo_store:insert(DbCol, Document),
+ riak_mongo_riak:insert(DbCol, Document),
process_insert(DbCol, L).
admin_command(<<"whatsmyuri">>, _Options, State) ->
View
2 src/riak_mongo_protocol.erl
@@ -67,7 +67,7 @@ bit(false) -> 0.
decode_packet( << ?HDR(_, ?InsertOpcode), ?get_bits32(0,0,0,0,0,0,0,ContinueOnError), Rest/binary >> ) ->
{DBColl, Rest1} = riak_mongo_bson2:get_cstring(Rest),
- BsonDocs = get_all_docs(Rest1),
+ Docs = get_all_docs(Rest1),
{ok, #mongo_insert{ dbcoll=DBColl,
request_id=RequestId,
documents=Docs,
View
95 src/riak_mongo_riak.erl
@@ -0,0 +1,95 @@
+%%
+%% This file is part of riak_mongo
+%%
+%% Copyright (c) 2012 by Pavlo Baron (pb at pbit dot org)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+
+%% @author Pavlo Baron <pb at pbit dot org>
+%% @doc Here we speak to the Riak store
+%% @copyright 2012 Pavlo Baron
+
+-module(riak_mongo_riak).
+
+-export([insert/2]).
+
+-include("riak_mongo_bson2.hrl").
+
+-spec insert(bson_objectid(), bson_document()) ->
+ ok |
+ {error, too_many_fails} |
+ {error, timeout} |
+ {error, {n_val_violation, N::integer()}}.
+insert(Bucket, Document) ->
+ %{ok, C} = riak:local_client(),
+ %O = riak_object:new(Bucket, ID, to_json(Document)),
+ %C:put(O).
+ to_json(Document).
+
+%find(#mongo_query=Query) ->
+ % query!!!
+ % if NumberToReturn < 0 then we close the cursor (cursor? state?)
+% {ok, C} = riak:local_client(),
+% {ok, L} = C:list_keys(Collection),
+% collect_objects(C, L).
+
+%collect_objects(_, []) ->
+% [];
+%collect_objects(C, [Key|L]) ->
+% {ok, O} = C:get(Collection, Key),
+% [riak_object:get_value(O)|collect_objects(C, L)].
+
+
+
+%% internals
+
+to_json(Document) ->
+
+
+error_logger:info_msg("~p~n", [term_to_binary(Document)]),
+
+ iolist_to_binary(mochijson2:encode(term_to_binary(Document))).
+
+encode_document(Document) ->
+ encode_element(Document).
+
+encode_elements([]) ->
+ [];
+encode_elements([Element|T]) ->
+ [encode_element(Element)|encode_elements(T)].
+
+encode_element(Element) when is_atom(Element) ->
+
+
+ error_logger:info_msg("atom E: ~p~n", [Element]),
+
+
+
+ %S = atom_to_list(Element),
+ <<"aaa">>;
+encode_element(Element) when is_list(Element) ->
+
+error_logger:info_msg("list E: ~p~n", [Element]),
+
+ case is_tuple(lists:nth(1, Element)) of
+ true -> encode_elements(Element);
+ false -> Element
+ end,
+ encode_elements(Element);
+encode_element(Element) when is_tuple(Element) ->
+
+error_logger:info_msg("tuple E: ~p~n", [Element]),
+
+ list_to_tuple(encode_elements(tuple_to_list(Element)));
+encode_element(Element) -> Element.
View
75 src/riak_mongo_server.erl
@@ -22,85 +22,28 @@
-module(riak_mongo_server).
--include_lib ("bson/include/bson_binary.hrl").
--include ("riak_mongo_protocol.hrl").
--include_lib("riak_mongo_state.hrl").
-
-export([start_link/2, handle_info/2, new_connection/2, init/1, sock_opts/0]).
-behavior(gen_nb_server).
+-include("riak_mongo_state.hrl").
+
start_link(IpAddr, Port) ->
gen_nb_server:start_link(?MODULE, IpAddr, Port, []).
init(_Args) ->
- {ok, ok}.
+ {ok, #server_state{old_owner=self()}}.
+handle_info({controlling_process, Pid}, State) ->
+ gen_tcp:controlling_process(State#server_state.sock, Pid),
+ {reply, State};
handle_info(_Msg, State) ->
{noreply, State}.
new_connection(Sock, State) ->
- Me = self(),
- P = spawn(fun() -> worker(Me) end),
- gen_tcp:controlling_process(Sock, P),
- P ! {set_socket, Sock},
- {ok, State}.
+ NewState = #server_state{old_owner=riak_mongo_worker_sup:new_worker(Sock, State),
+ sock=Sock},
+ {ok, NewState}.
sock_opts() ->
[binary, {active, once}, {packet, 0}].
-
-%% this should really be a process under supervision
-
-worker(Owner) ->
- receive {set_socket, Sock} -> ok end,
- inet:setopts(Sock, [{active, once}]),
- worker_loop(#state{ owner=Owner, sock=Sock, peer=inet:peername(Sock) }, <<>>).
-
-worker_loop(#state{sock=Sock}=State, UnprocessedData) ->
- receive
- {tcp, Sock, Data} ->
- {Messages, Rest} = riak_mongo_protocol:decode_wire(<<UnprocessedData/binary, Data/binary>>),
- State2 = process_messages(Messages, State),
- inet:setopts(Sock, [{active, once}]),
- worker_loop(State2, Rest);
-
- {tcp_closed, Sock} ->
- ok;
-
- Msg ->
- error_logger:info_msg("unknown message in worker loop: ~p~n", [Msg]),
- exit(bad_msg)
-
- %% timeout?
- end.
-
-
-%%
-%% loop over messages
-%%
-process_messages([], State) ->
- State;
-process_messages([Message|Rest], State) ->
- error_logger:info_msg("processing ~p~n", [Message]),
- case riak_mongo_message:process_message(Message, State) of
- {noreply, OutState} ->
- ok;
-
- {reply, Reply, #state{ sock=Sock }=State2} ->
- MessageID = element(2, Message),
- ReplyMessage = Reply#mongo_reply{ request_id = State2#state.request_id,
- reply_to = MessageID },
- OutState = State2#state{ request_id = (State2#state.request_id+1) },
-
- error_logger:info_msg("replying ~p~n", [ReplyMessage]),
-
- {ok, Packet} = riak_mongo_protocol:encode_packet(ReplyMessage),
- Size = byte_size(Packet),
- gen_tcp:send(Sock, <<?put_int32(Size+4), Packet/binary>>)
- end,
-
- process_messages(Rest, OutState);
-process_messages(A1,A2) ->
- error_logger:info_msg("BAD ~p,~p~n", [A1,A2]),
- exit({badarg,A1,A2}).
-
View
7 src/riak_mongo_sup.erl
@@ -16,7 +16,7 @@
%% limitations under the License.
%%
-%% @author Pavlo Baron <pb at pbit dot org> (borrowed from Kevin Smith)
+%% @author Pavlo Baron <pb at pbit dot org>
%% @doc This is the mail supervisor of riak_mongo
%% @copyright 2012 Pavlo Baron
@@ -32,4 +32,7 @@ init(Args) ->
ServerSpec = {server,
{riak_mongo_server, start_link, Args},
transient, 2000, worker, [riak_mongo_server]},
- {ok, {{one_for_one, 2, 10}, [ServerSpec]}}.
+ WorkerSupervisorSpec = {worker_sup,
+ {riak_mongo_worker_sup, start_link, []},
+ transient, 2000, worker, [riak_mongo_worker_sup]},
+ {ok, {{one_for_one, 2, 10}, [ServerSpec, WorkerSupervisorSpec]}}.
View
89 src/riak_mongo_worker.erl
@@ -0,0 +1,89 @@
+%%
+%% This file is part of riak_mongo
+%%
+%% Copyright (c) 2012 by Pavlo Baron (pb at pbit dot org)
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+
+%% @author Pavlo Baron <pb at pbit dot org>
+%% @doc This is the worker
+%% @copyright 2012 Pavlo Baron
+
+-module(riak_mongo_worker).
+
+-export([start_link/2, handle_info/2]).
+
+-behavior(gen_server).
+
+-include("riak_mongo_protocol.hrl").
+-include("riak_mongo_state.hrl").
+-include_lib("bson/include/bson_binary.hrl").
+
+start_link(Sock, OldOwner) ->
+ gen_server:start_link(?MODULE, [Sock, OldOwner]).
+
+init(Sock, OldOwner) ->
+ inet:setopts(Sock, [{active, once}]),
+ OldOwner ! {controlling_process, self()},
+ {ok, #worker_state{sock=Sock, peer=inet:peername(Sock)}}.
+
+handle_info(Msg, State) ->
+ case Msg of
+ {tcp, Sock, Data} ->
+ UnprocessedData = State#state.rest,
+ {Messages, Rest} = riak_mongo_protocol:decode_wire(<<UnprocessedData/binary, Data/binary>>),
+ State2 = process_messages(Messages, State),
+ inet:setopts(Sock, [{active, once}]),
+ State3 = State2#worker_state{rest=Rest},
+ {reply, State3};
+
+ {tcp_closed, Sock} ->
+ {reply, ok};
+
+ Msg ->
+ error_logger:info_msg("unknown message in worker callback: ~p~n", [Msg]),
+ {noreply, State}
+
+ %% timeout?
+ end.
+
+%%
+%% loop over messages
+%%
+process_messages([], State) ->
+ State;
+process_messages([Message|Rest], State) ->
+ error_logger:info_msg("processing ~p~n", [Message]),
+ case riak_mongo_message:process_message(Message, State) of
+ {noreply, OutState} ->
+ ok;
+
+ {reply, Reply, #worker_state{sock=Sock}=State2} ->
+ MessageID = element(2, Message),
+ ReplyMessage = Reply#mongo_reply{ request_id = State2#state.request_id,
+ reply_to = MessageID },
+ OutState = State2#worker_state{ request_id = (State2#state.request_id+1) },
+
+ error_logger:info_msg("replying ~p~n", [ReplyMessage]),
+
+ {ok, Packet} = riak_mongo_protocol:encode_packet(ReplyMessage),
+ Size = byte_size(Packet),
+ gen_tcp:send(Sock, <<?put_int32(Size+4), Packet/binary>>)
+ end,
+
+ process_messages(Rest, OutState);
+process_messages(A1,A2) ->
+ error_logger:info_msg("BAD ~p,~p~n", [A1,A2]),
+ exit({badarg,A1,A2}).
+
View
32 src/riak_mongo_store.erl → src/riak_mongo_worker_sup.erl
@@ -17,28 +17,22 @@
%%
%% @author Pavlo Baron <pb at pbit dot org>
-%% @doc Here we speak to the Riak store
+%% @doc This is the worker supervisor
%% @copyright 2012 Pavlo Baron
--module(riak_mongo_store).
+-module(riak_mongo_worker_sup).
+-behaviour(supervisor).
--export([insert/2]).
+-export([start_link/0, init/1, new_worker/2]).
--spec insert(binary(), tuple()) -> term().
-insert(Bucket, {ID, Struct}) ->
- {ok, C} = riak:local_client(),
- O = riak_object:new(Bucket, ID, riak_mongo_decenc:encode_struct(Struct)),
- C:put(O).
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, [Sock]).
-%find(#mongo_query=Query) ->
- % query!!!
- % if NumberToReturn < 0 then we close the cursor (cursor? state?)
-% {ok, C} = riak:local_client(),
-% {ok, L} = C:list_keys(Collection),
-% collect_objects(C, L).
+init(Sock) ->
+ WorkerSpec = {worker,
+ {riak_mongo_worker, start_link, [Sock, whereis(server)]},
+ temporary, brutal_kill, worker, [riak_mongo_worker]},
+ {ok, {{simple_one_for_one, 0, 1}, [WorkerSpec]}}.
-%collect_objects(_, []) ->
-% [];
-%collect_objects(C, [Key|L]) ->
-% {ok, O} = C:get(Collection, Key),
-% [riak_object:get_value(O)|collect_objects(C, L)].
+new_worker(Sock, OldOwner) ->
+ supervisor:start_child(worker_sup, [Sock, OldOwner]).

0 comments on commit 8dc5935

Please sign in to comment.