Skip to content

Commit

Permalink
docs: Add examples for callbacks back
Browse files Browse the repository at this point in the history
  • Loading branch information
pojntfx committed May 6, 2023
1 parent 9aa3edc commit 3760bf9
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 8 deletions.
118 changes: 118 additions & 0 deletions cmd/dudirekta-example-callbacks-callee/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"sync/atomic"
"time"

"github.com/pojntfx/dudirekta/pkg/rpc"
)

type local struct {
counter int64

Peers func() map[string]remote
}

func (s *local) Increment(ctx context.Context, delta int64) (int64, error) {
peerID := rpc.GetRemoteID(ctx)

for candidateIP, peer := range s.Peers() {
if candidateIP == peerID {
peer.Println(ctx, fmt.Sprintf("Incrementing counter by %v", delta))
}
}

return atomic.AddInt64(&s.counter, delta), nil
}

type remote struct {
Println func(ctx context.Context, msg string) error
}

func main() {
addr := flag.String("addr", "localhost:1337", "Listen or remote address")
listen := flag.Bool("listen", true, "Whether to allow connecting to peers by listening or dialing")

flag.Parse()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

service := &local{}

clients := 0
registry := rpc.NewRegistry(
service,
remote{},

time.Second*10,
ctx,
&rpc.Options{
ResponseBufferLen: rpc.DefaultResponseBufferLen,
OnClientConnect: func(remoteID string) {
clients++

log.Printf("%v clients connected", clients)
},
OnClientDisconnect: func(remoteID string) {
clients--

log.Printf("%v clients connected", clients)
},
},
)
service.Peers = registry.Peers

if *listen {
lis, err := net.Listen("tcp", *addr)
if err != nil {
panic(err)
}
defer lis.Close()

log.Println("Listening on", lis.Addr())

for {
func() {
conn, err := lis.Accept()
if err != nil {
log.Println("could not accept connection, continuing:", err)

return
}

go func() {

defer func() {
_ = conn.Close()

if err := recover(); err != nil {
log.Printf("Client disconnected with error: %v", err)
}
}()

if err := registry.Link(conn); err != nil {
panic(err)
}
}()
}()
}
} else {
conn, err := net.Dial("tcp", *addr)
if err != nil {
panic(err)
}
defer conn.Close()

log.Println("Connected to", conn.RemoteAddr())

if err := registry.Link(conn); err != nil {
panic(err)
}
}
}
150 changes: 150 additions & 0 deletions cmd/dudirekta-example-callbacks-caller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"bufio"
"context"
"flag"
"log"
"net"
"os"
"time"

"github.com/pojntfx/dudirekta/pkg/rpc"
)

type local struct{}

func (s *local) Println(ctx context.Context, msg string) error {
log.Printf("Printing message for peer with ID %v: %v", rpc.GetRemoteID(ctx), msg)

return nil
}

type remote struct {
Increment func(ctx context.Context, delta int64) (int64, error)
}

func main() {
addr := flag.String("addr", "localhost:1337", "Listen or remote address")
listen := flag.Bool("listen", false, "Whether to allow connecting to peers by listening or dialing")

flag.Parse()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clients := 0

registry := rpc.NewRegistry(
&local{},
remote{},

time.Second*10,
ctx,
&rpc.Options{
ResponseBufferLen: rpc.DefaultResponseBufferLen,
OnClientConnect: func(remoteID string) {
clients++

log.Printf("%v clients connected", clients)
},
OnClientDisconnect: func(remoteID string) {
clients--

log.Printf("%v clients connected", clients)
},
},
)

go func() {
log.Println(`Enter one of the following letters followed by <ENTER> to run a function on the remote(s):
- a: Increment remote counter by one
- b: Decrement remote counter by one`)

stdin := bufio.NewReader(os.Stdin)

for {
line, err := stdin.ReadString('\n')
if err != nil {
panic(err)
}

for peerID, peer := range registry.Peers() {
log.Println("Calling functions for peer with ID", peerID)

switch line {
case "a\n":
new, err := peer.Increment(ctx, 1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
case "b\n":
new, err := peer.Increment(ctx, -1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
default:
log.Printf("Unknown letter %v, ignoring input", line)

continue
}
}
}
}()

if *listen {
lis, err := net.Listen("tcp", *addr)
if err != nil {
panic(err)
}
defer lis.Close()

log.Println("Listening on", lis.Addr())

for {
func() {
conn, err := lis.Accept()
if err != nil {
log.Println("could not accept connection, continuing:", err)

return
}

go func() {
defer func() {
_ = conn.Close()

if err := recover(); err != nil {
log.Printf("Client disconnected with error: %v", err)
}
}()

if err := registry.Link(conn); err != nil {
panic(err)
}
}()
}()
}
} else {
conn, err := net.Dial("tcp", *addr)
if err != nil {
panic(err)
}
defer conn.Close()

log.Println("Connected to", conn.RemoteAddr())

if err := registry.Link(conn); err != nil {
panic(err)
}
}
}
8 changes: 0 additions & 8 deletions cmd/dudirekta-example-closures-callee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,6 @@ func main() {
},
)

lis, err := net.Listen("tcp", *addr)
if err != nil {
panic(err)
}
defer lis.Close()

log.Println("Listening on", lis.Addr())

if *listen {
lis, err := net.Listen("tcp", *addr)
if err != nil {
Expand Down

0 comments on commit 3760bf9

Please sign in to comment.