Skip to content

Commit

Permalink
Merge f5b10b4 into be8c983
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters committed Jun 18, 2021
2 parents be8c983 + f5b10b4 commit 2f8d512
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 29 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### IMPROVEMENTS

- [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez)
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots.
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. (@tychoish)
- [statesync] \#6582 Increase chunk priority and add multiple retry chunk requests (@cmwaters)

### BUG FIXES

- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters)
- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (@cmwaters)
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return fmt.Errorf("invalid trusted_hash: %w", err)
}

if cfg.ChunkRequestTimeout < time.Second {
return errors.New("chunk_request_timeout must be least a one second")
if cfg.ChunkRequestTimeout < 5*time.Second {
return errors.New("chunk_request_timeout must be at least 5 seconds")
}

if cfg.ChunkFetchers <= 0 {
Expand Down
4 changes: 2 additions & 2 deletions statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
},
{
ID: ChunkChannel,
Priority: 1,
SendQueueCapacity: 4,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: chunkMsgSize,
},
}
Expand Down
47 changes: 29 additions & 18 deletions statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ var (
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct {
cfg config.StateSyncConfig
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
snapshots *snapshotPool
tempDir string
chunkFetchers int32
retryTimeout time.Duration

mtx tmsync.RWMutex
chunks *chunkQueue
Expand All @@ -73,13 +74,14 @@ func newSyncer(
) *syncer {

return &syncer{
cfg: cfg,
logger: logger,
stateProvider: stateProvider,
conn: conn,
connQuery: connQuery,
snapshots: newSnapshotPool(stateProvider),
tempDir: tempDir,
chunkFetchers: cfg.ChunkFetchers,
retryTimeout: cfg.ChunkRequestTimeout,
}
}

Expand Down Expand Up @@ -250,7 +252,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := int32(0); i < s.cfg.ChunkFetchers; i++ {
for i := int32(0); i < s.chunkFetchers; i++ {
go s.fetchChunks(ctx, snapshot, chunks)
}

Expand Down Expand Up @@ -383,36 +385,45 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error {
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
var (
next = true
index uint32
err error
)

for {
index, err := chunks.Allocate()
if err == errDone {
// Keep checking until the context is cancelled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
if next {
index, err = chunks.Allocate()
if errors.Is(err, errDone) {
// Keep checking until the context is canceled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
}
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size())

ticker := time.NewTicker(s.cfg.ChunkRequestTimeout)
ticker := time.NewTicker(s.retryTimeout)
defer ticker.Stop()

s.requestChunk(snapshot, index)

select {
case <-chunks.WaitFor(index):
next = true

case <-ticker.C:
s.requestChunk(snapshot, index)
next = false

case <-ctx.Done():
return
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/generator/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
numValidators = 4
case "large":
// FIXME Networks are kept small since large ones use too much CPU.
numSeeds = r.Intn(3)
numSeeds = r.Intn(2)
numLightClients = r.Intn(3)
numValidators = 4 + r.Intn(7)
numFulls = r.Intn(5)
numValidators = 4 + r.Intn(4)
numFulls = r.Intn(4)
default:
return manifest, fmt.Errorf("unknown topology %q", opt["topology"])
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {

select {
case chTx <- tx:
time.Sleep(time.Duration(100/multiplier) * time.Millisecond)
time.Sleep(time.Second / time.Duration(multiplier))

case <-ctx.Done():
close(chTx)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes
return status, nil
}

time.Sleep(200 * time.Millisecond)
time.Sleep(300 * time.Millisecond)
}
}

Expand Down

0 comments on commit 2f8d512

Please sign in to comment.