Skip to content

Commit

Permalink
Fix: leaking goroutines on aborted /add requests
Browse files Browse the repository at this point in the history
It has been observed that some peers have a growing number of goroutines,
usually stuck in go-libp2p-gorpc.MultiStream() function, which is waiting to
read items from the arguments channel.

We suspect this is due to aborted /add requests. In situations when the add
request is aborted or fails, Finalize() is never called and the blocks channel
stays open, so MultiStream() can never exit, and the BlockStreamer can never
stop streaming etc.

As a fix, we added the requirement to call Close() when we stop using a
ClusterDAGService (error or not). This should ensure that the blocks channel
is always closed and not just on Finalize().
  • Loading branch information
hsanjuan committed Jul 8, 2022
1 parent b2ce7d9 commit d19c7fa
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 12 deletions.
3 changes: 3 additions & 0 deletions adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type ClusterDAGService interface {
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error)
// Close performs any necessary cleanups and should be called
// whenever the DAGService is not going to be used anymore.
Close() error
// Allocations returns the allocations made by the cluster DAG service
// for the added content.
Allocations() []peer.ID
Expand Down
6 changes: 6 additions & 0 deletions adder/adder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestAdder(t *testing.T) {
expectedCids := test.ShardingDirCids[:]

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), f)
Expand Down Expand Up @@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

ctx, cancel := context.WithCancel(context.Background())
adder := New(dags, p, nil)
Expand Down Expand Up @@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) {

// Add the car, discarding old dags.
dags = newMockCDAGServ()
defer dags.Close()

p.Format = "car"
adder = New(dags, p, nil)
root2, err := adder.FromMultipart(ctx, carMr)
Expand Down Expand Up @@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) {
p.Wrap = true

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), slf)
Expand Down
1 change: 1 addition & 0 deletions adder/adderutils/adderutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func AddMultipartHTTPHandler(
} else {
dags = single.New(ctx, rpc, params, params.Local)
}
defer dags.Close()

if outputTransform == nil {
outputTransform = func(in api.AddedOutput) interface{} { return in }
Expand Down
9 changes: 9 additions & 0 deletions adder/sharding/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return dgs.ingestBlock(ctx, node)
}

// Close performs cleanup and should be called when the DAGService is not
// going to be used anymore.
func (dgs *DAGService) Close() error {
if dgs.currentShard != nil {
dgs.currentShard.Close()
}
return nil
}

// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) {
Expand Down
28 changes: 20 additions & 8 deletions adder/sharding/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package sharding
import (
"context"
"fmt"
"sync"

ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
ipld "github.com/ipfs/go-ipld-format"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -19,12 +20,13 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
Expand Down Expand Up @@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
}
}

// Close stops any ongoing block streaming.
func (sh *shard) Close() error {
sh.closeBlocksOnce.Do(func() {
close(sh.blocks)
})
return nil
}

// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
Expand All @@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
}
close(sh.blocks)

sh.Close()

select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
Expand Down
20 changes: 16 additions & 4 deletions adder/single/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package single

import (
"context"
"sync"

adder "github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
Expand Down Expand Up @@ -31,9 +32,10 @@ type DAGService struct {
addParams api.AddParams
local bool

bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
recentBlocks *recentBlocks
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
recentBlocks *recentBlocks
}

// New returns a new Adder with the given rpc Client. The client is used
Expand Down Expand Up @@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
}
}

// Close cleans up the DAGService.
func (dgs *DAGService) Close() error {
dgs.closeBlocksOnce.Do(func() {
close(dgs.blocks)
})
return nil
}

// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) {
close(dgs.blocks)
// Close the blocks channel
dgs.Close()

// Wait for the BlockStreamer to finish.
select {
case <-dgs.ctx.Done():
return root, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params
} else {
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
defer dags.Close()
add := adder.New(dags, params, nil)
return add.FromMultipart(ctx, reader)
}
Expand Down
5 changes: 5 additions & 0 deletions test/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService {
}
}

// Close closes the DAGService.
func (d *MockDAGService) Close() error {
return nil
}

// Get reads a node.
func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) {
if d.writeOnly {
Expand Down

0 comments on commit d19c7fa

Please sign in to comment.