Skip to content

Commit

Permalink
Let's try this
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Feb 2, 2020
1 parent 962be9b commit 5767f07
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 11 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/state/types.go
Expand Up @@ -86,7 +86,7 @@ func (b *BeaconState) Copy() *BeaconState {
dirtyFields: make(map[fieldIndex]interface{}, 20),

// Copy on write validator index map.
valIdxMap: b.valIdxMap,
valIdxMap: b.valIdxMap,
}

for i := range b.dirtyFields {
Expand Down
37 changes: 29 additions & 8 deletions beacon-chain/sync/pending_attestations_queue.go
Expand Up @@ -45,7 +45,9 @@ func (s *Service) processPendingAtts(ctx context.Context) error {

for bRoot, attestations := range s.blkRootToPendingAtts {
// Has the pending attestation's missing block arrived yet?
if s.db.HasBlock(ctx, bRoot) {
beaconRoot := bytesutil.ToBytes32(bRoot[:32])
targetRoot := bytesutil.ToBytes32(bRoot[32:])
if s.db.HasBlock(ctx, beaconRoot) && s.db.HasBlock(ctx, targetRoot) {
numberOfBlocksRecoveredFromAtt.Inc()
for _, att := range attestations {
// The pending attestations can arrive in both aggregated and unaggregated forms,
Expand Down Expand Up @@ -103,10 +105,20 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}
}

req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
if !s.db.HasBlock(ctx, beaconRoot) {
req := [][32]byte{beaconRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}

if !s.db.HasBlock(ctx, targetRoot) {
req := [][32]byte{targetRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}
}
}
Expand All @@ -121,14 +133,23 @@ func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
defer s.pendingAttsLock.Unlock()

root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)
targetRoot := params.BeaconConfig().ZeroHash
if !s.db.HasBlock(context.Background(), bytesutil.ToBytes32(att.Aggregate.Data.Target.Root)) {
targetRoot = bytesutil.ToBytes32(att.Aggregate.Data.Target.Root)
}

roots := make([]byte, 64)
copy(root[:32], root[:])
copy(root[32:], targetRoot[:])
roots64 := bytesutil.ToBytes64(roots)

_, ok := s.blkRootToPendingAtts[root]
_, ok := s.blkRootToPendingAtts[roots64]
if !ok {
s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att}
s.blkRootToPendingAtts[roots64] = []*ethpb.AggregateAttestationAndProof{att}
return
}

s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att)
s.blkRootToPendingAtts[roots64] = append(s.blkRootToPendingAtts[roots64], att)
}

// This validates the pending attestations in the queue are still valid.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/service.go
Expand Up @@ -56,7 +56,7 @@ func NewRegularSync(cfg *Config) *Service {
initialSync: cfg.InitialSync,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
blkRootToPendingAtts: make(map[[64]byte][]*ethpb.AggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
Expand All @@ -79,7 +79,7 @@ type Service struct {
chain blockchainService
slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock
seenPendingBlocks map[[32]byte]bool
blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof
blkRootToPendingAtts map[[64]byte][]*ethpb.AggregateAttestationAndProof
pendingAttsLock sync.RWMutex
pendingQueueLock sync.RWMutex
chainStarted bool
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/validate_aggregate_proof.go
Expand Up @@ -81,6 +81,11 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateA
return false
}

if !r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.Target.Root)) {
r.savePendingAtt(a)
return false
}

// Verify attestation slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots.
currentSlot := uint64(roughtime.Now().Unix()-r.chain.GenesisTime().Unix()) / params.BeaconConfig().SecondsPerSlot
if attSlot > currentSlot || currentSlot > attSlot+params.BeaconConfig().AttestationPropagationSlotRange {
Expand Down
9 changes: 9 additions & 0 deletions shared/bytesutil/bytes.go
Expand Up @@ -122,6 +122,15 @@ func ToBytes48(x []byte) [48]byte {
return y
}

// ToBytes64 is a convenience method for converting a byte slice to a fix
// sized 64 byte array. This method will truncate the input if it is larger
// than 64 bytes.
func ToBytes64(x []byte) [64]byte {
var y [64]byte
copy(y[:], x)
return y
}

// FromBytes32 is a convenience method for converting a fixed-size byte array
// to a byte slice.
func FromBytes32(x [32]byte) []byte {
Expand Down

0 comments on commit 5767f07

Please sign in to comment.