Skip to content

Commit

Permalink
Merge pull request #1732 from ipfs-cluster/fix/goroutine-leak-adder
Browse files Browse the repository at this point in the history
Fix: leaking goroutines on aborted /add requests
  • Loading branch information
hsanjuan committed Jul 8, 2022
2 parents b2ce7d9 + d19c7fa commit c4d78d5
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 12 deletions.
3 changes: 3 additions & 0 deletions adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type ClusterDAGService interface {
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error)
// Close performs any necessary cleanups and should be called
// whenever the DAGService is not going to be used anymore.
Close() error
// Allocations returns the allocations made by the cluster DAG service
// for the added content.
Allocations() []peer.ID
Expand Down
6 changes: 6 additions & 0 deletions adder/adder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestAdder(t *testing.T) {
expectedCids := test.ShardingDirCids[:]

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), f)
Expand Down Expand Up @@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

ctx, cancel := context.WithCancel(context.Background())
adder := New(dags, p, nil)
Expand Down Expand Up @@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) {

// Add the car, discarding old dags.
dags = newMockCDAGServ()
defer dags.Close()

p.Format = "car"
adder = New(dags, p, nil)
root2, err := adder.FromMultipart(ctx, carMr)
Expand Down Expand Up @@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) {
p.Wrap = true

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), slf)
Expand Down
1 change: 1 addition & 0 deletions adder/adderutils/adderutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func AddMultipartHTTPHandler(
} else {
dags = single.New(ctx, rpc, params, params.Local)
}
defer dags.Close()

if outputTransform == nil {
outputTransform = func(in api.AddedOutput) interface{} { return in }
Expand Down
9 changes: 9 additions & 0 deletions adder/sharding/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return dgs.ingestBlock(ctx, node)
}

// Close performs cleanup and should be called when the DAGService is not
// going to be used anymore.
func (dgs *DAGService) Close() error {
if dgs.currentShard != nil {
dgs.currentShard.Close()
}
return nil
}

// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) {
Expand Down
28 changes: 20 additions & 8 deletions adder/sharding/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package sharding
import (
"context"
"fmt"
"sync"

ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
ipld "github.com/ipfs/go-ipld-format"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -19,12 +20,13 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
Expand Down Expand Up @@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
}
}

// Close stops any ongoing block streaming.
func (sh *shard) Close() error {
sh.closeBlocksOnce.Do(func() {
close(sh.blocks)
})
return nil
}

// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
Expand All @@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
}
close(sh.blocks)

sh.Close()

select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
Expand Down
20 changes: 16 additions & 4 deletions adder/single/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package single

import (
"context"
"sync"

adder "github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
Expand Down Expand Up @@ -31,9 +32,10 @@ type DAGService struct {
addParams api.AddParams
local bool

bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
recentBlocks *recentBlocks
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
recentBlocks *recentBlocks
}

// New returns a new Adder with the given rpc Client. The client is used
Expand Down Expand Up @@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
}
}

// Close cleans up the DAGService.
func (dgs *DAGService) Close() error {
dgs.closeBlocksOnce.Do(func() {
close(dgs.blocks)
})
return nil
}

// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) {
close(dgs.blocks)
// Close the blocks channel
dgs.Close()

// Wait for the BlockStreamer to finish.
select {
case <-dgs.ctx.Done():
return root, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params
} else {
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
defer dags.Close()
add := adder.New(dags, params, nil)
return add.FromMultipart(ctx, reader)
}
Expand Down
5 changes: 5 additions & 0 deletions test/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService {
}
}

// Close closes the DAGService.
func (d *MockDAGService) Close() error {
return nil
}

// Get reads a node.
func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) {
if d.writeOnly {
Expand Down

0 comments on commit c4d78d5

Please sign in to comment.