Skip to content

Commit

Permalink
Ms1 fixes (#577)
Browse files Browse the repository at this point in the history
* remove panics, better logs
* integrate sync into app
* send layer clock to mesh
* name change
* use layerTimer as syncTrigger
* fix sync test
* add state root print for flaky test
* fix test change to 10 instances
* fix concurrent layer handling
  • Loading branch information
almogdepaz committed Feb 25, 2019
1 parent c5ff047 commit c4d6867
Show file tree
Hide file tree
Showing 22 changed files with 310 additions and 335 deletions.
29 changes: 17 additions & 12 deletions app/app_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/spacemeshos/go-spacemesh/address"
"github.com/spacemeshos/go-spacemesh/api/config"
"github.com/spacemeshos/go-spacemesh/hare"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/mesh"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/oracle"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (app *AppTestSuite) TestMultipleNodes() {

txbytes, _ := mesh.TransactionAsBytes(&tx)
path := "../tmp/test/state_" + time.Now().String()
app.initMultipleInstances(app.T(), 2, path)
app.initMultipleInstances(app.T(), 10, path)
for _, a := range app.apps {
a.startServices()
}
Expand All @@ -121,22 +122,26 @@ func (app *AppTestSuite) TestMultipleNodes() {
case <-timeout:
app.T().Fatal("timed out ")
default:
for _, ap := range app.apps {
ok := 0

for idx, ap := range app.apps {
if big.NewInt(10).Cmp(ap.state.GetBalance(dst)) == 0 {
for _, ap2 := range app.apps {
assert.Equal(app.T(), ap.state.IntermediateRoot(false), ap2.state.IntermediateRoot(false))
if ap.state.IntermediateRoot(false) == ap2.state.IntermediateRoot(false) {
ok++
ok := 0
for idx2, ap2 := range app.apps {
if idx != idx2 {
r1 := ap.state.IntermediateRoot(false).String()
r2 := ap2.state.IntermediateRoot(false).String()
if r1 == r2 {
log.Info("%d roots confirmed out of %d", ok, len(app.apps))
ok++
}
}
if ok == len(app.apps)-1 {
return
}
}
if ok == len(app.apps) {
return
}

}
}
time.Sleep(1 * time.Second)
time.Sleep(1 * time.Millisecond)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/cmd/root.go
Expand Up @@ -35,7 +35,7 @@ func init() {
config.OracleServerWorldId, "The worldid to use with the oracle server (temporary) ")
RootCmd.PersistentFlags().StringVar(&config.GenesisTime, "genesis-time",
config.GenesisTime, "Time of the genesis layer in 2019-13-02T17:02:00+00:00 format")
RootCmd.PersistentFlags().Uint32Var(&config.LayerDurationSec, "layer-duration-sec",
RootCmd.PersistentFlags().IntVar(&config.LayerDurationSec, "layer-duration-sec",
config.LayerDurationSec, "Duration between layers in seconds")
/** ======================== P2P Flags ========================== **/
RootCmd.PersistentFlags().IntVar(&config.P2P.SecurityParam, "security-param",
Expand Down
23 changes: 13 additions & 10 deletions app/main.go
Expand Up @@ -48,7 +48,7 @@ type SpacemeshApp struct {
NodeInitCallback chan bool
grpcAPIService *api.SpacemeshGrpcService
jsonAPIService *api.JSONHTTPServer

syncer *sync.Syncer
blockListener *sync.BlockListener
db database.Database
state *state.StateDB
Expand Down Expand Up @@ -317,26 +317,28 @@ func (app *SpacemeshApp) initServices(instanceName string, swarm server.Service,
rng := rand.New(mt19937.New())
processor := state.NewTransactionProcessor(rng, st, lg)

//trtl := consensus.NewTortoise(50, 100)
trtl := consensus.NewAlgorithm(consensus.NewNinjaTortoise(layerSize))
mesh := mesh.NewMesh(db, db, db, trtl, processor, lg) //todo: what to do with the logger?

coinToss := consensus.WeakCoin{}
gTime, err := time.Parse(time.RFC3339, app.Config.GenesisTime)
if err != nil {
return err
}
clock := timesync.NewTicker(timesync.RealClock{}, time.Duration(app.Config.LayerDurationSec)*time.Second, gTime)
ld := time.Duration(app.Config.LayerDurationSec) * time.Second
clock := timesync.NewTicker(timesync.RealClock{}, ld, gTime)
trtl := consensus.NewAlgorithm(consensus.NewNinjaTortoise(layerSize, lg))
msh := mesh.NewMesh(db, db, db, trtl, processor, lg) //todo: what to do with the logger?

blockListener := sync.NewBlockListener(swarm, blockOracle, mesh, 1*time.Second, 1, clock, lg)
conf := sync.Configuration{SyncInterval: 1 * time.Second, Concurrency: 4, LayerSize: int(layerSize), RequestTimeout: 100 * time.Millisecond}
syncer := sync.NewSync(swarm, msh, blockOracle, conf, clock.Subscribe(), lg)

ha := hare.New(app.Config.HARE, swarm, sgn, mesh, hareOracle, clock.Subscribe(), lg)
ha := hare.New(app.Config.HARE, swarm, sgn, msh, hareOracle, clock.Subscribe(), lg)

blockProducer := miner.NewBlockBuilder(instanceName, swarm, clock.Subscribe(), coinToss, mesh, ha, blockOracle, lg)
blockProducer := miner.NewBlockBuilder(instanceName, swarm, clock.Subscribe(), coinToss, msh, ha, blockOracle, lg)
blockListener := sync.NewBlockListener(swarm, blockOracle, msh, 2*time.Second, 4, lg)

app.blockProducer = &blockProducer
app.blockListener = blockListener
app.mesh = mesh
app.mesh = msh
app.syncer = syncer
app.clock = clock
app.state = st
app.db = db
Expand All @@ -348,6 +350,7 @@ func (app *SpacemeshApp) initServices(instanceName string, swarm server.Service,

func (app *SpacemeshApp) startServices() {
app.blockListener.Start()
app.syncer.Start()
err := app.hare.Start()
if err != nil {
panic("cannot start hare")
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Expand Up @@ -63,7 +63,7 @@ type BaseConfig struct {
OracleServerWorldId uint64 `mapstructure:"oracle_server_worldid"`

GenesisTime string `mapstructure:"genesis-time"`
LayerDurationSec uint32 `mapstructure:"layer-duration-sec"`
LayerDurationSec int `mapstructure:"layer-duration-sec"`
}

// DefaultConfig returns the default configuration for a spacemesh node
Expand Down
9 changes: 4 additions & 5 deletions consensus/algorithm.go
Expand Up @@ -18,9 +18,10 @@ type Tortoise interface {
}

func NewAlgorithm(trtl Tortoise) *Algorithm {
return &Algorithm{Tortoise: trtl}
alg := &Algorithm{Tortoise: trtl}
alg.HandleIncomingLayer(GenesisLayer())
return alg
}

func (alg *Algorithm) HandleLateBlock(b *mesh.Block) {
//todo feed all layers from b's layer to tortoise
log.Info("received block with layer Id %v block id: %v ", b.Layer(), b.ID())
Expand Down Expand Up @@ -55,7 +56,6 @@ func (alg *Algorithm) ContextualValidity(id mesh.BlockID) bool {
}

func CreateGenesisBlock() *mesh.Block {
log.Info("Creating genesis")
bl := &mesh.Block{
Id: mesh.BlockID(config.GenesisId),
LayerIndex: 0,
Expand All @@ -64,8 +64,7 @@ func CreateGenesisBlock() *mesh.Block {
return bl
}

func createGenesisLayer() *mesh.Layer {
log.Info("Creating genesis")
func GenesisLayer() *mesh.Layer {
l := mesh.NewLayer(Genesis)
l.AddBlock(CreateGenesisBlock())
return l
Expand Down
4 changes: 2 additions & 2 deletions consensus/ninja_tortoise.go
Expand Up @@ -82,9 +82,9 @@ type ninjaTortoise struct {
tPatSupport map[votingPattern]map[mesh.LayerID]votingPattern //pattern support count
}

func NewNinjaTortoise(layerSize uint32) *ninjaTortoise {
func NewNinjaTortoise(layerSize uint32, log log.Log) *ninjaTortoise {
return &ninjaTortoise{
Log: log.New("optimized tortoise ", "", ""),
Log: log,
avgLayerSize: layerSize,
pBase: votingPattern{},
blocks: map[mesh.BlockID]*mesh.Block{},
Expand Down
10 changes: 5 additions & 5 deletions consensus/ninja_tortoise_test.go
Expand Up @@ -49,8 +49,8 @@ func TestNinjaTortoise_GlobalOpinion(t *testing.T) {

func TestForEachInView(t *testing.T) {
blocks := make(map[mesh.BlockID]*mesh.Block)
alg := NewNinjaTortoise(2)
l := createGenesisLayer()
alg := NewNinjaTortoise(2, log.New("TestForEachInView", "", ""))
l := GenesisLayer()
for _, b := range l.Blocks() {
blocks[b.ID()] = b
}
Expand Down Expand Up @@ -93,8 +93,8 @@ func TestNinjaTortoise_UpdatePatternTally(t *testing.T) {
func TestNinjaTortoise_Sanity1(t *testing.T) {
layerSize := 30
patternSize := layerSize
alg := NewNinjaTortoise(uint32(layerSize))
l1 := createGenesisLayer()
alg := NewNinjaTortoise(uint32(layerSize), log.New("TestNinjaTortoise_Sanity1", "", ""))
l1 := GenesisLayer()
genesisId := l1.Blocks()[0].ID()
alg.handleIncomingLayer(l1)
l := createLayerWithRandVoting(l1.Index()+1, []*mesh.Layer{l1}, layerSize, 1)
Expand All @@ -116,7 +116,7 @@ func TestNinjaTortoise_Sanity1(t *testing.T) {
//vote explicitly for two previous layers
//correction vectors compensate for double count
func TestNinjaTortoise_Sanity2(t *testing.T) {
alg := NewNinjaTortoise(uint32(3))
alg := NewNinjaTortoise(uint32(3), log.New("TestNinjaTortoise_Sanity2", "", ""))
l := createMulExplicitLayer(0, map[mesh.LayerID]*mesh.Layer{}, nil, 1)
l1 := createMulExplicitLayer(1, map[mesh.LayerID]*mesh.Layer{l.Index(): l}, map[mesh.LayerID][]int{0: {0}}, 3)
l2 := createMulExplicitLayer(2, map[mesh.LayerID]*mesh.Layer{l1.Index(): l1}, map[mesh.LayerID][]int{1: {0, 1, 2}}, 3)
Expand Down
2 changes: 1 addition & 1 deletion consensus/tortoise_test.go
Expand Up @@ -13,7 +13,7 @@ func TestAlgorithm_Sanity(t *testing.T) {
cachedLayers := 100

alg := NewTortoise(uint32(layerSize), uint32(cachedLayers))
l := createGenesisLayer()
l := GenesisLayer()
alg.HandleIncomingLayer(l)
alg.RegisterLayerCallback(func(id mesh.LayerID) {})
for i := 0; i < 11-1; i++ {
Expand Down
8 changes: 4 additions & 4 deletions hare/hare.go
Expand Up @@ -36,7 +36,7 @@ type TerminationOutput interface {
}

type orphanBlockProvider interface {
GetUnverifiedLayerBlocks(layerId mesh.LayerID) []mesh.BlockID
GetUnverifiedLayerBlocks(layerId mesh.LayerID) ([]mesh.BlockID, error)
}

// Hare is an orchestrator that shoots consensus processes and collects their termination output
Expand Down Expand Up @@ -160,9 +160,9 @@ func (h *Hare) onTick(id mesh.LayerID) {
}

// retrieve set form orphan blocks
blocks := h.obp.GetUnverifiedLayerBlocks(h.lastLayer)
if len(blocks) == 0 {
log.Info("No blocks for consensus on layer %v", id)
blocks, err := h.obp.GetUnverifiedLayerBlocks(h.lastLayer)
if err != nil {
log.Error("No blocks for consensus on layer %v %v", id, err)
return
}

Expand Down
4 changes: 2 additions & 2 deletions hare/hare_test.go
Expand Up @@ -284,7 +284,7 @@ func TestHare_onTick(t *testing.T) {
wg.Add(2)
go func() {
wg.Done()
layerTicker <- mesh.LayerID(0)
layerTicker <- 0
<-createdChan
<-nmcp.CloseChannel()
wg.Done()
Expand All @@ -304,7 +304,7 @@ func TestHare_onTick(t *testing.T) {
wg.Add(2)
go func() {
wg.Done()
layerTicker <- mesh.LayerID(1)
layerTicker <- 1
h.Close()
wg.Done()
}()
Expand Down
6 changes: 3 additions & 3 deletions hare/orphan_mock.go
Expand Up @@ -13,9 +13,9 @@ func (op *orphanMock) GetOrphanBlocks() []mesh.BlockID {
return []mesh.BlockID{}
}

func (op *orphanMock) GetUnverifiedLayerBlocks(l mesh.LayerID) []mesh.BlockID {
func (op *orphanMock) GetUnverifiedLayerBlocks(l mesh.LayerID) ([]mesh.BlockID, error) {
if op.f != nil {
return op.f()
return op.f(), nil
}
return []mesh.BlockID{}
return []mesh.BlockID{}, nil
}

0 comments on commit c4d6867

Please sign in to comment.