Skip to content

Commit

Permalink
feat: Introduce two separate request/response Frisbee operations
Browse files Browse the repository at this point in the history
  • Loading branch information
pojntfx committed Oct 16, 2023
1 parent fac66c8 commit 53d6158
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 140 deletions.
90 changes: 48 additions & 42 deletions cmd/dudirekta-example-frisbee-client/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"log"
"net"
"os"
"time"

"github.com/loopholelabs/frisbee-go"
Expand All @@ -17,7 +15,8 @@ import (
)

const (
DUDIREKTA = uint16(10)
DUDIREKTA_REQUESTS = uint16(10)
DUDIREKTA_RESPONSES = uint16(11)
)

type local struct{}
Expand Down Expand Up @@ -66,57 +65,45 @@ func main() {
)

go func() {
log.Println(`Enter one of the following letters followed by <ENTER> to run a function on the remote(s):
- a: Increment remote counter by one
- b: Decrement remote counter by one`)

stdin := bufio.NewReader(os.Stdin)

for {
line, err := stdin.ReadString('\n')
if err != nil {
panic(err)
}
for _, peer := range registry.Peers() {
new, err := peer.Increment(ctx, 1)
if err != nil {
log.Println("Got error for Increment func:", err)

for peerID, peer := range registry.Peers() {
log.Println("Calling functions for peer with ID", peerID)

switch line {
case "a\n":
new, err := peer.Increment(ctx, 1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
case "b\n":
new, err := peer.Increment(ctx, -1)
if err != nil {
log.Println("Got error for Increment func:", err)
continue
}

continue
}
log.Println(new)

log.Println(new)
default:
log.Printf("Unknown letter %v, ignoring input", line)
new, err = peer.Increment(ctx, -1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
}
}
}()

handlers := make(frisbee.HandlerTable)

packets := make(chan []byte)
handlers[DUDIREKTA] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
requestPackets := make(chan []byte)
handlers[DUDIREKTA_REQUESTS] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
b := make([]byte, incoming.Metadata.ContentLength)
copy(b, incoming.Content.Bytes())
requestPackets <- b

return
}

responsePackets := make(chan []byte)
handlers[DUDIREKTA_RESPONSES] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
b := make([]byte, incoming.Metadata.ContentLength)
copy(b, incoming.Content.Bytes())
packets <- b
responsePackets <- b

return
}
Expand All @@ -138,14 +125,32 @@ func main() {
func(b []byte) error {
pkg := packet.Get()

pkg.Metadata.Operation = DUDIREKTA
pkg.Metadata.Operation = DUDIREKTA_REQUESTS
pkg.Content.Write(b)
pkg.Metadata.ContentLength = uint32(pkg.Content.Len())

return client.WritePacket(pkg)
},
func(b []byte) error {
pkg := packet.Get()

pkg.Metadata.Operation = DUDIREKTA_RESPONSES
pkg.Content.Write(b)
pkg.Metadata.ContentLength = uint32(pkg.Content.Len())

return client.WritePacket(pkg)
},

func() ([]byte, error) {
b, ok := <-requestPackets
if !ok {
return []byte{}, net.ErrClosed
}

return b, nil
},
func() ([]byte, error) {
b, ok := <-packets
b, ok := <-responsePackets
if !ok {
return []byte{}, net.ErrClosed
}
Expand All @@ -159,5 +164,6 @@ func main() {

<-client.CloseChannel()

close(packets)
close(requestPackets)
close(responsePackets)
}
122 changes: 80 additions & 42 deletions cmd/dudirekta-example-frisbee-server/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package main

import (
"bufio"
"context"
"flag"
"log"
"net"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -20,7 +18,8 @@ import (
type Key int

const (
DUDIREKTA = uint16(10)
DUDIREKTA_REQUESTS = uint16(10)
DUDIREKTA_RESPONSES = uint16(11)

ConnIDKey Key = iota
)
Expand Down Expand Up @@ -71,30 +70,10 @@ func main() {
)

go func() {
log.Println(`Enter one of the following letters followed by <ENTER> to run a function on the remote(s):
- a: Print "Hello, world!"`)

stdin := bufio.NewReader(os.Stdin)

for {
line, err := stdin.ReadString('\n')
if err != nil {
panic(err)
}

for peerID, peer := range registry.Peers() {
log.Println("Calling functions for peer with ID", peerID)

switch line {
case "a\n":
if err := peer.Println(ctx, "Hello, world!"); err != nil {
log.Println("Got error for Println func:", err)

continue
}
default:
log.Printf("Unknown letter %v, ignoring input", line)
for _, peer := range registry.Peers() {
if err := peer.Println(ctx, "Hello, world!"); err != nil {
log.Println("Got error for Println func:", err)

continue
}
Expand All @@ -104,20 +83,42 @@ func main() {

handlers := make(frisbee.HandlerTable)

var packetsLock sync.Mutex
packets := map[string]chan []byte{}
var requestPacketsLock sync.Mutex
requestPackets := map[string]chan []byte{}

handlers[DUDIREKTA_REQUESTS] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
connID := ctx.Value(ConnIDKey).(string)

requestPacketsLock.Lock()
p, ok := requestPackets[connID]
if !ok {
p = make(chan []byte)

requestPackets[connID] = p
}
requestPacketsLock.Unlock()

b := make([]byte, incoming.Metadata.ContentLength)
copy(b, incoming.Content.Bytes())
p <- b

handlers[DUDIREKTA] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
return
}

var responsePacketsLock sync.Mutex
responsePackets := map[string]chan []byte{}

handlers[DUDIREKTA_RESPONSES] = func(ctx context.Context, incoming *packet.Packet) (outgoing *packet.Packet, action frisbee.Action) {
connID := ctx.Value(ConnIDKey).(string)

packetsLock.Lock()
p, ok := packets[connID]
responsePacketsLock.Lock()
p, ok := responsePackets[connID]
if !ok {
p = make(chan []byte)

packets[connID] = p
responsePackets[connID] = p
}
packetsLock.Unlock()
responsePacketsLock.Unlock()

b := make([]byte, incoming.Metadata.ContentLength)
copy(b, incoming.Content.Bytes())
Expand All @@ -135,15 +136,25 @@ func main() {
server.SetOnClosed(func(a *frisbee.Async, err error) {
connID := a.RemoteAddr().String()

packetsLock.Lock()
defer packetsLock.Unlock()
requestPacketsLock.Lock()
defer requestPacketsLock.Unlock()

rqp, ok := requestPackets[connID]
if !ok {
return
}

close(rqp)

responsePacketsLock.Lock()
defer responsePacketsLock.Unlock()

p, ok := packets[connID]
rsp, ok := responsePackets[connID]
if !ok {
return
}

close(p)
close(rsp)
})

server.ConnContext = func(ctx context.Context, a *frisbee.Async) context.Context {
Expand All @@ -154,21 +165,48 @@ func main() {
func(b []byte) error {
pkg := packet.Get()

pkg.Metadata.Operation = DUDIREKTA
pkg.Metadata.Operation = DUDIREKTA_REQUESTS
pkg.Content.Write(b)
pkg.Metadata.ContentLength = uint32(pkg.Content.Len())

return a.WritePacket(pkg)
},
func(b []byte) error {
pkg := packet.Get()

pkg.Metadata.Operation = DUDIREKTA_RESPONSES
pkg.Content.Write(b)
pkg.Metadata.ContentLength = uint32(pkg.Content.Len())

return a.WritePacket(pkg)
},

func() ([]byte, error) {
requestPacketsLock.Lock()
p, ok := requestPackets[connID]
if !ok {
p = make(chan []byte)

requestPackets[connID] = p
}
requestPacketsLock.Unlock()

b, ok := <-p
if !ok {
return []byte{}, net.ErrClosed
}

return b, nil
},
func() ([]byte, error) {
packetsLock.Lock()
p, ok := packets[connID]
responsePacketsLock.Lock()
p, ok := responsePackets[connID]
if !ok {
p = make(chan []byte)

packets[connID] = p
responsePackets[connID] = p
}
packetsLock.Unlock()
responsePacketsLock.Unlock()

b, ok := <-p
if !ok {
Expand Down
Loading

0 comments on commit 53d6158

Please sign in to comment.