@@ -163,8 +163,9 @@ type Greeting struct {
163163
164164// Opts is a way to configure Connection
165165type Opts struct {
166- // Timeout for any particular request. If Timeout is zero request, any
167- // request can be blocked infinitely.
166+ // Timeout for response to a particular request. The timeout is reset when
167+ // push messages are received. If Timeout is zero, any request can be
168+ // blocked infinitely.
168169 // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
169170 Timeout time.Duration
170171 // Timeout between reconnect attempts. If Reconnect is zero, no
@@ -568,8 +569,8 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
568569 requests [pos ].first = nil
569570 requests [pos ].last = & requests [pos ].first
570571 for fut != nil {
571- fut .err = neterr
572- fut . markReady ( conn )
572+ fut .SetError ( neterr )
573+ conn . markDone ( fut )
573574 fut , fut .next = fut .next , nil
574575 }
575576 }
@@ -685,40 +686,61 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
685686 conn .reconnect (err , c )
686687 return
687688 }
688- if fut := conn .fetchFuture (resp .RequestId ); fut != nil {
689- fut .resp = resp
690- fut .markReady (conn )
689+
690+ var fut * Future = nil
691+ if resp .Code == PushCode {
692+ if fut = conn .peekFuture (resp .RequestId ); fut != nil {
693+ fut .AppendPush (resp )
694+ }
691695 } else {
696+ if fut = conn .fetchFuture (resp .RequestId ); fut != nil {
697+ fut .SetResponse (resp )
698+ conn .markDone (fut )
699+ }
700+ }
701+ if fut == nil {
692702 conn .opts .Logger .Report (LogUnexpectedResultId , conn , resp )
693703 }
694704 }
695705}
696706
697707func (conn * Connection ) newFuture (requestCode int32 ) (fut * Future ) {
698- fut = & Future {}
708+ fut = NewFuture ()
699709 if conn .rlimit != nil && conn .opts .RLimitAction == RLimitDrop {
700710 select {
701711 case conn .rlimit <- struct {}{}:
702712 default :
703- fut .err = ClientError {ErrRateLimited , "Request is rate limited on client" }
713+ fut .err = ClientError {
714+ ErrRateLimited ,
715+ "Request is rate limited on client" ,
716+ }
717+ fut .ready = nil
718+ fut .done = nil
704719 return
705720 }
706721 }
707- fut .ready = make (chan struct {})
708722 fut .requestId = conn .nextRequestId ()
709723 fut .requestCode = requestCode
710724 shardn := fut .requestId & (conn .opts .Concurrency - 1 )
711725 shard := & conn .shard [shardn ]
712726 shard .rmut .Lock ()
713727 switch conn .state {
714728 case connClosed :
715- fut .err = ClientError {ErrConnectionClosed , "using closed connection" }
729+ fut .err = ClientError {
730+ ErrConnectionClosed ,
731+ "using closed connection" ,
732+ }
716733 fut .ready = nil
734+ fut .done = nil
717735 shard .rmut .Unlock ()
718736 return
719737 case connDisconnected :
720- fut .err = ClientError {ErrConnectionNotReady , "client connection is not ready" }
738+ fut .err = ClientError {
739+ ErrConnectionNotReady ,
740+ "client connection is not ready" ,
741+ }
721742 fut .ready = nil
743+ fut .done = nil
722744 shard .rmut .Unlock ()
723745 return
724746 }
@@ -737,22 +759,38 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
737759 runtime .Gosched ()
738760 select {
739761 case conn .rlimit <- struct {}{}:
740- case <- fut .ready :
762+ case <- fut .done :
741763 if fut .err == nil {
742- panic ("fut.ready is closed, but err is nil" )
764+ panic ("fut.done is closed, but err is nil" )
743765 }
744766 }
745767 }
746768 }
747769 return
748770}
749771
772+ func (conn * Connection ) sendFuture (fut * Future , body func (* msgpack.Encoder ) error ) * Future {
773+ if fut .ready == nil {
774+ return fut
775+ }
776+ conn .putFuture (fut , body )
777+ return fut
778+ }
779+
780+ func (conn * Connection ) failFuture (fut * Future , err error ) * Future {
781+ if f := conn .fetchFuture (fut .requestId ); f == fut {
782+ fut .SetError (err )
783+ conn .markDone (fut )
784+ }
785+ return fut
786+ }
787+
750788func (conn * Connection ) putFuture (fut * Future , body func (* msgpack.Encoder ) error ) {
751789 shardn := fut .requestId & (conn .opts .Concurrency - 1 )
752790 shard := & conn .shard [shardn ]
753791 shard .bufmut .Lock ()
754792 select {
755- case <- fut .ready :
793+ case <- fut .done :
756794 shard .bufmut .Unlock ()
757795 return
758796 default :
@@ -767,8 +805,8 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
767805 shard .buf .Trunc (blen )
768806 shard .bufmut .Unlock ()
769807 if f := conn .fetchFuture (fut .requestId ); f == fut {
770- fut .markReady ( conn )
771- fut . err = err
808+ fut .SetError ( err )
809+ conn . markDone ( fut )
772810 } else if f != nil {
773811 /* in theory, it is possible. In practice, you have
774812 * to have race condition that lasts hours */
@@ -782,7 +820,7 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
782820 // packing error is more important than connection
783821 // error, because it is indication of programmer's
784822 // mistake.
785- fut .err = err
823+ fut .SetError ( err )
786824 }
787825 }
788826 return
@@ -793,15 +831,40 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
793831 }
794832}
795833
834+ func (conn * Connection ) markDone (fut * Future ) {
835+ if conn .rlimit != nil {
836+ <- conn .rlimit
837+ }
838+ }
839+
840+ func (conn * Connection ) peekFuture (reqid uint32 ) (fut * Future ) {
841+ shard := & conn .shard [reqid & (conn .opts .Concurrency - 1 )]
842+ pos := (reqid / conn .opts .Concurrency ) & (requestsMap - 1 )
843+ shard .rmut .Lock ()
844+ defer shard .rmut .Unlock ()
845+
846+ if conn .opts .Timeout > 0 {
847+ fut = conn .getFutureImp (reqid , true )
848+ pair := & shard .requests [pos ]
849+ * pair .last = fut
850+ pair .last = & fut .next
851+ fut .timeout = time .Since (epoch ) + conn .opts .Timeout
852+ } else {
853+ fut = conn .getFutureImp (reqid , false )
854+ }
855+
856+ return fut
857+ }
858+
796859func (conn * Connection ) fetchFuture (reqid uint32 ) (fut * Future ) {
797860 shard := & conn .shard [reqid & (conn .opts .Concurrency - 1 )]
798861 shard .rmut .Lock ()
799- fut = conn .fetchFutureImp (reqid )
862+ fut = conn .getFutureImp (reqid , true )
800863 shard .rmut .Unlock ()
801864 return fut
802865}
803866
804- func (conn * Connection ) fetchFutureImp (reqid uint32 ) * Future {
867+ func (conn * Connection ) getFutureImp (reqid uint32 , fetch bool ) * Future {
805868 shard := & conn .shard [reqid & (conn .opts .Concurrency - 1 )]
806869 pos := (reqid / conn .opts .Concurrency ) & (requestsMap - 1 )
807870 pair := & shard .requests [pos ]
@@ -812,11 +875,13 @@ func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
812875 return nil
813876 }
814877 if fut .requestId == reqid {
815- * root = fut .next
816- if fut .next == nil {
817- pair .last = root
818- } else {
819- fut .next = nil
878+ if fetch {
879+ * root = fut .next
880+ if fut .next == nil {
881+ pair .last = root
882+ } else {
883+ fut .next = nil
884+ }
820885 }
821886 return fut
822887 }
@@ -851,11 +916,11 @@ func (conn *Connection) timeouts() {
851916 } else {
852917 fut .next = nil
853918 }
854- fut .err = ClientError {
919+ fut .SetError ( ClientError {
855920 Code : ErrTimeouted ,
856921 Msg : fmt .Sprintf ("client timeout for request %d" , fut .requestId ),
857- }
858- fut . markReady ( conn )
922+ })
923+ conn . markDone ( fut )
859924 shard .bufmut .Unlock ()
860925 }
861926 if pair .first != nil && pair .first .timeout < minNext {
0 commit comments