Skip to content

Commit

Permalink
Syncer Fixes (#67)
Browse files Browse the repository at this point in the history
* feat: Add ability to query n random elements from sorted array of IDs

* chore: Add documentation

* fix: update Syncer to send sync requests in parallel

* fix: add wiggle timeout to allow redundant pending sync messages

After the recption of a successful sync message, the pendingContent
struct is deleted but a timeout is used before denying the content so
that any additional sync requests that are still pending can be fullfilled
respectfully. This allows honest nodes to still fulfill sync requests
without their connection being dropped

* fix: add wiggle timeout and following call to deny in goroutine so that call to Sync remains non-blocking

* test: Add test for parallel syncing from multiple peers
  • Loading branch information
Rahul Ghangas committed Feb 18, 2021
1 parent 61a5848 commit 9345629
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 40 deletions.
16 changes: 8 additions & 8 deletions peer/opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

type SyncerOptions struct {
Logger *zap.Logger
Alpha int
Timeout time.Duration
Logger *zap.Logger
Alpha int
WiggleTimeout time.Duration
}

func DefaultSyncerOptions() SyncerOptions {
Expand All @@ -19,9 +19,9 @@ func DefaultSyncerOptions() SyncerOptions {
panic(err)
}
return SyncerOptions{
Logger: logger,
Alpha: DefaultAlpha,
Timeout: DefaultTimeout,
Logger: logger,
Alpha: DefaultAlpha,
WiggleTimeout: DefaultTimeout,
}
}

Expand All @@ -35,8 +35,8 @@ func (opts SyncerOptions) WithAlpha(alpha int) SyncerOptions {
return opts
}

func (opts SyncerOptions) WithTimeout(timeout time.Duration) SyncerOptions {
opts.Timeout = timeout
func (opts SyncerOptions) WithWiggleTimeout(timeout time.Duration) SyncerOptions {
opts.WiggleTimeout = timeout
return opts
}

Expand Down
55 changes: 24 additions & 31 deletions peer/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package peer

import (
"context"
"encoding/base64"
"fmt"
"sync"
"time"

"github.com/renproject/aw/channel"
"github.com/renproject/aw/transport"
Expand Down Expand Up @@ -82,20 +82,25 @@ func (syncer *Syncer) Sync(ctx context.Context, contentID []byte, hint *id.Signa
}
syncer.pendingMu.Unlock()

// Allow synchronisation messages for the content ID. This is required in
// order for channel to not filter inbound content (of unknown size). At the
// end of the method, we Deny the content ID again, un-doing the Allow and
// blocking content again.
syncer.filter.Allow(contentID)
defer func() {
go func() {
time.Sleep(syncer.opts.WiggleTimeout)
syncer.filter.Deny(contentID)
}()
}()

// Ensure that pending content is removed.
defer func() {
syncer.pendingMu.Lock()
delete(syncer.pending, string(contentID))
syncer.pendingMu.Unlock()
}()

// Allow synchronisation messages for the content ID. This is required in
// order for channel to not filter inbound content (of unknown size). At the
// end of the method, we Deny the content ID again, un-doing the Allow and
// blocking content again.
syncer.filter.Allow(contentID)
defer syncer.filter.Deny(contentID)

if ok {
select {
case <-ctx.Done():
Expand All @@ -108,47 +113,35 @@ func (syncer *Syncer) Sync(ctx context.Context, contentID []byte, hint *id.Signa
// Get addresses close to our address. We will iterate over these addresses
// in order and attempt to synchronise content by sending them pull
// messages.
peers := syncer.transport.Table().Peers(syncer.opts.Alpha)
peers := syncer.transport.Table().RandomPeers(syncer.opts.Alpha)
if hint != nil {
peers = append([]id.Signatory{*hint}, peers...)
}

for _, peer := range peers {

for i := range peers {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

content, err := func() ([]byte, error) {
innerCtx, innerCancel := context.WithTimeout(ctx, syncer.opts.Timeout)
defer innerCancel()

err := syncer.transport.Send(innerCtx, peer, wire.Msg{
p := peers[i]
go func() {
err := syncer.transport.Send(ctx, p, wire.Msg{
Version: wire.MsgVersion1,
Type: wire.MsgTypePull,
Data: contentID,
})
if err != nil {
return nil, fmt.Errorf("pulling: %v", err)
}

select {
case <-innerCtx.Done():
return nil, innerCtx.Err()
case content := <-pending.wait():
return content, nil
syncer.opts.Logger.Debug("sync", zap.String("peer", p.String()), zap.Error(fmt.Errorf("pulling: %v", err)))
}
}()
if err != nil {
syncer.opts.Logger.Debug("sync", zap.String("peer", peer.String()), zap.Error(err))
continue
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case content := <-pending.wait():
return content, nil
}

return nil, fmt.Errorf("content not found: %v", base64.RawURLEncoding.EncodeToString(contentID))
}

func (syncer *Syncer) DidReceiveMessage(from id.Signatory, msg wire.Msg) error {
Expand Down
60 changes: 59 additions & 1 deletion peer/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("Peer", func() {
opts, peers, tables, contentResolvers, _, transports := setup(n)

for i := range opts {
opts[i].SyncerOptions = opts[i].SyncerOptions.WithTimeout(2 * time.Second)
opts[i].SyncerOptions = opts[i].SyncerOptions.WithWiggleTimeout(2 * time.Second)
peers[i] = peer.New(
opts[i],
transports[i])
Expand Down Expand Up @@ -55,4 +55,62 @@ var _ = Describe("Peer", func() {
Ω(msg).To(Equal([]byte(helloMsg)))
})
})

Context("when getting a successful sync response on sending multiple parallel sync requests", func() {
It("should not drop connections for additional sync responses", func() {

n := 5
opts, peers, tables, contentResolvers, _, transports := setup(n)

for i := range opts {
opts[i].SyncerOptions = opts[i].SyncerOptions.WithWiggleTimeout(2 * time.Second)
peers[i] = peer.New(
opts[i],
transports[i])
peers[i].Resolve(context.Background(), contentResolvers[i])
}

for i := range peers {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go peers[i].Run(ctx)

for j := range peers {
if i != j {
tables[i].AddPeer(opts[j].PrivKey.Signatory(),
wire.NewUnsignedAddress(wire.TCP,
fmt.Sprintf("%v:%v", "localhost", uint16(3333+j)), uint64(time.Now().UnixNano())))
}
helloMsg := fmt.Sprintf("Hello from peer %d", j)
contentID := id.NewHash([]byte(helloMsg))
contentResolvers[i].InsertContent(contentID[:], []byte(helloMsg))
}
}

for i := range peers {
for j := range peers {
helloMsg := fmt.Sprintf("Hello from peer %d", j)
contentID := id.NewHash([]byte(helloMsg))

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
msg, err := peers[i].Sync(ctx, contentID[:], nil)

for {
if err == nil {
break
}
select {
case <-ctx.Done():
break
default:
msg, err = peers[i].Sync(ctx, contentID[:], nil)
}
}

Ω(msg).To(Equal([]byte(helloMsg)))
}
}
})
})
})

0 comments on commit 9345629

Please sign in to comment.