Skip to content

Commit

Permalink
Merge #1303
Browse files Browse the repository at this point in the history
1303: Add jitter to DKG start and upon processing phase 1 broadcast messages r=jordanschalm a=jordanschalm

Introduce random delays prior to expensive operations in the DKG:
* `Start` - ~700ms
* `HandleBroadcastMessage` - ~2.5s / message

[Results Link](https://www.notion.so/dapperlabs/Test-12-150-Node-DKG-Buffered-VerifVector-Processing-23deece06df2408a822648eb2888254b)

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
  • Loading branch information
bors[bot] and jordanschalm committed Sep 29, 2021
2 parents 7925ced + 7edb23f commit 13ef07a
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 3 deletions.
5 changes: 5 additions & 0 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func main() {
requiredApprovalsForSealVerification uint
requiredApprovalsForSealConstruction uint
emergencySealing bool
dkgControllerConfig dkgmodule.ControllerConfig
startupTimeString string
startupTime time.Time

Expand Down Expand Up @@ -147,6 +148,9 @@ func main() {
flags.StringVar(&accessAddress, "access-address", "", "the address of an access node")
flags.StringVar(&secureAccessNodeID, "secure-access-node-id", "", "the node ID of the secure access GRPC server")
flags.BoolVar(&insecureAccessAPI, "insecure-access-api", true, "required if insecure GRPC connection should be used")
flags.DurationVar(&dkgControllerConfig.BaseStartDelay, "dkg-controller-base-start-delay", dkgmodule.DefaultBaseStartDelay, "used to define the range for jitter prior to DKG start (eg. 500µs) - the base value is scaled quadratically with the # of DKG participants")
flags.DurationVar(&dkgControllerConfig.BaseHandleFirstBroadcastDelay, "dkg-controller-base-handle-first-broadcast-delay", dkgmodule.DefaultBaseHandleFirstBroadcastDelay, "used to define the range for jitter prior to DKG handling the first broadcast messages (eg. 50ms) - the base value is scaled quadratically with the # of DKG participants")
flags.DurationVar(&dkgControllerConfig.HandleSubsequentBroadcastDelay, "dkg-controller-handle-subsequent-broadcast-delay", dkgmodule.DefaultHandleSubsequentBroadcastDelay, "used to define the constant delay introduced prior to DKG handling subsequent broadcast messages (eg. 2s)")
flags.StringVar(&startupTimeString, "hotstuff-startup-time", cmd.NotSet, "specifies date and time (in ISO 8601 format) after which the consensus participant may enter the first view (e.g 2006-01-02T15:04:05Z07:00)")
})

Expand Down Expand Up @@ -757,6 +761,7 @@ func main() {
node.Me,
dkgContractClient,
dkgBrokerTunnel,
dkgControllerConfig,
),
viewsObserver,
)
Expand Down
7 changes: 7 additions & 0 deletions integration/dkg/dkg_emulator_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ func (s *DKGSuite) initEngines(node *node, ids flow.IdentityList) {
controllerFactoryLogger = zerolog.New(os.Stdout).Hook(hook)
}

// create a config with no delays for tests
config := dkg.ControllerConfig{
BaseStartDelay: 0,
BaseHandleFirstBroadcastDelay: 0,
}

// the reactor engine reacts to new views being finalized and drives the
// DKG protocol
reactorEngine := dkgeng.NewReactorEngine(
Expand All @@ -448,6 +454,7 @@ func (s *DKGSuite) initEngines(node *node, ids flow.IdentityList) {
core.Me,
node.dkgContractClient,
brokerTunnel,
config,
),
viewsObserver,
)
Expand Down
7 changes: 7 additions & 0 deletions integration/dkg/dkg_whiteboard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func createNode(
})
controllerFactoryLogger := zerolog.New(os.Stdout).Hook(hook)

// create a config with no delays for tests
config := dkg.ControllerConfig{
BaseStartDelay: 0,
BaseHandleFirstBroadcastDelay: 0,
}

// the reactor engine reacts to new views being finalized and drives the
// DKG protocol
reactorEngine := dkgeng.NewReactorEngine(
Expand All @@ -143,6 +149,7 @@ func createNode(
core.Me,
NewWhiteboardClient(id.NodeID, whiteboard),
brokerTunnel,
config,
),
viewsObserver,
)
Expand Down
148 changes: 147 additions & 1 deletion module/dkg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package dkg

import (
"fmt"
"math"
"math/rand"
"sync"
"time"

"github.com/rs/zerolog"

Expand All @@ -11,6 +14,57 @@ import (
"github.com/onflow/flow-go/module"
)

const (

// DefaultBaseStartDelay is the default base delay to use when introducing
// random delay to the DKG start process. See preStartDelay for details.
DefaultBaseStartDelay = 500 * time.Microsecond

// DefaultBaseHandleFirstBroadcastDelay is the default base to use when
// introducing random delay to processing the first DKG broadcast message.
// See preHandleFirstBroadcastDelay for details.
//
// For a 150-node DKG, we observe a cost of ~2.5s per message to process
// broadcast messages during phase 1, for a total of ~6m of total CPU time.
// We would like to target spreading this cost over a 30 minute period.
// With the default value for DefaultHandleSubsequentBroadcastDelay, this
// results in processing all phase 1 messages in 6m+6m=12m, so for a maximum
// total processing time of 30m, we sample the initial delay from [0,18m].
// We use 50ms as the default because 50ms*150^2 = 18.75m
//
DefaultBaseHandleFirstBroadcastDelay = 50 * time.Millisecond

// DefaultHandleSubsequentBroadcastDelay is the default delay to use before
// processing all DKG broadcasts after the first.
DefaultHandleSubsequentBroadcastDelay = 2500 * time.Millisecond
)

// ControllerConfig defines configuration for the DKG Controller. These define
// how the DKG controller introduces delays to expensive DKG computations.
//
// We introduce delays for two reasons:
// 1. Avoid running long-running expensive DKG computations consecutively.
// 2. Avoid synchronizing expensive DKG computations across the DKG committee.
//
// Delays introduced prior to DKG start and prior to processing the FIRST broadcast
// message are sampled uniformly from [0,m), where m=b*n^2
//
// b = base delay (from config)
// n = size of DKG committee
//
// Delays introduced prior to processing subsequent broadcast messages are constant.
//
type ControllerConfig struct {
// BaseStartDelay determines the maximum delay before starting the DKG.
BaseStartDelay time.Duration
// BaseHandleFirstBroadcastDelay determines the maximum delay before handling
// the first broadcast message.
BaseHandleFirstBroadcastDelay time.Duration
// HandleSubsequentBroadcastDelay determines the constant delay before handling
// all broadcast messages following the first.
HandleSubsequentBroadcastDelay time.Duration
}

// Controller implements the DKGController interface. It controls the execution
// of a Joint Feldman DKG instance. A new Controller must be instantiated for
// every epoch.
Expand Down Expand Up @@ -47,6 +101,9 @@ type Controller struct {

// artifactsLock protects access to artifacts
artifactsLock sync.Mutex

config ControllerConfig
once *sync.Once
}

// NewController instantiates a new Joint Feldman DKG controller.
Expand All @@ -56,6 +113,7 @@ func NewController(
dkg crypto.DKGState,
seed []byte,
broker module.DKGBroker,
config ControllerConfig,
) *Controller {

logger := log.With().
Expand All @@ -72,6 +130,8 @@ func NewController(
h2Ch: make(chan struct{}),
endCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
once: new(sync.Once),
config: config,
}
}

Expand Down Expand Up @@ -231,13 +291,39 @@ func (c *Controller) doBackgroundWork() {
if err != nil {
c.log.Err(err).Msg("error processing DKG private message")
}

case msg := <-broadcastMsgCh:

// before processing a broadcast message during phase 1, sleep for a
// random delay to avoid synchronizing this expensive operation across
// all consensus nodes
state := c.GetState()
if state == Phase1 {

// introduce a large, uniformly sampled delay prior to processing
// the first message
isFirstMessage := false
c.once.Do(func() {
isFirstMessage = true
delay := c.preHandleFirstBroadcastDelay()
c.log.Info().Msgf("sleeping for %s before processing first phase 1 broadcast message", delay)
time.Sleep(delay)
})

if !isFirstMessage {
// introduce a constant delay for all subsequent messages
c.log.Debug().Msgf("sleeping for %s before processing subsequent phase 1 broadcast message", c.config.HandleSubsequentBroadcastDelay)
time.Sleep(c.config.HandleSubsequentBroadcastDelay)
}
}

c.dkgLock.Lock()
err := c.dkg.HandleBroadcastMsg(int(msg.Orig), msg.Data)
c.dkgLock.Unlock()
if err != nil {
c.log.Err(err).Msg("error processing DKG broadcast message")
}

case <-c.shutdownCh:
return
}
Expand All @@ -247,9 +333,15 @@ func (c *Controller) doBackgroundWork() {
func (c *Controller) start() error {
state := c.GetState()
if state != Init {
return fmt.Errorf("Cannot execute start routine in state %s", state)
return fmt.Errorf("cannot execute start routine in state %s", state)
}

// before starting the DKG, sleep for a random delay to avoid synchronizing
// this expensive operation across all consensus nodes
delay := c.preStartDelay()
c.log.Debug().Msgf("sleeping for %s before processing phase 1 broadcast message", delay)
time.Sleep(delay)

c.dkgLock.Lock()
err := c.dkg.Start(c.seed)
c.dkgLock.Unlock()
Expand Down Expand Up @@ -326,3 +418,57 @@ func (c *Controller) phase3() error {
}
}
}

// preStartDelay returns a duration to delay prior to starting the DKG process.
// This prevents synchronization of the DKG starting (an expensive operation)
// across the network, which can impact finalization.
func (c *Controller) preStartDelay() time.Duration {
delay := computePreprocessingDelay(c.config.BaseStartDelay, c.dkg.Size())
return delay
}

// preHandleFirstBroadcastDelay returns a duration to delay prior to handling
// the first broadcast message. This delay is used only during phase 1 of the DKG.
// This prevents synchronization of processing verification vectors (an
// expensive operation) across the network, which can impact finalization.
//
func (c *Controller) preHandleFirstBroadcastDelay() time.Duration {
delay := computePreprocessingDelay(c.config.BaseHandleFirstBroadcastDelay, c.dkg.Size())
return delay
}

// computePreprocessingDelay computes a random delay to introduce before an
// expensive operation.
//
// The maximum delay is m=b*n^2 where:
// * b is a configurable base delay
// * n is the size of the DKG committee
//
func computePreprocessingDelay(baseDelay time.Duration, dkgSize int) time.Duration {

maxDelay := computePreprocessingDelayMax(baseDelay, dkgSize)
if maxDelay <= 0 {
return 0
}
// select delay from [0,m)
delay := time.Duration(rand.Int63n(maxDelay.Nanoseconds()))
return delay
}

// computePreprocessingDelayMax computes the maximum dely for computePreprocessingDelay.
func computePreprocessingDelayMax(baseDelay time.Duration, dkgSize int) time.Duration {
// sanity checks
if baseDelay < 0 {
baseDelay = 0
}
if dkgSize < 0 {
dkgSize = 0
}

// m=b*n^2
maxDelay := time.Duration(math.Pow(float64(dkgSize), 2)) * baseDelay
if maxDelay <= 0 {
return 0
}
return maxDelay
}
7 changes: 6 additions & 1 deletion module/dkg/controller_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ControllerFactory struct {
me module.Local
dkgContractClient module.DKGContractClient
tunnel *BrokerTunnel
config ControllerConfig
}

// NewControllerFactory creates a new factory that generates Controllers with
Expand All @@ -29,13 +30,16 @@ func NewControllerFactory(
log zerolog.Logger,
me module.Local,
dkgContractClient module.DKGContractClient,
tunnel *BrokerTunnel) *ControllerFactory {
tunnel *BrokerTunnel,
config ControllerConfig,
) *ControllerFactory {

return &ControllerFactory{
log: log,
me: me,
dkgContractClient: dkgContractClient,
tunnel: tunnel,
config: config,
}
}

Expand Down Expand Up @@ -80,6 +84,7 @@ func (f *ControllerFactory) Create(
dkg,
seed,
broker,
f.config,
)

return controller, nil
Expand Down
63 changes: 62 additions & 1 deletion module/dkg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/crypto"
Expand Down Expand Up @@ -246,12 +247,21 @@ func initNodes(t *testing.T, n int, phase1Duration, phase2Duration, phase3Durati
dkg, err := crypto.NewJointFeldman(n, signature.RandomBeaconThreshold(n), i, broker)
require.NoError(t, err)

// create a config with no delays for tests
config := ControllerConfig{
BaseStartDelay: 0,
BaseHandleFirstBroadcastDelay: 0,
HandleSubsequentBroadcastDelay: 0,
}

controller := NewController(
logger,
"dkg_test",
dkg,
seed,
broker)
broker,
config,
)
require.NoError(t, err)

node := newNode(i, controller, phase1Duration, phase2Duration, phase3Duration)
Expand Down Expand Up @@ -313,3 +323,54 @@ func checkArtifacts(t *testing.T, nodes []*node, totalNodes int) {
}
}
}

func TestDelay(t *testing.T) {

t.Run("should return 0 delay for <=0 inputs", func(t *testing.T) {
delay := computePreprocessingDelay(0, 100)
assert.Equal(t, delay, time.Duration(0))
delay = computePreprocessingDelay(time.Hour, 0)
assert.Equal(t, delay, time.Duration(0))
delay = computePreprocessingDelay(time.Millisecond, -1)
assert.Equal(t, delay, time.Duration(0))
delay = computePreprocessingDelay(-time.Millisecond, 100)
assert.Equal(t, delay, time.Duration(0))
})

// NOTE: this is a probabilistic test. It will (extremely infrequently) fail.
t.Run("should return different values for same inputs", func(t *testing.T) {
d1 := computePreprocessingDelay(time.Hour, 100)
d2 := computePreprocessingDelay(time.Hour, 100)
assert.NotEqual(t, d1, d2)
})

t.Run("should return values in expected range", func(t *testing.T) {
baseDelay := time.Second
dkgSize := 100
minDelay := time.Duration(0)
// m=b*n^2
expectedMaxDelay := time.Duration(int64(baseDelay) * int64(dkgSize) * int64(dkgSize))

maxDelay := computePreprocessingDelayMax(baseDelay, dkgSize)
assert.Equal(t, expectedMaxDelay, maxDelay)

delay := computePreprocessingDelay(baseDelay, dkgSize)
assert.LessOrEqual(t, minDelay, delay)
assert.GreaterOrEqual(t, expectedMaxDelay, delay)
})

t.Run("should return values in expected range for defaults", func(t *testing.T) {
baseDelay := DefaultBaseHandleFirstBroadcastDelay
dkgSize := 150
minDelay := time.Duration(0)
// m=b*n^2
expectedMaxDelay := time.Duration(int64(baseDelay) * int64(dkgSize) * int64(dkgSize))

maxDelay := computePreprocessingDelayMax(baseDelay, dkgSize)
assert.Equal(t, expectedMaxDelay, maxDelay)

delay := computePreprocessingDelay(baseDelay, dkgSize)
assert.LessOrEqual(t, minDelay, delay)
assert.GreaterOrEqual(t, expectedMaxDelay, delay)
})
}

0 comments on commit 13ef07a

Please sign in to comment.