Skip to content

Commit

Permalink
Merge 84ba8f6 into 77fad75
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan authored Jun 28, 2018
2 parents 77fad75 + 84ba8f6 commit 4c85795
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.12: QmWudFxN9c6GECgcVCTKRdPJPbBd5nABheBM8VBCh7LKN4
1.0.13: Qmb8tTGhxUEcevnFMmuoRHkJb1GzmQ64umAorJvMvqYcPG
2 changes: 1 addition & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (call *Call) watchContextWithStream(s inet.Stream) {
// Close() instead of Reset(). This lets the other
// write to the stream without printing errors to
// the console (graceful fail).
s.Close()
inet.FullClose(s)
call.doneWithError(call.ctx.Err())
}
}
Expand Down
20 changes: 12 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
)
Expand Down Expand Up @@ -266,9 +267,8 @@ func (c *Client) send(call *Call) {
call.doneWithError(err)
return
}
defer s.Close()
go call.watchContextWithStream(s)

go call.watchContextWithStream(s)
sWrap := wrapStream(s)

logger.Debugf("sending RPC %s.%s to %s", call.SvcID.Name,
Expand All @@ -289,18 +289,22 @@ func (c *Client) send(call *Call) {
s.Reset()
return
}
receiveResponse(sWrap, call)
err = receiveResponse(sWrap, call)
if err != nil {
s.Reset()
return
}
go inet.FullClose(s)
}

// receiveResponse reads a response to an RPC call
func receiveResponse(s *streamWrap, call *Call) {
func receiveResponse(s *streamWrap, call *Call) error {
logger.Debugf("waiting response for %s.%s to %s", call.SvcID.Name,
call.SvcID.Method, call.Dest)
var resp Response
if err := s.dec.Decode(&resp); err != nil {
call.doneWithError(err)
s.stream.Reset()
return
return err
}

defer call.done()
Expand All @@ -312,7 +316,7 @@ func receiveResponse(s *streamWrap, call *Call) {
// read
if err := s.dec.Decode(call.Reply); err != nil && err != io.EOF {
call.setError(err)
s.stream.Reset()
return err
}
return
return nil
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
"license": "MIT/BSD",
"name": "go-libp2p-gorpc",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.0.12"
"version": "1.0.13"
}

3 changes: 1 addition & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,14 @@ func NewServer(h host.Host, p protocol.ID) *Server {
if h != nil {
h.SetStreamHandler(p, func(stream inet.Stream) {
sWrap := wrapStream(stream)
defer stream.Close()
defer inet.FullClose(stream)
err := s.handle(sWrap)
if err != nil {
logger.Error("error handling RPC:", err)
resp := &Response{ServiceID{}, err.Error()}
sendResponse(sWrap, resp, nil)
}
})

}
return s
}
Expand Down

0 comments on commit 4c85795

Please sign in to comment.