Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair Restart Postprocessing Logic #8782

Merged
merged 3 commits into from Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/unreleased/fix-postprocessing-restart.md
@@ -0,0 +1,8 @@
Bugfix: Fix restarting of postprocessing

When an upload is not found, the logic to restart postprocessing was bunked. Additionally we extended the upload sessions
command to be able to restart the uploads without using a second command.

NOTE: This also includes a breaking fix for the deprecated `ocis storage-users uploads list` command

https://github.com/owncloud/ocis/pull/8782
11 changes: 8 additions & 3 deletions services/postprocessing/README.md
Expand Up @@ -81,12 +81,17 @@ See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postproces

If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process:

- First find the upload ID of the failed upload.
- First list ongoing upload sessions
```bash
ocis storage-users uploads list
ocis storage-users uploads sessions
```

- Then use the restart command to resume postprocessing of the ID selected.
- If you want to restart all uploads just rerun the command with the `--restart` flag
```bash
ocis storage-users uploads sessions --restart
```

- If you want to restart only one upload use the postprocessing restart command
```bash
ocis postprocessing restart -u <uploadID>
```
Expand Down
5 changes: 3 additions & 2 deletions services/postprocessing/pkg/config/defaults/defaultconfig.go
Expand Up @@ -35,9 +35,10 @@ func DefaultConfig() *config.Config {
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",
Store: "nats-js-kv",
Nodes: []string{"127.0.0.1:9233"},
Database: "postprocessing",
Table: "postprocessing",
Table: "",
kobergj marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
Expand Down
Expand Up @@ -40,7 +40,7 @@ func New(config config.Postprocessing) *Postprocessing {
}

// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
Expand Down
37 changes: 23 additions & 14 deletions services/postprocessing/pkg/service/service.go
Expand Up @@ -30,10 +30,12 @@ type PostprocessingService struct {
}

var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrFatal is returned when a fatal error occurs and we want to exit.
ErrFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
ErrEvent = errors.New("event error")
// ErrNotFound is returned when a postprocessing is not found in the store.
ErrNotFound = errors.New("postprocessing not found")
)

// NewPostprocessingService returns a new instance of a postprocessing service
Expand Down Expand Up @@ -67,9 +69,9 @@ func (pps *PostprocessingService) Run() error {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, errEvent):
case errors.Is(err, ErrEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
Expand Down Expand Up @@ -111,7 +113,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.NextStep(ev)

Expand Down Expand Up @@ -143,7 +145,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.Delay()
case events.UploadReady:
Expand All @@ -155,7 +157,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
}
case events.ResumePostprocessing:
return pps.handleResumePPEvent(ctx, ev)
Expand All @@ -166,14 +168,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {

if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
return fmt.Errorf("%w: cannot store upload", ErrEvent)
}
}

if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
}
}
return nil
Expand All @@ -182,10 +184,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
if err == store.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}

if len(recs) != 1 {
if len(recs) == 0 {
return nil, ErrNotFound
}

if len(recs) > 1 {
kobergj marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}

Expand Down Expand Up @@ -231,7 +240,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
for _, id := range ids {
if err := pps.resumePP(ctx, id); err != nil {
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
return fmt.Errorf("%w: cannot resume upload", errEvent)
return fmt.Errorf("cannot resume upload: %w", err)
}
}
return nil
Expand All @@ -240,7 +249,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
pp, err := pps.getPP(pps.store, uploadID)
if err != nil {
if err == store.ErrNotFound {
if err == ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: uploadID,
Timestamp: utils.TSNow(),
Expand All @@ -249,7 +258,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
}
return nil
}
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("cannot get upload: %w", err)
}

return events.Publish(ctx, pps.pub, pp.CurrentStep())
Expand Down
41 changes: 40 additions & 1 deletion services/storage-users/pkg/command/uploads.go
@@ -1,6 +1,7 @@
package command

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -13,11 +14,14 @@ import (
"github.com/urfave/cli/v2"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/event"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig"
)

Expand Down Expand Up @@ -69,7 +73,7 @@ func ListUploads(cfg *config.Config) *cli.Command {
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
kobergj marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
},
Expand Down Expand Up @@ -101,6 +105,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
Name: "json",
Usage: "output as json",
},
&cli.BoolFlag{
Name: "restart",
Usage: "send restart event for all listed sessions",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
Expand All @@ -124,6 +132,15 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
os.Exit(1)
}

var stream events.Stream
if c.Bool("restart") {
stream, err = event.NewStream(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err)
os.Exit(1)
}
}

var b strings.Builder
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
Expand Down Expand Up @@ -200,6 +217,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
fmt.Println(err)
}
fmt.Println(string(j))

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
} else {

Expand All @@ -223,6 +251,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
u.Expires().Format(time.RFC3339),
strconv.FormatBool(u.IsProcessing()),
})

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
kobergj marked this conversation as resolved.
Show resolved Hide resolved
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
table.Render()
}
Expand Down