Skip to content

Commit

Permalink
improve example and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang0 committed Apr 8, 2024
1 parent dc8c84d commit 34271ac
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 21 deletions.
24 changes: 10 additions & 14 deletions examples/multiple_node/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
Expand All @@ -23,22 +24,19 @@ func main() {
}))

svc, err := service.New(&types.Config{
Store: types.StoreConfig{
Type: "redis",
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, logger)
if err != nil {
log.Fatal("failed to create service", err)
}
f, err := svc.NewFlow("f2")
if err != nil {
log.Fatal("failed to create flow", err)
}
if prepareFlow(f); err != nil {
flowName := "f2"
mux := asynq.NewServeMux()
if err := svc.RegisterFlowsWithDefinitor(mux, map[string]flow.Definitor{
flowName: prepareFlow,
}); err != nil {
log.Fatal("failed to prepare flow", err)
}
srv := asynq.NewServer(
Expand All @@ -55,8 +53,6 @@ func main() {
// See the godoc for other configuration options
},
)
mux := asynq.NewServeMux()
svc.RegisterFlows(mux, f)
// ...register other handlers...

var wg sync.WaitGroup
Expand Down Expand Up @@ -128,7 +124,7 @@ func aggFn(dataMap map[string][]byte) ([]byte, error) {
return json.Marshal(i1 * i2)
}

func prepareFlow(f *flow.Flow) error {
func prepareFlow(_ context.Context, f *flow.Flow) error {
if err := f.Node("l1n1", incOp); err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions examples/one_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ const redisAddr = "127.0.0.1:6379"

func main() {
svc, err := service.New(&types.Config{
Store: types.StoreConfig{
Type: "redis",
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, nil)
if err != nil {
Expand Down
32 changes: 31 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
a DAG task engine based on asynq

## QuickStart
### server side
1. prepare asynq server mux
```golang
srv := asynq.NewServer(
Expand Down Expand Up @@ -52,6 +53,7 @@ a DAG task engine based on asynq
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
// for complex dag, you can use RegisterFlowsWithDefinitor
svc.RegisterFlows(mux, f)
```
4. start asynq server
Expand All @@ -60,7 +62,35 @@ a DAG task engine based on asynq
log.Fatalf("could not run server: %v", err)
}
```
5. submit dagflow tasks
### client side
1. create dagflow service, same as step 2 in server side
```golang
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: "127.0.0.1:6379",
Expire: 120,
},
Store: types.StoreConfig{
Type: "redis",
},
}, nil)
if err != nil {
log.Fatal("failed to create service", err)
}
```
2. create a flow object and register it to dagflow service, same as step 3 in server side except `mux` should be set to `nil`
```golang
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
log.Fatal("failed to create node", err)
}
// for complex dag, you can use RegisterFlowsWithDefinitor
svc.RegisterFlows(nil, f)
```
3. submit dagflow tasks
```golang
svc.Submit("f1", []byte(`1`))
```
Expand Down

0 comments on commit 34271ac

Please sign in to comment.