Skip to content

Commit

Permalink
Merge pull request #26 from renproject/feat/sharding
Browse files Browse the repository at this point in the history
Add non-signatory node support in hyperdrive
  • Loading branch information
Yunshi Sun committed Oct 1, 2019
2 parents 7784324 + e0efb0f commit fe1d619
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 31 deletions.
4 changes: 3 additions & 1 deletion hyperdrive.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ type hyperdrive struct {
func New(options Options, pStorage ProcessStorage, blockStorage BlockStorage, blockIterator BlockIterator, validator Validator, observer Observer, broadcaster Broadcaster, shards Shards, privKey ecdsa.PrivateKey) Hyperdrive {
replicas := make(map[Shard]Replica, len(shards))
for _, shard := range shards {
replicas[shard] = replica.New(options, pStorage, blockStorage, blockIterator, validator, observer, broadcaster, shard, privKey)
if observer.IsSignatory(shard) {
replicas[shard] = replica.New(options, pStorage, blockStorage, blockIterator, validator, observer, broadcaster, shard, privKey)
}
}
return &hyperdrive{
replicas: replicas,
Expand Down
63 changes: 36 additions & 27 deletions hyperdrive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
mrand "math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/renproject/hyperdrive/block"
"github.com/renproject/hyperdrive/replica"
Expand All @@ -33,8 +33,10 @@ var _ = Describe("Hyperdrive", func() {
table := []struct {
shard int
f int
r int
}{
{1, 1},
{1, 2, 0},
{1, 2, 2},
}

for _, entry := range table {
Expand All @@ -43,14 +45,15 @@ var _ = Describe("Hyperdrive", func() {
shards[i] = RandomShard()
}
f := entry.f
r := entry.r

Context(fmt.Sprintf("when the network have %v nodes (f = %v) and %v shards", 3*f+1, f, len(shards)), func() {
Context(fmt.Sprintf("when the network have %v signatory nodes (f = %v), %v non-signatory nodes and %v shards", 3*f+1, f, r, len(shards)), func() {
Context("when all nodes have 100% live time", func() {
Context("when all nodes start at same time", func() {
It("should keep producing new blocks", func() {
options := DefaultOption
options.maxBootDelay = 0
network := NewNetwork(f, shards, options)
network := NewNetwork(f, r, shards, options)

network.Start()
defer network.Stop()
Expand All @@ -63,7 +66,7 @@ var _ = Describe("Hyperdrive", func() {

Context("when each node has a random delay when starting", func() {
It("should keep producing new blocks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -81,7 +84,7 @@ var _ = Describe("Hyperdrive", func() {
shuffledIndices := mrand.Perm(3*f + 1)
option.disableNodes = shuffledIndices[:f]

network := NewNetwork(f, shards, option)
network := NewNetwork(f, r, shards, option)
network.Start()
defer network.Stop()

Expand All @@ -95,7 +98,7 @@ var _ = Describe("Hyperdrive", func() {
Context("when some nodes are having network connection issue", func() {
Context("when they go back online after certain amount of time", func() {
It("should keep producing new blocks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -119,7 +122,7 @@ var _ = Describe("Hyperdrive", func() {

Context("when they fail to reconnect to the network", func() {
It("should keep producing blocks with the rest of the networks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -144,7 +147,7 @@ var _ = Describe("Hyperdrive", func() {
Context("when no more than f nodes crashed", func() {
Context("when they go back online after some time", func() {
It("should keep producing new blocks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -168,7 +171,7 @@ var _ = Describe("Hyperdrive", func() {

Context("when they fail to reconnect to the network", func() {
It("should keep producing new blocks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -194,7 +197,7 @@ var _ = Describe("Hyperdrive", func() {
Context("when more than f nodes crash,", func() {
Context("when they fail to reconnect to the network", func() {
It("should stop producing new blocks", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand All @@ -216,7 +219,7 @@ var _ = Describe("Hyperdrive", func() {

Context("when they successfully reconnect to the network", func() {
It("should start producing blocks again", func() {
network := NewNetwork(f, shards, DefaultOption)
network := NewNetwork(f, r, shards, DefaultOption)
network.Start()
defer network.Stop()

Expand Down Expand Up @@ -254,7 +257,7 @@ var _ = Describe("Hyperdrive", func() {
shuffledIndices := mrand.Perm(3*f + 1)
options.disableNodes = shuffledIndices[:f+1]

network := NewNetwork(f, shards, options)
network := NewNetwork(f, r, shards, options)
network.Start()
defer network.Stop()

Expand All @@ -271,7 +274,7 @@ var _ = Describe("Hyperdrive", func() {
shuffledIndices := mrand.Perm(3*f + 1)
options.disableNodes = shuffledIndices[:f+1]

network := NewNetwork(f, shards, options)
network := NewNetwork(f, r, shards, options)
network.Start()
defer network.Stop()

Expand Down Expand Up @@ -311,6 +314,7 @@ var DefaultOption = networkOptions{

type Network struct {
f int
r int
shards replica.Shards
options networkOptions

Expand All @@ -324,11 +328,11 @@ type Network struct {
Broadcaster *MockBroadcaster
}

func NewNetwork(f int, shards replica.Shards, options networkOptions) Network {
func NewNetwork(f, r int, shards replica.Shards, options networkOptions) Network {
if f <= 0 {
panic("f must be positive")
}
total := 3*f + 1
total := (3*f + 1) + r

// Generate keys for all the nodes
keys := make([]*ecdsa.PrivateKey, total)
Expand All @@ -343,7 +347,7 @@ func NewNetwork(f int, shards replica.Shards, options networkOptions) Network {
}

// Initialize all nodes
genesisBlock := testutil.GenesisBlock(sigs)
genesisBlock := testutil.GenesisBlock(sigs[r:])
broadcaster := NewMockBroadcaster(keys, options.minNetworkDelay, options.maxNetworkDelay)
nodes := make([]*Node, total)
for i := range nodes {
Expand All @@ -353,12 +357,13 @@ func NewNetwork(f int, shards replica.Shards, options networkOptions) Network {
}
store := NewMockPersistentStorage(shards)
store.Init(genesisBlock)
nodes[i] = NewNode(logger.WithField("node", i), shards, keys[i], broadcaster, store)
nodes[i] = NewNode(logger.WithField("node", i), shards, keys[i], broadcaster, store, i >= r)
}
ctx, cancel := context.WithCancel(context.Background())

return Network{
f: f,
r: r,
shards: shards,
options: options,
nodesMu: new(sync.RWMutex),
Expand Down Expand Up @@ -412,7 +417,7 @@ func (network *Network) StartNode(i int) {
network.nodesMu.Lock()
defer network.nodesMu.Unlock()

network.nodes[i] = NewNode(logger.WithField("node", i), network.shards, network.nodes[i].key, network.Broadcaster, store)
network.nodes[i] = NewNode(logger.WithField("node", i), network.shards, network.nodes[i].key, network.Broadcaster, store, i >= network.r)
network.startNode(i)
}

Expand Down Expand Up @@ -485,19 +490,23 @@ func (network *Network) HealthCheck(indexes []int) bool {
currentBlockHeights := make([]block.Height, len(nodes))
for _, shard := range network.shards {
for i, node := range nodes {
block := node.storage.LatestBlock(shard)
currentBlockHeights[i] = block.Header().Height()
if node.observer.IsSignatory(shard) {
block := node.storage.LatestBlock(shard)
currentBlockHeights[i] = block.Header().Height()
}
}
}

time.Sleep(5 * time.Second)

for _, shard := range network.shards {
for i, node := range nodes {
block := node.storage.LatestBlock(shard)
if block.Header().Height() <= currentBlockHeights[i] {
log.Printf("⚠️ node %v didn't progress ,old height = %v, new height = %v", i, currentBlockHeights[i], block.Header().Height())
return false
if node.observer.IsSignatory(shard) {
block := node.storage.LatestBlock(shard)
if block.Header().Height() <= currentBlockHeights[i] {
log.Printf("⚠️ node %v didn't progress ,old height = %v, new height = %v", i, currentBlockHeights[i], block.Header().Height())
return false
}
}
}
}
Expand All @@ -518,7 +527,7 @@ func (node Node) Signatory() id.Signatory {
return id.NewSignatory(node.key.PublicKey)
}

func NewNode(logger logrus.FieldLogger, shards Shards, pk *ecdsa.PrivateKey, broadcaster *MockBroadcaster, store *MockPersistentStorage) *Node {
func NewNode(logger logrus.FieldLogger, shards Shards, pk *ecdsa.PrivateKey, broadcaster *MockBroadcaster, store *MockPersistentStorage, isSignatory bool) *Node {
option := Options{
Logger: logger,
BackOffExp: 1,
Expand All @@ -527,7 +536,7 @@ func NewNode(logger logrus.FieldLogger, shards Shards, pk *ecdsa.PrivateKey, bro
}
iter := NewMockBlockIterator(store)
validator := NewMockValidator(store)
observer := NewMockObserver(store)
observer := NewMockObserver(store, isSignatory)
hd := New(option, store, store, iter, validator, observer, broadcaster, shards, *pk)

return &Node{
Expand Down
1 change: 1 addition & 0 deletions replica/rebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Validator interface {

type Observer interface {
DidCommitBlock(block.Height, Shard)
IsSignatory(Shard) bool
}

type shardRebaser struct {
Expand Down
3 changes: 3 additions & 0 deletions replica/replica_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func newMockObserver() Observer {

func (m mockObserver) DidCommitBlock(block.Height, Shard) {
}
func (m mockObserver) IsSignatory(Shard) bool {
return true
}

type mockProcessStorage struct {
}
Expand Down
12 changes: 9 additions & 3 deletions testutil/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ func (m *MockValidator) IsBlockValid(b block.Block, checkHistory bool, shard rep
}

type MockObserver struct {
store *MockPersistentStorage
store *MockPersistentStorage
isSignatory bool
}

func NewMockObserver(store *MockPersistentStorage) replica.Observer {
func NewMockObserver(store *MockPersistentStorage, isSignatory bool) replica.Observer {
return &MockObserver{
store: store,
store: store,
isSignatory: isSignatory,
}
}

Expand All @@ -126,6 +128,10 @@ func (m MockObserver) DidCommitBlock(height block.Height, shard replica.Shard) {
blockchain.InsertBlockStatAtHeight(height-1, prevBlock.PreviousState())
}

func (observer *MockObserver) IsSignatory(replica.Shard) bool {
return observer.isSignatory
}

type latestMessages struct {
Mu *sync.RWMutex
Height block.Height
Expand Down

0 comments on commit fe1d619

Please sign in to comment.