Skip to content

Commit

Permalink
Merge remote-tracking branch 'mweibel/ssl-support'
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Apr 30, 2012
2 parents c938630 + ca60a3d commit bc41adb
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 41 deletions.
62 changes: 59 additions & 3 deletions src/mongo.erl
Expand Up @@ -5,8 +5,10 @@

-export_type ([host/0, connection/0]).
-export ([connect/1, connect/2, disconnect/1, connect_factory/1, connect_factory/2]).
-export([ssl_connect/1, ssl_connect/2, ssl_connect/3, ssl_connect_factory/1, ssl_connect_factory/2, ssl_connect_factory/3]).
-export_type ([replset/0, rs_connection/0]).
-export ([rs_connect/1, rs_connect/2, rs_disconnect/1, rs_connect_factory/1, rs_connect_factory/2]).
-export ([ssl_rs_connect/1, ssl_rs_connect/2, ssl_rs_connect/3, ssl_rs_connect_factory/1, ssl_rs_connect_factory/2, ssl_rs_connect_factory/3]).

-export_type ([action/1, db/0, write_mode/0, read_mode/0, failure/0]).
-export ([do/5, this_db/0]).
Expand Down Expand Up @@ -67,6 +69,7 @@ connect_factory (Host) -> connect_factory (Host, infinity).
%@doc Factory for use with a connection pool. See resource_pool module.
connect_factory (Host, TimeoutMS) -> {Host, fun (H) -> connect (H, TimeoutMS) end, fun disconnect/1, fun mongo_connect:is_closed/1}.


% Replica Set %

-type replset() :: mongo_replset:replset().
Expand All @@ -92,6 +95,59 @@ rs_connect_factory (ReplSet) -> rs_connect_factory (ReplSet, infinity).
%@doc Factory for use with a rs_connection pool. See resource_pool module.
rs_connect_factory (Replset, TimeoutMS) -> {Replset, fun (RS) -> RC = rs_connect (RS, TimeoutMS), {ok, RC} end, fun rs_disconnect/1, fun mongo_replset:is_closed/1}.

% SSL %
-spec ssl_connect(host()) -> {ok, connection()} | {error, reason()}.
%@doc Connect to given MongoDB server using SSL.
ssl_connect(Host) -> ssl_connect(Host, infinity, []).

-spec ssl_connect(host(), timeout()) -> {ok, connection()} | {error, reason()}.
%@doc Connect to given MongoDB server using SSL.
ssl_connect(Host, TimeoutMS) -> ssl_connect(Host, TimeoutMS, []).

-spec ssl_connect(host(), timeout(), ssl:ssloption()) -> {ok, connection()} | {error, reason()}.
%@doc Connect to given MongoDB server with SSL support.
ssl_connect(Host, TimeoutMS, Options) -> mongo_connect:ssl_connect(Host, TimeoutMS, Options).


-spec ssl_connect_factory (host()) -> resource_pool:factory(connection()).
%@doc Factory for use with a connection pool. See resource_pool module.
ssl_connect_factory (Host) -> ssl_connect_factory (Host, infinity, []).

-spec ssl_connect_factory (host(), timeout()) -> resource_pool:factory(connection()).
%@doc Factory for use with a connection pool. See resource_pool module.
ssl_connect_factory (Host, TimeoutMS) -> ssl_connect_factory (Host, TimeoutMS, []).

-spec ssl_connect_factory (host(), timeout(), ssl:ssloption()) -> resource_pool:factory(connection()).
%@doc Factory for use with a connection pool. See resource_pool module.
ssl_connect_factory (Host, TimeoutMS, Options) -> {Host, fun (H) -> ssl_connect (H, TimeoutMS, Options) end, fun disconnect/1, fun mongo_connect:is_closed/1}.


-spec ssl_rs_connect (replset()) -> rs_connection(). % IO
%@doc Create new cache of connections over SSL to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called.
ssl_rs_connect (Replset) -> mongo_replset:ssl_connect (Replset).

-spec ssl_rs_connect (replset(), timeout()) -> rs_connection(). % IO
%@doc Create new cache of connections over SSL to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called. Timeout used for initial connection and every query and safe write.
ssl_rs_connect (Replset, TimeoutMS) -> mongo_replset:ssl_connect (Replset, TimeoutMS).

-spec ssl_rs_connect (replset(), timeout(), ssl:ssloption()) -> rs_connection(). % IO
%@doc Create new cache of connections over SSL to replica set members starting with seed members. No connection attempted until rs_primary or rs_secondary_ok called. Timeout used for initial connection and every query and safe write.
ssl_rs_connect (Replset, TimeoutMS, Options) -> mongo_replset:ssl_connect (Replset, TimeoutMS, Options).


-spec ssl_rs_connect_factory (replset()) -> resource_pool:factory(rs_connection()).
%@doc Factory for use with a rs_connection pool over SSL. See resource_pool module.
ssl_rs_connect_factory (ReplSet) -> ssl_rs_connect_factory (ReplSet, infinity, []).

-spec ssl_rs_connect_factory (replset(), timeout()) -> resource_pool:factory(rs_connection()).
%@doc Factory for use with a rs_connection pool over SSL. See resource_pool module.
ssl_rs_connect_factory (ReplSet, TimeoutMS) -> ssl_rs_connect_factory (ReplSet, TimeoutMS, []).

-spec ssl_rs_connect_factory (replset(), timeout(), ssl:ssloption()) -> resource_pool:factory(rs_connection()).
%@doc Factory for use with a rs_connection pool over SSL. See resource_pool module.
ssl_rs_connect_factory (Replset, TimeoutMS, Options) -> {Replset, fun (RS) -> RC = ssl_rs_connect (RS, TimeoutMS, Options), {ok, RC} end, fun rs_disconnect/1, fun mongo_replset:is_closed/1}.


% Action %

-type action(A) :: fun (() -> A).
Expand Down Expand Up @@ -130,9 +186,9 @@ do (WriteMode, ReadMode, Connection, Database, Action) -> case connection_mode (

-spec connection_mode (read_mode(), connection() | rs_connection()) -> {ok, connection()} | {error, reason()}. % IO
%@doc For rs_connection return appropriate primary or secondary connection
connection_mode (_, Conn = {connection, _, _, _}) -> {ok, Conn};
connection_mode (master, RsConn = {rs_connection, _, _, _}) -> mongo_replset:primary (RsConn);
connection_mode (slave_ok, RsConn = {rs_connection, _, _, _}) -> mongo_replset:secondary_ok (RsConn).
connection_mode (_, Conn = {connection, _, _, _, _}) -> {ok, Conn};
connection_mode (master, RsConn = {rs_connection, _, _, _, _}) -> mongo_replset:primary (RsConn);
connection_mode (slave_ok, RsConn = {rs_connection, _, _, _, _}) -> mongo_replset:secondary_ok (RsConn).

-spec this_db () -> db(). % Action
%@doc Current db in context that we are querying
Expand Down
106 changes: 84 additions & 22 deletions src/mongo_connect.erl
Expand Up @@ -4,7 +4,7 @@
-export_type ([host/0, connection/0, dbconnection/0, failure/0]).

-export ([host_port/1, read_host/1, show_host/1]).
-export ([connect/1, connect/2, conn_host/1, close/1, is_closed/1]).
-export ([connect/1, connect/2, ssl_connect/2, ssl_connect/3, conn_host/1, close/1, is_closed/1]).

-export ([call/3, send/2]). % for mongo_query and mongo_cursor

Expand Down Expand Up @@ -37,7 +37,7 @@ read_host (UString) -> case string:tokens (bson:str (UString), ":") of

-type reason() :: any().

-opaque connection() :: {connection, host(), mvar:mvar (gen_tcp:socket()), timeout()}.
-opaque connection() :: {connection, host(), mvar:mvar (gen_tcp:socket()), mvar:mvar(ssl:sslsocket()), timeout()}.
% Thread-safe, TCP connection to a MongoDB server.
% Passive raw binary socket.
% Type not opaque to mongo:connection_mode/2
Expand All @@ -49,50 +49,94 @@ connect (Host) -> connect (Host, infinity).
-spec connect (host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO
%@doc Create connection to given MongoDB server or return reason for connection failure. Timeout is used for initial connection and every call.
connect (Host, TimeoutMS) -> try mvar:create (fun () -> tcp_connect (host_port (Host), TimeoutMS) end, fun gen_tcp:close/1)
of VSocket -> {ok, {connection, host_port (Host), VSocket, TimeoutMS}}
of VSocket -> {ok, {connection, host_port (Host), VSocket, false, TimeoutMS}}
catch Reason -> {error, Reason} end.

ssl_connect(Host, Options) -> ssl_connect(Host, infinity, Options).

ssl_connect(Host, TimeoutMS, Options) -> try mvar:create(fun() -> tcp_ssl_connect(host_port(Host), TimeoutMS, Options) end, fun ssl:close/1)
of SslSocket -> {ok, {connection, host_port(Host), false, SslSocket, TimeoutMS}}
catch Reason -> {error, Reason} end.

-spec conn_host (connection()) -> host().
%@doc Host this is connected to
conn_host ({connection, Host, _VSocket, _}) -> Host.
conn_host ({connection, Host, _VSocket, _SslSocket, _}) -> Host.

-spec close (connection()) -> ok. % IO
%@doc Close connection.
close ({connection, _Host, VSocket, _}) -> mvar:terminate (VSocket).
close ({connection, _Host, VSocket, SslSocket, _}) ->
case VSocket =/= false of
true ->
mvar:terminate(VSocket);
_ ->
mvar:terminate(SslSocket)
end.

-spec is_closed (connection()) -> boolean(). % IO
%@doc Has connection been closed?
is_closed ({connection, _, VSocket, _}) -> mvar:is_terminated (VSocket).
is_closed ({connection, _, VSocket, SslSocket, _}) ->
case VSocket =/= false of
true ->
mvar:is_terminated (VSocket);
_ ->
mvar:is_terminated(SslSocket)
end.

-type dbconnection() :: {mongo_protocol:db(), connection()}.

-type failure() :: {connection_failure, connection(), reason()}.

-spec call (dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure()
%@doc Synchronous send and reply. Notices are sent right before request in single block. Exclusive access to connection during entire call.
call ({Db, Conn = {connection, _Host, VSocket, TimeoutMS}}, Notices, Request) ->
call ({Db, Conn = {connection, _Host, VSocket, SslSocket, TimeoutMS}}, Notices, Request) ->
{MessagesBin, RequestId} = messages_binary (Db, Notices ++ [Request]),
Call = fun (Socket) ->
tcp_send (Socket, MessagesBin),
<<?get_int32 (N)>> = tcp_recv (Socket, 4, TimeoutMS),
tcp_recv (Socket, N-4, TimeoutMS) end,
try mvar:with (VSocket, Call) of
ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply % ^ ResponseTo must match RequestId
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end.
SslCall = fun (Socket) ->
tcp_ssl_send (Socket, MessagesBin),
<<?get_int32 (N)>> = tcp_ssl_recv (Socket, 4, TimeoutMS),
tcp_ssl_recv (Socket, N-4, TimeoutMS) end,

case VSocket =/= false of
true ->
try mvar:with (VSocket, Call) of
ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply % ^ ResponseTo must match RequestId
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end;
_ ->
try mvar:with (SslSocket, SslCall) of
ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply % ^ ResponseTo must match RequestId
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end
end.

-spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure()
%@doc Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send.
send ({Db, Conn = {connection, _Host, VSocket, _}}, Notices) ->
send ({Db, Conn = {connection, _Host, VSocket, SslSocket, _}}, Notices) ->
{NoticesBin, _} = messages_binary (Db, Notices),
Send = fun (Socket) -> tcp_send (Socket, NoticesBin) end,
try mvar:with (VSocket, Send)
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end.
SslSend = fun(Socket) -> tcp_ssl_send(Socket, NoticesBin) end,

case VSocket =/= false of
true ->
try mvar:with (VSocket, Send)
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end;
_ ->
try mvar:with (SslSocket, SslSend)
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end
end.

-spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}.
%@doc Binary representation of messages
Expand All @@ -105,14 +149,32 @@ messages_binary (Db, Messages) ->

% Util %

tcp_connect ({Hostname, Port}, TimeoutMS) -> case gen_tcp:connect (Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of
{ok, Socket} -> Socket;
{error, Reason} -> throw (Reason) end.
tcp_connect ({Hostname, Port}, TimeoutMS) ->
case gen_tcp:connect (Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of
{ok, Socket} -> Socket;
{error, Reason} -> throw (Reason)
end.

tcp_ssl_connect ({Hostname, Port}, TimeoutMS, Options) ->
SslOptions =[Options | [binary, {active, false}, {packet, 0}]],
ssl:start(),
case ssl:connect(Hostname, Port, SslOptions, TimeoutMS) of
{ok, SslSocket} -> SslSocket;
{error, Reason} -> throw (Reason)
end.

tcp_send (Socket, Binary) -> case gen_tcp:send (Socket, Binary) of
ok -> ok;
{error, Reason} -> throw (Reason) end.

tcp_ssl_send (SslSocket, Binary) -> case ssl:send(SslSocket, Binary) of
ok -> ok;
{error, Reason} -> throw (Reason) end.

tcp_recv (Socket, N, TimeoutMS) -> case gen_tcp:recv (Socket, N, TimeoutMS) of
{ok, Binary} -> Binary;
{error, Reason} -> throw (Reason) end.

tcp_ssl_recv(SslSocket, N, TimeoutMS) -> case ssl:recv(SslSocket, N, TimeoutMS) of
{ok, Binary} -> Binary;
{error, Reason} -> throw(Reason) end.
56 changes: 40 additions & 16 deletions src/mongo_replset.erl
Expand Up @@ -2,7 +2,7 @@
-module (mongo_replset).

-export_type ([replset/0, rs_connection/0]).
-export ([connect/1, connect/2, primary/1, secondary_ok/1, close/1, is_closed/1]). % API
-export ([connect/1, connect/2, ssl_connect/1, ssl_connect/2, ssl_connect/3, primary/1, secondary_ok/1, close/1, is_closed/1]). % API

-type maybe(A) :: {A} | {}.
-type err_or(A) :: {ok, A} | {error, reason()}.
Expand Down Expand Up @@ -41,11 +41,25 @@ connect (ReplSet) -> connect (ReplSet, infinity).
%@doc Create new cache of connections to replica set members starting with seed members. No connection attempted until primary or secondary_ok called. Timeout used for initial connection and every call.
connect ({ReplName, Hosts}, TimeoutMS) ->
Dict = dict:from_list (lists:map (fun (Host) -> {mongo_connect:host_port (Host), {}} end, Hosts)),
{rs_connection, ReplName, mvar:new (Dict), TimeoutMS}.
{rs_connection, ReplName, mvar:new (Dict), TimeoutMS, false}.

-spec ssl_connect(replset()) -> rs_connection().
%@doc Create new cache of connections with SSL support.
ssl_connect(ReplSet) -> ssl_connect(ReplSet, infinity, []).

-spec ssl_connect(replset(), timeout()) -> rs_connection().
%@doc Create new cache of connections with SSL support.
ssl_connect(ReplSet, TimeoutMS) -> ssl_connect(ReplSet, TimeoutMS, []).

-spec ssl_connect(replset(), timeout(), ssl:ssloption()) -> rs_connection().
%@doc Create new cache of connections with SSL support.
ssl_connect({ReplName, Hosts}, TimeoutMS, SslOptions) ->
Dict = dict:from_list (lists:map (fun (Host) -> {mongo_connect:host_port (Host), {}} end, Hosts)),
{rs_connection, ReplName, mvar:new (Dict), TimeoutMS, SslOptions}.

%% Note(superbobry): having an 'opaque' type here causes 'dialyzer' to
%% crash (bug reported).
-type rs_connection() :: {rs_connection, rs_name(), mvar:mvar(connections()), timeout()}.
-type rs_connection() :: {rs_connection, rs_name(), mvar:mvar(connections()), timeout(), maybe(list())}.
% Maintains set of connections to some if not all of the replica set members. Opaque except to mongo:connect_mode
% Type not opaque to mongo:connection_mode/2
-type connections() :: dict:dictionary (host(), maybe(connection())).
Expand All @@ -71,14 +85,14 @@ secondary_ok (ReplConn) -> try

-spec close (rs_connection()) -> ok. % IO
%@doc Close replset connection
close ({rs_connection, _, VConns, _}) ->
close ({rs_connection, _, VConns, _, _}) ->
CloseConn = fun (_, MCon, _) -> case MCon of {Con} -> mongo_connect:close (Con); {} -> ok end end,
mvar:with (VConns, fun (Dict) -> dict:fold (CloseConn, ok, Dict) end),
mvar:terminate (VConns).

-spec is_closed (rs_connection()) -> boolean(). % IO
%@doc Has replset connection been closed?
is_closed ({rs_connection, _, VConns, _}) -> mvar:is_terminated (VConns).
is_closed ({rs_connection, _, VConns, _, _}) -> mvar:is_terminated (VConns).

% EIO = IO that may throw error of any type

Expand Down Expand Up @@ -106,7 +120,7 @@ secondary_ok_conn (ReplConn, Hosts) -> try

-spec fetch_member_info (rs_connection()) -> member_info(). % EIO
%@doc Retrieve isMaster info from a current known member in replica set. Update known list of members from fetched info.
fetch_member_info (ReplConn = {rs_connection, _ReplName, VConns, _}) ->
fetch_member_info (ReplConn = {rs_connection, _ReplName, VConns, _, _}) ->
OldHosts_ = dict:fetch_keys (mvar:read (VConns)),
{Conn, Info} = until_success (OldHosts_, fun (Host) -> connect_member (ReplConn, Host) end),
OldHosts = sets:from_list (OldHosts_),
Expand All @@ -133,27 +147,37 @@ remove_host (Host, Dict) ->

-spec connect_member (rs_connection(), host()) -> member_info(). % EIO
%@doc Connect to host and verify membership. Cache connection in rs_connection
connect_member ({rs_connection, ReplName, VConns, TimeoutMS}, Host) ->
Conn = get_connection (VConns, Host, TimeoutMS),
connect_member ({rs_connection, ReplName, VConns, TimeoutMS, SslOptions}, Host) ->
Conn = get_connection (VConns, Host, TimeoutMS, SslOptions),
Info = try get_member_info (Conn) catch _ ->
mongo_connect:close (Conn),
Conn1 = get_connection (VConns, Host, TimeoutMS),
Conn1 = get_connection (VConns, Host, TimeoutMS, SslOptions),
get_member_info (Conn1) end,
case bson:at (setName, Info) of
ReplName -> {Conn, Info};
_ ->
mongo_connect:close (Conn),
throw ({not_member, ReplName, Host, Info}) end.

get_connection (VConns, Host, TimeoutMS) -> mvar:modify (VConns, fun (Dict) ->
get_connection (VConns, Host, TimeoutMS, SslOptions) -> mvar:modify (VConns, fun (Dict) ->
case dict:find (Host, Dict) of
{ok, {Conn}} -> case mongo_connect:is_closed (Conn) of
false -> {Dict, Conn};
true -> new_connection (Dict, Host, TimeoutMS) end;
_ -> new_connection (Dict, Host, TimeoutMS) end end).

new_connection (Dict, Host, TimeoutMS) -> case mongo_connect:connect (Host, TimeoutMS) of
{ok, Conn} -> {dict:store (Host, {Conn}, Dict), Conn};
{error, Reason} -> throw ({cant_connect, Reason}) end.
true -> new_connection (Dict, Host, TimeoutMS, SslOptions) end;
_ -> new_connection (Dict, Host, TimeoutMS, SslOptions) end end).

new_connection (Dict, Host, TimeoutMS, SslOptions) ->
case SslOptions of
false ->
case mongo_connect:connect (Host, TimeoutMS) of
{ok, Conn} -> {dict:store (Host, {Conn}, Dict), Conn};
{error, Reason} -> throw ({cant_connect, Reason})
end;
_ ->
case mongo_connect:ssl_connect(Host, TimeoutMS, SslOptions) of
{ok, Conn} -> {dict:store (Host, {Conn}, Dict), Conn};
{error, Reason} -> throw ({cant_connect, Reason})
end
end.

get_member_info (Conn) -> mongo_query:command ({admin, Conn}, {isMaster, 1}, true).

0 comments on commit bc41adb

Please sign in to comment.