-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow_test.go
56 lines (50 loc) · 1.5 KB
/
workflow_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package workflow
import (
"context"
pcode "github.com/protoflow-labs/protoflow/gen/code"
"github.com/protoflow-labs/protoflow/pkg/graph/edge"
"github.com/protoflow-labs/protoflow/pkg/graph/node/base"
code2 "github.com/protoflow-labs/protoflow/pkg/graph/node/code"
"github.com/reactivex/rxgo/v2"
"github.com/rs/zerolog/log"
"testing"
)
func TestRun(t *testing.T) {
// TODO breadchris start server to listen for localhost:8000?
r := code2.NewServer(base.NewNode("test 1"), code2.NewServerProto(pcode.Runtime_NODEJS).GetServer())
n1 := code2.NewFunctionNode(
base.NewNode("test 2"),
code2.NewFunctionProto().GetFunction(),
code2.WithFunction(code2.InMemoryObserver("test 1")),
)
n2 := code2.NewFunctionNode(
base.NewNode("test 3"),
code2.NewFunctionProto().GetFunction(),
code2.WithFunction(code2.InMemoryObserver("test 2")),
)
a, err := Default().
WithBuiltNodes(n1, n2).
WithBuiltEdges(
edge.New(edge.NewProvidesProto(r.ID(), n1.ID())),
edge.New(edge.NewProvidesProto(r.ID(), n2.ID())),
edge.New(edge.NewMapProto(n1.ID(), n2.ID())),
).
Build()
if err != nil {
t.Fatal(err)
}
input := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of("input")
}})
obs, err := a.WireNodes(context.Background(), n1.ID(), input)
if err != nil {
t.Fatal(err)
}
<-obs.ForEach(func(item any) {
log.Info().Interface("item", item).Msg("trace")
}, func(err error) {
log.Error().Err(err).Msg("err")
}, func() {
log.Info().Msg("complete")
})
}