Skip to content

Commit

Permalink
collect latency and failure from every request and adjust latency bas…
Browse files Browse the repository at this point in the history
…ed on response size (#5601)

closes: #5265

previous implementation was incomplete and had two flaws. it will not account for the difference in response size, larger payloads will naturally take more time to complete. it would skew peer selection logic towards peers that are slower to respond.

the other flaw was to use only hs/1 protocol for measuing latency, often peers are blocked on ax/1 (where we get a collection of ids, which is large and slow) and lack of prioritization there may cause inefficient retries.
  • Loading branch information
dshulyak committed Feb 27, 2024
1 parent 75f021e commit 1d536c8
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ configuration is as follows:
If you are not using a remote post service you do not need to adjust anything. If you are using a remote setup
make sure your post service now connects to `grpc-post-listener` instead of `grpc-private-listener`. If you are
connecting to a remote post service over the internet we strongly recommend using mTLS via `grpc-tls-listener`.
* [#5601](https://github.com/spacemeshos/go-spacemesh/pull/5601) measure latency from all requests in sync
This improves peers selection logic, mainly to prevent asking slow peers for collection of atxs, which often blocks sync.

* [5602](https://github.com/spacemeshos/go-spacemesh/pull/5602) Optimize client side of fetcher to avoid encoding when
not needed.
Expand Down
25 changes: 17 additions & 8 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,22 @@ func (f *Fetch) loop() {
}
}

func (f *Fetch) meteredRequest(
ctx context.Context,
protocol string,
peer p2p.Peer,
req []byte,
) ([]byte, error) {
start := time.Now()
resp, err := f.servers[protocol].Request(ctx, peer, req)
if err != nil {
f.peers.OnFailure(peer)
} else {
f.peers.OnLatency(peer, len(resp), time.Since(start))
}
return resp, err
}

// receive Data from message server and call response handlers accordingly.
func (f *Fetch) receiveResponse(data []byte, batch *batchInfo) {
if f.stopped() {
Expand Down Expand Up @@ -656,15 +672,8 @@ func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) ([]byte, error) {
// Request is synchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
req := codec.MustEncode(&batch.RequestBatch)
data, err := f.servers[hashProtocol].Request(f.shutdownCtx, peer, req)
if err != nil {
f.peers.OnFailure(peer)
return nil, err
}
f.peers.OnLatency(peer, time.Since(start))
return data, nil
return f.meteredRequest(f.shutdownCtx, hashProtocol, peer, req)
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand Down
38 changes: 11 additions & 27 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,44 +217,32 @@ func (f *Fetch) GetPoetProof(ctx context.Context, id types.Hash32) error {
}

func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]byte, error) {
return f.servers[malProtocol].Request(ctx, peer, []byte{})
return f.meteredRequest(ctx, malProtocol, peer, []byte{})
}

// GetLayerData get layer data from peers.
func (f *Fetch) GetLayerData(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error) {
lidBytes, err := codec.Encode(&lid)
if err != nil {
return nil, err
}
return f.servers[lyrDataProtocol].Request(ctx, peer, lidBytes)
lidBytes := codec.MustEncode(&lid)
return f.meteredRequest(ctx, lyrDataProtocol, peer, lidBytes)
}

func (f *Fetch) GetLayerOpinions(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error) {
req := OpinionRequest{
reqData := codec.MustEncode(&OpinionRequest{
Layer: lid,
}
reqData, err := codec.Encode(&req)
if err != nil {
return nil, err
}
return f.servers[OpnProtocol].Request(ctx, peer, reqData)
})
return f.meteredRequest(ctx, OpnProtocol, peer, reqData)
}

// PeerEpochInfo get the epoch info published in the given epoch from the specified peer.
func (f *Fetch) PeerEpochInfo(ctx context.Context, peer p2p.Peer, epoch types.EpochID) (*EpochData, error) {
f.logger.WithContext(ctx).With().Debug("requesting epoch info from peer",
log.Stringer("peer", peer),
log.Stringer("epoch", epoch))

epochBytes, err := codec.Encode(epoch)
epochBytes := codec.MustEncode(epoch)
data, err := f.meteredRequest(ctx, atxProtocol, peer, epochBytes)
if err != nil {
return nil, err
}
data, err := f.servers[atxProtocol].Request(ctx, peer, epochBytes)
if err != nil {
return nil, err
}

var ed EpochData
if err := codec.Decode(data, &ed); err != nil {
return nil, fmt.Errorf("decoding epoch data: %w", err)
Expand All @@ -269,12 +257,8 @@ func (f *Fetch) PeerMeshHashes(ctx context.Context, peer p2p.Peer, req *MeshHash
log.Object("req", req),
)

reqData, err := codec.Encode(req)
if err != nil {
f.logger.With().Fatal("failed to encode mesh hash request", log.Err(err))
}

data, err := f.servers[meshHashProtocol].Request(ctx, peer, reqData)
reqData := codec.MustEncode(req)
data, err := f.meteredRequest(ctx, meshHashProtocol, peer, reqData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,7 +286,7 @@ func (f *Fetch) GetCert(
reqData := codec.MustEncode(req)

for _, peer := range peers {
data, err := f.servers[OpnProtocol].Request(ctx, peer, reqData)
data, err := f.meteredRequest(ctx, OpnProtocol, peer, reqData)
if err != nil {
f.logger.With().Debug("failed to get cert", log.Stringer("peer", peer), log.Err(err))
continue
Expand Down
10 changes: 9 additions & 1 deletion fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ func (p *Peers) OnFailure(id peer.ID) {
}

// OnLatency updates average peer and global latency.
func (p *Peers) OnLatency(id peer.ID, latency time.Duration) {
func (p *Peers) OnLatency(id peer.ID, size int, latency time.Duration) {
if size == 0 {
return
}
// We assume that latency is proportional to the size of the message
// and define it as a duration to transmit 1kiB.
// To account for the additional overhead of transmitting small messages,
// we treat them as if they were 1kiB.
latency = latency / time.Duration(max(size/1024, 1))
p.mu.Lock()
defer p.mu.Unlock()
peer, exist := p.peers[id]
Expand Down
18 changes: 17 additions & 1 deletion fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
"github.com/stretchr/testify/require"
)

// any random non zero number that will be used if size is not specified in the test case
// it is intentionally different from assumed minimal size in the latency function.
const testSize = 100

type event struct {
id peer.ID
add, delete bool
size int
success int
failure int
latency time.Duration
Expand All @@ -30,7 +35,7 @@ func withEvents(events []event) *Peers {
tracker.OnFailure(ev.id)
}
for i := 0; i < ev.success; i++ {
tracker.OnLatency(ev.id, ev.latency)
tracker.OnLatency(ev.id, max(ev.size, testSize), ev.latency)
}
}
return tracker
Expand Down Expand Up @@ -70,6 +75,17 @@ func TestSelect(t *testing.T) {
selectFrom: []peer.ID{"b", "a"},
best: peer.ID("a"),
},
{
desc: "latency adjusted based on size",
events: []event{
{id: "a", success: 2, latency: 10, size: 1_000, add: true},
{id: "b", success: 2, latency: 20, size: 4_000, add: true},
},
n: 5,
expect: []peer.ID{"b", "a"},
selectFrom: []peer.ID{"a", "b"},
best: peer.ID("b"),
},
{
desc: "total number is larger then capacity",
events: []event{
Expand Down

0 comments on commit 1d536c8

Please sign in to comment.