Skip to content

Commit

Permalink
storagenode/pieces: fix concurrent empty and restore trash
Browse files Browse the repository at this point in the history
This ensures that empty trash and restore trash cannot run at the same
time.

Fixes #5416

Change-Id: I9d2e3aa3d66e61e5c8a7427a95208bb96089792d
  • Loading branch information
egonelbre committed Jan 3, 2023
1 parent 46d99a0 commit 9544a67
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 104 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -52,7 +52,7 @@ require (
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe
storj.io/drpc v0.0.32
storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41
storj.io/private v0.0.0-20221108123115-3a27297f0b78
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -951,8 +951,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/monkit-jaeger v0.0.0-20220915074555-d100d7589f41 h1:SVuEocEhZfFc13J1AmlVLitdGXTVrvmbzN4Z9C9Ms40=
Expand Down
179 changes: 85 additions & 94 deletions storagenode/pieces/trashchore.go
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"storj.io/common/storj"
"storj.io/common/sync2"
Expand All @@ -25,11 +24,19 @@ type TrashChore struct {

Cycle *sync2.Cycle

workers workersService
mu sync.Mutex
restoring map[storj.NodeID]bool
started sync2.Fence
root context.Context

mu sync.Mutex
done bool
satellites map[storj.NodeID]*sync2.Workplace
}

const (
jobEmptyTrash = 1
jobRestoreTrash = 2
)

// NewTrashChore instantiates a new TrashChore. choreInterval is how often this
// chore runs, and trashExpiryInterval is passed into the EmptyTrash method to
// determine which trashed pieces should be deleted.
Expand All @@ -40,127 +47,111 @@ func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Dura
store: store,
trust: trust,

Cycle: sync2.NewCycle(choreInterval),
restoring: map[storj.NodeID]bool{},
Cycle: sync2.NewCycle(choreInterval),
satellites: map[storj.NodeID]*sync2.Workplace{},
}
}

// Run starts the cycle.
func (chore *TrashChore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

var group errgroup.Group
chore.Cycle.Start(ctx, &group, func(ctx context.Context) error {
chore.log.Debug("starting to empty trash")
chore.root = ctx
chore.started.Release()

for _, satelliteID := range chore.trust.GetSatellites(ctx) {
// ignore satellites that are being restored
chore.mu.Lock()
isRestoring := chore.restoring[satelliteID]
chore.mu.Unlock()
if isRestoring {
continue
}
err = chore.Cycle.Run(ctx, func(ctx context.Context) error {
chore.log.Debug("starting to empty trash")

trashedBefore := time.Now().Add(-chore.trashExpiryInterval)
err := chore.store.EmptyTrash(ctx, satelliteID, trashedBefore)
if err != nil {
chore.log.Error("emptying trash failed", zap.Error(err))
var wg sync.WaitGroup
limiter := make(chan struct{}, 1)
for _, satellite := range chore.trust.GetSatellites(ctx) {
satellite := satellite
place := chore.ensurePlace(satellite)
wg.Add(1)
ok := place.Start(chore.root, jobEmptyTrash, nil, func(ctx context.Context) {
defer wg.Done()
// don't allow multiple trash jobs at the same time
select {
case <-ctx.Done():
return
case limiter <- struct{}{}:
}
defer func() { <-limiter }()

chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite))
trashedBefore := time.Now().Add(-chore.trashExpiryInterval)
err := chore.store.EmptyTrash(ctx, satellite, trashedBefore)
if err != nil {
chore.log.Error("emptying trash failed", zap.Error(err))
}
})
if !ok {
wg.Done()
}
}

wg.Wait()
return nil
})
group.Go(func() error {
chore.workers.Run(ctx)
return nil
})
return group.Wait()
}

// StartRestore starts restoring trash for the specified satellite.
func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) {
chore.mu.Lock()
isRestoring := chore.restoring[satellite]
if isRestoring {
chore.mu.Unlock()
return
}
chore.restoring[satellite] = true
chore.done = true
chore.mu.Unlock()

ok := chore.workers.Go(ctx, func(ctx context.Context) {
chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite))
err := chore.store.RestoreTrash(ctx, satellite)
if err != nil {
chore.log.Error("restore trash failed", zap.Stringer("Satellite ID", satellite), zap.Error(err))
} else {
chore.log.Info("restore trash finished", zap.Stringer("Satellite ID", satellite))
}

chore.mu.Lock()
delete(chore.restoring, satellite)
chore.mu.Unlock()
})
if !ok {
chore.log.Info("failed to start restore trash", zap.Stringer("Satellite ID", satellite))
for _, place := range chore.satellites {
place.Cancel()
}
for _, place := range chore.satellites {
<-place.Done()
}

return err
}

// Close the chore.
// Close closes the chore.
func (chore *TrashChore) Close() error {
chore.Cycle.Close()
return nil
}

// workersService allows to start workers with a different context.
type workersService struct {
started sync2.Fence
root context.Context
active sync.WaitGroup

mu sync.Mutex
closed bool
}

// Run starts waiting for worker requests with the specified context.
func (workers *workersService) Run(ctx context.Context) {
// setup root context that the workers are bound to
workers.root = ctx
workers.started.Release()
// StartRestore starts a satellite restore, if it hasn't already started and
// the chore is not shutting down.
func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) error {
if !chore.started.Wait(ctx) {
return ctx.Err()
}

// wait until it's time to shut down:
<-workers.root.Done()
place := chore.ensurePlace(satellite)
if place == nil {
return context.Canceled
}

// ensure we don't allow starting workers after it's time to shut down
workers.mu.Lock()
workers.closed = true
workers.mu.Unlock()
place.Start(chore.root, jobRestoreTrash, func(jobID interface{}) bool {
return jobID == jobEmptyTrash
}, func(ctx context.Context) {
chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite))
err := chore.store.RestoreTrash(ctx, satellite)
if err != nil {
chore.log.Error("restore trash failed", zap.Stringer("Satellite ID", satellite), zap.Error(err))
} else {
chore.log.Info("restore trash finished", zap.Stringer("Satellite ID", satellite))
}
})

// wait for any remaining workers
workers.active.Wait()
return nil
}

// Go tries to start a worker.
func (workers *workersService) Go(ctx context.Context, work func(context.Context)) bool {
// Wait until we can use workers.root.
if !workers.started.Wait(ctx) {
return false
// ensurePlace creates a work place for the specified satellite.
func (chore *TrashChore) ensurePlace(satellite storj.NodeID) *sync2.Workplace {
chore.mu.Lock()
defer chore.mu.Unlock()
if chore.done {
return nil
}

// check that we are still allowed to start new workers
workers.mu.Lock()
if workers.closed {
workers.mu.Unlock()
return false
place, ok := chore.satellites[satellite]
if !ok {
place = sync2.NewWorkPlace()
chore.satellites[satellite] = place
}
workers.active.Add(1)
workers.mu.Unlock()

go func() {
defer workers.active.Done()
work(workers.root)
}()

return true
return place
}
5 changes: 4 additions & 1 deletion storagenode/piecestore/endpoint.go
Expand Up @@ -819,7 +819,10 @@ func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb.
return nil, rpcstatus.Error(rpcstatus.PermissionDenied, "RestoreTrash called with untrusted ID")
}

endpoint.trashChore.StartRestore(ctx, peer.ID)
err = endpoint.trashChore.StartRestore(ctx, peer.ID)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, "failed to start restore")
}

return &pb.RestoreTrashResponse{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion testsuite/storjscan/go.mod
Expand Up @@ -10,7 +10,7 @@ require (
github.com/zeebo/errs v1.3.0
go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe
storj.io/private v0.0.0-20221108123115-3a27297f0b78
storj.io/storj v1.63.1
storj.io/storjscan v0.0.0-20220926140643-1623c3b391b0
Expand Down
4 changes: 2 additions & 2 deletions testsuite/storjscan/go.sum
Expand Up @@ -1263,8 +1263,8 @@ storj.io/common v0.0.0-20220802175255-aae0c09ec9d4/go.mod h1:+gF7jbVvpjVIVHhK+EJ
storj.io/common v0.0.0-20220829171748-14b0a3c9565e/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
storj.io/monkit-jaeger v0.0.0-20220726162929-c3a9898b5bca/go.mod h1:iK+dmHZZXQlW7ahKdNSOo+raMk5BDL2wbD62FIeXLWs=
Expand Down
2 changes: 1 addition & 1 deletion testsuite/ui/go.mod
Expand Up @@ -10,7 +10,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.17.0
storj.io/common v0.0.0-20221215155610-3715c7f7ce66
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe
storj.io/gateway-mt v1.18.1-0.20211210081136-cada9a567d31
storj.io/private v0.0.0-20221108123115-3a27297f0b78
storj.io/storj v0.12.1-0.20221125175451-ef4b564b82f7
Expand Down
4 changes: 2 additions & 2 deletions testsuite/ui/go.sum
Expand Up @@ -1502,8 +1502,8 @@ storj.io/common v0.0.0-20211102144601-401a79f0706a/go.mod h1:a2Kw7Uipu929OFANfWK
storj.io/common v0.0.0-20220719163320-cd2ef8e1b9b0/go.mod h1:mCYV6Ud5+cdbuaxdPD5Zht/HYaIn0sffnnws9ErkrMQ=
storj.io/common v0.0.0-20220915180246-7826900e2b06/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221123115229-fed3e6651b63/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66 h1:T9IqZ+Hi8EnAuZ0Te9FEPRR10UffbDhGY4D/SvZdmqg=
storj.io/common v0.0.0-20221215155610-3715c7f7ce66/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe h1:8WUqW2mefWGiTp6LguJJmC+ZZP0JZxWv6T4clr3E54o=
storj.io/common v0.0.0-20221223153333-f5b4455d9cbe/go.mod h1:+gF7jbVvpjVIVHhK+EJFhfPbudX395lnPq/dKkj/Qys=
storj.io/dotworld v0.0.0-20210324183515-0d11aeccd840/go.mod h1:KU9YvEgRrMMiWLvH8pzn1UkoCoxggKIPvQxmNdx7aXQ=
storj.io/drpc v0.0.11/go.mod h1:TiFc2obNjL9/3isMW1Rpxjy8V9uE0B2HMeMFGiiI7Iw=
storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko=
Expand Down

0 comments on commit 9544a67

Please sign in to comment.