Skip to content

Commit

Permalink
Merge pull request #340 from libp2p/feat/fewer-goroutines
Browse files Browse the repository at this point in the history
reduce background goroutines
  • Loading branch information
Stebalien committed May 24, 2019
2 parents 2d6dde4 + b55fad1 commit 978eca5
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
14 changes: 8 additions & 6 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

ggio "github.com/gogo/protobuf/io"
ctxio "github.com/jbenet/go-context/io"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
Expand All @@ -19,6 +18,7 @@ import (
)

var dhtReadMessageTimeout = time.Minute
var dhtStreamIdleTimeout = 10 * time.Minute
var ErrReadTimeout = fmt.Errorf("timed out reading response")

// The Protobuf writer performs multiple small writes when writing a message.
Expand Down Expand Up @@ -67,12 +67,12 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.ctx

cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
mPeer := s.Conn().RemotePeer()

timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
defer timer.Stop()

for {
var req pb.Message
switch err := r.ReadMsg(&req); err {
Expand All @@ -93,6 +93,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
case nil:
}

timer.Reset(dhtStreamIdleTimeout)

startTime := time.Now()
ctx, _ = tag.New(
ctx,
Expand Down Expand Up @@ -126,7 +128,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
}

// send out response msg
err = writeMsg(cw, resp)
err = writeMsg(s, resp)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Debugf("error writing response: %v", err)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-todocounter v0.0.1
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.0.30
github.com/libp2p/go-libp2p-crypto v0.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q
github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
Expand Down

0 comments on commit 978eca5

Please sign in to comment.