Skip to content

Commit

Permalink
chore: add backflow example (#339)
Browse files Browse the repository at this point in the history
* add backflow example

* clean basic example
  • Loading branch information
venjiang committed Jun 14, 2022
1 parent bfe58f9 commit 117577f
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 44 deletions.
33 changes: 10 additions & 23 deletions example/0-basic/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ output: "prefixed"
tasks:
run:
desc: run
deps: [zipper, source, sfn1, sfn2]
deps: [zipper, source, sfn]
cmds:
- echo 'basic example run'

Expand All @@ -18,8 +18,8 @@ tasks:
- rm -rf ./bin

build:
desc: build source, sfns and zipper
deps: [source-build, sfn1-build, sfn2-build]
desc: build source, sfn and zipper
deps: [source-build, sfn-build]
cmds:
- echo 'building done'

Expand All @@ -28,10 +28,10 @@ tasks:
cmds:
- "go build -o ./bin/source{{exeExt}} source/main.go"

sfn1-build:
desc: build sfn1
sfn-build:
desc: build sfn
cmds:
- "go build -o ./bin/sfn1{{exeExt}} sfn/main.go"
- "go build -o ./bin/sfn{{exeExt}} sfn/main.go"

source:
desc: run source
Expand All @@ -41,24 +41,11 @@ tasks:
env:
YOMO_LOG_LEVEL: error

sfn1:
desc: run sfn1
deps: [sfn1-build]
sfn:
desc: run sfn
deps: [sfn-build]
cmds:
- "./bin/sfn1{{exeExt}}"
env:
YOMO_LOG_LEVEL: error

sfn2-build:
desc: build sfn2
cmds:
- "go build -o ./bin/sfn2{{exeExt}} sfn2/main.go"

sfn2:
desc: run sfn2
deps: [sfn2-build]
cmds:
- "./bin/sfn2{{exeExt}}"
- "./bin/sfn{{exeExt}}"
env:
YOMO_LOG_LEVEL: error

Expand Down
7 changes: 3 additions & 4 deletions example/0-basic/sfn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"fmt"
"os"

"github.com/yomorun/yomo"
Expand Down Expand Up @@ -49,10 +48,10 @@ func handler(data []byte) (byte, []byte) {
var model noiseData
err := json.Unmarshal(data, &model)
if err != nil {
logger.Errorf("[sfn1] json.Marshal err=%v", err)
logger.Errorf("[sfn] json.Marshal err=%v", err)
os.Exit(-2)
} else {
logger.Printf(">> [sfn1] got tag=0x33, data=%+v", model)
logger.Printf(">> [sfn] got tag=0x33, data=%+v", model)
}
return 0x34, []byte(fmt.Sprintf("sfn1 processed result: %v", model.Noise))
return 0x0, nil
}
5 changes: 0 additions & 5 deletions example/0-basic/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func main() {
"yomo-source",
yomo.WithZipperAddr(addr),
yomo.WithLogger(logger),
yomo.WithObserveDataTags(0x34, 0x0),
)
err := source.Connect()
if err != nil {
Expand All @@ -46,10 +45,6 @@ func main() {
logger.Printf("[source] receive server error: %v", err)
os.Exit(1)
})
// set receive handler for the observe datatags
source.SetReceiveHandler(func(tag byte, data []byte) {
logger.Printf("[source] ♻️ receive backflow: tag=%#v, data=%v", tag, string(data))
})

// generate mock data and send it to YoMo-Zipper in every 100 ms.
err = generateAndSendData(source)
Expand Down
113 changes: 113 additions & 0 deletions example/5-backflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Backflow example

This example represents how [source](https://docs.yomo.run/source) receives stream functions processed results.

## Code structure

+ `source`: Mocking data of a Sound Sensor. [docs.yomo.run/source](https://docs.yomo.run/source)
+ `sfn-1`: Convert the noise value to `int` type in real-time. [docs.yomo.run/stream-function](https://docs.yomo.run/stream-fn)
+ `sfn-2`: Calculate 10 times the noise value in real-time. [docs.yomo.run/stream-function](https://docs.yomo.run/stream-fn)
+ `zipper`: Orchestrate a workflow that receives the data from `source`, stream computing in `stream-fn` [docs.yomo.run/zipper](https://docs.yomo.run/zipper)

## Prepare

Install YoMo CLI

### Binary (Recommended)

```bash
$ curl -fsSL "https://bina.egoist.sh/yomorun/cli?name=yomo" | sh

==> Resolved version latest to v1.1.0
==> Downloading asset for darwin amd64
==> Installing yomo to /usr/local/bin
==> Installation complete
```

### Or build from source

```bash
$ go install github.com/yomorun/cli/yomo@latest
$ yomo version
YoMo CLI Version: v1.1.0
Runtime Version: v1.8.0
```

## Option 1: Auto Run

`task run`

## Option 2: Manual

### Run [zipper](https://docs.yomo.run/zipper)

```bash
yomo serve -c ./workflow.yaml

2022-06-13 15:46:01.477 [yomo:zipper] Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...
2022-06-13 15:46:01.479 [core:server] ✅ [backflow][71590] Listening on: 127.0.0.1:9000, MODE: DEVELOPMENT, QUIC: [v1 draft-29], AUTH: [none]
```

### Run [sfn-1](https://docs.yomo.run/stream-fn)

```bash
go run ./sfn-1/main.go

2022-06-13 15:53:17.486 [core:client] use credential: [none]
2022-06-13 15:53:17.496 [core:client] ❤️ [sfn-1][e6KHnVWboNz0x8Ffhvq-e]([::]:56117) is connected to YoMo-Zipper localhost:9000
```
### Run [sfn-2](https://docs.yomo.run/stream-fn)
```bash
go run ./sfn-2/main.go

2022-06-13 15:53:17.486 [core:client] use credential: [none]
2022-06-13 15:53:17.496 [core:client] ❤️ [sfn-2][e6KHnVWboNz0x8Ffhvq-e]([::]:56117) is connected to YoMo-Zipper localhost:9000
```

### Run [yomo-source](https://docs.yomo.run/source)

```bash
go run ./source/main.go

2022-06-13 16:00:10.440 [core:client] use credential: [none]
2022-06-13 16:00:10.447 [core:client] ❤️ [yomo-source][QqkNxX3tQlnw64Pg8JqZR]([::]:64036) is connected to YoMo-Zipper localhost:9000
...
```

### Results

The terminal of `yomo-srouce` will print the real-time receives value.

```bash
2022-06-13 16:06:48.690 [source] ✅ Emit 158.30 to YoMo-Zipper
2022-06-13 16:06:48.691 [source] ♻️ receive backflow: tag=0x34, data=158
2022-06-13 16:06:48.692 [source] ♻️ receive backflow: tag=0x35, data=1580
2022-06-13 16:06:49.691 [source] ✅ Emit 28.81 to YoMo-Zipper
2022-06-13 16:06:49.693 [source] ♻️ receive backflow: tag=0x34, data=28
2022-06-13 16:06:49.694 [source] ♻️ receive backflow: tag=0x35, data=280
2022-06-13 16:06:50.691 [source] ✅ Emit 3.81 to YoMo-Zipper
2022-06-13 16:06:50.694 [source] ♻️ receive backflow: tag=0x34, data=3
2022-06-13 16:06:50.694 [source] ♻️ receive backflow: tag=0x35, data=30
...
```

The terminal of `sfn-1` will print the real-time noise value.

```bash
2022-06-13 16:06:48.691 [sfn-1] got: tag=0x33, data=158.3, return: tag=0x34, data=158
2022-06-13 16:06:49.692 [sfn-1] got: tag=0x33, data=28.81, return: tag=0x34, data=28
2022-06-13 16:06:50.693 [sfn-1] got: tag=0x33, data=3.81, return: tag=0x34, data=3
...
```

The terminal of `sfn-2` will print the real-time noise value.

```bash
2022-06-13 16:06:48.692 [sfn-2] got: tag=0x34, data=158, return: tag=0x35, data=1580
2022-06-13 16:06:49.693 [sfn-2] got: tag=0x34, data=28, return: tag=0x35, data=280
2022-06-13 16:06:50.694 [sfn-2] got: tag=0x34, data=3, return: tag=0x35, data=30
...
```



70 changes: 70 additions & 0 deletions example/5-backflow/Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# https://taskfile.dev

version: "3"

output: "prefixed"

tasks:
run:
desc: run
deps: [zipper, source, sfn-1, sfn-2]
cmds:
- echo 'backflow example run'

# example cleanup
clean:
desc: clean
cmds:
- rm -rf ./bin

build:
desc: build source, sfns and zipper
deps: [source-build, sfn-1-build, sfn-2-build]
cmds:
- echo 'building done'

source:
desc: run source
deps: [source-build]
cmds:
- "./bin/source{{exeExt}}"
env:
YOMO_LOG_LEVEL: error

source-build:
desc: build source
cmds:
- "go build -o ./bin/source{{exeExt}} source/main.go"

sfn-1:
desc: run sfn-1
deps: [sfn-1-build]
cmds:
- "./bin/sfn-1{{exeExt}}"
env:
YOMO_LOG_LEVEL: error

sfn-1-build:
desc: build sfn-1
cmds:
- "go build -o ./bin/sfn-1{{exeExt}} sfn-1/main.go"

sfn-2:
desc: run sfn-2
deps: [sfn-2-build]
cmds:
- "./bin/sfn-2{{exeExt}}"
env:
YOMO_LOG_LEVEL: error

sfn-2-build:
desc: build sfn-2
cmds:
- "go build -o ./bin/sfn-2{{exeExt}} sfn-2/main.go"

zipper:
desc: run zipper
cmds:
- "yomo serve -c workflow.yaml"
env:
YOMO_LOG_LEVEL: error
59 changes: 59 additions & 0 deletions example/5-backflow/sfn-1/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"os"
"strconv"

"github.com/yomorun/yomo"
"github.com/yomorun/yomo/pkg/logger"
)

type noiseData struct {
Noise float32 `json:"noise"` // Noise value
Time int64 `json:"time"` // Timestamp (ms)
From string `json:"from"` // Source IP
}

func main() {
addr := "localhost:9000"
if v := os.Getenv("YOMO_ADDR"); v != "" {
addr = v
}
sfn := yomo.NewStreamFunction(
"sfn-1",
yomo.WithZipperAddr(addr),
yomo.WithObserveDataTags(0x33),
)
defer sfn.Close()

// set handler
sfn.SetHandler(handler)
// start
err := sfn.Connect()
if err != nil {
logger.Errorf("[sfn-1] connect err=%v", err)
os.Exit(1)
}
// set the error handler function when server error occurs
sfn.SetErrorHandler(func(err error) {
logger.Errorf("[sfn-1] receive server error: %v", err)
sfn.Close()
os.Exit(1)
})

select {}
}

func handler(data []byte) (byte, []byte) {
// got
noise, err := strconv.ParseFloat(string(data), 10)
if err != nil {
logger.Errorf("[sfn-1] got err=%v", err)
return 0x0, nil
}
// result
result := int(noise)
logger.Printf("[sfn-1] got: tag=0x33, data=%v, return: tag=0x34, data=%v", noise, result)

return 0x34, []byte(strconv.Itoa(result))
}

0 comments on commit 117577f

Please sign in to comment.