Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
mongo_connect & mongo_cursor now use mvar instead of gen_server directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony Hannan committed Dec 2, 2010
1 parent 763fec8 commit 1173c99
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 113 deletions.
4 changes: 2 additions & 2 deletions ebin/mongodb.app
Original file line number Original file line Diff line number Diff line change
@@ -1,8 +1,8 @@
{application, mongodb, {application, mongodb,
[{description, "Client interface to MongoDB, also known as the driver. See www.mongodb.org"}, [{description, "Client interface to MongoDB, also known as the driver. See www.mongodb.org"},
{vsn, "0.0"}, {vsn, "0.0"},
{modules, [mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mongodb_tests]}, {modules, [mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mvar, mongodb_tests]},
{registered, [oid_counter, oid_machineprocid, requestid_counter]}, {registered, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{mod, {mongodb_app, []}} {mod, {mongodb_app, []}}
]}. ]}.
91 changes: 34 additions & 57 deletions src/mongo_connect.erl
Original file line number Original file line Diff line number Diff line change
@@ -1,53 +1,60 @@
% Thread-safe TCP connection to a MongoDB server with synchronous call and asynchronous send interface. % Thread-safe TCP connection to a MongoDB server with synchronous call and asynchronous send interface.
-module(mongo_connect). -module (mongo_connect).


-export_type ([host/0, connection/0, failure/0, dbconnection/0]). -export_type ([host/0, connection/0, dbconnection/0, failure/0]).


-export([connect/1, close/1]). % API -export([connect/1, close/1]). % API
-export ([call/3, send/2]). % for mongo_query and mongo_cursor -export ([call/3, send/2]). % for mongo_query and mongo_cursor


-behaviour (gen_server).
-export ([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-include_lib ("bson/include/bson_binary.hrl"). -include_lib ("bson/include/bson_binary.hrl").


-type host() :: {inet:hostname() | inet:ip_address(), 0..65535}. -type host() :: {inet:hostname() | inet:ip_address(), 0..65535}.
% Address and port where address may be ip address or its domain name % Address and port where address may be ip address or its domain name

-type server(_) :: pid() | atom(). % Pid or register process name with parameterized state
-type reason() :: any(). -type reason() :: any().


-opaque connection() :: server (gen_tcp:socket()). -opaque connection() :: mvar:mvar (gen_tcp:socket()).
% Thread-safe, TCP connection to a MongoDB server. % Thread-safe, TCP connection to a MongoDB server.
% Passive raw binary socket. % Passive raw binary socket.


-type failure() :: {connection_failure, connection(), reason()}.

-type dbconnection() :: {mongo_protocol:db(), connection()}.

-spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO -spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO
% Create connection to given MongoDB server or return reason for connection failure. % Create connection to given MongoDB server or return reason for connection failure.
connect (Host) -> gen_server:start_link (?MODULE, Host, []). connect ({Addr, Port}) ->
Init = fun () -> case gen_tcp:connect (Addr, Port, [binary, {active, false}, {packet, 0}]) of
{ok, Socket} -> Socket;
{error, Reason} -> throw (Reason) end end,
try mvar:create (Init, fun gen_tcp:close/1)
of Var -> {ok, Var}
catch Reason -> {error, Reason} end.


-spec close (connection()) -> ok. % IO -spec close (connection()) -> ok. % IO
% Close connection. % Close connection.
close (Conn) -> gen_server:cast (Conn, stop). close (Conn) -> mvar:terminate (Conn).

-type dbconnection() :: {mongo_protocol:db(), connection()}.

-type failure() :: {connection_failure, connection(), reason()}.


-spec call (dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure() -spec call (dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure()
% Synchronous send and reply. Notices are sent right before request in single block. Exclusive access to connection during entire call. % Synchronous send and reply. Notices are sent right before request in single block. Exclusive access to connection during entire call.
call ({Db, Conn}, Notices, Request) -> call ({Db, Conn}, Notices, Request) ->
{MessagesBin, RequestId} = messages_binary (Db, Notices ++ [Request]), {MessagesBin, RequestId} = messages_binary (Db, Notices ++ [Request]),
case gen_server:call (Conn, {call, MessagesBin}) of Call = fun (Socket) ->
{error, Reason} -> throw ({connection_failure, Conn, Reason}); tcp_send (Socket, MessagesBin),
{ok, ReplyBin} -> <<?get_int32 (N)>> = tcp_recv (Socket, 4),
tcp_recv (Socket, N-4) end,
try mvar:with (Conn, Call)
of ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin), {RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply end. % ^ ResponseTo must match RequestId Reply % ^ ResponseTo must match RequestId
catch Reason -> throw ({connection_failure, Conn, Reason}) end.


-spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO -spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure()
% Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send. % Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send.
send ({Db, Conn}, Notices) -> send ({Db, Conn}, Notices) ->
{NoticesBin, _} = messages_binary (Db, Notices), {NoticesBin, _} = messages_binary (Db, Notices),
gen_server:cast (Conn, {send, NoticesBin}). Send = fun (Socket) -> tcp_send (Socket, NoticesBin) end,
try mvar:with (Conn, Send)
catch Reason -> throw ({connection_failure, Conn, Reason}) end.


-spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}. -spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}.
% Binary representation of messages % Binary representation of messages
Expand All @@ -58,42 +65,12 @@ messages_binary (Db, Messages) ->
{<<Bin /binary, ?put_int32 (byte_size (MBin) + 4), MBin /binary>>, RequestId} end, {<<Bin /binary, ?put_int32 (byte_size (MBin) + 4), MBin /binary>>, RequestId} end,
lists:foldl (Build, {<<>>, 0}, Messages). lists:foldl (Build, {<<>>, 0}, Messages).


% gen_server callbacks % % Util %

-spec init (host()) -> {ok, gen_tcp:socket()} | {stop, reason()}. % IO
% Connect to host or stop with connection failure
init ({Address, Port}) ->
case gen_tcp:connect (Address, Port, [binary, {active, false}, {packet, 0}]) of
{error, Reason} -> {stop, Reason};
{ok, Socket} -> {ok, Socket} end.

-type tag() :: any(). % Unique tag

-spec handle_call ({call, binary()}, {pid(), tag()}, gen_tcp:socket()) -> {reply, {ok, binary()}, gen_tcp:socket()} | {stop, reason(), {err, reason()}, gen_tcp:socket()}. % IO
% Send request and wait and return reply
handle_call ({call, RequestBin}, _From, Socket) ->
case gen_tcp:send (Socket, RequestBin) of
{error, Reason} -> {stop, Reason, {error, Reason}, Socket};
ok -> case gen_tcp:recv (Socket, 4) of
{error, Reason} -> {stop, Reason, {error, Reason}, Socket};
{ok, <<?get_int32 (N)>>} -> case gen_tcp:recv (Socket, N - 4) of
{error, Reason} -> {stop, Reason, {error, Reason}, Socket};
{ok, ReplyBin} -> {reply, {ok, ReplyBin}, Socket} end end end.

-spec handle_cast
({send, binary()}, gen_tcp:socket()) -> {noreply, gen_tcp:socket()} | {stop, reason(), gen_tcp:socket()}; % IO
(stop, gen_tcp:socket()) -> {stop, normal, gen_tcp:socket()}.
% Send notice
handle_cast ({send, NoticeBin}, Socket) ->
case gen_tcp:send (Socket, NoticeBin) of
{error, Reason} -> {stop, Reason, Socket};
ok -> {noreply, Socket} end;
% Close connection
handle_cast (stop, Socket) -> {stop, normal, Socket}.

handle_info (_Info, Socket) -> {noreply, Socket}.


-spec terminate (reason(), gen_tcp:socket()) -> any(). % IO. Result ignored tcp_send (Socket, Binary) -> case gen_tcp:send (Socket, Binary) of
terminate (_Reason, Socket) -> gen_tcp:close (Socket). ok -> ok;
{error, Reason} -> throw (Reason) end.


code_change (_OldVsn, State, _Extra) -> {ok, State}. tcp_recv (Socket, N) -> case gen_tcp:recv (Socket, N) of
{ok, Binary} -> Binary;
{error, Reason} -> throw (Reason) end.
73 changes: 21 additions & 52 deletions src/mongo_cursor.erl
Original file line number Original file line Diff line number Diff line change
@@ -1,70 +1,49 @@
% A cursor references pending query results on a server. % A cursor references pending query results on a server.
% TODO: terminate cursor after idle for 10 minutes. % TODO: terminate cursor after idle for 10 minutes.
-module(mongo_cursor). -module (mongo_cursor).


-export_type ([cursor/0, expired/0, maybe/1]). -export_type ([maybe/1]).
-export_type ([cursor/0, expired/0]).


-export ([next/1, rest/1, close/1]). % API -export ([next/1, rest/1, close/1]). % API
-export ([cursor/4]). % for mongo_query -export ([cursor/4]). % for mongo_query


-behaviour (gen_server).
-export ([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-include ("mongo_protocol.hrl"). -include ("mongo_protocol.hrl").


-type server(_) :: pid() | atom(). % Pid or register process name with parameterized state
-type reason() :: any().

-type state() :: {env(), batch()}.
-type env() :: {mongo_connect:dbconnection(), collection(), batchsize()}.
-type batch() :: {cursorid(), [bson:document()]}.

-type maybe(A) :: {A} | {}. -type maybe(A) :: {A} | {}.


-opaque cursor() :: server (state()). -opaque cursor() :: mvar:mvar (state()).
% Thread-safe cursor, ie. access to query results % Thread-safe cursor, ie. access to query results


-type expired() :: {cursor_expired, cursor()}. -type expired() :: {cursor_expired, cursor()}.


-type state() :: {env(), batch()}.
-type env() :: {mongo_connect:dbconnection(), collection(), batchsize()}.
-type batch() :: {cursorid(), [bson:document()]}.

-spec cursor (mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO -spec cursor (mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO
% Create new cursor from result batch % Create new cursor from result batch
cursor (DbConn, Collection, BatchSize, Batch) -> cursor (DbConn, Collection, BatchSize, Batch) ->
{ok, Cursor} = gen_server:start_link (?MODULE, {{DbConn, Collection, BatchSize}, Batch}, []), mvar:new ({{DbConn, Collection, BatchSize}, Batch}, fun finalize/1).
Cursor.


-spec close (cursor()) -> ok. % IO -spec close (cursor()) -> ok. % IO
% Close cursor. % Close cursor
close (Cursor) -> gen_server:cast (Cursor, stop). close (Cursor) -> mvar:terminate (Cursor).

-spec next (cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure()
% Return next document in query result or nothing if finished.
next (Cursor) -> case gen_server:call (Cursor, next) of
{ok, Result} -> Result;
expired -> throw ({cursor_expired, Cursor});
F = {connection_failure, _, _} -> throw (F) end.


-spec rest (cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure() -spec rest (cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure()
% Return remaining documents in query result % Return remaining documents in query result
rest (Cursor) -> case next (Cursor) of rest (Cursor) -> case next (Cursor) of
{} -> []; {} -> [];
{Doc} -> [Doc | rest (Cursor)] end. {Doc} -> [Doc | rest (Cursor)] end.


% gen_server callbacks % -spec next (cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure()

% Return next document in query result or nothing if finished.
-spec init (state()) -> {ok, state()}. next (Cursor) ->
init (State) -> {ok, State}. Next = fun ({Env, Batch}) ->

{Batch1, MDoc} = xnext (Env, Batch),
-type tag() :: any(). % Unique tag {{Env, Batch1}, MDoc} end,

try mvar:modify (Cursor, Next)
-spec handle_call (next, {pid(), tag()}, gen_tcp:socket()) -> {reply, {ok, maybe (bson:document())}, state()} | {stop, expired, expired, state()} | {stop, connection_failure, mongo_connect:failure(), state()}. % IO catch expired -> throw ({cursor_expired, Cursor}) end.
% Get next document in cursor or return stop on expired or connection failure
handle_call (next, _From, {Env, Batch}) ->
try xnext (Env, Batch) of
{Batch1, MDoc} -> {reply, {ok, MDoc}, {Env, Batch1}}
catch
throw: expired -> {stop, expired, expired, {Env, Batch}};
throw: E = {connection_failure, _, _} -> {stop, connection_failure, E, {Env, Batch}}
end.


-spec xnext (env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure() -spec xnext (env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure()
% Get next document in cursor, fetching next batch from server if necessary % Get next document in cursor, fetching next batch from server if necessary
Expand All @@ -85,18 +64,8 @@ batch_reply (#reply {
CursorNotFound -> throw (expired); CursorNotFound -> throw (expired);
true -> {CursorId, Docs} end. true -> {CursorId, Docs} end.


-spec handle_cast (stop, state()) -> {stop, normal, state()}. -spec finalize (state()) -> ok. % IO. Result ignored
% Close cursor
handle_cast (stop, State) -> {stop, normal, State}.

handle_info (_Info, Socket) -> {noreply, Socket}.

-spec terminate (reason(), state()) -> any(). % IO. Result ignored
% Kill cursor on server if not already % Kill cursor on server if not already
terminate (expired, _) -> ok; finalize ({{DbConn, _, _}, {CursorId, _}}) -> case CursorId of
terminate (connection_failure, _) -> ok;
terminate (_Reason, {{DbConn, _, _}, {CursorId, _}}) -> case CursorId of
0 -> ok; 0 -> ok;
_ -> mongo_connect:send (DbConn, [#killcursor {cursorids = [CursorId]}]) end. _ -> mongo_connect:send (DbConn, [#killcursor {cursorids = [CursorId]}]) end.

code_change (_OldVsn, State, _Extra) -> {ok, State}.
26 changes: 24 additions & 2 deletions src/mongodb_tests.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10,12 +10,33 @@ test() -> eunit:test ({setup,
fun () -> application:start (mongodb), fun () -> application:start (mongodb),
io:format (user, "~n** Make sure mongod is running on 127.0.0.1:27017 **~n~n", []) end, io:format (user, "~n** Make sure mongod is running on 127.0.0.1:27017 **~n~n", []) end,
fun (_) -> application:stop (mongodb) end, fun (_) -> application:stop (mongodb) end,
[fun app_test/0, [fun var_test/0,
fun var_finalize_test/0,
fun app_test/0,
fun connect_test/0, fun connect_test/0,
fun mongo_test/0 fun mongo_test/0
]}). ]}).


% This test must be run first right after application start (assumes vars contain initial value) var_test() ->
Var = mvar:new (0),
0 = mvar:write (Var, 1),
1 = mvar:read (Var),
foo = mvar:modify (Var, fun (N) -> {N+1, foo} end),
2 = mvar:read (Var),
foo = (catch mvar:with (Var, fun (_) -> throw (foo) end)),
mvar:terminate (Var),
{exit, {noproc, _}} = try mvar:read (Var) catch C:E -> {C, E} end.

var_finalize_test() ->
Var0 = mvar:new ({}),
Var = mvar:new (0, fun (N) -> mvar:write (Var0, N) end),
{} = mvar:read (Var0),
0 = mvar:read (Var),
mvar:terminate (Var),
0 = mvar:read (Var0),
mvar:terminate (Var0).

% This test must be run first right after application start (assumes counter table contain initial values)
app_test() -> app_test() ->
1 = mongodb_app:next_requestid(), 1 = mongodb_app:next_requestid(),
UnixSecs = bson:unixtime_to_secs (bson:timenow()), UnixSecs = bson:unixtime_to_secs (bson:timenow()),
Expand All @@ -24,6 +45,7 @@ app_test() ->


% Mongod server must be running on 127.0.0.1:27017 % Mongod server must be running on 127.0.0.1:27017
connect_test() -> connect_test() ->
{error, _} = mongo_connect:connect ({"127.0.0.1", 26555}),
{ok, Conn} = mongo_connect:connect ({"127.0.0.1", 27017}), {ok, Conn} = mongo_connect:connect ({"127.0.0.1", 27017}),
DbConn = {test, Conn}, DbConn = {test, Conn},
Res = mongo_query:write (DbConn, #delete {collection = foo, selector = {}}, {}), Res = mongo_query:write (DbConn, #delete {collection = foo, selector = {}}, {}),
Expand Down

0 comments on commit 1173c99

Please sign in to comment.