Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[full-ci] Retry postprocessing steps #7874

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8 h1:Z1i5VmeHNc6n0jIl/Iljfs+gt7bhdcVT/5cNxn1XIs4=
github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28 h1:IhBjtl4F/aAUdbpfjWOy1jwzrh1wLOH50UToPPOqJy8=
github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
3 changes: 3 additions & 0 deletions services/antivirus/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
outcome = av.o
case !res.Infected && err == nil:
outcome = events.PPOutcomeContinue
case err != nil:
outcome = events.PPOutcomeRetry
default:
// Not sure what this is about. abort.
outcome = events.PPOutcomeAbort
}

Expand Down
11 changes: 9 additions & 2 deletions services/postprocessing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,16 @@ By using the envvar `POSTPROCESSING_STEPS`, custom postprocessing steps can be a
For using custom postprocessing steps you need a custom service listening to the configured event system (see `General Prerequisites`)

#### Workflow
When setting a custom postprocessing step (eg. `"customstep"`) the postprocessing service will eventually sent an event during postprocessing. The event will be of type `StartPostprocessingStep` with its field `StepToStart` set to `"customstep"`. When the custom service receives this event it can safely execute its actions, postprocessing service will wait until it has finished its work. The event contains further information (filename, executing user, size, ...) and also required tokens and urls to download the file in case byte inspection is necessary.
When defining a custom postprocessing step (eg. `"customstep"`), the postprocessing service will eventually send an event during postprocessing. The event will be of type `StartPostprocessingStep` with its field `StepToStart` set to `"customstep"`. When the service defined as custom step receives this event, it can safely execute its actions. The postprocessing service will wait until it has finished its work. The event contains further information (filename, executing user, size, ...) and also requires tokens and URLs to download the file in case byte inspection is necessary.

Once the custom service has finished its work, it should sent an event of type `PostprocessingFinished` via the configured events system. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of "delete" (abort postprocessing, delete the file), "abort" (abort postprocessing, keep the file) and "continue" (continue postprocessing, this is the success case).
Once the service defined as custom step has finished its work, it should send an event of type `PostprocessingFinished` via the configured events system back to the postprocessing service. This event needs to contain a `FinishedStep` field set to `"customstep"`. It also must contain the outcome of the step, which can be one of the following:

- `delete`: Abort postprocessing, delete the file.
- `abort`: Abort postprocessing, keep the file.
- `retry`: There was a problem that was most likely temporary and may be solved by trying again after some backoff duration. Retry runs automatically and is defined by the backoff behavior as described below.
- `continue`: Continue postprocessing, this is the success case.

The backoff behavior as mentioned in the `retry` outcome can be configured using the `POSTPROCESSING_RETRY_BACKOFF_DURATION` and `POSTPROCESSING_MAX_RETRIES` environment variables. The backoff duration is calculated using the following formula after each failure: `backoff_duration = POSTPROCESSING_RETRY_BACKOFF_DURATION * 2^(number of failures - 1)`. This means that the time between the next round grows exponentially limited by the number of retries. Steps that still don't succeed after the maximum number of retries will be automatically moved to the `abort` state.

See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postprocessing.go) for up-to-date information of reserved step names and event definitions.

Expand Down
3 changes: 3 additions & 0 deletions services/postprocessing/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Postprocessing struct {
Events Events `yaml:"events"`
Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details."`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details."`

RetryBackoffDuration time.Duration `yaml:"retry_backoff_duration" env:"POSTPROCESSING_RETRY_BACKOFF_DURATION" desc:"The base for the exponential backoff duration before retrying a failed postprocessing step. See the Environment Variable Types description for more details."`
MaxRetries int `yaml:"max_retries" env:"POSTPROCESSING_MAX_RETRIES" desc:"The maximum number of retries for a failed postprocessing step."`
}

// Events combines the configuration options for the event bus.
Expand Down
4 changes: 4 additions & 0 deletions services/postprocessing/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package defaults

import (
"time"

"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)

Expand Down Expand Up @@ -29,6 +31,8 @@ func DefaultConfig() *config.Config {
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
RetryBackoffDuration: 5 * time.Second,
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",
Expand Down
42 changes: 30 additions & 12 deletions services/postprocessing/pkg/postprocessing/postprocessing.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package postprocessing

import (
"math"
"time"

user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)

// Postprocessing handles postprocessing of a file
Expand All @@ -18,7 +20,9 @@ type Postprocessing struct {
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
Status Status
PPDelay time.Duration
Failures int

config config.Postprocessing
}

// Status is helper struct to show current postprocessing status
Expand All @@ -28,16 +32,9 @@ type Status struct {
}

// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
func New(config config.Postprocessing) *Postprocessing {
return &Postprocessing{
ID: uploadID,
URL: uploadURL,
User: user,
Filename: filename,
Filesize: filesize,
ResourceID: resourceID,
Steps: steps,
PPDelay: delay,
config: config,
}
}

Expand All @@ -55,9 +52,14 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(ev.FinishedStep)
case events.PPOutcomeRetry:
pp.Failures++
if pp.Failures > pp.config.MaxRetries {
return pp.finished(events.PPOutcomeAbort)
}
return pp.retry()
default:
return pp.finished(ev.Outcome)

}
}

Expand All @@ -71,10 +73,15 @@ func (pp *Postprocessing) CurrentStep() interface{} {

// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
time.Sleep(pp.PPDelay)
time.Sleep(pp.config.Delayprocessing)
return pp.next(events.PPStepDelay)
}

// BackoffDuration calculates the duration for exponential backoff based on the number of failures.
func (pp *Postprocessing) BackoffDuration() time.Duration {
return pp.config.RetryBackoffDuration * time.Duration(math.Pow(2, float64(pp.Failures-1)))
}

func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.Steps)
for i, s := range pp.Steps {
Expand Down Expand Up @@ -107,3 +114,14 @@ func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.
Outcome: outcome,
}
}

func (pp *Postprocessing) retry() events.PostprocessingRetry {
pp.Status.Outcome = events.PPOutcomeRetry
return events.PostprocessingRetry{
UploadID: pp.ID,
ExecutingUser: pp.User,
Filename: pp.Filename,
Failures: pp.Failures,
BackoffDuration: pp.BackoffDuration(),
}
}
50 changes: 43 additions & 7 deletions services/postprocessing/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
Expand Down Expand Up @@ -89,24 +90,54 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {

switch ev := e.Event.(type) {
case events.BytesReceived:
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
pp = &postprocessing.Postprocessing{
ID: ev.UploadID,
URL: ev.URL,
User: ev.ExecutingUser,
Filename: ev.Filename,
Filesize: ev.Filesize,
ResourceID: ev.ResourceID,
Steps: pps.steps,
}
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.NextStep(ev)

switch pp.Status.Outcome {
case events.PPOutcomeRetry:
// schedule retry
backoff := pp.BackoffDuration()
go func() {
time.Sleep(backoff)
retryEvent := events.StartPostprocessingStep{
UploadID: pp.ID,
URL: pp.URL,
ExecutingUser: pp.User,
Filename: pp.Filename,
Filesize: pp.Filesize,
ResourceID: pp.ResourceID,
StepToStart: pp.Status.CurrentStep,
}
err := events.Publish(ctx, pps.pub, retryEvent)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
}
}()
}
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
Expand All @@ -119,7 +150,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
return fmt.Errorf("%w: cannot delete upload", errEvent)
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
if err == store.ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
Expand Down Expand Up @@ -175,7 +206,7 @@ func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
})
}

func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
return nil, err
Expand All @@ -185,6 +216,11 @@ func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, er
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}

var pp postprocessing.Postprocessing
return &pp, json.Unmarshal(recs[0].Value, &pp)
pp := postprocessing.New(pps.c)
err = json.Unmarshal(recs[0].Value, pp)
if err != nil {
return nil, err
}

return pp, nil
}
18 changes: 18 additions & 0 deletions vendor/github.com/cs3org/reva/v2/pkg/events/postprocessing.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions vendor/github.com/studio-b12/gowebdav/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.16.1-0.20231208083424-41aa50b4a2e8
# github.com/cs3org/reva/v2 v2.16.1-0.20231212124908-ab6ed782de28
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime
Expand Down