From 13d5fce9790de90e7514723f128d1bbb59375597 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 28 Jun 2018 11:22:12 +0200 Subject: [PATCH 1/2] Stream closing: improve logic with FullClose Using full close allows to ensure we end up discarding the stream when the other side has not closed it as expected. --- call.go | 2 +- client.go | 20 ++++++++++++-------- server.go | 3 +-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/call.go b/call.go index 4af07e6..22ef793 100644 --- a/call.go +++ b/call.go @@ -83,7 +83,7 @@ func (call *Call) watchContextWithStream(s inet.Stream) { // Close() instead of Reset(). This lets the other // write to the stream without printing errors to // the console (graceful fail). - s.Close() + inet.FullClose(s) call.doneWithError(call.ctx.Err()) } } diff --git a/client.go b/client.go index 322a288..a5c7ce2 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ import ( "sync" host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" ) @@ -266,9 +267,8 @@ func (c *Client) send(call *Call) { call.doneWithError(err) return } - defer s.Close() - go call.watchContextWithStream(s) + go call.watchContextWithStream(s) sWrap := wrapStream(s) logger.Debugf("sending RPC %s.%s to %s", call.SvcID.Name, @@ -289,18 +289,22 @@ func (c *Client) send(call *Call) { s.Reset() return } - receiveResponse(sWrap, call) + err = receiveResponse(sWrap, call) + if err != nil { + s.Reset() + return + } + go inet.FullClose(s) } // receiveResponse reads a response to an RPC call -func receiveResponse(s *streamWrap, call *Call) { +func receiveResponse(s *streamWrap, call *Call) error { logger.Debugf("waiting response for %s.%s to %s", call.SvcID.Name, call.SvcID.Method, call.Dest) var resp Response if err := s.dec.Decode(&resp); err != nil { call.doneWithError(err) - s.stream.Reset() - return + return err } defer call.done() @@ -312,7 +316,7 @@ func receiveResponse(s *streamWrap, call *Call) { // read if err := s.dec.Decode(call.Reply); err != nil && err != io.EOF { call.setError(err) - s.stream.Reset() + return err } - return + return nil } diff --git a/server.go b/server.go index b49bc78..91c89e8 100644 --- a/server.go +++ b/server.go @@ -124,7 +124,7 @@ func NewServer(h host.Host, p protocol.ID) *Server { if h != nil { h.SetStreamHandler(p, func(stream inet.Stream) { sWrap := wrapStream(stream) - defer stream.Close() + defer inet.FullClose(stream) err := s.handle(sWrap) if err != nil { logger.Error("error handling RPC:", err) @@ -132,7 +132,6 @@ func NewServer(h host.Host, p protocol.ID) *Server { sendResponse(sWrap, resp, nil) } }) - } return s } From 84ba8f6560ea77ef4f539f5f31b1ee0960a94889 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 28 Jun 2018 11:23:53 +0200 Subject: [PATCH 2/2] gx publish 1.0.13 --- .gx/lastpubver | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index 7579339..702b5fb 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.0.12: QmWudFxN9c6GECgcVCTKRdPJPbBd5nABheBM8VBCh7LKN4 +1.0.13: Qmb8tTGhxUEcevnFMmuoRHkJb1GzmQ64umAorJvMvqYcPG diff --git a/package.json b/package.json index d9f865a..0b11481 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,6 @@ "license": "MIT/BSD", "name": "go-libp2p-gorpc", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.0.12" + "version": "1.0.13" }