Skip to content

Commit

Permalink
improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang0 committed Apr 8, 2024
1 parent 34271ac commit 56a3540
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 40 deletions.
50 changes: 30 additions & 20 deletions examples/multiple_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,8 @@ import (
const redisAddr = "127.0.0.1:6379"

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))

svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, logger)
if err != nil {
log.Fatal("failed to create service", err)
}
flowName := "f2"
mux := asynq.NewServeMux()
if err := svc.RegisterFlowsWithDefinitor(mux, map[string]flow.Definitor{
flowName: prepareFlow,
}); err != nil {
log.Fatal("failed to prepare flow", err)
}
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
Expand All @@ -53,6 +35,8 @@ func main() {
// See the godoc for other configuration options
},
)
flowName := "f2"
createSvc(mux, flowName)
// ...register other handlers...

var wg sync.WaitGroup
Expand All @@ -63,14 +47,15 @@ func main() {
log.Fatalf("could not run server: %v", err)
}
}()
clientSVC := createClientSVC(flowName)
intialV := 10
expectV := (10 + 1 + 1) * (10 + 1 - 1) * 2
sessID, err := svc.Submit("f2", []byte(fmt.Sprintf(`%d`, intialV)))
sessID, err := clientSVC.Submit("f2", []byte(fmt.Sprintf(`%d`, intialV)))
if err != nil {
log.Fatal("failed to submit task", err)
}
time.Sleep(15 * time.Second)
resMap, err := svc.GetResult("f2", sessID)
resMap, err := clientSVC.GetResult("f2", sessID)
if err != nil {
log.Fatal("failed to get result", err)
}
Expand All @@ -83,6 +68,31 @@ func main() {
wg.Wait()
}

func createSvc(mux *asynq.ServeMux, flowName string) *service.Service {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, logger)
if err != nil {
log.Fatal("failed to create service", err)
}
if err := svc.RegisterFlowsWithDefinitor(mux, map[string]flow.Definitor{
flowName: prepareFlow,
}); err != nil {
log.Fatal("failed to prepare flow", err)
}
return svc
}

func createClientSVC(flowName string) *service.Service {
svc := createSvc(nil, flowName)
return svc
}
func incOp(data []byte, option map[string][]string) ([]byte, error) {
var i int
if err := json.Unmarshal(data, &i); err != nil {
Expand Down
56 changes: 36 additions & 20 deletions examples/one_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,14 @@ import (
"sync"

"github.com/hibiken/asynq"
"github.com/yuyang0/dagflow/flow"
"github.com/yuyang0/dagflow/service"
"github.com/yuyang0/dagflow/types"
)

const redisAddr = "127.0.0.1:6379"

func main() {
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, nil)
if err != nil {
log.Fatal("failed to create service", err)
}
f, err := svc.NewFlow("f1")
if err != nil {
log.Fatal("failed to create flow", err)
}
err = f.Node("n1", incOp)
if err != nil {
log.Fatal("failed to create node", err)
}

srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
Expand All @@ -46,7 +29,8 @@ func main() {
},
)
mux := asynq.NewServeMux()
svc.RegisterFlows(mux, f)
flowName, nodeName := "f1", "n1"
createSvcAndFlow(mux, flowName, nodeName)
// ...register other handlers...

var wg sync.WaitGroup
Expand All @@ -57,10 +41,42 @@ func main() {
log.Fatalf("could not run server: %v", err)
}
}()
svc.Submit("f1", []byte(`1`))
clientSVC := createClientSVC(flowName, nodeName)
if _, err := clientSVC.Submit(flowName, []byte(`1`)); err != nil {
log.Fatal("failed to submit task", err)
}
wg.Wait()
}

func createSvcAndFlow(mux *asynq.ServeMux, flowName, nodeName string) (*service.Service, *flow.Flow) {
svc, err := service.New(&types.Config{
Redis: types.RedisConfig{
Addr: redisAddr,
Expire: 120,
},
}, nil)
if err != nil {
log.Fatal("failed to create service", err)
}
f, err := svc.NewFlow(flowName)
if err != nil {
log.Fatal("failed to create flow", err)
}
err = f.Node(nodeName, incOp)
if err != nil {
log.Fatal("failed to create node", err)
}
if err := svc.RegisterFlows(mux, f); err != nil {
log.Fatalf("failed to register flow: %v", err)
}
return svc, f
}

func createClientSVC(flowName, nodeName string) *service.Service {
svc, _ := createSvcAndFlow(nil, flowName, nodeName)
return svc
}

func incOp(data []byte, option map[string][]string) ([]byte, error) {
var i int
if err := json.Unmarshal(data, &i); err != nil {
Expand Down

0 comments on commit 56a3540

Please sign in to comment.