Browse files

[debug] stdlib: Added connection monitor to socketpool.

  • Loading branch information...
1 parent 8940fc8 commit 16c38a3226e99eed21c15872c0e09320dd13d765 @nrs135 nrs135 committed Mar 14, 2012
Showing with 37 additions and 16 deletions.
  1. +36 −15 stdlib/apis/mongo/socketpool.opa
  2. +1 −1 stdlib/apis/mongo/view.opa
View
51 stdlib/apis/mongo/socketpool.opa
@@ -30,6 +30,8 @@ type SocketPool.result = outcome((bool,Socket.connection),Mongo.failure)
log: bool;
queue: Queue.t(continuation(SocketPool.result));
slaveok: bool;
+ open_connections: intset;
+ monitor_connections: bool;
}
@private type SocketPool.msg =
@@ -50,13 +52,27 @@ SocketPool = {{
@private ML = MongoLog
@private
- Socket_close(log)(conn) =
- do if log then ML.debug("SocketPool.handler","close {Socket.conn_id(conn)}",void)
- Socket.close(conn)
+ Socket_close(conn, state) =
+ conn_id = Socket.conn_id(conn)
+ do if state.log then ML.debug("SocketPool.handler","close {conn_id}",void)
+ do Socket.close(conn)
+ {state with open_connections=IntSet.remove(conn_id,state.open_connections)}
+
+ @private montor(from, state) =
+ if state.monitor_connections
+ then
+ do ML.debug("SocketPool.handler({from})",
+ "open={List.list_to_string(Int.to_string,IntSet.To.list(state.open_connections))}",void)
+ do ML.debug("SocketPool.handler({from})",
+ "sockets={List.list_to_string(Int.to_string,List.map(Socket.conn_id,state.sockets))}",void)
+ do ML.debug("SocketPool.handler({from})",
+ "allocated={List.list_to_string(Int.to_string,List.map(Socket.conn_id,state.allocated))}",void)
+ void
@private pool_handler(state:SocketPool.state, msg:SocketPool.msg): Session.instruction(SocketPool.state) =
match msg with
| {release=connection} ->
+ do monitor("release", state)
conn_id = Socket.conn_id(connection)
if List.exists((c -> Socket.conn_id(c) == conn_id),state.allocated)
then
@@ -73,9 +89,10 @@ SocketPool = {{
{set={state with ~queue}})
else
do if state.log then ML.debug("SocketPool.handler","drop socket {Socket.conn_id(connection)}",void)
- do Socket_close(state.log)(connection)
- {unchanged}
+ state = Socket_close(connection, state)
+ {set=state}
| {get=k} ->
+ do monitor("get", state)
(match state.sockets with
| [conn|sockets] ->
do if state.log then ML.debug("SocketPool.handler","reuse open socket {Socket.conn_id(conn)}",void)
@@ -90,6 +107,7 @@ SocketPool = {{
else
(match Socket.connect_with_err_cont(state.host.f1,state.host.f2) with
| {success=conn} ->
+ state = {state with open_connections=IntSet.add(Socket.conn_id(conn),state.open_connections)}
do if state.log then ML.debug("SocketPool.handler","successfully opened socket {Socket.conn_id(conn)}",void)
do Continuation.return(k, {success=(state.slaveok,conn)})
allocated = conn +> state.allocated
@@ -99,9 +117,10 @@ SocketPool = {{
do Continuation.return(k, {failure={Error="Got exception {str}"}})
{unchanged}))
| {reconnect=host} ->
+ do monitor("reconnect", state)
do if state.log then ML.debug("SocketPool.handler","reconnect({host.f1}.{host.f2})",void)
- do List.iter(Socket_close(state.log),state.sockets)
- do List.iter(Socket_close(state.log),state.allocated)
+ state = List.fold(Socket_close,state.sockets,state)
+ state = List.fold(Socket_close,state.allocated,state)
{set={state with ~host; cnt=0; sockets=[]; allocated=[]}}
| {gethost=k} ->
do Continuation.return(k, state.host)
@@ -113,18 +132,20 @@ SocketPool = {{
{unchanged}
| {close} ->
do if state.log then ML.debug("SocketPool.handler","close socket pool",void)
- do List.iter(Socket_close(state.log),state.sockets)
- do List.iter(Socket_close(state.log),state.allocated)
- {unchanged}
+ state = List.fold(Socket_close,state.sockets,state)
+ state = List.fold(Socket_close,state.allocated,state)
+ {set=state}
| {stop} ->
+ do monitor("stop", state)
do if state.log then ML.debug("SocketPool.handler","stop socket pool",void)
- do List.iter(Socket_close(state.log),state.sockets)
- do List.iter(Socket_close(state.log),state.allocated)
+ state = List.fold(Socket_close,state.sockets,state)
+ _state = List.fold(Socket_close,state.allocated,state)
{stop}
- make(host:Mongo.mongo_host, max:int, log:bool): SocketPool.t =
- Session.make(({~host; ~max; ~log; cnt=0; sockets=[]; allocated=[]; queue=Queue.empty;
- slaveok=false;}:SocketPool.state),
+ make(host:Mongo.mongo_host, max:int, _log:bool): SocketPool.t =
+ do ML.debug("SocketPool.make","{host}",void)
+ Session.make(({~host; ~max; log=true; cnt=0; sockets=[]; allocated=[]; queue=Queue.empty;
+ slaveok=false; open_connections=IntSet.empty; monitor_connections=false}:SocketPool.state),
pool_handler)
get(pool:SocketPool.t) : SocketPool.result =
View
2 stdlib/apis/mongo/view.opa
@@ -98,7 +98,7 @@ MongoView = {{
then ML.fatal("View.type_from_fields","Fields failed to validate",-1)
else
tst =
- match List.unique_list_of(List.map((e -> Bson.int_of_value(e.value)),fields)) with
+ match List.unique_list_of(List.map((e -> Bson.int_of_value(e.value)),List.filter((e -> e.name != "_id"),fields))) with
| [{some=num}] -> (match num with 0 -> not | _ -> (tf -> tf))
| _ -> ML.fatal("View.type_from_fields","Bad fields value {fields}",-1)
dfields = List.map((e -> String.explode(".",e.name)),fields)

0 comments on commit 16c38a3

Please sign in to comment.