Skip to content

Commit

Permalink
feat: Add basic implementation of callee callback transparency
Browse files Browse the repository at this point in the history
  • Loading branch information
pojntfx committed May 6, 2023
1 parent d722e89 commit ae85d9b
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 143 deletions.
17 changes: 4 additions & 13 deletions cmd/dudirekta-example-closures-callee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"time"

"github.com/pojntfx/dudirekta/pkg/closures"
"github.com/pojntfx/dudirekta/pkg/rpc"
)

Expand All @@ -18,26 +17,18 @@ type local struct {
func (s *local) Iterate(
ctx context.Context,
length int,
onIterationClosureID string,
onIteration func(ctx context.Context, i int) error,
) (int, error) {
peerID := rpc.GetRemoteID(ctx)

for i := 0; i < length; i++ {
for candidateIP, peer := range s.Peers() {
if candidateIP == peerID {
if _, err := peer.CallClosure(ctx, onIterationClosureID, []interface{}{i}); err != nil {
return -1, err
}
}
if err := onIteration(ctx, i); err != nil {
return -1, err
}
}

return length, nil
}

type remote struct {
CallClosure closures.CallClosureType
}
type remote struct{}

func main() {
addr := flag.String("addr", ":1337", "Listen address")
Expand Down
8 changes: 4 additions & 4 deletions cmd/dudirekta-example-closures-caller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type remote struct {
Iterate func(
ctx context.Context,
length int,
onIteration func(i int) error,
onIteration func(ctx context.Context, i int) error,
) (int, error)
}

func Iterate(callee remote, ctx context.Context, length int, onIteration func(i int) error) (int, error) {
func Iterate(callee remote, ctx context.Context, length int, onIteration func(ctx context.Context, i int) error) (int, error) {
return callee.Iterate(ctx, length, onIteration)
}

Expand Down Expand Up @@ -73,7 +73,7 @@ func main() {
for _, peer := range registry.Peers() {
switch line {
case "a\n":
length, err := Iterate(peer, ctx, 5, func(i int) error {
length, err := Iterate(peer, ctx, 5, func(ctx context.Context, i int) error {
log.Println("In iteration", i)

return nil
Expand All @@ -86,7 +86,7 @@ func main() {

log.Println(length)
case "b\n":
length, err := Iterate(peer, ctx, 10, func(i int) error {
length, err := Iterate(peer, ctx, 10, func(ctx context.Context, i int) error {
log.Println("In iteration", i)

return nil
Expand Down
19 changes: 17 additions & 2 deletions pkg/closures/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
)

var (
errorType = reflect.TypeOf((*error)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()

ErrInvalidReturn = errors.New("invalid return, can only return an error or a value and an error")
ErrNotAFunction = errors.New("not a function")
ErrInvalidArgsCount = errors.New("invalid argument count")
ErrInvalidArg = errors.New("invalid argument")
ErrClosureDoesNotExist = errors.New("closure does not exist")
ErrInvalidArgs = errors.New("invalid arguments, first argument needs to be a context.Context")
)

type (
Expand All @@ -38,6 +40,14 @@ func createClosure(fn interface{}) (func(args ...interface{}) (interface{}, erro
return nil, ErrInvalidReturn
}

if functionType.NumIn() < 1 {
return nil, ErrInvalidArgs
}

if !functionType.In(0).Implements(contextType) {
return nil, ErrInvalidArgs
}

return func(args ...interface{}) (interface{}, error) {
if len(args) != functionType.NumIn() {
return nil, ErrInvalidArgsCount
Expand All @@ -49,7 +59,12 @@ func createClosure(fn interface{}) (func(args ...interface{}) (interface{}, erro
if argType.ConvertibleTo(functionType.In(i)) {
in[i] = reflect.ValueOf(arg).Convert(functionType.In(i))
} else {
return nil, ErrInvalidArg
// TODO: Drop this and the whole "there has to be a context in the callback function" requirementt
if functionType.In(i).Implements(contextType) {
in[i] = reflect.ValueOf(context.Background())
} else {
return nil, ErrInvalidArg
}
}
} else {
in[i] = reflect.ValueOf(arg)
Expand Down
Loading

0 comments on commit ae85d9b

Please sign in to comment.