Skip to content

Commit

Permalink
Merge pull request #41 from renproject/release/1.1.0
Browse files Browse the repository at this point in the history
Release v1.1.0
  • Loading branch information
jazg committed Mar 11, 2020
2 parents 174a073 + 81f40ed commit ab134d1
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 242 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ version: 2.1

_defaults: &defaults
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.13

jobs:
build:
Expand Down
78 changes: 50 additions & 28 deletions hyperdrive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Hyperdrive", func() {
It("should keep producing new blocks", func() {
option := DefaultOption
shuffledIndices := mrand.Perm(3*f + 1)
option.disableNodes = shuffledIndices[:f]
option.disabledNodes = shuffledIndices[:f]

network := NewNetwork(f, r, shards, option)
network.Start()
Expand Down Expand Up @@ -241,25 +241,30 @@ var _ = Describe("Hyperdrive", func() {

Context("when they successfully reconnect to the network", func() {
It("should start producing blocks again", func() {
network := NewNetwork(f, r, shards, DefaultOption)
options := DefaultOption
options.debugLogger = []int{0}
options.timeoutProposers = []int{mrand.Intn(3*f + 1)} // Pick a random node to timeout when proposing blocks.
network := NewNetwork(f, r, shards, options)
network.Start()
defer network.Stop()

// Wait for all nodes reach consensus
// Wait for all nodes to reach consensus.
Eventually(func() bool {
return network.HealthCheck(nil)
}, 30*time.Second).Should(BeTrue())

// Crash f + 1 random nodes and expect no blocks produced after that
// Crash f+1 random nodes and expect no blocks to be
// produced.
shuffledIndices := mrand.Perm(3*f + 1)
crashedNodes := shuffledIndices[:f+1]
phi.ParForAll(crashedNodes, func(i int) {
SleepRandomSeconds(0, 2)
index := crashedNodes[i]
network.StopNode(index)
})
Expect(network.HealthCheck(nil)).Should(BeFalse())

// Restart the nodes after some time
// Restart the nodes at different times.
phi.ParForAll(crashedNodes, func(i int) {
SleepRandomSeconds(5, 10)
index := crashedNodes[i]
Expand All @@ -268,7 +273,7 @@ var _ = Describe("Hyperdrive", func() {

Eventually(func() bool {
return network.HealthCheck(nil)
}, 30*time.Second).Should(BeTrue())
}, 120*time.Second).Should(BeTrue())
})
})
})
Expand All @@ -280,7 +285,7 @@ var _ = Describe("Hyperdrive", func() {
// Start the network with more than f nodes offline
options := DefaultOption
shuffledIndices := mrand.Perm(3*f + 1)
options.disableNodes = shuffledIndices[:f+1]
options.disabledNodes = shuffledIndices[:f+1]

network := NewNetwork(f, r, shards, options)
network.Start()
Expand All @@ -297,7 +302,7 @@ var _ = Describe("Hyperdrive", func() {
// Start the network with more than f nodes offline
options := DefaultOption
shuffledIndices := mrand.Perm(3*f + 1)
options.disableNodes = shuffledIndices[:f+1]
options.disabledNodes = shuffledIndices[:f+1]

network := NewNetwork(f, r, shards, options)
network.Start()
Expand All @@ -319,22 +324,23 @@ var _ = Describe("Hyperdrive", func() {
})

type networkOptions struct {
minNetworkDelay int // minimum network latency when sending messages in milliseconds
maxNetworkDelay int // maximum network latency when sending messages in milliseconds
minBootDelay int // minimum delay when booting the node in seconds
maxBootDelay int // maximum delay when booting the node in seconds
debugLogger []int // indexes of the nodes which we want to enable the debug logger, nil for disable all
disableNodes []int // indexes of the nodes which we want to disable at the starting of the network, nil for enable all

minNetworkDelay int // minimum network latency when sending messages in milliseconds
maxNetworkDelay int // maximum network latency when sending messages in milliseconds
minBootDelay int // minimum delay when booting the node in seconds
maxBootDelay int // maximum delay when booting the node in seconds
debugLogger []int // indices of nodes that use a debug logger, nil to disable all
disabledNodes []int // indices of nodes that are disabled when the network starts, nil to enable all
timeoutProposers []int // indices of nodes that timeout prior to proposing blocks, nil for no invalid proposers
}

var DefaultOption = networkOptions{
minNetworkDelay: 100,
maxNetworkDelay: 500,
minBootDelay: 0,
maxBootDelay: 3,
debugLogger: nil,
disableNodes: nil,
minNetworkDelay: 100,
maxNetworkDelay: 500,
minBootDelay: 0,
maxBootDelay: 3,
debugLogger: nil,
disabledNodes: nil,
timeoutProposers: nil,
}

type Network struct {
Expand Down Expand Up @@ -378,11 +384,19 @@ func NewNetwork(f, r int, shards replica.Shards, options networkOptions) Network
for i := range nodes {
logger := logrus.New()
if Contain(options.debugLogger, i) {
logger.Infof("✏️ node %d has debug logs enabled", i)
logger.SetLevel(logrus.DebugLevel)
}
store := NewMockPersistentStorage(shards)
store.Init(genesisBlock)
nodes[i] = NewNode(logger.WithField("node", i), shards, keys[i], broadcaster, store, i < 3*f+1)

var timeoutProposer bool
if Contain(options.timeoutProposers, i) {
logger.Infof("✏️ node %d will time out when proposing", i)
timeoutProposer = true
}
iter := NewMockBlockIterator(store, timeoutProposer)
nodes[i] = NewNode(logger.WithField("node", i), shards, keys[i], iter, broadcaster, store, i < 3*f+1)
}
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -403,7 +417,7 @@ func NewNetwork(f, r int, shards replica.Shards, options networkOptions) Network

func (network Network) Start() {
phi.ParForAll(network.nodes, func(i int) {
if Contain(network.options.disableNodes, i) {
if Contain(network.options.disabledNodes, i) {
return
}
SleepRandomSeconds(network.options.minBootDelay, network.options.maxBootDelay)
Expand Down Expand Up @@ -438,11 +452,16 @@ func (network *Network) StartNode(i int) {
logger.SetLevel(logrus.DebugLevel)
}
store := network.nodes[i].storage
var timeoutProposer bool
if Contain(network.options.timeoutProposers, i) {
timeoutProposer = true
}
iter := NewMockBlockIterator(store, timeoutProposer)

network.nodesMu.Lock()
defer network.nodesMu.Unlock()

network.nodes[i] = NewNode(logger.WithField("node", i), network.shards, network.nodes[i].key, network.Broadcaster, store, i >= network.r)
network.nodes[i] = NewNode(logger.WithField("node", i), network.shards, network.nodes[i].key, iter, network.Broadcaster, store, i >= network.r)
network.startNode(i)
}

Expand All @@ -466,7 +485,11 @@ func (network *Network) startNode(i int) {

for {
select {
case message := <-messages:
case messageBytes := <-messages:
var message replica.Message
if err := message.UnmarshalBinary(messageBytes); err != nil {
panic(err)
}
hyperdrive.HandleMessage(message)
select {
case <-innerCtx.Done():
Expand Down Expand Up @@ -529,7 +552,7 @@ func (network *Network) HealthCheck(indexes []int) bool {
if node.observer.IsSignatory(shard) {
block := node.storage.LatestBlock(shard)
if block.Header().Height() <= currentBlockHeights[i] {
log.Printf("⚠️ node %v didn't progress ,old height = %v, new height = %v", i, currentBlockHeights[i], block.Header().Height())
node.logger.Infof("⚠️ node %d did not progress, old height = %d, new height = %d", i, currentBlockHeights[i], block.Header().Height())
return false
}
}
Expand All @@ -552,14 +575,13 @@ func (node Node) Signatory() id.Signatory {
return id.NewSignatory(node.key.PublicKey)
}

func NewNode(logger logrus.FieldLogger, shards Shards, pk *ecdsa.PrivateKey, broadcaster *MockBroadcaster, store *MockPersistentStorage, isSignatory bool) *Node {
func NewNode(logger logrus.FieldLogger, shards Shards, pk *ecdsa.PrivateKey, iter *MockBlockIterator, broadcaster *MockBroadcaster, store *MockPersistentStorage, isSignatory bool) *Node {
option := Options{
Logger: logger,
BackOffExp: 1,
BackOffBase: 3 * time.Second,
BackOffMax: 3 * time.Second,
}
iter := NewMockBlockIterator(store)
validator := NewMockValidator(store)
observer := NewMockObserver(store, isSignatory)
hd := New(option, store, store, iter, validator, observer, broadcaster, shards, *pk)
Expand Down
72 changes: 72 additions & 0 deletions process/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,78 @@ func (precommit *Precommit) UnmarshalBinary(data []byte) error {
return nil
}

// MarshalJSON implements the `json.Marshaler` interface for the `Resync` type.
func (resync Resync) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Sig id.Signature `json:"sig"`
Signatory id.Signatory `json:"signatory"`
Height block.Height `json:"height"`
Round block.Round `json:"round"`
}{
resync.sig,
resync.signatory,
resync.height,
resync.round,
})
}

// UnmarshalJSON implements the `json.Unmarshaler` interface for the `Resync`
// type.
func (resync *Resync) UnmarshalJSON(data []byte) error {
tmp := struct {
Sig id.Signature `json:"sig"`
Signatory id.Signatory `json:"signatory"`
Height block.Height `json:"height"`
Round block.Round `json:"round"`
}{}
if err := json.Unmarshal(data, &tmp); err != nil {
return err
}
resync.sig = tmp.Sig
resync.signatory = tmp.Signatory
resync.height = tmp.Height
resync.round = tmp.Round
return nil
}

// MarshalBinary implements the `encoding.BinaryMarshaler` interface for the
// `Resync` type.
func (resync Resync) MarshalBinary() ([]byte, error) {
buf := new(bytes.Buffer)
if err := binary.Write(buf, binary.LittleEndian, resync.sig); err != nil {
return buf.Bytes(), fmt.Errorf("cannot write resync.sig: %v", err)
}
if err := binary.Write(buf, binary.LittleEndian, resync.signatory); err != nil {
return buf.Bytes(), fmt.Errorf("cannot write resync.signatory: %v", err)
}
if err := binary.Write(buf, binary.LittleEndian, resync.height); err != nil {
return buf.Bytes(), fmt.Errorf("cannot write resync.height: %v", err)
}
if err := binary.Write(buf, binary.LittleEndian, resync.round); err != nil {
return buf.Bytes(), fmt.Errorf("cannot write resync.round: %v", err)
}
return buf.Bytes(), nil
}

// UnmarshalBinary implements the `encoding.BinaryUnmarshaler` interface for the
// `Resync` type.
func (resync *Resync) UnmarshalBinary(data []byte) error {
buf := bytes.NewBuffer(data)
if err := binary.Read(buf, binary.LittleEndian, &resync.sig); err != nil {
return fmt.Errorf("cannot read resync.sig: %v", err)
}
if err := binary.Read(buf, binary.LittleEndian, &resync.signatory); err != nil {
return fmt.Errorf("cannot read resync.signatory: %v", err)
}
if err := binary.Read(buf, binary.LittleEndian, &resync.height); err != nil {
return fmt.Errorf("cannot read resync.height: %v", err)
}
if err := binary.Read(buf, binary.LittleEndian, &resync.round); err != nil {
return fmt.Errorf("cannot read resync.round: %v", err)
}
return nil
}

// MarshalJSON implements the `json.Marshaler` interface for the `Inbox` type.
func (inbox Inbox) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Expand Down
13 changes: 13 additions & 0 deletions process/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ var _ = Describe("Marshaling", func() {
}
})
})

Context("when marshaling the same resync multiple times", func() {
It("should return the same bytes", func() {
resync := RandomMessage(ResyncMessageType)
resyncBytes, err := resync.MarshalBinary()
Expect(err).ToNot(HaveOccurred())
for i := 0; i < 100; i++ {
tmpResyncBytes, err := resync.MarshalBinary()
Expect(err).ToNot(HaveOccurred())
Expect(tmpResyncBytes).Should(Equal(resyncBytes))
}
})
})
})
63 changes: 63 additions & 0 deletions process/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding"
"encoding/json"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -27,6 +28,9 @@ const (
// PrecommitMessageType is used by messages that are precommitting for block
// hashes (or nil precommitting).
PrecommitMessageType = 3
// ResyncMessageType is used by messages that query others for previous
// messages.
ResyncMessageType = 4
)

// Messages is a wrapper around the `[]Message` type.
Expand Down Expand Up @@ -67,6 +71,12 @@ type Message interface {
Type() MessageType
}

var (
// ErrBlockHashNotProvided is returned when querying the block hash for a
// message type that does not implement the function.
ErrBlockHashNotProvided = errors.New("block hash not provided")
)

// Sign a message using an ECDSA private key. The resulting signature will be
// stored inside the message.
func Sign(m Message, privKey ecdsa.PrivateKey) error {
Expand All @@ -90,6 +100,9 @@ func Sign(m Message, privKey ecdsa.PrivateKey) error {
case *Precommit:
m.signatory = signatory
copy(m.sig[:], sig)
case *Resync:
m.signatory = signatory
copy(m.sig[:], sig)
default:
panic(fmt.Errorf("invariant violation: unexpected message type=%T", m))
}
Expand Down Expand Up @@ -303,6 +316,56 @@ func (precommit *Precommit) String() string {
return fmt.Sprintf("Precommit(Height=%v,Round=%v,BlockHash=%v)", precommit.Height(), precommit.Round(), precommit.BlockHash())
}

// Resyncs is a wrapper around the `[]Resync` type.
type Resyncs []Resync

// Resync previous messages.
type Resync struct {
signatory id.Signatory
sig id.Signature
height block.Height
round block.Round
}

func NewResync(height block.Height, round block.Round) *Resync {
return &Resync{
height: height,
round: round,
}
}

func (resync *Resync) Signatory() id.Signatory {
return resync.signatory
}

func (resync *Resync) SigHash() id.Hash {
return sha256.Sum256([]byte(resync.String()))
}

func (resync *Resync) Sig() id.Signature {
return resync.sig
}

func (resync *Resync) Height() block.Height {
return resync.height
}

func (resync *Resync) Round() block.Round {
return resync.round
}

func (resync *Resync) BlockHash() id.Hash {
panic(ErrBlockHashNotProvided)
}

func (resync *Resync) Type() MessageType {
return ResyncMessageType
}

func (resync *Resync) String() string {
return fmt.Sprintf("Resync(Height=%v,Round=%v)", resync.Height(), resync.Round())
}

// An Inbox is storage container for one type message. Any type of message can
// be stored, but an attempt to store messages of different types in one inbox
// will cause a panic. Inboxes are used extensively by the consensus algorithm
Expand Down
Loading

0 comments on commit ab134d1

Please sign in to comment.