From c4d68677f2a31a6e0fd6e865301185692ddf635b Mon Sep 17 00:00:00 2001 From: Almog De Paz Date: Mon, 25 Feb 2019 12:11:59 +0200 Subject: [PATCH] Ms1 fixes (#577) * remove panics, better logs * integrate sync into app * send layer clock to mesh * name change * use layerTimer as syncTrigger * fix sync test * add state root print for flaky test * fix test change to 10 instances * fix concurrent layer handling --- app/app_test.go | 29 +++-- app/cmd/root.go | 2 +- app/main.go | 23 ++-- config/config.go | 2 +- consensus/algorithm.go | 9 +- consensus/ninja_tortoise.go | 4 +- consensus/ninja_tortoise_test.go | 10 +- consensus/tortoise_test.go | 2 +- hare/hare.go | 8 +- hare/hare_test.go | 4 +- hare/orphan_mock.go | 6 +- mesh/mesh.go | 58 +++------- mesh/mesh_test.go | 44 +++---- mesh/meshdb.go | 6 +- miner/block_builder.go | 36 +++--- miner/builder_test.go | 6 +- p2p/server/msgserver.go | 2 +- sync/block_listener.go | 25 +--- sync/block_listener_test.go | 3 +- sync/syncer.go | 174 ++++++++++++++-------------- sync/syncer_test.go | 189 +++++++++++++++++-------------- timesync/clock_test.go | 3 +- 22 files changed, 310 insertions(+), 335 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 576440cc8e..e70c93e3cd 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -5,6 +5,7 @@ import ( "github.com/spacemeshos/go-spacemesh/address" "github.com/spacemeshos/go-spacemesh/api/config" "github.com/spacemeshos/go-spacemesh/hare" + "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh" "github.com/spacemeshos/go-spacemesh/miner" "github.com/spacemeshos/go-spacemesh/oracle" @@ -107,7 +108,7 @@ func (app *AppTestSuite) TestMultipleNodes() { txbytes, _ := mesh.TransactionAsBytes(&tx) path := "../tmp/test/state_" + time.Now().String() - app.initMultipleInstances(app.T(), 2, path) + app.initMultipleInstances(app.T(), 10, path) for _, a := range app.apps { a.startServices() } @@ -121,22 +122,26 @@ func (app *AppTestSuite) TestMultipleNodes() { case <-timeout: app.T().Fatal("timed out ") default: - for _, ap := range app.apps { - ok := 0 - + for idx, ap := range app.apps { if big.NewInt(10).Cmp(ap.state.GetBalance(dst)) == 0 { - for _, ap2 := range app.apps { - assert.Equal(app.T(), ap.state.IntermediateRoot(false), ap2.state.IntermediateRoot(false)) - if ap.state.IntermediateRoot(false) == ap2.state.IntermediateRoot(false) { - ok++ + ok := 0 + for idx2, ap2 := range app.apps { + if idx != idx2 { + r1 := ap.state.IntermediateRoot(false).String() + r2 := ap2.state.IntermediateRoot(false).String() + if r1 == r2 { + log.Info("%d roots confirmed out of %d", ok, len(app.apps)) + ok++ + } + } + if ok == len(app.apps)-1 { + return } } - if ok == len(app.apps) { - return - } + } } - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Millisecond) } } } diff --git a/app/cmd/root.go b/app/cmd/root.go index 9072ddbb00..daea5c1799 100644 --- a/app/cmd/root.go +++ b/app/cmd/root.go @@ -35,7 +35,7 @@ func init() { config.OracleServerWorldId, "The worldid to use with the oracle server (temporary) ") RootCmd.PersistentFlags().StringVar(&config.GenesisTime, "genesis-time", config.GenesisTime, "Time of the genesis layer in 2019-13-02T17:02:00+00:00 format") - RootCmd.PersistentFlags().Uint32Var(&config.LayerDurationSec, "layer-duration-sec", + RootCmd.PersistentFlags().IntVar(&config.LayerDurationSec, "layer-duration-sec", config.LayerDurationSec, "Duration between layers in seconds") /** ======================== P2P Flags ========================== **/ RootCmd.PersistentFlags().IntVar(&config.P2P.SecurityParam, "security-param", diff --git a/app/main.go b/app/main.go index 25317d0435..81515ede9a 100644 --- a/app/main.go +++ b/app/main.go @@ -48,7 +48,7 @@ type SpacemeshApp struct { NodeInitCallback chan bool grpcAPIService *api.SpacemeshGrpcService jsonAPIService *api.JSONHTTPServer - + syncer *sync.Syncer blockListener *sync.BlockListener db database.Database state *state.StateDB @@ -317,26 +317,28 @@ func (app *SpacemeshApp) initServices(instanceName string, swarm server.Service, rng := rand.New(mt19937.New()) processor := state.NewTransactionProcessor(rng, st, lg) - //trtl := consensus.NewTortoise(50, 100) - trtl := consensus.NewAlgorithm(consensus.NewNinjaTortoise(layerSize)) - mesh := mesh.NewMesh(db, db, db, trtl, processor, lg) //todo: what to do with the logger? - coinToss := consensus.WeakCoin{} gTime, err := time.Parse(time.RFC3339, app.Config.GenesisTime) if err != nil { return err } - clock := timesync.NewTicker(timesync.RealClock{}, time.Duration(app.Config.LayerDurationSec)*time.Second, gTime) + ld := time.Duration(app.Config.LayerDurationSec) * time.Second + clock := timesync.NewTicker(timesync.RealClock{}, ld, gTime) + trtl := consensus.NewAlgorithm(consensus.NewNinjaTortoise(layerSize, lg)) + msh := mesh.NewMesh(db, db, db, trtl, processor, lg) //todo: what to do with the logger? - blockListener := sync.NewBlockListener(swarm, blockOracle, mesh, 1*time.Second, 1, clock, lg) + conf := sync.Configuration{SyncInterval: 1 * time.Second, Concurrency: 4, LayerSize: int(layerSize), RequestTimeout: 100 * time.Millisecond} + syncer := sync.NewSync(swarm, msh, blockOracle, conf, clock.Subscribe(), lg) - ha := hare.New(app.Config.HARE, swarm, sgn, mesh, hareOracle, clock.Subscribe(), lg) + ha := hare.New(app.Config.HARE, swarm, sgn, msh, hareOracle, clock.Subscribe(), lg) - blockProducer := miner.NewBlockBuilder(instanceName, swarm, clock.Subscribe(), coinToss, mesh, ha, blockOracle, lg) + blockProducer := miner.NewBlockBuilder(instanceName, swarm, clock.Subscribe(), coinToss, msh, ha, blockOracle, lg) + blockListener := sync.NewBlockListener(swarm, blockOracle, msh, 2*time.Second, 4, lg) app.blockProducer = &blockProducer app.blockListener = blockListener - app.mesh = mesh + app.mesh = msh + app.syncer = syncer app.clock = clock app.state = st app.db = db @@ -348,6 +350,7 @@ func (app *SpacemeshApp) initServices(instanceName string, swarm server.Service, func (app *SpacemeshApp) startServices() { app.blockListener.Start() + app.syncer.Start() err := app.hare.Start() if err != nil { panic("cannot start hare") diff --git a/config/config.go b/config/config.go index d7d1a8617b..c0e0f72a7f 100644 --- a/config/config.go +++ b/config/config.go @@ -63,7 +63,7 @@ type BaseConfig struct { OracleServerWorldId uint64 `mapstructure:"oracle_server_worldid"` GenesisTime string `mapstructure:"genesis-time"` - LayerDurationSec uint32 `mapstructure:"layer-duration-sec"` + LayerDurationSec int `mapstructure:"layer-duration-sec"` } // DefaultConfig returns the default configuration for a spacemesh node diff --git a/consensus/algorithm.go b/consensus/algorithm.go index 8cbfd4fe92..2167413b9e 100644 --- a/consensus/algorithm.go +++ b/consensus/algorithm.go @@ -18,9 +18,10 @@ type Tortoise interface { } func NewAlgorithm(trtl Tortoise) *Algorithm { - return &Algorithm{Tortoise: trtl} + alg := &Algorithm{Tortoise: trtl} + alg.HandleIncomingLayer(GenesisLayer()) + return alg } - func (alg *Algorithm) HandleLateBlock(b *mesh.Block) { //todo feed all layers from b's layer to tortoise log.Info("received block with layer Id %v block id: %v ", b.Layer(), b.ID()) @@ -55,7 +56,6 @@ func (alg *Algorithm) ContextualValidity(id mesh.BlockID) bool { } func CreateGenesisBlock() *mesh.Block { - log.Info("Creating genesis") bl := &mesh.Block{ Id: mesh.BlockID(config.GenesisId), LayerIndex: 0, @@ -64,8 +64,7 @@ func CreateGenesisBlock() *mesh.Block { return bl } -func createGenesisLayer() *mesh.Layer { - log.Info("Creating genesis") +func GenesisLayer() *mesh.Layer { l := mesh.NewLayer(Genesis) l.AddBlock(CreateGenesisBlock()) return l diff --git a/consensus/ninja_tortoise.go b/consensus/ninja_tortoise.go index 9b5bae61b0..29fab5dc29 100644 --- a/consensus/ninja_tortoise.go +++ b/consensus/ninja_tortoise.go @@ -82,9 +82,9 @@ type ninjaTortoise struct { tPatSupport map[votingPattern]map[mesh.LayerID]votingPattern //pattern support count } -func NewNinjaTortoise(layerSize uint32) *ninjaTortoise { +func NewNinjaTortoise(layerSize uint32, log log.Log) *ninjaTortoise { return &ninjaTortoise{ - Log: log.New("optimized tortoise ", "", ""), + Log: log, avgLayerSize: layerSize, pBase: votingPattern{}, blocks: map[mesh.BlockID]*mesh.Block{}, diff --git a/consensus/ninja_tortoise_test.go b/consensus/ninja_tortoise_test.go index 1d4fb763d2..8a8f9fa7f8 100644 --- a/consensus/ninja_tortoise_test.go +++ b/consensus/ninja_tortoise_test.go @@ -49,8 +49,8 @@ func TestNinjaTortoise_GlobalOpinion(t *testing.T) { func TestForEachInView(t *testing.T) { blocks := make(map[mesh.BlockID]*mesh.Block) - alg := NewNinjaTortoise(2) - l := createGenesisLayer() + alg := NewNinjaTortoise(2, log.New("TestForEachInView", "", "")) + l := GenesisLayer() for _, b := range l.Blocks() { blocks[b.ID()] = b } @@ -93,8 +93,8 @@ func TestNinjaTortoise_UpdatePatternTally(t *testing.T) { func TestNinjaTortoise_Sanity1(t *testing.T) { layerSize := 30 patternSize := layerSize - alg := NewNinjaTortoise(uint32(layerSize)) - l1 := createGenesisLayer() + alg := NewNinjaTortoise(uint32(layerSize), log.New("TestNinjaTortoise_Sanity1", "", "")) + l1 := GenesisLayer() genesisId := l1.Blocks()[0].ID() alg.handleIncomingLayer(l1) l := createLayerWithRandVoting(l1.Index()+1, []*mesh.Layer{l1}, layerSize, 1) @@ -116,7 +116,7 @@ func TestNinjaTortoise_Sanity1(t *testing.T) { //vote explicitly for two previous layers //correction vectors compensate for double count func TestNinjaTortoise_Sanity2(t *testing.T) { - alg := NewNinjaTortoise(uint32(3)) + alg := NewNinjaTortoise(uint32(3), log.New("TestNinjaTortoise_Sanity2", "", "")) l := createMulExplicitLayer(0, map[mesh.LayerID]*mesh.Layer{}, nil, 1) l1 := createMulExplicitLayer(1, map[mesh.LayerID]*mesh.Layer{l.Index(): l}, map[mesh.LayerID][]int{0: {0}}, 3) l2 := createMulExplicitLayer(2, map[mesh.LayerID]*mesh.Layer{l1.Index(): l1}, map[mesh.LayerID][]int{1: {0, 1, 2}}, 3) diff --git a/consensus/tortoise_test.go b/consensus/tortoise_test.go index 938172ed90..bcb96d239d 100644 --- a/consensus/tortoise_test.go +++ b/consensus/tortoise_test.go @@ -13,7 +13,7 @@ func TestAlgorithm_Sanity(t *testing.T) { cachedLayers := 100 alg := NewTortoise(uint32(layerSize), uint32(cachedLayers)) - l := createGenesisLayer() + l := GenesisLayer() alg.HandleIncomingLayer(l) alg.RegisterLayerCallback(func(id mesh.LayerID) {}) for i := 0; i < 11-1; i++ { diff --git a/hare/hare.go b/hare/hare.go index 448de58c4a..83534c226f 100644 --- a/hare/hare.go +++ b/hare/hare.go @@ -36,7 +36,7 @@ type TerminationOutput interface { } type orphanBlockProvider interface { - GetUnverifiedLayerBlocks(layerId mesh.LayerID) []mesh.BlockID + GetUnverifiedLayerBlocks(layerId mesh.LayerID) ([]mesh.BlockID, error) } // Hare is an orchestrator that shoots consensus processes and collects their termination output @@ -160,9 +160,9 @@ func (h *Hare) onTick(id mesh.LayerID) { } // retrieve set form orphan blocks - blocks := h.obp.GetUnverifiedLayerBlocks(h.lastLayer) - if len(blocks) == 0 { - log.Info("No blocks for consensus on layer %v", id) + blocks, err := h.obp.GetUnverifiedLayerBlocks(h.lastLayer) + if err != nil { + log.Error("No blocks for consensus on layer %v %v", id, err) return } diff --git a/hare/hare_test.go b/hare/hare_test.go index d5115af89a..3741802612 100644 --- a/hare/hare_test.go +++ b/hare/hare_test.go @@ -284,7 +284,7 @@ func TestHare_onTick(t *testing.T) { wg.Add(2) go func() { wg.Done() - layerTicker <- mesh.LayerID(0) + layerTicker <- 0 <-createdChan <-nmcp.CloseChannel() wg.Done() @@ -304,7 +304,7 @@ func TestHare_onTick(t *testing.T) { wg.Add(2) go func() { wg.Done() - layerTicker <- mesh.LayerID(1) + layerTicker <- 1 h.Close() wg.Done() }() diff --git a/hare/orphan_mock.go b/hare/orphan_mock.go index dfb553e1a9..d0fb9e4e1f 100644 --- a/hare/orphan_mock.go +++ b/hare/orphan_mock.go @@ -13,9 +13,9 @@ func (op *orphanMock) GetOrphanBlocks() []mesh.BlockID { return []mesh.BlockID{} } -func (op *orphanMock) GetUnverifiedLayerBlocks(l mesh.LayerID) []mesh.BlockID { +func (op *orphanMock) GetUnverifiedLayerBlocks(l mesh.LayerID) ([]mesh.BlockID, error) { if op.f != nil { - return op.f() + return op.f(), nil } - return []mesh.BlockID{} + return []mesh.BlockID{}, nil } diff --git a/mesh/mesh.go b/mesh/mesh.go index 95fc52b24b..0b72dbb2da 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -39,6 +39,7 @@ type Mesh struct { tortoise MeshValidator state StateUpdater orphMutex sync.RWMutex + done chan struct{} } func NewMesh(layers, blocks, validity database.DB, mesh MeshValidator, state StateUpdater, logger log.Log) *Mesh { @@ -47,8 +48,10 @@ func NewMesh(layers, blocks, validity database.DB, mesh MeshValidator, state Sta Log: logger, tortoise: mesh, state: state, - meshDB: NewMeshDB(layers, blocks, validity), + done: make(chan struct{}), + meshDB: NewMeshDB(layers, blocks, validity, logger), } + return ll } @@ -71,10 +74,6 @@ func (m *Mesh) VerifiedLayer() uint32 { return atomic.LoadUint32(&m.verifiedLayer) } -func (m *Mesh) LatestReceivedLayer() uint32 { //maybe switch names with latestlayer? - return atomic.LoadUint32(&m.lastSeenLayer) -} - func (m *Mesh) LatestLayer() uint32 { defer m.lkMutex.RUnlock() m.lkMutex.RLock() @@ -85,31 +84,11 @@ func (m *Mesh) SetLatestLayer(idx uint32) { defer m.lkMutex.Unlock() m.lkMutex.Lock() if idx > m.latestLayer { - m.Debug("set latest known layer to ", idx) + m.Info("set latest known layer to ", idx) m.latestLayer = idx } } -func (m *Mesh) AddLayer(layer *Layer) error { - m.lMutex.Lock() - defer m.lMutex.Unlock() - count := LayerID(m.LatestReceivedLayer()) - if count > layer.Index() { - m.Debug("can't add layer ", layer.Index(), "(already exists)") - return errors.New("can't add layer (already exists)") - } - - if count+1 < layer.Index() { - m.Debug("can't add layer", layer.Index(), " missing previous layers") - return errors.New("can't add layer missing previous layers") - } - atomic.StoreUint32(&m.lastSeenLayer, uint32(layer.Index())) - m.addLayer(layer) - m.Log.With().Info("added layer", log.Uint32("id", uint32(layer.Index())), log.Int("num of blocks", len(layer.blocks))) - m.SetLatestLayer(uint32(layer.Index())) - return nil -} - func (m *Mesh) ValidateLayer(layer *Layer) { m.Info("Validate layer %d", layer.Index()) oldPbase, newPbase := m.tortoise.HandleIncomingLayer(layer) @@ -163,9 +142,9 @@ func (m *Mesh) GetLayer(i LayerID) (*Layer, error) { } func (m *Mesh) AddBlock(block *Block) error { - log.Debug("add block ", block.ID()) + m.Debug("add block %d", block.ID()) if err := m.addBlock(block); err != nil { - m.Error("failed to add block ", block.ID(), " ", err) + m.Error("failed to add block %v %v", block.ID(), err) return err } m.SetLatestLayer(uint32(block.Layer())) @@ -197,24 +176,23 @@ func (m *Mesh) handleOrphanBlocks(block *Block) { } } -func (m *Mesh) GetUnverifiedLayerBlocks(l LayerID) []BlockID { +func (m *Mesh) GetUnverifiedLayerBlocks(l LayerID) ([]BlockID, error) { x, err := m.meshDB.layers.Get(l.ToBytes()) if err != nil { - panic(fmt.Sprintf("could not retrive latest layer = %d blocks ", l)) + return nil, errors.New(fmt.Sprintf("could not retrive layer = %d blocks ", l)) } blockIds, err := bytesToBlockIds(x) if err != nil { - panic(fmt.Sprintf("could bytes to id array for layer %d ", l)) + return nil, errors.New(fmt.Sprintf("could not desirialize layer to id array for layer %d ", l)) } arr := make([]BlockID, 0, len(blockIds)) for bid := range blockIds { arr = append(arr, bid) } - return arr + return arr, nil } -//todo better thread safety -func (m *Mesh) GetOrphanBlocksExcept(l LayerID) []BlockID { +func (m *Mesh) GetOrphanBlocksBefore(l LayerID) ([]BlockID, error) { m.orphMutex.RLock() defer m.orphMutex.RUnlock() ids := map[BlockID]struct{}{} @@ -226,14 +204,14 @@ func (m *Mesh) GetOrphanBlocksExcept(l LayerID) []BlockID { } } - prevLayer, err := m.getLayer(l - 1) + blocks, err := m.GetUnverifiedLayerBlocks(l - 1) if err != nil { - panic(fmt.Sprint("failed getting latest layer ", err)) + return nil, errors.New(fmt.Sprint("failed getting latest layer ", err)) } //add last layer blocks - for _, b := range prevLayer.Blocks() { - ids[b.ID()] = struct{}{} + for _, b := range blocks { + ids[b] = struct{}{} } idArr := make([]BlockID, 0, len(ids)) @@ -244,11 +222,11 @@ func (m *Mesh) GetOrphanBlocksExcept(l LayerID) []BlockID { sort.Slice(idArr, func(i, j int) bool { return idArr[i] < idArr[j] }) m.Info("orphans for layer %d are %v", l, idArr) - return idArr + return idArr, nil } func (m *Mesh) GetBlock(id BlockID) (*Block, error) { - m.Debug("get block ", id) + m.Debug("get block %d", id) return m.getBlock(id) } diff --git a/mesh/mesh_test.go b/mesh/mesh_test.go index a0d3a4796d..a976971b2a 100644 --- a/mesh/mesh_test.go +++ b/mesh/mesh_test.go @@ -31,7 +31,6 @@ func getMesh(id string) *Mesh { bdb := database.NewMemDatabase() ldb := database.NewMemDatabase() cdb := database.NewMemDatabase() - layers := NewMesh(ldb, bdb, cdb, &MeshValidatorMock{}, &MockState{}, log.New(id, "", "")) return layers } @@ -69,7 +68,11 @@ func TestLayers_AddLayer(t *testing.T) { l, err := layers.GetLayer(id) assert.True(t, err != nil, "error: ", err) - err = layers.AddLayer(NewExistingLayer(1, []*Block{block1, block2, block3})) + err = layers.AddBlock(block1) + assert.NoError(t, err) + err = layers.AddBlock(block2) + assert.NoError(t, err) + err = layers.AddBlock(block3) assert.NoError(t, err) l, err = layers.GetLayer(id) assert.NoError(t, err) @@ -84,14 +87,12 @@ func TestLayers_AddWrongLayer(t *testing.T) { block2 := NewBlock(true, nil, time.Now(), 2) block3 := NewBlock(true, nil, time.Now(), 4) l1 := NewExistingLayer(1, []*Block{block1}) - layers.AddLayer(l1) + layers.AddBlock(block1) layers.ValidateLayer(l1) l2 := NewExistingLayer(2, []*Block{block2}) - layers.AddLayer(l2) + layers.AddBlock(block2) layers.ValidateLayer(l2) - l3 := NewExistingLayer(4, []*Block{block3}) - layers.AddLayer(l3) - layers.ValidateLayer(l3) + layers.AddBlock(block3) _, err := layers.GetVerifiedLayer(1) assert.True(t, err == nil, "error: ", err) _, err1 := layers.GetVerifiedLayer(2) @@ -107,30 +108,16 @@ func TestLayers_GetLayer(t *testing.T) { block2 := NewBlock(true, nil, time.Now(), 1) block3 := NewBlock(true, nil, time.Now(), 1) l1 := NewExistingLayer(1, []*Block{block1}) - layers.AddLayer(l1) + layers.AddBlock(block1) layers.ValidateLayer(l1) l, err := layers.GetVerifiedLayer(0) - layers.AddLayer(NewExistingLayer(3, []*Block{block2})) - layers.AddLayer(NewExistingLayer(2, []*Block{block3})) + layers.AddBlock(block2) + layers.AddBlock(block3) l, err = layers.GetVerifiedLayer(1) assert.True(t, err == nil, "error: ", err) assert.True(t, l.Index() == 1, "wrong layer") } -func TestLayers_LocalLayerCount(t *testing.T) { - layers := getMesh("t5") - defer layers.Close() - block1 := NewBlock(true, nil, time.Now(), 1) - block2 := NewBlock(true, nil, time.Now(), 4) - block3 := NewBlock(true, nil, time.Now(), 2) - block4 := NewBlock(true, nil, time.Now(), 1) - layers.AddLayer(NewExistingLayer(1, []*Block{block1})) - layers.AddLayer(NewExistingLayer(4, []*Block{block2})) - layers.AddLayer(NewExistingLayer(2, []*Block{block3})) - layers.AddLayer(NewExistingLayer(3, []*Block{block4})) - assert.Equal(t, uint32(3), layers.LatestLayer(), "wrong layer count") -} - func TestLayers_LatestKnownLayer(t *testing.T) { layers := getMesh("t6") defer layers.Close() @@ -165,10 +152,13 @@ func TestLayers_OrphanBlocks(t *testing.T) { layers.AddBlock(block2) layers.AddBlock(block3) layers.AddBlock(block4) - assert.True(t, len(layers.GetOrphanBlocksExcept(3)) == 4, "wrong layer") - assert.Equal(t, len(layers.GetOrphanBlocksExcept(2)), 2) + arr, _ := layers.GetOrphanBlocksBefore(3) + assert.True(t, len(arr) == 4, "wrong layer") + arr2, _ := layers.GetOrphanBlocksBefore(2) + assert.Equal(t, len(arr2), 2) layers.AddBlock(block5) time.Sleep(1 * time.Second) - assert.True(t, len(layers.GetOrphanBlocksExcept(4)) == 1, "wrong layer") + arr3, _ := layers.GetOrphanBlocksBefore(4) + assert.True(t, len(arr3) == 1, "wrong layer") } diff --git a/mesh/meshdb.go b/mesh/meshdb.go index 08361ee9ff..a63c408bbf 100644 --- a/mesh/meshdb.go +++ b/mesh/meshdb.go @@ -15,6 +15,7 @@ type layerMutex struct { } type meshDB struct { + log.Log layers database.DB blocks database.DB contextualValidity database.DB //map blockId to contextualValidation state of block @@ -24,8 +25,9 @@ type meshDB struct { lhMutex sync.Mutex } -func NewMeshDB(layers, blocks, validity database.DB) *meshDB { +func NewMeshDB(layers, blocks, validity database.DB, log log.Log) *meshDB { ll := &meshDB{ + Log: log, blocks: blocks, layers: layers, contextualValidity: validity, @@ -161,7 +163,7 @@ func (m *meshDB) updateLayerWithBlock(block *Block) error { return errors.New("could not get all blocks from database ") } } - log.Info("added block %v to layer %v", block.ID(), block.LayerIndex) + m.Info("added block %v to layer %v", block.ID(), block.LayerIndex) blockIds[block.ID()] = true w, err := blockIdsAsBytes(blockIds) if err != nil { diff --git a/miner/block_builder.go b/miner/block_builder.go index 804014ab1d..5353b02364 100644 --- a/miner/block_builder.go +++ b/miner/block_builder.go @@ -2,6 +2,7 @@ package miner import ( "bytes" + "errors" "fmt" "github.com/spacemeshos/go-spacemesh/address" "github.com/spacemeshos/go-spacemesh/common" @@ -109,7 +110,7 @@ type WeakCoinProvider interface { } type OrphanBlockProvider interface { - GetUnverifiedLayerBlocks(l mesh.LayerID) []mesh.BlockID + GetOrphanBlocksBefore(l mesh.LayerID) ([]mesh.BlockID, error) } //used from external API call? @@ -121,22 +122,25 @@ func (t *BlockBuilder) AddTransaction(nonce uint64, origin, destination address. return nil } -func (t *BlockBuilder) createBlock(id mesh.LayerID, txs []mesh.SerializableTransaction) mesh.Block { +func (t *BlockBuilder) createBlock(id mesh.LayerID, txs []mesh.SerializableTransaction) (*mesh.Block, error) { var res []mesh.BlockID = nil var err error - - if id == 0 { - panic("cannot create block in layer 0") - } - - if id == 1 { + if id == config.Genesis { + return nil, errors.New("cannot create block in genesis layer ") + } else if id == config.Genesis+1 { res = append(res, config.GenesisId) } else { res, err = t.hareResult.GetResult(id - 1) if err != nil { - t.Log.Error("didnt receive hare result for layer %v", id-1) + return nil, errors.New(fmt.Sprintf("didnt receive hare result for layer %v", id-1)) } } + + viewEdges, err := t.orphans.GetOrphanBlocksBefore(id) + if err != nil { + return nil, err + } + b := mesh.Block{ MinerID: t.minerID, Id: mesh.BlockID(rand.Int63()), @@ -146,11 +150,11 @@ func (t *BlockBuilder) createBlock(id mesh.LayerID, txs []mesh.SerializableTrans Timestamp: time.Now().UnixNano(), Txs: txs, BlockVotes: res, - ViewEdges: t.orphans.GetUnverifiedLayerBlocks(id - 1), + ViewEdges: viewEdges, } t.Log.Info("Iv'e created block in layer %v id %v, num of transactions %v votes %d viewEdges %d", b.LayerIndex, b.Id, len(b.Txs), len(b.BlockVotes), len(b.ViewEdges)) - return b + return &b, nil } func (t *BlockBuilder) listenForTx() { @@ -182,15 +186,19 @@ func (t *BlockBuilder) acceptBlockData() { return case id := <-t.beginRoundEvent: - if !t.blockOracle.BlockEligible(id, t.minerID) { + if !t.blockOracle.BlockEligible(mesh.LayerID(id), t.minerID) { break } txList := t.transactionQueue[:common.Min(len(t.transactionQueue), MaxTransactionsPerBlock)] t.transactionQueue = t.transactionQueue[common.Min(len(t.transactionQueue), MaxTransactionsPerBlock):] - blk := t.createBlock(id, txList) + blk, err := t.createBlock(mesh.LayerID(id), txList) + if err != nil { + t.Error("cannot create new block, %v ", err) + continue + } go func() { - bytes, err := mesh.BlockAsBytes(blk) + bytes, err := mesh.BlockAsBytes(*blk) if err != nil { t.Log.Error("cannot serialize block %v", err) return diff --git a/miner/builder_test.go b/miner/builder_test.go index a31113983c..f9ab4a50cb 100644 --- a/miner/builder_test.go +++ b/miner/builder_test.go @@ -34,8 +34,8 @@ type MockOrphans struct { st []mesh.BlockID } -func (m MockOrphans) GetUnverifiedLayerBlocks(l mesh.LayerID) []mesh.BlockID { - return m.st +func (m MockOrphans) GetOrphanBlocksBefore(l mesh.LayerID) ([]mesh.BlockID, error) { + return m.st, nil } type mockBlockOracle struct { @@ -104,7 +104,7 @@ func TestBlockBuilder_CreateBlock(t *testing.T) { builder.AddTransaction(trans[1].AccountNonce, trans[1].Origin, *trans[1].Recipient, big.NewInt(0).SetBytes(trans[1].Price)) builder.AddTransaction(trans[2].AccountNonce, trans[2].Origin, *trans[2].Recipient, big.NewInt(0).SetBytes(trans[2].Price)) - go func() { beginRound <- mesh.LayerID(2) }() + go func() { beginRound <- 2 }() select { case output := <-receiver.RegisterGossipProtocol(sync.NewBlockProtocol): diff --git a/p2p/server/msgserver.go b/p2p/server/msgserver.go index 52b9c650fd..6d5ed2d31d 100644 --- a/p2p/server/msgserver.go +++ b/p2p/server/msgserver.go @@ -176,7 +176,7 @@ func (p *MessageServer) SendRequest(msgType MessageType, payload []byte, address p.removeFromPending(reqID) return sendErr } - log.Debug("sent request number %v", reqID) + p.Debug("sent request id: %v", reqID) return nil } diff --git a/sync/block_listener.go b/sync/block_listener.go index 313405aaa7..8851bc9668 100644 --- a/sync/block_listener.go +++ b/sync/block_listener.go @@ -46,7 +46,6 @@ func (bl *BlockListener) Start() { if atomic.CompareAndSwapUint32(&bl.startLock, 0, 1) { go bl.run() go bl.ListenToGossipBlocks() - go bl.onTick() } } @@ -54,7 +53,7 @@ func (bl *BlockListener) OnNewBlock(b *mesh.Block) { bl.addUnknownToQueue(b) } -func NewBlockListener(net server.Service, bv BlockValidator, layers *mesh.Mesh, timeout time.Duration, concurrency int, clock TickProvider, logger log.Log) *BlockListener { +func NewBlockListener(net server.Service, bv BlockValidator, layers *mesh.Mesh, timeout time.Duration, concurrency int, logger log.Log) *BlockListener { bl := BlockListener{ BlockValidator: bv, @@ -66,7 +65,6 @@ func NewBlockListener(net server.Service, bv BlockValidator, layers *mesh.Mesh, unknownQueue: make(chan mesh.BlockID, 200), //todo tune buffer size + get buffer from config exit: make(chan struct{}), receivedGossipBlocks: net.RegisterGossipProtocol(NewBlockProtocol), - tick: clock.Subscribe(), } bl.RegisterMsgHandler(BLOCK, newBlockRequestHandler(layers, logger)) @@ -119,27 +117,6 @@ func (bl *BlockListener) run() { } } -func (bl *BlockListener) onTick() { - for { - select { - case <-bl.exit: - bl.Logger.Info("run stopped") - return - case newLayer := <-bl.tick: //todo: should this be here or in own loop? - if newLayer == 0 { - break - } - //log.Info("new layer tick in listener") - l, err := bl.GetLayer(newLayer - 1) //bl.CreateLayer(newLayer - 1) - if err != nil { - log.Error("error getting layer %v : %v", newLayer-1, err) - break - } - go bl.Mesh.ValidateLayer(l) - } - } -} - //todo handle case where no peer knows the block func (bl *BlockListener) FetchBlock(id mesh.BlockID) { for _, p := range bl.GetPeers() { diff --git a/sync/block_listener_test.go b/sync/block_listener_test.go index 930a7df6a2..3f91be8a89 100644 --- a/sync/block_listener_test.go +++ b/sync/block_listener_test.go @@ -33,9 +33,10 @@ func (pm PeersMock) GetPeers() []p2p.Peer { func (pm PeersMock) Close() { return } + func ListenerFactory(serv server.Service, peers p2p.Peers, name string) *BlockListener { - nbl := NewBlockListener(serv, BlockValidatorMock{}, getMesh(memoryDB, "TestBlockListener_"+name), 1*time.Second, 2, &ClockMock{}, log.New(name, "", "")) + nbl := NewBlockListener(serv, BlockValidatorMock{}, getMesh(memoryDB, "TestBlockListener_"+name), 1*time.Second, 2, log.New(name, "", "")) nbl.Peers = peers //override peers with mock return nbl } diff --git a/sync/syncer.go b/sync/syncer.go index 3135bf3736..edc437f1f8 100644 --- a/sync/syncer.go +++ b/sync/syncer.go @@ -10,6 +10,7 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/p2p/service" "github.com/spacemeshos/go-spacemesh/sync/pb" + "github.com/spacemeshos/go-spacemesh/timesync" "sync" "sync/atomic" "time" @@ -20,11 +21,10 @@ type BlockValidator interface { } type Configuration struct { - hdist uint32 //dist of consensus layers from newst layer - syncInterval time.Duration - concurrency int //number of workers for sync method - layerSize int - requestTimeout time.Duration + SyncInterval time.Duration + Concurrency int //number of workers for sync method + LayerSize int + RequestTimeout time.Duration } type Syncer struct { @@ -34,10 +34,12 @@ type Syncer struct { Configuration log.Log *server.MessageServer - SyncLock uint32 - startLock uint32 - forceSync chan bool - exit chan struct{} + currentLayer uint32 + SyncLock uint32 + startLock uint32 + forceSync chan bool + clock timesync.LayerTimer + exit chan struct{} } func (s *Syncer) ForceSync() { @@ -76,41 +78,40 @@ func (s *Syncer) Start() { //fires a sync every sm.syncInterval or on force space from outside func (s *Syncer) run() { - syncTicker := time.NewTicker(s.syncInterval) + foo := func() { + if atomic.CompareAndSwapUint32(&s.SyncLock, IDLE, RUNNING) { + s.Synchronise() + atomic.StoreUint32(&s.SyncLock, IDLE) + } + } for { - doSync := false select { case <-s.exit: - log.Debug("run stoped") + s.Debug("run stoped") return - case doSync = <-s.forceSync: - case <-syncTicker.C: - doSync = true - } - if doSync { - go func() { - if atomic.CompareAndSwapUint32(&s.SyncLock, IDLE, RUNNING) { - s.Debug("do sync") - s.Synchronise() - atomic.StoreUint32(&s.SyncLock, IDLE) - } - }() + case <-s.forceSync: + go foo() + case layer := <-s.clock: + atomic.StoreUint32(&s.currentLayer, uint32(layer)) + s.Info("sync got tick for layer %v", layer) + go foo() } } } //fires a sync every sm.syncInterval or on force space from outside -func NewSync(srv server.Service, layers *mesh.Mesh, bv BlockValidator, conf Configuration, logger log.Log) *Syncer { +func NewSync(srv server.Service, layers *mesh.Mesh, bv BlockValidator, conf Configuration, clock timesync.LayerTimer, logger log.Log) *Syncer { s := Syncer{ BlockValidator: bv, Configuration: conf, Log: logger, Mesh: layers, Peers: p2p.NewPeers(srv), - MessageServer: server.NewMsgServer(srv, syncProtocol, conf.requestTimeout-time.Millisecond*30, make(chan service.DirectMessage, config.ConfigValues.BufferSize), logger), + MessageServer: server.NewMsgServer(srv, syncProtocol, conf.RequestTimeout-time.Millisecond*30, make(chan service.DirectMessage, config.ConfigValues.BufferSize), logger), SyncLock: 0, startLock: 0, forceSync: make(chan bool), + clock: clock, exit: make(chan struct{}), } @@ -122,65 +123,54 @@ func NewSync(srv server.Service, layers *mesh.Mesh, bv BlockValidator, conf Conf } func (s *Syncer) maxSyncLayer() uint32 { - if uint32(s.LatestLayer()) < s.hdist { - return 0 - } - - return s.LatestLayer() - s.hdist + return atomic.LoadUint32(&s.currentLayer) } func (s *Syncer) Synchronise() { - log.Info("syncing layer %v to layer %v ", s.LatestReceivedLayer(), s.maxSyncLayer()) - for i := s.LatestReceivedLayer(); i < s.maxSyncLayer(); i++ { - blockIds, err := s.getLayerBlockIDs(mesh.LayerID(i + 1)) //returns a set of all known blocks in the mesh - if err != nil { - log.Error("could not get layer block ids: ", err) - log.Debug("synchronise failed, local layer index is ", i) - return + for currenSyncLayer := s.VerifiedLayer() + 1; currenSyncLayer < s.maxSyncLayer(); currenSyncLayer++ { + s.Info("syncing layer %v to layer %v current consensus layer is %d", s.VerifiedLayer(), currenSyncLayer, atomic.LoadUint32(&s.currentLayer)) + lyr, _ := s.GetLayer(mesh.LayerID(currenSyncLayer)) + if lyr == nil { + lyr, _ = s.getLayerFromNeighbors(currenSyncLayer) } + s.ValidateLayer(lyr) + } +} - output := make(chan *mesh.Block) - // each worker goroutine tries to fetch a block iteratively from each peer - count := int32(s.concurrency) - - for i := 0; i < s.concurrency; i++ { - go func() { - for id := range blockIds { - for _, p := range s.GetPeers() { - if bCh, err := sendBlockRequest(s.MessageServer, p, mesh.BlockID(id), s.Log); err == nil { - if b := <-bCh; b != nil && s.BlockEligible(b.LayerIndex, b.MinerID) { //some validation testing - s.Debug("received block", b) - output <- b - break - } +func (s *Syncer) getLayerFromNeighbors(currenSyncLayer uint32) (*mesh.Layer, error) { + blockIds, err := s.getLayerBlockIDs(mesh.LayerID(currenSyncLayer)) + if err != nil { + return nil, err + } + blocksArr := make([]*mesh.Block, 0, len(blockIds)) + // each worker goroutine tries to fetch a block iteratively from each peer + output := make(chan *mesh.Block) + count := int32(s.Concurrency) + for j := 0; j < s.Concurrency; j++ { + go func() { + for id := range blockIds { + for _, p := range s.GetPeers() { + if bCh, err := sendBlockRequest(s.MessageServer, p, mesh.BlockID(id), s.Log); err == nil { + if b := <-bCh; b != nil && s.BlockEligible(b.LayerIndex, b.MinerID) { //some validation testing + s.Debug("received block", b) + output <- b + break } } } - if atomic.AddInt32(&count, -1); atomic.LoadInt32(&count) == 0 { // last one closes the channel - close(output) - } - }() - } - - blocks := make([]*mesh.Block, 0, len(blockIds)) - - for block := range output { - s.Debug("add block to layer", block) - blocks = append(blocks, block) - } - - s.Debug("add layer ", i) - - l := mesh.NewExistingLayer(mesh.LayerID(i+1), blocks) - err = s.AddLayer(l) - if err != nil { - log.Error("cannot insert layer to db because %v", err) - continue - } - go s.ValidateLayer(l) + } + if atomic.AddInt32(&count, -1); atomic.LoadInt32(&count) == 0 { // last one closes the channel + close(output) + } + }() + } + for block := range output { + s.Debug("add block to layer %v", block) + s.AddBlock(block) + blocksArr = append(blocksArr, block) } - s.Debug("synchronise done, local layer index is ", s.VerifiedLayer(), "most recent is ", s.LatestReceivedLayer()) + return mesh.NewExistingLayer(mesh.LayerID(currenSyncLayer), blocksArr), nil } type peerHashPair struct { @@ -222,7 +212,7 @@ func (s *Syncer) getLayerBlockIDs(index mesh.LayerID) (chan mesh.BlockID, error) m, err := s.getLayerHashes(index) if err != nil { - s.Error("could not get LayerHashes for layer: ", index, err) + s.Error("could not get LayerHashes for layer: ", index) return nil, err } return s.getIdsForHash(m, index) @@ -242,15 +232,16 @@ func (s *Syncer) getIdsForHash(m map[string]p2p.Peer, index mesh.LayerID) (chan reqCounter++ wg.Add(1) go func() { - v := <-c - ch <- v + if v := <-c; v != nil { + ch <- v + } wg.Done() }() } go func() { wg.Wait(); close(ch) }() - idSet := make(map[mesh.BlockID]bool, s.layerSize) //change uint32 to BlockId - timeout := time.After(s.requestTimeout) + idSet := make(map[mesh.BlockID]bool, s.LayerSize) //change uint32 to BlockId + timeout := time.After(s.RequestTimeout) for reqCounter > 0 { select { case b := <-ch: @@ -292,7 +283,7 @@ func (s *Syncer) getLayerHashes(index mesh.LayerID) (map[string]p2p.Peer, error) } // request hash from all wg := sync.WaitGroup{} - ch := make(chan peerHashPair, len(peers)) + ch := make(chan *peerHashPair, len(peers)) resCounter := len(peers) for _, p := range peers { @@ -304,17 +295,21 @@ func (s *Syncer) getLayerHashes(index mesh.LayerID) (map[string]p2p.Peer, error) //merge channels and close when done wg.Add(1) go func() { - v := <-c - ch <- v + if v := <-c; v != nil { + ch <- v + } wg.Done() }() } go func() { wg.Wait(); close(ch) }() - timeout := time.After(s.requestTimeout) + timeout := time.After(s.RequestTimeout) for resCounter > 0 { // Got a timeout! fail with a timeout error select { case pair := <-ch: + if pair == nil { //do nothing on close channel + continue + } m[string(pair.hash)] = pair.peer resCounter-- case <-timeout: @@ -328,9 +323,9 @@ func (s *Syncer) getLayerHashes(index mesh.LayerID) (map[string]p2p.Peer, error) return m, nil } -func (s *Syncer) sendLayerHashRequest(peer p2p.Peer, layer mesh.LayerID) (chan peerHashPair, error) { - s.Debug("send Layer hash request Peer: ", peer, " layer: ", layer) - ch := make(chan peerHashPair) +func (s *Syncer) sendLayerHashRequest(peer p2p.Peer, layer mesh.LayerID) (chan *peerHashPair, error) { + s.Debug("send Layer hash request Peer: %v layer: %v", peer, layer) + ch := make(chan *peerHashPair) data := &pb.LayerHashReq{Layer: uint32(layer)} payload, err := proto.Marshal(data) if err != nil { @@ -349,7 +344,8 @@ func (s *Syncer) sendLayerHashRequest(peer p2p.Peer, layer mesh.LayerID) (chan p s.Error("could not unmarshal layer hash response ", err) return } - ch <- peerHashPair{peer: peer, hash: res.Hash} + s.Debug("got hash response from %v hash: %v layer: %d", peer, res.Hash, layer) + ch <- &peerHashPair{peer: peer, hash: res.Hash} } return ch, s.SendRequest(LAYER_HASH, payload, peer, foo) } @@ -418,7 +414,7 @@ func newLayerHashRequestHandler(layers *mesh.Mesh, logger log.Log) func(msg []by layer, err := layers.GetLayer(mesh.LayerID(req.Layer)) if err != nil { - logger.Error("Error handling layer ", req.Layer, " request message with error:", err) + logger.Error("Error handling layer %d request message with error: %v", req.Layer, err) return nil } diff --git a/sync/syncer_test.go b/sync/syncer_test.go index 86ffe9db41..a72f01e7ae 100644 --- a/sync/syncer_test.go +++ b/sync/syncer_test.go @@ -4,12 +4,14 @@ import ( "errors" "fmt" "github.com/google/uuid" + "github.com/spacemeshos/go-spacemesh/consensus" "github.com/spacemeshos/go-spacemesh/database" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/service" "github.com/spacemeshos/go-spacemesh/state" + "github.com/spacemeshos/go-spacemesh/timesync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "sync/atomic" @@ -17,26 +19,39 @@ import ( "time" ) -var conf = Configuration{2, 1 * time.Second, 1, 300, 10 * time.Millisecond} +var conf = Configuration{2 * time.Second, 1, 300, 100 * time.Millisecond} const ( levelDB = "LevelDB" memoryDB = "MemoryDB" ) +type MockTimer struct { +} + +func (MockTimer) Now() time.Time { + layout := "2006-01-02T15:04:05.000Z" + str := "2018-11-12T11:45:26.371Z" + start, _ := time.Parse(layout, str) + return start +} + func SyncMockFactory(number int, conf Configuration, name string, dbType string) (syncs []*Syncer, p2ps []*service.Node) { nodes := make([]*Syncer, 0, number) p2ps = make([]*service.Node, 0, number) sim := service.NewSimulator() + tick := 200 * time.Millisecond + layout := "2006-01-02T15:04:05.000Z" + str := "2018-11-12T11:45:26.371Z" + start, _ := time.Parse(layout, str) + ts := timesync.NewTicker(MockTimer{}, tick, start) + tk := ts.Subscribe() for i := 0; i < number; i++ { net := sim.NewNode() name := fmt.Sprintf(name+"_%d", i) l := log.New(name, "", "") - - sync := NewSync(net, getMesh(name+"_"+time.Now().String(), dbType), BlockValidatorMock{}, conf, l) - - //sync := NewSync(net, getMesh(name+"_"+time.Now().String()),conf, l) - + sync := NewSync(net, getMesh(name+"_"+time.Now().String(), dbType), BlockValidatorMock{}, conf, tk, l) + ts.Start() nodes = append(nodes, sync) p2ps = append(p2ps, net) } @@ -71,7 +86,6 @@ func getMeshWithLevelDB(id string) *mesh.Mesh { ldb := database.NewLevelDbStore("layers_test_"+id, nil, nil) cv := database.NewLevelDbStore("contextually_valid_test_"+id, nil, nil) //odb := database.NewLevelDbStore("orphans_test_"+id+"_"+time.String(), nil, nil) - layers := mesh.NewMesh(ldb, bdb, cv, &MeshValidatorMock{}, &stateMock{}, log.New(id, "", "")) return layers } @@ -139,7 +153,7 @@ func TestSyncProtocol_BlockRequest(t *testing.T) { defer syncObj.Close() lid := mesh.LayerID(1) block := mesh.NewExistingBlock(mesh.BlockID(uuid.New().ID()), lid, []byte("data data data")) - syncObj.AddLayer(mesh.NewExistingLayer(lid, []*mesh.Block{block})) + syncObj.AddBlock(block) ch, err := sendBlockRequest(syncObj2.MessageServer, nodes[0].Node.PublicKey(), block.ID(), syncObj.Log) timeout := time.NewTimer(2 * time.Second) @@ -160,9 +174,7 @@ func TestSyncProtocol_LayerHashRequest(t *testing.T) { syncObj2 := syncs[1] defer syncObj2.Close() lid := mesh.LayerID(1) - l := mesh.NewExistingLayer(lid, make([]*mesh.Block, 0, 10)) - l.AddBlock(mesh.NewExistingBlock(mesh.BlockID(123), lid, nil)) - syncObj1.AddLayer(l) + syncObj1.AddBlock(mesh.NewExistingBlock(mesh.BlockID(123), lid, nil)) //syncObj1.ValidateLayer(l) //this is to simulate the approval of the tortoise... timeout := time.NewTimer(2 * time.Second) ch, err := syncObj2.sendLayerHashRequest(nodes[0].Node.PublicKey(), lid) @@ -188,7 +200,12 @@ func TestSyncProtocol_LayerIdsRequest(t *testing.T) { layer.AddBlock(mesh.NewExistingBlock(mesh.BlockID(132), lid, nil)) layer.AddBlock(mesh.NewExistingBlock(mesh.BlockID(111), lid, nil)) layer.AddBlock(mesh.NewExistingBlock(mesh.BlockID(222), lid, nil)) - syncObj1.AddLayer(layer) + + syncObj1.AddBlock(mesh.NewExistingBlock(mesh.BlockID(123), lid, nil)) + syncObj1.AddBlock(mesh.NewExistingBlock(mesh.BlockID(132), lid, nil)) + syncObj1.AddBlock(mesh.NewExistingBlock(mesh.BlockID(111), lid, nil)) + syncObj1.AddBlock(mesh.NewExistingBlock(mesh.BlockID(222), lid, nil)) + ch, err := syncObj.sendLayerIDsRequest(nodes[1].Node.PublicKey(), lid) timeout := time.NewTimer(2 * time.Second) @@ -214,19 +231,6 @@ func TestSyncProtocol_LayerIdsRequest(t *testing.T) { } -func verifyChannelReadWithTimeout(t *testing.T, ch chan interface{}) interface{} { - timeout := time.NewTimer(3 * time.Second) - select { - - case <-timeout.C: - t.Error("timed out ") - return nil - case val := <-ch: - return val - - } -} - func TestSyncProtocol_FetchBlocks(t *testing.T) { syncs, nodes := SyncMockFactory(2, conf, "TestSyncProtocol_FetchBlocks_", memoryDB) syncObj1 := syncs[0] @@ -240,13 +244,13 @@ func TestSyncProtocol_FetchBlocks(t *testing.T) { block2 := mesh.NewExistingBlock(mesh.BlockID(321), 1, nil) block3 := mesh.NewExistingBlock(mesh.BlockID(222), 2, nil) - syncObj1.AddLayer(mesh.NewExistingLayer(0, []*mesh.Block{block1})) - syncObj1.AddLayer(mesh.NewExistingLayer(1, []*mesh.Block{block2})) - syncObj1.AddLayer(mesh.NewExistingLayer(2, []*mesh.Block{block3})) + syncObj1.AddBlock(block1) + syncObj1.AddBlock(block2) + syncObj1.AddBlock(block3) ch, err := syncObj2.sendLayerHashRequest(n1.PublicKey(), 0) timeout := time.NewTimer(3 * time.Second) - var hash peerHashPair + var hash *peerHashPair select { case <-timeout.C: @@ -317,8 +321,6 @@ func TestSyncProtocol_SyncTwoNodes(t *testing.T) { syncObj2.Peers = pm2 //override peers with mock defer syncObj2.Close() - block1 := mesh.NewExistingBlock(mesh.BlockID(111), 0, nil) - block2 := mesh.NewExistingBlock(mesh.BlockID(222), 0, nil) block3 := mesh.NewExistingBlock(mesh.BlockID(333), 1, nil) block4 := mesh.NewExistingBlock(mesh.BlockID(444), 1, nil) block5 := mesh.NewExistingBlock(mesh.BlockID(555), 2, nil) @@ -327,16 +329,16 @@ func TestSyncProtocol_SyncTwoNodes(t *testing.T) { block8 := mesh.NewExistingBlock(mesh.BlockID(888), 3, nil) block9 := mesh.NewExistingBlock(mesh.BlockID(999), 4, nil) block10 := mesh.NewExistingBlock(mesh.BlockID(101), 5, nil) - syncObj1.AddLayer(mesh.NewExistingLayer(0, []*mesh.Block{block1, block2})) - syncObj1.AddLayer(mesh.NewExistingLayer(1, []*mesh.Block{block3, block4})) - syncObj1.AddLayer(mesh.NewExistingLayer(2, []*mesh.Block{block5, block6})) - syncObj1.AddLayer(mesh.NewExistingLayer(3, []*mesh.Block{block7, block8})) - syncObj1.AddLayer(mesh.NewExistingLayer(4, []*mesh.Block{block9})) - syncObj1.AddLayer(mesh.NewExistingLayer(5, []*mesh.Block{block10})) - - timeout := time.After(5 * time.Second) - syncObj2.SetLatestLayer(5) - syncObj1.Start() + syncObj1.AddBlock(block3) + syncObj1.AddBlock(block4) + syncObj1.AddBlock(block5) + syncObj1.AddBlock(block6) + syncObj1.AddBlock(block7) + syncObj1.AddBlock(block8) + syncObj1.AddBlock(block9) + syncObj1.AddBlock(block10) + timeout := time.After(120 * time.Second) + //syncObj1.Start() syncObj2.Start() // Keep trying until we're timed out or got a result or got an error @@ -348,11 +350,9 @@ loop: t.Error("timed out ") return default: - if syncObj2.LatestReceivedLayer() == 3 { + if syncObj2.VerifiedLayer() == 5 { t.Log("done!") break loop - } else { - syncObj2.LatestReceivedLayer() } time.Sleep(100 * time.Millisecond) } @@ -386,8 +386,6 @@ func syncTest(dpType string, t *testing.T) { syncObj3.Peers = getPeersMock([]p2p.Peer{n1.PublicKey(), n2.PublicKey(), n4.PublicKey()}) syncObj4.Peers = getPeersMock([]p2p.Peer{n1.PublicKey(), n2.PublicKey()}) - block1 := mesh.NewExistingBlock(mesh.BlockID(111), 0, nil) - block2 := mesh.NewExistingBlock(mesh.BlockID(222), 0, nil) block3 := mesh.NewExistingBlock(mesh.BlockID(333), 1, nil) block4 := mesh.NewExistingBlock(mesh.BlockID(444), 1, nil) block5 := mesh.NewExistingBlock(mesh.BlockID(555), 2, nil) @@ -397,17 +395,21 @@ func syncTest(dpType string, t *testing.T) { block9 := mesh.NewExistingBlock(mesh.BlockID(999), 4, nil) block10 := mesh.NewExistingBlock(mesh.BlockID(101), 4, nil) - syncObj1.AddLayer(mesh.NewExistingLayer(0, []*mesh.Block{block1, block2})) - syncObj1.AddLayer(mesh.NewExistingLayer(1, []*mesh.Block{block3, block4})) - syncObj1.AddLayer(mesh.NewExistingLayer(2, []*mesh.Block{block5, block6})) - syncObj1.AddLayer(mesh.NewExistingLayer(3, []*mesh.Block{block7, block8})) - syncObj1.AddLayer(mesh.NewExistingLayer(4, []*mesh.Block{block9, block10})) + syncObj1.Mesh.ValidateLayer(consensus.GenesisLayer()) + syncObj2.Mesh.ValidateLayer(consensus.GenesisLayer()) + syncObj3.Mesh.ValidateLayer(consensus.GenesisLayer()) + syncObj4.Mesh.ValidateLayer(consensus.GenesisLayer()) + syncObj1.AddBlock(block3) + syncObj1.AddBlock(block4) + syncObj1.AddBlock(block5) + syncObj1.AddBlock(block6) + syncObj1.AddBlock(block7) + syncObj1.AddBlock(block8) + syncObj1.AddBlock(block9) + syncObj1.AddBlock(block10) - syncObj2.SetLatestLayer(5) syncObj2.Start() - syncObj3.SetLatestLayer(5) syncObj3.Start() - syncObj4.SetLatestLayer(5) syncObj4.Start() // Keep trying until we're timed out or got a result or got an error @@ -420,9 +422,9 @@ loop: case <-timeout: t.Error("timed out ") default: - if syncObj2.LatestReceivedLayer() == 3 && syncObj3.LatestReceivedLayer() == 3 { + if syncObj2.VerifiedLayer() == 4 && syncObj3.VerifiedLayer() == 4 { t.Log("done!") - t.Log(syncObj2.LatestReceivedLayer(), " ", syncObj3.LatestReceivedLayer()) + t.Log(syncObj2.VerifiedLayer(), " ", syncObj3.VerifiedLayer()) break loop } } @@ -463,10 +465,17 @@ func Test_TwoNodes_SyncIntegrationSuite(t *testing.T) { sis.NeighborsCount = 2 sis.name = t.Name() i := uint32(1) + tick := 200 * time.Millisecond + layout := "2006-01-02T15:04:05.000Z" + str := "2018-11-12T11:45:26.371Z" + start, _ := time.Parse(layout, str) + ts := timesync.NewTicker(MockTimer{}, tick, start) + tk := ts.Subscribe() sis.BeforeHook = func(idx int, s p2p.NodeTestInstance) { l := log.New(fmt.Sprintf("%s_%d", sis.name, atomic.LoadUint32(&i)), "", "") - sync := NewSync(s, getMesh(memoryDB, fmt.Sprintf("%s_%s", sis.name, time.Now())), BlockValidatorMock{}, conf, l) + sync := NewSync(s, getMesh(memoryDB, fmt.Sprintf("%s_%s", sis.name, time.Now())), BlockValidatorMock{}, conf, tk, l) sis.syncers = append(sis.syncers, sync) + ts.Start() atomic.AddUint32(&i, 1) } suite.Run(t, sis) @@ -492,11 +501,16 @@ func (sis *syncIntegrationTwoNodes) TestSyncProtocol_TwoNodes() { syncObj2 := sis.syncers[2] defer syncObj2.Close() - syncObj2.AddLayer(mesh.NewExistingLayer(0, []*mesh.Block{block1, block2})) - syncObj2.AddLayer(mesh.NewExistingLayer(1, []*mesh.Block{block3, block4})) - syncObj2.AddLayer(mesh.NewExistingLayer(2, []*mesh.Block{block5, block6})) - syncObj2.AddLayer(mesh.NewExistingLayer(3, []*mesh.Block{block7, block8})) - syncObj2.AddLayer(mesh.NewExistingLayer(4, []*mesh.Block{block9, block10})) + syncObj2.AddBlock(block1) + syncObj2.AddBlock(block2) + syncObj2.AddBlock(block3) + syncObj2.AddBlock(block4) + syncObj2.AddBlock(block5) + syncObj2.AddBlock(block6) + syncObj2.AddBlock(block7) + syncObj2.AddBlock(block8) + syncObj2.AddBlock(block9) + syncObj2.AddBlock(block10) timeout := time.After(60 * time.Second) syncObj1.SetLatestLayer(5) syncObj1.Start() @@ -509,10 +523,11 @@ func (sis *syncIntegrationTwoNodes) TestSyncProtocol_TwoNodes() { t.Error("timed out ") return default: - if syncObj1.LatestReceivedLayer() == 3 { + if syncObj1.VerifiedLayer() == 5 { t.Log("done!") return } + time.Sleep(100 * time.Millisecond) } } } @@ -531,9 +546,16 @@ func Test_Multiple_SyncIntegrationSuite(t *testing.T) { sis.NeighborsCount = 3 sis.name = t.Name() i := uint32(1) + tick := 2 * time.Second + layout := "2006-01-02T15:04:05.000Z" + str := "2018-11-12T11:45:26.371Z" + start, _ := time.Parse(layout, str) + ts := timesync.NewTicker(MockTimer{}, tick, start) + tk := ts.Subscribe() sis.BeforeHook = func(idx int, s p2p.NodeTestInstance) { l := log.New(fmt.Sprintf("%s_%d", sis.name, atomic.LoadUint32(&i)), "", "") - sync := NewSync(s, getMesh(memoryDB, fmt.Sprintf("%s_%d_%s", sis.name, atomic.LoadUint32(&i), time.Now())), BlockValidatorMock{}, conf, l) + sync := NewSync(s, getMesh(memoryDB, fmt.Sprintf("%s_%d_%s", sis.name, atomic.LoadUint32(&i), time.Now())), BlockValidatorMock{}, conf, tk, l) + ts.Start() sis.syncers = append(sis.syncers, sync) atomic.AddUint32(&i, 1) } @@ -543,16 +565,13 @@ func Test_Multiple_SyncIntegrationSuite(t *testing.T) { func (sis *syncIntegrationMultipleNodes) TestSyncProtocol_MultipleNodes() { t := sis.T() - block1 := mesh.NewExistingBlock(mesh.BlockID(111), 0, nil) block2 := mesh.NewExistingBlock(mesh.BlockID(222), 1, nil) block3 := mesh.NewExistingBlock(mesh.BlockID(333), 2, nil) block4 := mesh.NewExistingBlock(mesh.BlockID(444), 2, nil) block5 := mesh.NewExistingBlock(mesh.BlockID(555), 3, nil) block6 := mesh.NewExistingBlock(mesh.BlockID(666), 3, nil) - //block7 := mesh.NewExistingBlock(mesh.BlockID(777), 4, nil) - //block8 := mesh.NewExistingBlock(mesh.BlockID(888), 4, nil) - //block9 := mesh.NewExistingBlock(mesh.BlockID(999), 5, nil) - //block10 := mesh.NewExistingBlock(mesh.BlockID(101), 5, nil) + block7 := mesh.NewExistingBlock(mesh.BlockID(777), 4, nil) + block8 := mesh.NewExistingBlock(mesh.BlockID(888), 4, nil) syncObj1 := sis.syncers[0] defer syncObj1.Close() @@ -565,21 +584,18 @@ func (sis *syncIntegrationMultipleNodes) TestSyncProtocol_MultipleNodes() { syncObj5 := sis.syncers[4] defer syncObj5.Close() - syncObj4.AddLayer(mesh.NewExistingLayer(0, []*mesh.Block{block1})) - syncObj4.AddLayer(mesh.NewExistingLayer(1, []*mesh.Block{block2})) - syncObj4.AddLayer(mesh.NewExistingLayer(2, []*mesh.Block{block3, block4})) - syncObj4.AddLayer(mesh.NewExistingLayer(3, []*mesh.Block{block5, block6})) - - timeout := time.After(2 * 60 * time.Second) - syncObj2.SetLatestLayer(5) - syncObj3.SetLatestLayer(5) - syncObj1.SetLatestLayer(5) - syncObj5.SetLatestLayer(5) + syncObj4.AddBlock(block2) + syncObj4.AddBlock(block3) + syncObj4.AddBlock(block4) + syncObj4.AddBlock(block5) + syncObj4.AddBlock(block6) + syncObj4.AddBlock(block7) + syncObj4.AddBlock(block8) + timeout := time.After(30 * time.Second) syncObj1.Start() syncObj2.Start() syncObj3.Start() - syncObj4.Start() syncObj5.Start() // Keep trying until we're timed out or got a result or got an error @@ -590,17 +606,18 @@ func (sis *syncIntegrationMultipleNodes) TestSyncProtocol_MultipleNodes() { t.Error("timed out ") goto end default: - if syncObj2.LatestReceivedLayer() == 3 || syncObj4.LatestReceivedLayer() == 3 { + if syncObj1.VerifiedLayer() >= 3 || syncObj2.VerifiedLayer() >= 3 || syncObj3.VerifiedLayer() >= 3 || syncObj5.VerifiedLayer() >= 3 { t.Log("done!") goto end } + time.Sleep(1 * time.Millisecond) } } end: - log.Debug("sync 1 ", syncObj1.LatestReceivedLayer()) - log.Debug("sync 2 ", syncObj2.LatestReceivedLayer()) - log.Debug("sync 3 ", syncObj3.LatestReceivedLayer()) - log.Debug("sync 4 ", syncObj4.LatestReceivedLayer()) - log.Debug("sync 5 ", syncObj5.LatestReceivedLayer()) + log.Debug("sync 1 ", syncObj1.VerifiedLayer()) + log.Debug("sync 2 ", syncObj2.VerifiedLayer()) + log.Debug("sync 3 ", syncObj3.VerifiedLayer()) + log.Debug("sync 4 ", syncObj4.VerifiedLayer()) + log.Debug("sync 5 ", syncObj5.VerifiedLayer()) return } diff --git a/timesync/clock_test.go b/timesync/clock_test.go index ab21afca7e..8e1d2814a8 100644 --- a/timesync/clock_test.go +++ b/timesync/clock_test.go @@ -1,7 +1,6 @@ package timesync import ( - "github.com/spacemeshos/go-spacemesh/mesh" "github.com/stretchr/testify/assert" "testing" "time" @@ -65,6 +64,6 @@ func TestTicker_StartClock_LayerID(t *testing.T) { ts := NewTicker(MockTimer{}, tick, start) ts.updateLayerID() - assert.Equal(t, mesh.LayerID(6), ts.currentLayer) + assert.Equal(t, 6, int(ts.currentLayer)) ts.Stop() }