diff --git a/include/emongo.hrl b/include/emongo.hrl index e69de29..2315c5f 100644 --- a/include/emongo.hrl +++ b/include/emongo.hrl @@ -0,0 +1,12 @@ +-record(pool, {id, host, port, database, size=1, conn_pids=[], req_id=1}). +-record(header, {messageLength, requestID, responseTo, opCode}). +-record(response, {header, responseFlag, cursorID, startingFrom, numberReturned, documents}). + +-define(OP_REPLY, 1). +-define(OP_MSG, 1000). +-define(OP_UPDATE, 2001). +-define(OP_INSERT, 2002). +-define(OP_QUERY, 2004). +-define(OP_GET_MORE, 2005). +-define(OP_DELETE, 2006). +-define(OP_KILL_CURSORS, 2007). \ No newline at end of file diff --git a/priv/example.config b/priv/example.config index e69de29..c5d41e5 100644 --- a/priv/example.config +++ b/priv/example.config @@ -0,0 +1,10 @@ +[{emongo, [ + {pools, [ + {test1, [ + {size, 1}, + {host, "localhost"}, + {port, 27017}, + {database, "testdatabase"} + ]} + ]} +]}]. \ No newline at end of file diff --git a/src/emongo.erl b/src/emongo.erl index cfd8817..fb5f444 100644 --- a/src/emongo.erl +++ b/src/emongo.erl @@ -26,7 +26,9 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([find/2, find_one/2, insert/2, save/2, remove/2]). +-export([pools/0, add_pool/5, insert/3]). + +-include("emongo.hrl"). %%==================================================================== %% API @@ -38,6 +40,12 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +pools() -> + gen_server:call(?MODULE, pools, infinity). + +add_pool(PoolId, Host, Port, Database, Size) -> + gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity). + %%show_dbs() -> ok. %%show_collections(Database) -> ok. @@ -48,26 +56,26 @@ start_link() -> %%use_db(PoolId) -> ok. -find(PoolId, {obj, Props}) -> - gen_server:call(?MODULE, {PoolId, {find, {obj, Props}}}, infinity). +%find(PoolId, Collection, {obj, _}=Obj) -> -find_one(PoolId, {obj, Props}) -> ok. +%find_one(PoolId, Collection, {obj, _}=Obj) -> -insert(PoolId, {obj, Props}) -> ok. +insert(PoolId, Collection, {obj, _}=Obj) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Obj), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). %%update -save(PoolId, {obj, Props}) -> ok. +%save(PoolId, {obj, Props}) -> ok. -remove(PoolId, {obj, Props}) -> ok. +%remove(PoolId, {obj, Props}) -> ok. %%ensure_index %%count -%%drop_collection - -%%clear +%drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) -> %%==================================================================== %% gen_server callbacks @@ -81,7 +89,7 @@ remove(PoolId, {obj, Props}) -> ok. %% Description: Initiates the server %%-------------------------------------------------------------------- init(_) -> - {ok, []}. + {ok, initialize_pools()}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -92,13 +100,33 @@ init(_) -> %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call({PoolId, {find, {obj, Props}}}, _From, Pools) -> - Pool = proplists:get_value(PoolId, Pools), - - {reply, ok, Pools}; +handle_call(pools, _From, Pools) -> + {reply, Pools, Pools}; +handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, Pools) -> + {Result, Pools1} = + case proplists:is_defined(PoolId, Pools) of + true -> + {{error, pool_already_exists}, Pools}; + false -> + Pool = #pool{ + id=PoolId, + host=Host, + port=Port, + database=Database, + size=Size + }, + Pool1 = open_connections(Pool), + {ok, [{PoolId, Pool1}|Pools]} + end, + {reply, Result, Pools1}; - +handle_call({pid, PoolId}, _From, Pools) -> + Pool = proplists:get_value(PoolId, Pools), + Pool1 = Pool#pool{req_id = ((Pool#pool.req_id)+1)}, + OtherPools = proplists:delete(PoolId, Pools), + Pid = get_conn_pid(Pool), + {reply, {Pid, Pool}, [Pool1|OtherPools]}; handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}. @@ -140,6 +168,27 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- +initialize_pools() -> + case application:get_env(emongo, pools) of + undefined -> + []; + {ok, Pools} -> + [begin + Pool = #pool{ + id = PoolId, + size = proplists:get_value(size, Props, 1), + host = proplists:get_value(host, Props, "localhost"), + port = proplists:get_value(port, Props, 27017), + database = proplists:get_value(database, Props, "test") + }, + {PoolId, open_connections(Pool)} + end || {PoolId, Props} <- Pools] + end. + +open_connections(Pool) -> + ConnPids = [begin + emongo_conn:start_link(Pool#pool.host, Pool#pool.port) + end || _ <- lists:seq(1, Pool#pool.size)], + Pool#pool{conn_pids=ConnPids}. - - +get_conn_pid(#pool{conn_pids=[Pid|_]}) -> Pid. \ No newline at end of file diff --git a/src/emongo_app.erl b/src/emongo_app.erl index 989fb27..73b6b8e 100644 --- a/src/emongo_app.erl +++ b/src/emongo_app.erl @@ -32,7 +32,7 @@ stop(_) -> ok. init(_) -> {ok, {{one_for_one, 10, 10}, [ - %{ex_scheduler, {ex_scheduler, start_link, []}, permanent, 5000, worker, [ex_scheduler]} + {emongo, {emongo, start_link, []}, permanent, 5000, worker, [emongo]} ]}}. build_rel() -> diff --git a/src/emongo_conn.erl b/src/emongo_conn.erl new file mode 100644 index 0000000..f87be7b --- /dev/null +++ b/src/emongo_conn.erl @@ -0,0 +1,76 @@ +%% Copyright (c) 2009 Jacob Vorreuter +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without +%% restriction, including without limitation the rights to use, +%% copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the +%% Software is furnished to do so, subject to the following +%% conditions: +%% +%% The above copyright notice and this permission notice shall be +%% included in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +%% OTHER DEALINGS IN THE SOFTWARE. +-module(emongo_conn). + +-export([start_link/2, init/3, send/3, send_recv/3]). + +-record(request, {req_id, requestor}). +-record(state, {socket, requests}). + +-include("emongo.hrl"). + +start_link(Host, Port) -> + proc_lib:start_link(?MODULE, init, [Host, Port, self()]). + +init(Host, Port, Parent) -> + Socket = open_socket(Host, Port), + proc_lib:init_ack(Parent, self()), + loop(#state{socket=Socket, requests=[]}). + +send(Pid, ReqID, Packet) -> + gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}). + +send_recv(Pid, ReqID, Packet) -> + gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}). + +loop(State) -> + receive + {'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} -> + gen_tcp:send(State#state.socket, Packet), + gen:reply({From, Mref}, ok), + loop(State); + {'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} -> + gen_tcp:send(State#state.socket, Packet), + gen:reply({From, Mref}, ok), + Request = #request{req_id=ReqID, requestor={From, Mref}}, + State1 = State#state{requests=[Request|State#state.requests]}, + loop(State1); + {tcp, _Sock, Data} -> + Resp = emongo_packet:decode_response(Data), + ResponseTo = (Resp#response.header)#header.responseTo, + case proplists:get_value(ResponseTo, State#state.requests) of + undefined -> + ok; + Requestor -> + gen:reply(Requestor, Resp) + end, + loop(State) + end. + +open_socket(Host, Port) -> + case gen_tcp:connect(Host, Port, [binary, {active, true}]) of + {ok, Sock} -> + Sock; + {error, Reason} -> + exit({failed_to_open_socket, Reason}) + end. \ No newline at end of file diff --git a/src/emongo_packet.erl b/src/emongo_packet.erl new file mode 100644 index 0000000..dd914a4 --- /dev/null +++ b/src/emongo_packet.erl @@ -0,0 +1,49 @@ +%% Copyright (c) 2009 Jacob Vorreuter +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without +%% restriction, including without limitation the rights to use, +%% copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the +%% Software is furnished to do so, subject to the following +%% conditions: +%% +%% The above copyright notice and this permission notice shall be +%% included in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +%% OTHER DEALINGS IN THE SOFTWARE. +-module(emongo_packet). + +-export([insert/4, decode_response/1]). + +-include("emongo.hrl"). + +insert(Database, Collection, ReqID, {obj, Props}) -> + FullName = mongodb_bson:encode_cstring(lists:concat([Database, ".", Collection])), + EncodedDocument = mongodb_bson:encode({obj, Props}), + Message = <<0:32, FullName/binary, EncodedDocument/binary>>, + Length = byte_size(Message), + <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_INSERT:32/little-signed, Message/binary>>. + +decode_response(<>) -> + <> = Message, + #response{ + header = {header, Length, ReqID, RespTo, Op}, + responseFlag = RespFlag, + cursorID = CursorID, + startingFrom = StartingFrom, + numberReturned = NumRet, + documents = mongodb_bson:decode(Documents) + }. \ No newline at end of file diff --git a/t/001-load.t b/t/001-load.t index b2b1b2d..b15bf5a 100644 --- a/t/001-load.t +++ b/t/001-load.t @@ -1,10 +1,12 @@ #!/usr/local/bin/escript %% -*- erlang -*- -%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell +%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example main(_) -> etap:plan(unknown), error_logger:tty(false), etap_application:start_ok(emongo, "application 'emongo' started ok"), + etap:is(length(emongo:pools()), 1, "one pool exists in state"), + etap:end_tests(). \ No newline at end of file