Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - collect latency and failure from every request and adjust latency based on response size #5601

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ configuration is as follows:
If you are not using a remote post service you do not need to adjust anything. If you are using a remote setup
make sure your post service now connects to `grpc-post-listener` instead of `grpc-private-listener`. If you are
connecting to a remote post service over the internet we strongly recommend using mTLS via `grpc-tls-listener`.
* [#5601](https://github.com/spacemeshos/go-spacemesh/pull/5601) measure latency from all requests in sync
This improves peers selection logic, mainly to prevent asking slow peers for collection of atxs, which often blocks sync.

## Release v1.3.8

Expand Down
25 changes: 17 additions & 8 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,22 @@ func (f *Fetch) loop() {
}
}

func (f *Fetch) meteredRequest(
ctx context.Context,
protocol string,
peer p2p.Peer,
req []byte,
) ([]byte, error) {
start := time.Now()
resp, err := f.servers[protocol].Request(ctx, peer, req)
if err != nil {
f.peers.OnFailure(peer)
} else {
f.peers.OnLatency(peer, len(resp), time.Since(start))
}
return resp, err
}

// receive Data from message server and call response handlers accordingly.
func (f *Fetch) receiveResponse(data []byte, batch *batchInfo) {
if f.stopped() {
Expand Down Expand Up @@ -651,15 +667,8 @@ func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) ([]byte, error) {
// Request is synchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
req := codec.MustEncode(&batch.RequestBatch)
data, err := f.servers[hashProtocol].Request(f.shutdownCtx, peer, req)
if err != nil {
f.peers.OnFailure(peer)
return nil, err
}
f.peers.OnLatency(peer, time.Since(start))
return data, nil
return f.meteredRequest(f.shutdownCtx, hashProtocol, peer, req)
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand Down
38 changes: 11 additions & 27 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,44 +217,32 @@ func (f *Fetch) GetPoetProof(ctx context.Context, id types.Hash32) error {
}

func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]byte, error) {
return f.servers[malProtocol].Request(ctx, peer, []byte{})
return f.meteredRequest(ctx, malProtocol, peer, []byte{})
}

// GetLayerData get layer data from peers.
func (f *Fetch) GetLayerData(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error) {
lidBytes, err := codec.Encode(&lid)
if err != nil {
return nil, err
}
return f.servers[lyrDataProtocol].Request(ctx, peer, lidBytes)
lidBytes := codec.MustEncode(&lid)
return f.meteredRequest(ctx, lyrDataProtocol, peer, lidBytes)
}

func (f *Fetch) GetLayerOpinions(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error) {
req := OpinionRequest{
reqData := codec.MustEncode(&OpinionRequest{
Layer: lid,
}
reqData, err := codec.Encode(&req)
if err != nil {
return nil, err
}
return f.servers[OpnProtocol].Request(ctx, peer, reqData)
})
return f.meteredRequest(ctx, OpnProtocol, peer, reqData)
}

// PeerEpochInfo get the epoch info published in the given epoch from the specified peer.
func (f *Fetch) PeerEpochInfo(ctx context.Context, peer p2p.Peer, epoch types.EpochID) (*EpochData, error) {
f.logger.WithContext(ctx).With().Debug("requesting epoch info from peer",
log.Stringer("peer", peer),
log.Stringer("epoch", epoch))

epochBytes, err := codec.Encode(epoch)
epochBytes := codec.MustEncode(epoch)
data, err := f.meteredRequest(ctx, atxProtocol, peer, epochBytes)
if err != nil {
return nil, err
}
data, err := f.servers[atxProtocol].Request(ctx, peer, epochBytes)
if err != nil {
return nil, err
}

var ed EpochData
if err := codec.Decode(data, &ed); err != nil {
return nil, fmt.Errorf("decoding epoch data: %w", err)
Expand All @@ -269,12 +257,8 @@ func (f *Fetch) PeerMeshHashes(ctx context.Context, peer p2p.Peer, req *MeshHash
log.Object("req", req),
)

reqData, err := codec.Encode(req)
if err != nil {
f.logger.With().Fatal("failed to encode mesh hash request", log.Err(err))
}

data, err := f.servers[meshHashProtocol].Request(ctx, peer, reqData)
reqData := codec.MustEncode(req)
data, err := f.meteredRequest(ctx, meshHashProtocol, peer, reqData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,7 +286,7 @@ func (f *Fetch) GetCert(
reqData := codec.MustEncode(req)

for _, peer := range peers {
data, err := f.servers[OpnProtocol].Request(ctx, peer, reqData)
data, err := f.meteredRequest(ctx, OpnProtocol, peer, reqData)
if err != nil {
f.logger.With().Debug("failed to get cert", log.Stringer("peer", peer), log.Err(err))
continue
Expand Down
10 changes: 9 additions & 1 deletion fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,15 @@
}

// OnLatency updates average peer and global latency.
func (p *Peers) OnLatency(id peer.ID, latency time.Duration) {
func (p *Peers) OnLatency(id peer.ID, size int, latency time.Duration) {
if size == 0 {
return
}

Check warning on line 88 in fetch/peers/peers.go

View check run for this annotation

Codecov / codecov/patch

fetch/peers/peers.go#L88

Added line #L88 was not covered by tests
// We assume that latency is proportional to the size of the message
// and define it as a duration to transmit 1kiB.
// To account for the additional overhead of transmitting small messages,
// we treat them as if they were 1kiB.
latency = latency / time.Duration(max(size/1024, 1))
p.mu.Lock()
defer p.mu.Unlock()
peer, exist := p.peers[id]
Expand Down
18 changes: 17 additions & 1 deletion fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
"github.com/stretchr/testify/require"
)

// any random non zero number that will be used if size is not specified in the test case
// it is intentionally different from assumed minimal size in the latency function.
const testSize = 100

type event struct {
id peer.ID
add, delete bool
size int
success int
failure int
latency time.Duration
Expand All @@ -30,7 +35,7 @@ func withEvents(events []event) *Peers {
tracker.OnFailure(ev.id)
}
for i := 0; i < ev.success; i++ {
tracker.OnLatency(ev.id, ev.latency)
tracker.OnLatency(ev.id, max(ev.size, testSize), ev.latency)
}
}
return tracker
Expand Down Expand Up @@ -70,6 +75,17 @@ func TestSelect(t *testing.T) {
selectFrom: []peer.ID{"b", "a"},
best: peer.ID("a"),
},
{
desc: "latency adjusted based on size",
events: []event{
{id: "a", success: 2, latency: 10, size: 1_000, add: true},
{id: "b", success: 2, latency: 20, size: 4_000, add: true},
},
n: 5,
expect: []peer.ID{"b", "a"},
selectFrom: []peer.ID{"a", "b"},
best: peer.ID("b"),
},
{
desc: "total number is larger then capacity",
events: []event{
Expand Down
Loading