Skip to content

Commit

Permalink
hashsync: convert from chunked streams to normal streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed Mar 24, 2024
1 parent b60079a commit f8a021f
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 296 deletions.
271 changes: 93 additions & 178 deletions hashsync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -38,6 +37,7 @@ func (m *decodedItemBatchMessage) Keys() []Ordered {
}
return r
}

func (m *decodedItemBatchMessage) Values() []any {
r := make([]any, len(m.ContentValues))
for n, v := range m.ContentValues {

Check failure on line 43 in hashsync/handler.go

View workflow job for this annotation

GitHub Actions / lint

should use copy(to, from) instead of a loop (S1001)
Expand Down Expand Up @@ -80,136 +80,86 @@ func decodeItemBatchMessage(m *ItemBatchMessage, newValue NewValueFunc) (*decode
type conduitState int

Check failure on line 80 in hashsync/handler.go

View workflow job for this annotation

GitHub Actions / lint

type conduitState is unused (U1000)

type wireConduit struct {
i server.Interactor
pendingMsgs []SyncMessage
initReqBuf *bytes.Buffer
newValue NewValueFunc
stream io.ReadWriter
initReqBuf *bytes.Buffer
newValue NewValueFunc
// rmmePrint bool
}

var _ Conduit = &wireConduit{}

func (c *wireConduit) reset() {
c.pendingMsgs = nil
}

// receive receives a single frame from the Interactor and decodes one
// or more SyncMessages from it. The frames contain just one message
// except for the initial frame which may contain multiple messages
// b/c of the way Server handles the initial request
func (c *wireConduit) receive() (msgs []SyncMessage, err error) {
data, err := c.i.Receive()
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, errors.New("zero length sync message")
// NextMessage implements Conduit.
func (c *wireConduit) NextMessage() (SyncMessage, error) {
var b [1]byte
if _, err := io.ReadFull(c.stream, b[:]); err != nil {
if !errors.Is(err, io.EOF) {
return nil, err

Check warning on line 96 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}
return nil, nil

Check warning on line 98 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L98

Added line #L98 was not covered by tests
}
b := bytes.NewBuffer(data)
for {
code, err := b.ReadByte()
mtype := MessageType(b[0])
// fmt.Fprintf(os.Stderr, "QQQQQ: wireConduit: receive message type %s\n", mtype)
switch mtype {
case MessageTypeDone:
return &DoneMessage{}, nil
case MessageTypeEndRound:
return &EndRoundMessage{}, nil
case MessageTypeItemBatch:
var m ItemBatchMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err

Check warning on line 110 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L110

Added line #L110 was not covered by tests
}
dm, err := decodeItemBatchMessage(&m, c.newValue)
if err != nil {
if !errors.Is(err, io.EOF) {
// this shouldn't really happen
return nil, err
}
// fmt.Fprintf(os.Stderr, "QQQQQ: wireConduit: decoded msgs: %#v\n", msgs)
return msgs, nil
return nil, err

Check warning on line 114 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L114

Added line #L114 was not covered by tests
}
mtype := MessageType(code)
// fmt.Fprintf(os.Stderr, "QQQQQ: wireConduit: receive message type %s\n", mtype)
switch mtype {
case MessageTypeDone:
msgs = append(msgs, &DoneMessage{})
case MessageTypeEndRound:
msgs = append(msgs, &EndRoundMessage{})
case MessageTypeItemBatch:
var m ItemBatchMessage
if _, err := codec.DecodeFrom(b, &m); err != nil {
return nil, err
}
dm, err := decodeItemBatchMessage(&m, c.newValue)
if err != nil {
return nil, err
}
msgs = append(msgs, dm)
case MessageTypeEmptySet:
msgs = append(msgs, &EmptySetMessage{})
case MessageTypeEmptyRange:
var m EmptyRangeMessage
if _, err := codec.DecodeFrom(b, &m); err != nil {
return nil, err
}
msgs = append(msgs, &m)
case MessageTypeFingerprint:
var m FingerprintMessage
if _, err := codec.DecodeFrom(b, &m); err != nil {
return nil, err
}
msgs = append(msgs, &m)
case MessageTypeRangeContents:
var m RangeContentsMessage
if _, err := codec.DecodeFrom(b, &m); err != nil {
return nil, err
}
msgs = append(msgs, &m)
case MessageTypeQuery:
var m QueryMessage
if _, err := codec.DecodeFrom(b, &m); err != nil {
return nil, err
}
msgs = append(msgs, &m)
default:
return nil, fmt.Errorf("invalid message code %02x", code)
return dm, nil
case MessageTypeEmptySet:
return &EmptySetMessage{}, nil

Check warning on line 118 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L117-L118

Added lines #L117 - L118 were not covered by tests
case MessageTypeEmptyRange:
var m EmptyRangeMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err

Check warning on line 122 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L122

Added line #L122 was not covered by tests
}
return &m, nil
case MessageTypeFingerprint:
var m FingerprintMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err

Check warning on line 128 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L128

Added line #L128 was not covered by tests
}
return &m, nil
case MessageTypeRangeContents:
var m RangeContentsMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err

Check warning on line 134 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L134

Added line #L134 was not covered by tests
}
return &m, nil
case MessageTypeQuery:
var m QueryMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err

Check warning on line 140 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L140

Added line #L140 was not covered by tests
}
return &m, nil
default:
return nil, fmt.Errorf("invalid message code %02x", b[0])

Check warning on line 144 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L143-L144

Added lines #L143 - L144 were not covered by tests
}
}

func (c *wireConduit) send(m sendable) error {
// fmt.Fprintf(os.Stderr, "QQQQQ: wireConduit: sending %s m %#v\n", m.Type(), m)
msg := []byte{byte(m.Type())}
// if c.rmmePrint {
// fmt.Fprintf(os.Stderr, "QQQQQ: send: %s\n", SyncMessageToString(m))
// }
encoded, err := codec.Encode(m)
if err != nil {
return fmt.Errorf("error encoding %T: %w", m, err)
}
msg = append(msg, encoded...)
var stream io.Writer
if c.initReqBuf != nil {
c.initReqBuf.Write(msg)
stream = c.initReqBuf
} else if c.stream == nil {
panic("BUG: wireConduit: no stream")

Check warning on line 153 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L153

Added line #L153 was not covered by tests
} else {
if err := c.i.Send(msg); err != nil {
return err
}
}
return nil
}

// NextMessage implements Conduit.
func (c *wireConduit) NextMessage() (SyncMessage, error) {
if len(c.pendingMsgs) != 0 {
m := c.pendingMsgs[0]
c.pendingMsgs = c.pendingMsgs[1:]
// if c.rmmePrint {
// fmt.Fprintf(os.Stderr, "QQQQQ: recv: %s\n", SyncMessageToString(m))
// }
return m, nil
stream = c.stream
}

msgs, err := c.receive()
if err != nil {
return nil, err
}
if len(msgs) == 0 {
return nil, nil
b := []byte{byte(m.Type())}
if _, err := stream.Write(b); err != nil {
return err

Check warning on line 159 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L159

Added line #L159 was not covered by tests
}

c.pendingMsgs = msgs[1:]
// if c.rmmePrint {
// fmt.Fprintf(os.Stderr, "QQQQQ: recv: %s\n", SyncMessageToString(msgs[0]))
// }
return msgs[0], nil
_, err := codec.EncodeTo(stream, m)
return err
}

func (c *wireConduit) SendFingerprint(x, y Ordered, fingerprint any, count int) error {
Expand Down Expand Up @@ -289,37 +239,32 @@ func (c *wireConduit) withInitialRequest(toCall func(Conduit) error) ([]byte, er
return c.initReqBuf.Bytes(), nil
}

func makeHandler(rsr *RangeSetReconciler, c *wireConduit, done chan struct{}) server.InteractiveHandler {
return func(ctx context.Context, i server.Interactor) (time.Duration, error) {
defer func() {
if done != nil {
close(done)
}
}()
c.i = i
for {
c.reset()
// Process() will receive all items and messages from the peer
syncDone, err := rsr.Process(c)
if err != nil {
// do not close done if we're returning an
// error, as the channel will be closed in the
// error handler func
done = nil
return 0, err
} else if syncDone {
return 0, nil
}
func (c *wireConduit) handleStream(stream io.ReadWriter, rsr *RangeSetReconciler) error {
c.stream = stream
for {
// Process() will receive all items and messages from the peer
syncDone, err := rsr.Process(c)
if err != nil {
return err

Check warning on line 248 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L248

Added line #L248 was not covered by tests
} else if syncDone {
return nil
}
}
}

func MakeServerHandler(is ItemStore, opts ...Option) server.InteractiveHandler {
return func(ctx context.Context, i server.Interactor) (time.Duration, error) {
func MakeServerHandler(is ItemStore, opts ...Option) server.StreamHandler {
return func(ctx context.Context, req []byte, stream io.ReadWriter) error {
c := wireConduit{newValue: is.New}
rsr := NewRangeSetReconciler(is, opts...)
h := makeHandler(rsr, &c, nil)
return h(ctx, i)
s := struct {
io.Reader
io.Writer
}{
// prepend the received request to data being read
Reader: io.MultiReader(bytes.NewBuffer(req), stream),
Writer: stream,
}
return c.handleStream(s, rsr)
}
}

Expand Down Expand Up @@ -349,22 +294,9 @@ func syncStore(ctx context.Context, r requester, peer p2p.Peer, is ItemStore, x,
if err != nil {
return err

Check warning on line 295 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L295

Added line #L295 was not covered by tests
}
done := make(chan struct{}, 1)
h := makeHandler(rsr, &c, done)
var reqErr error
if err = r.InteractiveRequest(ctx, peer, initReq, h, func(err error) {
reqErr = err
close(done)
}); err != nil {
return err
}
select {
case <-ctx.Done():
<-done
return ctx.Err()
case <-done:
return reqErr
}
return r.StreamRequest(ctx, peer, initReq, func(ctx context.Context, stream io.ReadWriter) error {
return c.handleStream(stream, rsr)
})
}

func Probe(ctx context.Context, r requester, peer p2p.Peer, opts ...Option) (fp any, count int, err error) {
Expand Down Expand Up @@ -394,33 +326,16 @@ func boundedProbe(ctx context.Context, r requester, peer p2p.Peer, x, y *types.H
if err != nil {
return nil, 0, err

Check warning on line 327 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L327

Added line #L327 was not covered by tests
}
done := make(chan struct{}, 2)
h := func(ctx context.Context, i server.Interactor) (time.Duration, error) {
defer func() {
done <- struct{}{}
}()
c.i = i
err = r.StreamRequest(ctx, peer, initReq, func(ctx context.Context, stream io.ReadWriter) error {
c.stream = stream
var err error
fp, count, err = rsr.HandleProbeResponse(&c)
return 0, err
}
var reqErr error
if err = r.InteractiveRequest(ctx, peer, initReq, h, func(err error) {
reqErr = err
done <- struct{}{}
}); err != nil {
return err
})
if err != nil {
return nil, 0, err

Check warning on line 336 in hashsync/handler.go

View check run for this annotation

Codecov / codecov/patch

hashsync/handler.go#L336

Added line #L336 was not covered by tests
}
select {
case <-ctx.Done():
<-done
return nil, 0, ctx.Err()
case <-done:
if reqErr != nil {
return nil, 0, reqErr
}
return fp, count, nil
}
return fp, count, nil
}

// TODO: request duration
Expand Down

0 comments on commit f8a021f

Please sign in to comment.