Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase announcer's receiver buffer to 512 #3423

Merged
merged 2 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions pkg/protocol/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ import (
"google.golang.org/protobuf/proto"
)

// announceReceiveBuffer is a buffer for messages received from the broadcast
// channel needed when the announcer's consumer is temporarily too slow to
// handle them. Keep in mind that although we expect only 51 announce messages,
// it may happen that the announcer receives retransmissions of messages from
// the previous signing protocol and before they are filtered out as not
// interesting for the announcer, they are buffered in the channel.
const announceReceiveBuffer = 512

// announcementMessage represents a message that is used to announce
// member's participation in the given session of the protocol.
type announcementMessage struct {
Expand Down Expand Up @@ -59,19 +67,16 @@ func (am *announcementMessage) Type() string {
// readiness announcement over the provided broadcast channel.
type Announcer struct {
protocolID string
groupSize int
broadcastChannel net.BroadcastChannel
membershipValidator *group.MembershipValidator
}

// New creates a new instance of the Announcer. It expects a unique protocol
// identifier, the size of the group performing the protocol, a broadcast
// channel configured to mediate between group members, and a membership
// validator configured to validate the group membership of announcements
// senders.
// identifier, a broadcast channel configured to mediate between group members,
// and a membership validator configured to validate the group membership of
// announcements senders.
func New(
protocolID string,
groupSize int,
broadcastChannel net.BroadcastChannel,
membershipValidator *group.MembershipValidator,
) *Announcer {
Expand All @@ -81,7 +86,6 @@ func New(

return &Announcer{
protocolID: protocolID,
groupSize: groupSize,
broadcastChannel: broadcastChannel,
membershipValidator: membershipValidator,
}
Expand All @@ -100,7 +104,8 @@ func (a *Announcer) Announce(
[]group.MemberIndex,
error,
) {
messagesChan := make(chan net.Message, a.groupSize)
messagesChan := make(chan net.Message, announceReceiveBuffer)
lukasz-zimnoch marked this conversation as resolved.
Show resolved Hide resolved

a.broadcastChannel.Recv(ctx, func(message net.Message) {
messagesChan <- message
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/protocol/announcer/announcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package announcer

import (
"context"
"math/big"
"reflect"
"sync"
"testing"

fuzz "github.com/google/gofuzz"
"github.com/keep-network/keep-core/pkg/chain"
"github.com/keep-network/keep-core/pkg/chain/local_v1"
Expand All @@ -10,10 +15,6 @@ import (
"github.com/keep-network/keep-core/pkg/net/local"
"github.com/keep-network/keep-core/pkg/operator"
"github.com/keep-network/keep-core/pkg/protocol/group"
"math/big"
"reflect"
"sync"
"testing"
)

func TestAnnouncementMessage_MarshalingRoundtrip(t *testing.T) {
Expand Down Expand Up @@ -150,7 +151,6 @@ func TestAnnouncer(t *testing.T) {

announcer := New(
protocolID,
groupSize,
broadcastChannel,
membershipValidator,
)
Expand Down
1 change: 0 additions & 1 deletion pkg/tbtc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (n *node) joinDKGIfEligible(seed *big.Int, startBlockNumber uint64) {

announcer := announcer.New(
fmt.Sprintf("%v-%v", ProtocolName, "dkg"),
n.chain.GetConfig().GroupSize,
broadcastChannel,
membershipValidator,
)
Expand Down
1 change: 0 additions & 1 deletion pkg/tbtc/signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func (se *signingExecutor) sign(

announcer := announcer.New(
fmt.Sprintf("%v-%v", ProtocolName, "signing"),
se.chainConfig.GroupSize,
se.broadcastChannel,
se.membershipValidator,
)
Expand Down