Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statesync: increase chunk priority and robustness #6582

Merged
merged 4 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### IMPROVEMENTS

- [statesync] \#6566 Allow state sync fetchers and request timeout to be configurable. (@alexanderbez)
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots.
- [statesync] \#6378 Retry requests for snapshots and add a minimum discovery time (5s) for new snapshots. (@tychoish)
- [statesync] \#6582 Increase chunk priority and add multiple retry chunk requests (@cmwaters)

### BUG FIXES

- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters)
- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (@cmwaters)
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return fmt.Errorf("invalid trusted_hash: %w", err)
}

if cfg.ChunkRequestTimeout < time.Second {
return errors.New("chunk_request_timeout must be least a one second")
if cfg.ChunkRequestTimeout < 5*time.Second {
return errors.New("chunk_request_timeout must be at least 5 seconds")
}

if cfg.ChunkFetchers <= 0 {
Expand Down
4 changes: 2 additions & 2 deletions statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
},
{
ID: ChunkChannel,
Priority: 1,
SendQueueCapacity: 4,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: chunkMsgSize,
},
}
Expand Down
47 changes: 29 additions & 18 deletions statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ var (
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct {
cfg config.StateSyncConfig
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
snapshots *snapshotPool
tempDir string
chunkFetchers int32
retryTimeout time.Duration

mtx tmsync.RWMutex
chunks *chunkQueue
Expand All @@ -73,13 +74,14 @@ func newSyncer(
) *syncer {

return &syncer{
cfg: cfg,
logger: logger,
stateProvider: stateProvider,
conn: conn,
connQuery: connQuery,
snapshots: newSnapshotPool(stateProvider),
tempDir: tempDir,
chunkFetchers: cfg.ChunkFetchers,
retryTimeout: cfg.ChunkRequestTimeout,
}
}

Expand Down Expand Up @@ -250,7 +252,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := int32(0); i < s.cfg.ChunkFetchers; i++ {
for i := int32(0); i < s.chunkFetchers; i++ {
go s.fetchChunks(ctx, snapshot, chunks)
}

Expand Down Expand Up @@ -383,36 +385,45 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error {
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
var (
next = true
index uint32
err error
)

for {
index, err := chunks.Allocate()
if err == errDone {
// Keep checking until the context is cancelled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
if next {
index, err = chunks.Allocate()
if errors.Is(err, errDone) {
// Keep checking until the context is canceled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
}
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size())

ticker := time.NewTicker(s.cfg.ChunkRequestTimeout)
ticker := time.NewTicker(s.retryTimeout)
defer ticker.Stop()

s.requestChunk(snapshot, index)

select {
case <-chunks.WaitFor(index):
next = true

case <-ticker.C:
s.requestChunk(snapshot, index)
next = false

case <-ctx.Done():
return
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/generator/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
numValidators = 4
case "large":
// FIXME Networks are kept small since large ones use too much CPU.
numSeeds = r.Intn(3)
numSeeds = r.Intn(2)
numLightClients = r.Intn(3)
numValidators = 4 + r.Intn(7)
numFulls = r.Intn(5)
numValidators = 4 + r.Intn(4)
numFulls = r.Intn(4)
default:
return manifest, fmt.Errorf("unknown topology %q", opt["topology"])
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {

select {
case chTx <- tx:
time.Sleep(time.Duration(100/multiplier) * time.Millisecond)
time.Sleep(time.Second / time.Duration(multiplier))

case <-ctx.Done():
close(chTx)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/runner/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes
return status, nil
}

time.Sleep(200 * time.Millisecond)
time.Sleep(300 * time.Millisecond)
}
}

Expand Down