diff --git a/.gx/lastpubver b/.gx/lastpubver index 7579339..d3130c3 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.12: QmWudFxN9c6GECgcVCTKRdPJPbBd5nABheBM8VBCh7LKN4 +1.0.13: QmenmbeGacw4oTgnYiFxLvZG3Wx8JCTu6iuWp72qyg2ZkH diff --git a/client.go b/client.go index 322a288..ad9fe45 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ import ( "sync" host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" ) @@ -266,8 +267,6 @@ func (c *Client) send(call *Call) { call.doneWithError(err) return } - defer s.Close() - go call.watchContextWithStream(s) sWrap := wrapStream(s) @@ -289,18 +288,22 @@ func (c *Client) send(call *Call) { s.Reset() return } - receiveResponse(sWrap, call) + err := receiveResponse(sWrap, call) + if err != nil { + s.Reset() + return + } + go inet.FullClose(s) } // receiveResponse reads a response to an RPC call -func receiveResponse(s *streamWrap, call *Call) { +func receiveResponse(s *streamWrap, call *Call) error { logger.Debugf("waiting response for %s.%s to %s", call.SvcID.Name, call.SvcID.Method, call.Dest) var resp Response if err := s.dec.Decode(&resp); err != nil { call.doneWithError(err) - s.stream.Reset() - return + return err } defer call.done() @@ -312,7 +315,7 @@ func receiveResponse(s *streamWrap, call *Call) { // read if err := s.dec.Decode(call.Reply); err != nil && err != io.EOF { call.setError(err) - s.stream.Reset() + return err } - return + return nil } diff --git a/package.json b/package.json index d9f865a..0b11481 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.12" + "version": "1.0.13" } diff --git a/server.go b/server.go index b49bc78..91c89e8 100644 --- a/server.go +++ b/server.go @@ -124,7 +124,7 @@ func NewServer(h host.Host, p protocol.ID) *Server { if h != nil { h.SetStreamHandler(p, func(stream inet.Stream) { sWrap := wrapStream(stream) - defer stream.Close() + defer inet.FullClose(stream) err := s.handle(sWrap) if err != nil { logger.Error("error handling RPC:", err) @@ -132,7 +132,6 @@ func NewServer(h host.Host, p protocol.ID) *Server { sendResponse(sWrap, resp, nil) } }) - } return s }