Skip to content

Commit

Permalink
Feat: add retry failed step operation (#101)
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Dec 13, 2022
1 parent a8868ee commit fd1cdcc
Show file tree
Hide file tree
Showing 8 changed files with 941 additions and 51 deletions.
14 changes: 13 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (

"github.com/crossplane/crossplane-runtime/pkg/event"
flag "github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"

velaclient "github.com/kubevela/pkg/controller/client"
Expand Down Expand Up @@ -233,7 +236,16 @@ func main() {
if backupPersistType == "" {
klog.Warning("Backup persist type is empty, workflow record won't be persisted")
}
persister, err := backup.NewPersister(context.Background(), kubeClient, backupPersistType, backupConfigSecretName, backupConfigSecretNamespace)
configSecret := &corev1.Secret{}
reader := mgr.GetAPIReader()
if err := reader.Get(context.Background(), client.ObjectKey{
Name: backupConfigSecretName,
Namespace: backupConfigSecretNamespace,
}, configSecret); err != nil && !kerrors.IsNotFound(err) {
klog.Error(err, "unable to find secret")
os.Exit(1)
}
persister, err := backup.NewPersister(configSecret.Data, backupPersistType)
if err != nil {
klog.Error(err, "unable to create persister")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion controllers/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func isLatestFailedRecord(ctx context.Context, cli client.Client, run *v1alpha1.
}
runs := &v1alpha1.WorkflowRunList{}
listOpt := &client.ListOptions{}
if groupByLabel != "" {
if groupByLabel != "" && run.Labels != nil && run.Labels[groupByLabel] != "" {
labels := &metav1.LabelSelector{
MatchLabels: map[string]string{
groupByLabel: run.Labels[groupByLabel],
Expand Down
2 changes: 1 addition & 1 deletion controllers/workflowrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func (r *WorkflowRunReconciler) endWithNegativeCondition(ctx context.Context, wr
}

func (r *workflowRunPatcher) patchStatus(ctx context.Context, status *v1alpha1.WorkflowRunStatus, isUpdate bool) error {
wr := r.run
r.run.Status = *status
wr := r.run
if isUpdate {
if err := r.Status().Update(ctx, wr); err != nil {
executor.StepStatusCache.Store(fmt.Sprintf("%s-%s", wr.Name, wr.Namespace), -1)
Expand Down
11 changes: 1 addition & 10 deletions pkg/backup/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package backup

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/pkg/backup/sls"
Expand All @@ -18,12 +14,7 @@ const (
)

// NewPersister is a factory method for creating a persister.
func NewPersister(ctx context.Context, cli client.Client, persistType, configName, configNamespace string) (PersistWorkflowRecord, error) {
secret := &corev1.Secret{}
if err := cli.Get(ctx, client.ObjectKey{Name: configName, Namespace: configNamespace}, secret); err != nil {
return nil, err
}
config := secret.Data
func NewPersister(config map[string][]byte, persistType string) (PersistWorkflowRecord, error) {
if config == nil {
return nil, fmt.Errorf("empty config")
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/backup/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ func TestNewPersister(t *testing.T) {
expectedErr string
secret *corev1.Secret
}{
"no config": {
persistType: "sls",
configName: "invalid",
expectedErr: "not found",
},
"empty config": {
persistType: "sls",
secret: &corev1.Secret{
Expand All @@ -33,7 +28,6 @@ func TestNewPersister(t *testing.T) {
Namespace: "default",
},
},
configName: "valid",
expectedErr: "empty config",
},
"invalid type": {
Expand All @@ -47,7 +41,6 @@ func TestNewPersister(t *testing.T) {
"accessKeyID": []byte("accessKeyID"),
},
},
configName: "valid",
expectedErr: "unsupported persist type",
},
"sls-not-complete": {
Expand All @@ -61,7 +54,6 @@ func TestNewPersister(t *testing.T) {
"accessKeyID": []byte("accessKeyID"),
},
},
configName: "valid",
expectedErr: "invalid SLS config",
},
"sls-success": {
Expand All @@ -79,7 +71,6 @@ func TestNewPersister(t *testing.T) {
"LogStoreName": []byte("logstore"),
},
},
configName: "valid",
},
}
for name, tc := range testCases {
Expand All @@ -89,7 +80,7 @@ func TestNewPersister(t *testing.T) {
r.NoError(cli.Create(ctx, tc.secret))
defer cli.Delete(ctx, tc.secret)
}
_, err := NewPersister(ctx, cli, tc.persistType, tc.configName, "default")
_, err := NewPersister(tc.secret.Data, tc.persistType)
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
Expand Down
39 changes: 25 additions & 14 deletions pkg/backup/sls/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (

// Handler is sls config.
type Handler struct {
LogStoreName string
ProjectName string
Producer *producer.Producer
LogStoreName string
ProjectName string
ProducerConfig *producer.ProducerConfig
}

// Callback is for sls callback
type Callback struct {
ctx monitorContext.Context
}

// NewSLSHandler create a new sls handler
Expand All @@ -34,31 +39,37 @@ func NewSLSHandler(config map[string][]byte) (*Handler, error) {
producerConfig.AccessKeySecret = accessKeySecret

return &Handler{
Producer: producer.InitProducer(producerConfig),
LogStoreName: logStoreName,
ProjectName: projectName,
ProducerConfig: producerConfig,
LogStoreName: logStoreName,
ProjectName: projectName,
}, nil
}

// Fail is fail callback
func (callback *Callback) Fail(result *producer.Result) {
callback.ctx.Error(fmt.Errorf("failed to send log to sls"), result.GetErrorMessage(), "errorCode", result.GetErrorCode(), "requestId", result.GetRequestId())
}

// Success is success callback
func (callback *Callback) Success(result *producer.Result) {
}

// Store is store workflowRun to sls
func (s *Handler) Store(ctx monitorContext.Context, run *v1alpha1.WorkflowRun) error {
ctx.Info("Start Send workflow record to SLS")
s.Producer.Start()
defer func(producerInstance *producer.Producer, timeoutMs int64) {
err := producerInstance.Close(timeoutMs)
if err != nil {
ctx.Error(err, "Close SLS fail")
}
}(s.Producer, 60000)
p := producer.InitProducer(s.ProducerConfig)
p.Start()
defer p.SafeClose()

data, err := json.Marshal(run)
if err != nil {
ctx.Error(err, "Marshal WorkflowRun Content fail")
return err
}

callback := &Callback{ctx: ctx}
log := producer.GenerateLog(uint32(time.Now().Unix()), map[string]string{"content": string(data)})
err = s.Producer.SendLog(s.ProjectName, s.LogStoreName, "topic", "", log)
err = p.SendLogWithCallBack(s.ProjectName, s.LogStoreName, "topic", "", log, callback)
if err != nil {
ctx.Error(err, "Send WorkflowRun Content to SLS fail")
return err
Expand Down
Loading

0 comments on commit fd1cdcc

Please sign in to comment.