Skip to content

Commit

Permalink
checking in core code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Vorreuter committed Sep 18, 2009
1 parent d84d7bf commit 549cbe5
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 20 deletions.
12 changes: 12 additions & 0 deletions include/emongo.hrl
@@ -0,0 +1,12 @@
-record(pool, {id, host, port, database, size=1, conn_pids=[], req_id=1}).
-record(header, {messageLength, requestID, responseTo, opCode}).
-record(response, {header, responseFlag, cursorID, startingFrom, numberReturned, documents}).

-define(OP_REPLY, 1).
-define(OP_MSG, 1000).
-define(OP_UPDATE, 2001).
-define(OP_INSERT, 2002).
-define(OP_QUERY, 2004).
-define(OP_GET_MORE, 2005).
-define(OP_DELETE, 2006).
-define(OP_KILL_CURSORS, 2007).
10 changes: 10 additions & 0 deletions priv/example.config
@@ -0,0 +1,10 @@
[{emongo, [
{pools, [
{test1, [
{size, 1},
{host, "localhost"},
{port, 27017},
{database, "testdatabase"}
]}
]}
]}].
85 changes: 67 additions & 18 deletions src/emongo.erl
Expand Up @@ -26,7 +26,9 @@
-export([start_link/0, init/1, handle_call/3, handle_cast/2, -export([start_link/0, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). handle_info/2, terminate/2, code_change/3]).


-export([find/2, find_one/2, insert/2, save/2, remove/2]). -export([pools/0, add_pool/5, insert/3]).

-include("emongo.hrl").


%%==================================================================== %%====================================================================
%% API %% API
Expand All @@ -38,6 +40,12 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).


pools() ->
gen_server:call(?MODULE, pools, infinity).

add_pool(PoolId, Host, Port, Database, Size) ->
gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity).

%%show_dbs() -> ok. %%show_dbs() -> ok.


%%show_collections(Database) -> ok. %%show_collections(Database) -> ok.
Expand All @@ -48,26 +56,26 @@ start_link() ->


%%use_db(PoolId) -> ok. %%use_db(PoolId) -> ok.


find(PoolId, {obj, Props}) -> %find(PoolId, Collection, {obj, _}=Obj) ->
gen_server:call(?MODULE, {PoolId, {find, {obj, Props}}}, infinity).


find_one(PoolId, {obj, Props}) -> ok. %find_one(PoolId, Collection, {obj, _}=Obj) ->


insert(PoolId, {obj, Props}) -> ok. insert(PoolId, Collection, {obj, _}=Obj) ->
{Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity),
Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Obj),
emongo_conn:send(Pid, Pool#pool.req_id, Packet).


%%update %%update


save(PoolId, {obj, Props}) -> ok. %save(PoolId, {obj, Props}) -> ok.


remove(PoolId, {obj, Props}) -> ok. %remove(PoolId, {obj, Props}) -> ok.


%%ensure_index %%ensure_index


%%count %%count


%%drop_collection %drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) ->

%%clear


%%==================================================================== %%====================================================================
%% gen_server callbacks %% gen_server callbacks
Expand All @@ -81,7 +89,7 @@ remove(PoolId, {obj, Props}) -> ok.
%% Description: Initiates the server %% Description: Initiates the server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init(_) -> init(_) ->
{ok, []}. {ok, initialize_pools()}.


%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
Expand All @@ -92,13 +100,33 @@ init(_) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% Description: Handling call messages %% Description: Handling call messages
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_call({PoolId, {find, {obj, Props}}}, _From, Pools) -> handle_call(pools, _From, Pools) ->
Pool = proplists:get_value(PoolId, Pools), {reply, Pools, Pools};

{reply, ok, Pools};


handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, Pools) ->
{Result, Pools1} =
case proplists:is_defined(PoolId, Pools) of
true ->
{{error, pool_already_exists}, Pools};
false ->
Pool = #pool{
id=PoolId,
host=Host,
port=Port,
database=Database,
size=Size
},
Pool1 = open_connections(Pool),
{ok, [{PoolId, Pool1}|Pools]}
end,
{reply, Result, Pools1};



handle_call({pid, PoolId}, _From, Pools) ->
Pool = proplists:get_value(PoolId, Pools),
Pool1 = Pool#pool{req_id = ((Pool#pool.req_id)+1)},
OtherPools = proplists:delete(PoolId, Pools),
Pid = get_conn_pid(Pool),
{reply, {Pid, Pool}, [Pool1|OtherPools]};


handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}. handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.


Expand Down Expand Up @@ -140,6 +168,27 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
initialize_pools() ->
case application:get_env(emongo, pools) of
undefined ->
[];
{ok, Pools} ->
[begin
Pool = #pool{
id = PoolId,
size = proplists:get_value(size, Props, 1),
host = proplists:get_value(host, Props, "localhost"),
port = proplists:get_value(port, Props, 27017),
database = proplists:get_value(database, Props, "test")
},
{PoolId, open_connections(Pool)}
end || {PoolId, Props} <- Pools]
end.

open_connections(Pool) ->
ConnPids = [begin
emongo_conn:start_link(Pool#pool.host, Pool#pool.port)
end || _ <- lists:seq(1, Pool#pool.size)],
Pool#pool{conn_pids=ConnPids}.



get_conn_pid(#pool{conn_pids=[Pid|_]}) -> Pid.

2 changes: 1 addition & 1 deletion src/emongo_app.erl
Expand Up @@ -32,7 +32,7 @@ stop(_) -> ok.


init(_) -> init(_) ->
{ok, {{one_for_one, 10, 10}, [ {ok, {{one_for_one, 10, 10}, [
%{ex_scheduler, {ex_scheduler, start_link, []}, permanent, 5000, worker, [ex_scheduler]} {emongo, {emongo, start_link, []}, permanent, 5000, worker, [emongo]}
]}}. ]}}.


build_rel() -> build_rel() ->
Expand Down
76 changes: 76 additions & 0 deletions src/emongo_conn.erl
@@ -0,0 +1,76 @@
%% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(emongo_conn).

-export([start_link/2, init/3, send/3, send_recv/3]).

-record(request, {req_id, requestor}).
-record(state, {socket, requests}).

-include("emongo.hrl").

start_link(Host, Port) ->
proc_lib:start_link(?MODULE, init, [Host, Port, self()]).

init(Host, Port, Parent) ->
Socket = open_socket(Host, Port),
proc_lib:init_ack(Parent, self()),
loop(#state{socket=Socket, requests=[]}).

send(Pid, ReqID, Packet) ->
gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}).

send_recv(Pid, ReqID, Packet) ->
gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}).

loop(State) ->
receive
{'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} ->
gen_tcp:send(State#state.socket, Packet),
gen:reply({From, Mref}, ok),
loop(State);
{'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} ->
gen_tcp:send(State#state.socket, Packet),
gen:reply({From, Mref}, ok),
Request = #request{req_id=ReqID, requestor={From, Mref}},
State1 = State#state{requests=[Request|State#state.requests]},
loop(State1);
{tcp, _Sock, Data} ->
Resp = emongo_packet:decode_response(Data),
ResponseTo = (Resp#response.header)#header.responseTo,
case proplists:get_value(ResponseTo, State#state.requests) of
undefined ->
ok;
Requestor ->
gen:reply(Requestor, Resp)
end,
loop(State)
end.

open_socket(Host, Port) ->
case gen_tcp:connect(Host, Port, [binary, {active, true}]) of
{ok, Sock} ->
Sock;
{error, Reason} ->
exit({failed_to_open_socket, Reason})
end.
49 changes: 49 additions & 0 deletions src/emongo_packet.erl
@@ -0,0 +1,49 @@
%% Copyright (c) 2009 Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(emongo_packet).

-export([insert/4, decode_response/1]).

-include("emongo.hrl").

insert(Database, Collection, ReqID, {obj, Props}) ->
FullName = mongodb_bson:encode_cstring(lists:concat([Database, ".", Collection])),
EncodedDocument = mongodb_bson:encode({obj, Props}),
Message = <<0:32, FullName/binary, EncodedDocument/binary>>,
Length = byte_size(Message),
<<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_INSERT:32/little-signed, Message/binary>>.

decode_response(<<Length:32/little-signed, ReqID:32/little-signed, RespTo:32/little-signed, Op:32/little-signed, Message/binary>>) ->
<<RespFlag:32/little-signed,
CursorID:64/little-signed,
StartingFrom:32/little-signed,
NumRet:32/little-signed,
Documents/binary>> = Message,
#response{
header = {header, Length, ReqID, RespTo, Op},
responseFlag = RespFlag,
cursorID = CursorID,
startingFrom = StartingFrom,
numberReturned = NumRet,
documents = mongodb_bson:decode(Documents)
}.
4 changes: 3 additions & 1 deletion t/001-load.t
@@ -1,10 +1,12 @@
#!/usr/local/bin/escript #!/usr/local/bin/escript
%% -*- erlang -*- %% -*- erlang -*-
%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell %%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example


main(_) -> main(_) ->
etap:plan(unknown), etap:plan(unknown),
error_logger:tty(false), error_logger:tty(false),
etap_application:start_ok(emongo, "application 'emongo' started ok"), etap_application:start_ok(emongo, "application 'emongo' started ok"),


etap:is(length(emongo:pools()), 1, "one pool exists in state"),

etap:end_tests(). etap:end_tests().

0 comments on commit 549cbe5

Please sign in to comment.