Skip to content

Commit

Permalink
add UseCustomStore config
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang0 committed Apr 8, 2024
1 parent 916a1ea commit 88ad2c8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
12 changes: 6 additions & 6 deletions flow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ func (e *Executor) setExecResult(ctx context.Context, eRes *ExecResult) error {
if err != nil {
return err
}
if cfg.UseAsynqStore {
_, err = e.t.ResultWriter().Write(bs)
} else {
if cfg.UseCustomStore {
err = e.f.stor.Set(ctx, eRes.ID, bs)
} else {
_, err = e.t.ResultWriter().Write(bs)
}
return err
}
Expand All @@ -189,14 +189,14 @@ func (e *Executor) getExecResult(execID string) (eRes *ExecResult, err error) {
var bs []byte
f := e.f
cfg := e.f.cfg
if cfg.UseAsynqStore {
if cfg.UseCustomStore {
bs, err = f.stor.Get(context.TODO(), execID)
} else {
ti, err := f.insp.GetTaskInfo("default", execID)
if err != nil {
return nil, err
}
bs = ti.Result
} else {
bs, err = f.stor.Get(context.TODO(), execID)
}
if err != nil {
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
)

type Config struct {
Store StoreConfig `json:"store"`
// dagflow stores result in asynq's taskinfo by default,
// if you want to store result in your own storage, set this to true
// and also set the store config properly.
UseCustomStore bool `json:"useCustomStore"`
Store StoreConfig `json:"store"`
// some configs related to asynq
Redis RedisConfig `json:"redis"`
UseAsynqStore bool `json:"useAsynqStore" default:"true"`
RetryCount int `json:"retryCount" default:"30"`
Timeout time.Duration `json:"timeout" default:"5h"`
Redis RedisConfig `json:"redis"`
RetryCount int `json:"retryCount" default:"30"`
Timeout time.Duration `json:"timeout" default:"5h"`
}

type StoreConfig struct {
Expand Down Expand Up @@ -41,7 +44,9 @@ func (cfg *Config) Refine() error {
if err := cfg.Redis.check(); err != nil {
return err
}
if cfg.Store.Redis.Addr == "" && len(cfg.Store.Redis.SentinelAddrs) == 0 {
if cfg.UseCustomStore &&
cfg.Store.Redis.Addr == "" &&
len(cfg.Store.Redis.SentinelAddrs) == 0 {
cfg.Store.Redis = cfg.Redis
}
defaults.SetDefaults(cfg)
Expand Down

0 comments on commit 88ad2c8

Please sign in to comment.