Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: leaking goroutines on aborted /add requests #1732

Merged
merged 1 commit into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions adder/adder.go
Expand Up @@ -43,6 +43,9 @@ type ClusterDAGService interface {
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error)
// Close performs any necessary cleanups and should be called
// whenever the DAGService is not going to be used anymore.
Close() error
// Allocations returns the allocations made by the cluster DAG service
// for the added content.
Allocations() []peer.ID
Expand Down
6 changes: 6 additions & 0 deletions adder/adder_test.go
Expand Up @@ -55,6 +55,7 @@ func TestAdder(t *testing.T) {
expectedCids := test.ShardingDirCids[:]

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)

Expand Down Expand Up @@ -88,6 +89,7 @@ func TestAdder_DoubleStart(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), f)
Expand Down Expand Up @@ -124,6 +126,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
p := api.DefaultAddParams()

dags := newMockCDAGServ()
defer dags.Close()

ctx, cancel := context.WithCancel(context.Background())
adder := New(dags, p, nil)
Expand Down Expand Up @@ -176,6 +179,8 @@ func TestAdder_CAR(t *testing.T) {

// Add the car, discarding old dags.
dags = newMockCDAGServ()
defer dags.Close()

p.Format = "car"
adder = New(dags, p, nil)
root2, err := adder.FromMultipart(ctx, carMr)
Expand Down Expand Up @@ -217,6 +222,7 @@ func TestAdder_LargeFolder(t *testing.T) {
p.Wrap = true

dags := newMockCDAGServ()
defer dags.Close()

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), slf)
Expand Down
1 change: 1 addition & 0 deletions adder/adderutils/adderutils.go
Expand Up @@ -39,6 +39,7 @@ func AddMultipartHTTPHandler(
} else {
dags = single.New(ctx, rpc, params, params.Local)
}
defer dags.Close()

if outputTransform == nil {
outputTransform = func(in api.AddedOutput) interface{} { return in }
Expand Down
9 changes: 9 additions & 0 deletions adder/sharding/dag_service.go
Expand Up @@ -77,6 +77,15 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return dgs.ingestBlock(ctx, node)
}

// Close performs cleanup and should be called when the DAGService is not
// going to be used anymore.
func (dgs *DAGService) Close() error {
if dgs.currentShard != nil {
dgs.currentShard.Close()
}
return nil
}

// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) {
Expand Down
28 changes: 20 additions & 8 deletions adder/sharding/shard.go
Expand Up @@ -3,10 +3,11 @@ package sharding
import (
"context"
"fmt"
"sync"

ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
ipld "github.com/ipfs/go-ipld-format"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -19,12 +20,13 @@ import (
// a peer to be block-put and will be part of the same shard in the
// cluster DAG.
type shard struct {
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
ctx context.Context
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
Expand Down Expand Up @@ -93,6 +95,14 @@ func (sh *shard) sendBlock(ctx context.Context, n ipld.Node) error {
}
}

// Close stops any ongoing block streaming.
func (sh *shard) Close() error {
sh.closeBlocksOnce.Do(func() {
close(sh.blocks)
})
return nil
}

// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
Expand All @@ -110,7 +120,9 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}
}
close(sh.blocks)

sh.Close()

select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
Expand Down
20 changes: 16 additions & 4 deletions adder/single/dag_service.go
Expand Up @@ -4,6 +4,7 @@ package single

import (
"context"
"sync"

adder "github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/api"
Expand Down Expand Up @@ -31,9 +32,10 @@ type DAGService struct {
addParams api.AddParams
local bool

bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
recentBlocks *recentBlocks
bs *adder.BlockStreamer
blocks chan api.NodeWithMeta
closeBlocksOnce sync.Once
recentBlocks *recentBlocks
}

// New returns a new Adder with the given rpc Client. The client is used
Expand Down Expand Up @@ -109,10 +111,20 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
}
}

// Close cleans up the DAGService.
func (dgs *DAGService) Close() error {
dgs.closeBlocksOnce.Do(func() {
close(dgs.blocks)
})
return nil
}

// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) {
close(dgs.blocks)
// Close the blocks channel
dgs.Close()

// Wait for the BlockStreamer to finish.
select {
case <-dgs.ctx.Done():
return root, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions cluster.go
Expand Up @@ -1744,6 +1744,7 @@ func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params
} else {
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
defer dags.Close()
add := adder.New(dags, params, nil)
return add.FromMultipart(ctx, reader)
}
Expand Down
5 changes: 5 additions & 0 deletions test/sharding.go
Expand Up @@ -300,6 +300,11 @@ func NewMockDAGService(writeOnly bool) *MockDAGService {
}
}

// Close closes the DAGService.
func (d *MockDAGService) Close() error {
return nil
}

// Get reads a node.
func (d *MockDAGService) Get(ctx context.Context, cid cid.Cid) (format.Node, error) {
if d.writeOnly {
Expand Down