Skip to content

Commit

Permalink
refactor: Adding logging
Browse files Browse the repository at this point in the history
  • Loading branch information
pojntfx committed Oct 13, 2023
1 parent ad05b22 commit e3fa42c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 57 deletions.
40 changes: 5 additions & 35 deletions cmd/dudirekta-example-tcp-client/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"log"
"net"
"os"
"time"

"github.com/pojntfx/dudirekta/pkg/rpc"
Expand Down Expand Up @@ -60,46 +58,18 @@ func main() {
)

go func() {
log.Println(`Enter one of the following letters followed by <ENTER> to run a function on the remote(s):
- a: Increment remote counter by one
- b: Decrement remote counter by one`)

stdin := bufio.NewReader(os.Stdin)

for {
line, err := stdin.ReadString('\n')
if err != nil {
panic(err)
}

for peerID, peer := range registry.Peers() {
log.Println("Calling functions for peer with ID", peerID)

switch line {
case "a\n":
new, err := peer.Increment(ctx, 1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
case "b\n":
new, err := peer.Increment(ctx, -1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

log.Println(new)
default:
log.Printf("Unknown letter %v, ignoring input", line)
_, err := peer.Increment(ctx, 1)
if err != nil {
log.Println("Got error for Increment func:", err)

continue
}

time.Sleep(time.Millisecond * 10)
}
}
}()
Expand Down
26 changes: 4 additions & 22 deletions cmd/dudirekta-example-tcp-server/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package main

import (
"bufio"
"context"
"flag"
"log"
"net"
"os"
"sync/atomic"
"time"

Expand Down Expand Up @@ -60,33 +58,17 @@ func main() {
)

go func() {
log.Println(`Enter one of the following letters followed by <ENTER> to run a function on the remote(s):
- a: Print "Hello, world!"`)

stdin := bufio.NewReader(os.Stdin)

for {
line, err := stdin.ReadString('\n')
if err != nil {
panic(err)
}

for peerID, peer := range registry.Peers() {
log.Println("Calling functions for peer with ID", peerID)

switch line {
case "a\n":
if err := peer.Println(ctx, "Hello, world!"); err != nil {
log.Println("Got error for Println func:", err)

continue
}
default:
log.Printf("Unknown letter %v, ignoring input", line)
if err := peer.Println(ctx, "Hello, world!"); err != nil {
log.Println("Got error for Println func:", err)

continue
}

time.Sleep(time.Millisecond * 10)
}
}
}()
Expand Down
33 changes: 33 additions & 0 deletions pkg/rpc/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"io"
"log"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -149,6 +150,8 @@ func (r Registry[R]) makeRPC(
case <-t.C:
t.Stop()

log.Println("Timed out RPC", name, callID)

res <- response{"", nil, ErrCallTimedOut, true}

return
Expand All @@ -157,6 +160,8 @@ func (r Registry[R]) makeRPC(
return
}

log.Println("Received response", name, callID)

if msg.id == callID {
res <- msg

Expand All @@ -166,12 +171,16 @@ func (r Registry[R]) makeRPC(
}
}()

log.Println("Sending request", name, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent request", name, callID)

returnValues := []reflect.Value{}
select {
case rawReturnValue := <-res:
Expand Down Expand Up @@ -291,6 +300,8 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
d := json.NewDecoder(conn)

for {
log.Println("Receiving requests")

var res []json.RawMessage
if err := d.Decode(&res); err != nil {
errs <- err
Expand Down Expand Up @@ -327,6 +338,8 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Received request", functionName, callID)

var functionArgs []json.RawMessage
if err := json.Unmarshal(res[3], &functionArgs); err != nil {
errs <- err
Expand Down Expand Up @@ -438,11 +451,15 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Sending response", functionName, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent response", functionName, callID)
case 1:
if res[0].Type().Implements(errorType) && !res[0].IsNil() {
b, err := json.Marshal([]interface{}{false, callID, nil, res[0].Interface().(error).Error()})
Expand All @@ -452,11 +469,15 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Sending response", functionName, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent response", functionName, callID)
} else {
v, err := json.Marshal(res[0].Interface())
if err != nil {
Expand All @@ -472,11 +493,15 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Sending response", functionName, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent response", functionName, callID)
}
case 2:
v, err := json.Marshal(res[0].Interface())
Expand All @@ -494,11 +519,15 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Sending response", functionName, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent response", functionName, callID)
} else {
b, err := json.Marshal([]interface{}{false, callID, json.RawMessage(string(v)), res[1].Interface().(error).Error()})
if err != nil {
Expand All @@ -507,11 +536,15 @@ func (r Registry[R]) Link(conn io.ReadWriteCloser) error {
return
}

log.Println("Sending response", functionName, callID)

if _, err := conn.Write(b); err != nil {
errs <- err

return
}

log.Println("Sent response", functionName, callID)
}
}
}()
Expand Down

0 comments on commit e3fa42c

Please sign in to comment.