diff --git a/services/storage-users/pkg/command/uploads.go b/services/storage-users/pkg/command/uploads.go index 99d93e533b8..18671f5ac73 100644 --- a/services/storage-users/pkg/command/uploads.go +++ b/services/storage-users/pkg/command/uploads.go @@ -1,6 +1,7 @@ package command import ( + "context" "encoding/json" "fmt" "os" @@ -13,11 +14,14 @@ import ( "github.com/urfave/cli/v2" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/registry" + "github.com/cs3org/reva/v2/pkg/utils" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/storage-users/pkg/event" "github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig" ) @@ -101,6 +105,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { Name: "json", Usage: "output as json", }, + &cli.BoolFlag{ + Name: "restart", + Usage: "send restart event for all listed sessions", + }, }, Before: func(c *cli.Context) error { return configlog.ReturnFatal(parser.ParseConfig(cfg)) @@ -124,6 +132,15 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { os.Exit(1) } + var stream events.Stream + if c.Bool("restart") { + stream, err = event.NewStream(cfg) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err) + os.Exit(1) + } + } + var b strings.Builder filter := storage.UploadSessionFilter{} if c.IsSet("processing") { @@ -200,6 +217,16 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { fmt.Println(err) } fmt.Println(string(j)) + + if c.Bool("restart") { + if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{ + UploadID: u.ID(), + Timestamp: utils.TSNow(), + }); err != nil { + fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID()) + os.Exit(1) + } + } } } else { @@ -223,6 +250,16 @@ func ListUploadSessions(cfg *config.Config) *cli.Command { u.Expires().Format(time.RFC3339), strconv.FormatBool(u.IsProcessing()), }) + + if c.Bool("restart") { + if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{ + UploadID: u.ID(), + Timestamp: utils.TSNow(), + }); err != nil { + fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID()) + os.Exit(1) + } + } } table.Render() }