Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: client cannot write tag that within reserved range #815

Merged
merged 4 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ai/function_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// ReducerTag is the observed tag of the reducer
var ReducerTag uint32 = 0x61
var ReducerTag uint32 = 0xE001

// FunctionCall describes the data structure when invoking the sfn function
type FunctionCall struct {
Expand Down
2 changes: 2 additions & 0 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func TestConnectTo(t *testing.T) {
ClientTypeSource,
WithLogger(discardingLogger),
)
assert.Equal(t, "source", source.Name())
assert.Equal(t, source.clientID, source.ClientID())

_ = source.Connect(context.TODO())

Expand Down
12 changes: 12 additions & 0 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frame

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -174,6 +175,17 @@ type Writer interface {
WriteFrame(Frame) error
}

// ErrReservedTag is returned when write a reserved tag.
var ErrReservedTag = errors.New("[0xF000, 0xFFFF] is reserved; please do not write within this range")

// IsReservedTag returns error when write a reserved tag.
func IsReservedTag(tag Tag) error {
if tag >= 0xF000 && tag <= 0xFFFF {
return ErrReservedTag
}
return nil
}

// Listener accepts Conns.
type Listener interface {
// Accept accepts Conns.
Expand Down
8 changes: 6 additions & 2 deletions core/serverless/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func (c *Context) Write(tag uint32, data []byte) error {
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
mdBytes, err := c.md.Encode()
if err != nil {
return err
Expand All @@ -63,7 +65,9 @@ func (c *Context) WriteWithTarget(tag uint32, data []byte, target string) error
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
if target != "" {
c.md.Set(metadata.TargetKey, target)
}
Expand Down
8 changes: 6 additions & 2 deletions core/serverless/cron_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func (c *CronContext) Write(tag uint32, data []byte) error {
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: c.mdBytes,
Expand All @@ -43,7 +45,9 @@ func (c *CronContext) WriteWithTarget(tag uint32, data []byte, target string) er
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
if target == "" {
return c.Write(tag, data)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/quic-go/quic-go v0.43.1
github.com/reactivex/rxgo/v2 v2.5.0
github.com/robfig/cron/v3 v3.0.1
github.com/sashabaranov/go-openai v1.23.1
github.com/sashabaranov/go-openai v1.24.0
github.com/second-state/WasmEdge-go v0.13.4
github.com/shirou/gopsutil/v3 v3.24.2
github.com/spf13/cobra v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/sashabaranov/go-openai v1.23.1 h1:b2IsEG9+BdJ3f6G3gGu9Lon2Mw/C0aYqME3YzwBHcls=
github.com/sashabaranov/go-openai v1.23.1/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg=
github.com/sashabaranov/go-openai v1.24.0 h1:4H4Pg8Bl2RH/YSnU8DYumZbuHnnkfioor/dtNlB20D4=
github.com/sashabaranov/go-openai v1.24.0/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg=
github.com/second-state/WasmEdge-go v0.13.4 h1:NHfJC+aayUW93ydAzlcX7Jx1WDRpI24KvY5SAbeTyvY=
github.com/second-state/WasmEdge-go v0.13.4/go.mod h1:HyBf9hVj1sRAjklsjc1Yvs9b5RcmthPG9z99dY78TKg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down
7 changes: 5 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (s *yomoSource) Connect() error {

// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
tracer := trace.NewTracer("Source")
Expand Down Expand Up @@ -98,8 +101,8 @@ func (s *yomoSource) Write(tag uint32, data []byte) error {

// WritePayload writes `yomo.Payload` with specified tag.
func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
if data == nil {
return nil
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
Expand Down
7 changes: 7 additions & 0 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/ylog"
)

Expand Down Expand Up @@ -35,6 +36,9 @@ func TestSource(t *testing.T) {
err := source.Connect()
assert.Nil(t, err)

err = source.Write(0xF001, []byte("reserved tag"))
assert.Equal(t, frame.ErrReservedTag, err)

// send data to zipper from source
err = source.Write(0x23, []byte("pipe test"))
assert.Nil(t, err)
Expand All @@ -43,6 +47,9 @@ func TestSource(t *testing.T) {
err = source.Write(0x21, []byte("test"))
assert.Nil(t, err)

err = source.WriteWithTarget(0xF002, []byte("reserved tag"), mockTargetString)
assert.Equal(t, frame.ErrReservedTag, err)

err = source.WriteWithTarget(0x22, []byte("message from source"), mockTargetString)
assert.Nil(t, err)

Expand Down