Skip to content

Commit

Permalink
Feat: optimiaze the sls producer instance
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Dec 9, 2022
1 parent d272277 commit 01a415d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 106 deletions.
25 changes: 8 additions & 17 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crossplane/crossplane-runtime/pkg/event"
flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -48,6 +44,7 @@ import (

"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/controllers"
"github.com/kubevela/workflow/pkg/backup"
"github.com/kubevela/workflow/pkg/common"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/kubevela/workflow/pkg/features"
Expand Down Expand Up @@ -216,18 +213,13 @@ func main() {
}

if feature.DefaultMutableFeatureGate.Enabled(features.EnableBackupWorkflowRecord) {
configSecret := &corev1.Secret{}
reader := mgr.GetAPIReader()
if err := reader.Get(context.Background(), client.ObjectKey{
Name: backupConfigSecretName,
Namespace: backupConfigSecretNamespace,
}, configSecret); err != nil && !kerrors.IsNotFound(err) {
klog.Error(err, "unable to find secret")
os.Exit(1)
if backupPersistType == "" {
klog.Warning("Backup persist type is empty, workflow record won't be persisted")
}
configData := configSecret.Data
if configData == nil {
configData = make(map[string][]byte)
persister, err := backup.NewPersister(context.Background(), kubeClient, backupPersistType, backupConfigSecretName, backupConfigSecretNamespace)
if err != nil {
klog.Error(err, "unable to create persister")
os.Exit(1)
}
if err = (&controllers.BackupReconciler{
Client: kubeClient,
Expand All @@ -237,8 +229,7 @@ func main() {
IgnoreStrategy: backupIgnoreStrategy,
CleanOnBackup: backupCleanOnBackup,
GroupByLabel: groupByLabel,
PersistType: backupPersistType,
PersistConfig: configData,
Persister: persister,
},
Args: controllerArgs,
}).SetupWithManager(mgr); err != nil {
Expand Down
8 changes: 3 additions & 5 deletions controllers/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ type BackupReconciler struct {

// BackupArgs is the args for backup
type BackupArgs struct {
PersistType string
Persister backup.PersistWorkflowRecord
BackupStrategy string
IgnoreStrategy string
GroupByLabel string
CleanOnBackup bool
PersistConfig map[string][]byte
}

const (
Expand Down Expand Up @@ -131,9 +130,8 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

func (r *BackupReconciler) backup(ctx monitorContext.Context, cli client.Client, run *v1alpha1.WorkflowRun) error {
persister := backup.NewPersister(r.PersistType, r.PersistConfig)
if persister != nil {
if err := persister.Store(ctx, run); err != nil {
if r.Persister != nil {
if err := r.Persister.Store(ctx, run); err != nil {
return err
}
}
Expand Down
35 changes: 21 additions & 14 deletions pkg/backup/interface.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package backup

import (
monitorContext "github.com/kubevela/pkg/monitor/context"
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/pkg/backup/sls"
)
Expand All @@ -13,24 +18,26 @@ const (
)

// NewPersister is a factory method for creating a persister.
func NewPersister(persistType string, config map[string][]byte) persistWorkflowRecord {
func NewPersister(ctx context.Context, cli client.Client, persistType, configName, configNamespace string) (PersistWorkflowRecord, error) {
secret := &corev1.Secret{}
if err := cli.Get(ctx, client.ObjectKey{Name: configName, Namespace: configNamespace}, secret); err != nil {
return nil, err
}
config := secret.Data
if config == nil {
return nil, fmt.Errorf("empty config")
}
switch persistType {
case PersistTypeSLS:
if config == nil {
return nil
}
return &sls.Handler{
LogStoreName: string(config["LogStoreName"]),
ProjectName: string(config["ProjectName"]),
Endpoint: string(config["Endpoint"]),
AccessKeyID: string(config["AccessKeyID"]),
AccessKeySecret: string(config["AccessKeySecret"]),
}
return sls.NewSLSHandler(config)
case "":
return nil, nil
default:
return nil
return nil, fmt.Errorf("unsupported persist type %s", persistType)
}
}

type persistWorkflowRecord interface {
// PersistWorkflowRecord is the interface for record persist
type PersistWorkflowRecord interface {
Store(ctx monitorContext.Context, run *v1alpha1.WorkflowRun) error
}
106 changes: 81 additions & 25 deletions pkg/backup/interface_test.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,100 @@
package backup

import (
"reflect"
"context"
"testing"

"github.com/kubevela/workflow/pkg/backup/sls"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestNewPersister(t *testing.T) {
type args struct {
cli := fake.NewFakeClientWithScheme(scheme.Scheme)
ctx := context.Background()
testCases := map[string]struct {
persistType string
config map[string][]byte
}
tests := []struct {
name string
args args
want persistWorkflowRecord
configName string
expectedErr string
secret *corev1.Secret
}{
{
name: "Empty config",
args: args{
persistType: "sls",
config: nil,
"no config": {
persistType: "sls",
configName: "invalid",
expectedErr: "not found",
},
"empty config": {
persistType: "sls",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
Namespace: "default",
},
},
configName: "valid",
expectedErr: "empty config",
},
"invalid type": {
persistType: "invalid",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
Namespace: "default",
},
Data: map[string][]byte{
"accessKeyID": []byte("accessKeyID"),
},
},
want: nil,
configName: "valid",
expectedErr: "unsupported persist type",
},
{
name: "Success",
args: args{
persistType: "sls",
config: make(map[string][]byte),
"sls-not-complete": {
persistType: "sls",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
Namespace: "default",
},
Data: map[string][]byte{
"accessKeyID": []byte("accessKeyID"),
},
},
want: &sls.Handler{},
configName: "valid",
expectedErr: "invalid SLS config",
},
"sls-success": {
persistType: "sls",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "valid",
Namespace: "default",
},
Data: map[string][]byte{
"AccessKeyID": []byte("accessKeyID"),
"AccessKeySecret": []byte("accessKeySecret"),
"Endpoint": []byte("endpoint"),
"ProjectName": []byte("project"),
"LogStoreName": []byte("logstore"),
},
},
configName: "valid",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewPersister(tt.args.persistType, tt.args.config); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewPersister() = %v, want %v", got, tt.want)
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
if tc.secret != nil {
r.NoError(cli.Create(ctx, tc.secret))
defer cli.Delete(ctx, tc.secret)
}
_, err := NewPersister(ctx, cli, tc.persistType, tc.configName, "default")
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
}
r.NoError(err)
})
}
}
43 changes: 29 additions & 14 deletions pkg/backup/sls/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sls

import (
"encoding/json"
"fmt"
"time"

monitorContext "github.com/kubevela/pkg/monitor/context"
Expand All @@ -12,29 +13,43 @@ import (

// Handler is sls config.
type Handler struct {
LogStoreName string
ProjectName string
Endpoint string
AccessKeyID string
AccessKeySecret string
LogStoreName string
ProjectName string
Producer *producer.Producer
}

// NewSLSHandler create a new sls handler
func NewSLSHandler(config map[string][]byte) (*Handler, error) {
endpoint := string(config["Endpoint"])
accessKeyID := string(config["AccessKeyID"])
accessKeySecret := string(config["AccessKeySecret"])
projectName := string(config["ProjectName"])
logStoreName := string(config["LogStoreName"])
if endpoint == "" || accessKeyID == "" || accessKeySecret == "" || projectName == "" || logStoreName == "" {
return nil, fmt.Errorf("invalid SLS config, please make sure endpoint/ak/sk/project/logstore are both provided correctly")
}
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = endpoint
producerConfig.AccessKeyID = accessKeyID
producerConfig.AccessKeySecret = accessKeySecret

return &Handler{
Producer: producer.InitProducer(producerConfig),
LogStoreName: logStoreName,
ProjectName: projectName,
}, nil
}

// Store is store workflowRun to sls
func (s *Handler) Store(ctx monitorContext.Context, run *v1alpha1.WorkflowRun) error {
ctx.Info("Start Send workflow record to SLS")
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = s.Endpoint
producerConfig.AccessKeyID = s.AccessKeyID
producerConfig.AccessKeySecret = s.AccessKeySecret

producerInstance := producer.InitProducer(producerConfig)
producerInstance.Start()
s.Producer.Start()
defer func(producerInstance *producer.Producer, timeoutMs int64) {
err := producerInstance.Close(timeoutMs)
if err != nil {
ctx.Error(err, "Close SLS fail")
}
}(producerInstance, 60000)
}(s.Producer, 60000)

data, err := json.Marshal(run)
if err != nil {
Expand All @@ -43,7 +58,7 @@ func (s *Handler) Store(ctx monitorContext.Context, run *v1alpha1.WorkflowRun) e
}

log := producer.GenerateLog(uint32(time.Now().Unix()), map[string]string{"content": string(data)})
err = producerInstance.SendLog(s.ProjectName, s.LogStoreName, "topic", "", log)
err = s.Producer.SendLog(s.ProjectName, s.LogStoreName, "topic", "", log)
if err != nil {
ctx.Error(err, "Send WorkflowRun Content to SLS fail")
return err
Expand Down
Loading

0 comments on commit 01a415d

Please sign in to comment.