Skip to content

Commit

Permalink
Merge branch 'fuzz_state_no_cache' of github.com:prysmaticlabs/prysm …
Browse files Browse the repository at this point in the history
…into fuzz_state_no_cache
  • Loading branch information
shayzluf committed Feb 18, 2020
2 parents 789d964 + d1358de commit 52324b1
Show file tree
Hide file tree
Showing 24 changed files with 900 additions and 158 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Expand Up @@ -1272,7 +1272,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "b7452dde4ca361809def4ed5924ab3cb7ad1299a",
commit = "53ccc146f7f488c5c7634530057f4aedf510a9ac",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [
Expand Down
22 changes: 21 additions & 1 deletion beacon-chain/BUILD.bazel
@@ -1,7 +1,7 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test")
load("@io_bazel_rules_docker//go:image.bzl", "go_image")
load("@io_bazel_rules_docker//container:container.bzl", "container_bundle")
load("//tools:binary_targets.bzl", "binary_targets")
load("//tools:binary_targets.bzl", "binary_targets", "go_image_debug")
load("@io_bazel_rules_docker//contrib:push-all.bzl", "docker_push")

go_library(
Expand Down Expand Up @@ -71,12 +71,32 @@ container_bundle(
tags = ["manual"],
)

go_image_debug(
name = "image_debug",
image = ":image",
)

container_bundle(
name = "image_bundle_debug",
images = {
"gcr.io/prysmaticlabs/prysm/beacon-chain:latest-debug": ":image_debug",
"gcr.io/prysmaticlabs/prysm/beacon-chain:{DOCKER_TAG}-debug": ":image_debug",
},
tags = ["manual"],
)

docker_push(
name = "push_images",
bundle = ":image_bundle",
tags = ["manual"],
)

docker_push(
name = "push_images_debug",
bundle = ":image_bundle_debug",
tags = ["manual"],
)

go_binary(
name = "beacon-chain",
embed = [":go_default_library"],
Expand Down
20 changes: 11 additions & 9 deletions beacon-chain/blockchain/chain_info.go
Expand Up @@ -32,7 +32,7 @@ type TimeFetcher interface {
type HeadFetcher interface {
HeadSlot() uint64
HeadRoot(ctx context.Context) ([]byte, error)
HeadBlock() *ethpb.SignedBeaconBlock
HeadBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, error)
HeadState(ctx context.Context) (*state.BeaconState, error)
HeadValidatorsIndices(epoch uint64) ([]uint64, error)
HeadSeed(epoch uint64) ([32]byte, error)
Expand Down Expand Up @@ -135,23 +135,25 @@ func (s *Service) HeadRoot(ctx context.Context) ([]byte, error) {
}

// HeadBlock returns the head block of the chain.
func (s *Service) HeadBlock() *ethpb.SignedBeaconBlock {
return s.headBlock()
// If the head state is nil from service struct,
// it will attempt to get the head block from DB.
func (s *Service) HeadBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, error) {
if s.hasHeadState() {
return s.headBlock(), nil
}

return s.beaconDB.HeadBlock(ctx)
}

// HeadState returns the head state of the chain.
// If the head state is nil from service struct,
// it will attempt to get from DB and error if nil again.
// it will attempt to get the head state from DB.
func (s *Service) HeadState(ctx context.Context) (*state.BeaconState, error) {
if s.hasHeadState() {
return s.headState(), nil
}

headState, err := s.beaconDB.HeadState(ctx)
if err != nil {
return nil, err
}
return headState, nil
return s.beaconDB.HeadState(ctx)
}

// HeadValidatorsIndices returns a list of active validator indices from the head view of a given epoch.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/chain_info_norace_test.go
Expand Up @@ -54,7 +54,7 @@ func TestHeadBlock_DataRace(t *testing.T) {
[32]byte{},
)
}()
s.HeadBlock()
s.HeadBlock(context.Background())
}

func TestHeadState_DataRace(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions beacon-chain/blockchain/chain_info_test.go
Expand Up @@ -145,10 +145,16 @@ func TestHeadRoot_CanRetrieve(t *testing.T) {

func TestHeadBlock_CanRetrieve(t *testing.T) {
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1}}
s, _ := state.InitializeFromProto(&pb.BeaconState{})
c := &Service{}
c.head = &head{block: b}
c.head = &head{block: b, state: s}

recevied, err := c.HeadBlock(context.Background())
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(b, c.HeadBlock()) {
if !reflect.DeepEqual(b, recevied) {
t.Error("incorrect head block received")
}
}
Expand Down
8 changes: 8 additions & 0 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain/metrics"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -204,6 +205,13 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}
}

if flags.Get().EnableArchive {
atts := signed.Block.Body.Attestations
if err := s.beaconDB.SaveAttestations(ctx, atts); err != nil {
return errors.Wrapf(err, "could not save block attestations from slot %d", b.Slot)
}
}

// Update justified check point.
if postState.CurrentJustifiedCheckpoint().Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
Expand Down
12 changes: 10 additions & 2 deletions beacon-chain/blockchain/service_test.go
Expand Up @@ -305,7 +305,11 @@ func TestChainService_InitializeBeaconChain(t *testing.T) {
if _, err := bc.HeadState(ctx); err != nil {
t.Error(err)
}
if bc.HeadBlock() == nil {
headBlk, err := bc.HeadBlock(ctx)
if err != nil {
t.Fatal(err)
}
if headBlk == nil {
t.Error("Head state can't be nil after initialize beacon chain")
}
if bc.headRoot() == params.BeaconConfig().ZeroHash {
Expand Down Expand Up @@ -356,7 +360,11 @@ func TestChainService_InitializeChainInfo(t *testing.T) {
if err := c.initializeChainInfo(ctx); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(c.HeadBlock(), headBlock) {
headBlk, err := c.HeadBlock(ctx)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(headBlk, headBlock) {
t.Error("head block incorrect")
}
s, err := c.HeadState(ctx)
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -160,8 +160,8 @@ func (ms *ChainService) HeadRoot(ctx context.Context) ([]byte, error) {
}

// HeadBlock mocks HeadBlock method in chain service.
func (ms *ChainService) HeadBlock() *ethpb.SignedBeaconBlock {
return ms.Block
func (ms *ChainService) HeadBlock(context.Context) (*ethpb.SignedBeaconBlock, error) {
return ms.Block, nil
}

// HeadState mocks HeadState method in chain service.
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/core/state/state.go
Expand Up @@ -5,6 +5,7 @@ package state

import (
"context"
"fmt"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -66,7 +67,7 @@ func GenesisBeaconState(deposits []*ethpb.Deposit, genesisTime uint64, eth1Data
leaves := [][]byte{}
for _, deposit := range deposits {
if deposit == nil || deposit.Data == nil {
return nil, errors.New("eth1data contains nil or deposits with nil data field")
return nil, fmt.Errorf("nil deposit or deposit with nil data cannot be processed: %v", deposit)
}
hash, err := ssz.HashTreeRoot(deposit.Data)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/beacon/BUILD.bazel
Expand Up @@ -30,6 +30,7 @@ go_library(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/pagination:go_default_library",
Expand Down Expand Up @@ -73,6 +74,7 @@ go_test(
"//beacon-chain/rpc/testing:go_default_library",
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
Expand Down
103 changes: 103 additions & 0 deletions beacon-chain/rpc/beacon/attestations.go
Expand Up @@ -8,9 +8,12 @@ import (
ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -110,6 +113,97 @@ func (bs *Server) ListAttestations(
}, nil
}

// ListIndexedAttestations retrieves indexed attestations by target epoch.
// IndexedAttestations are sorted by data slot by default. Either a target epoch filter
// or a boolean filter specifying a request for genesis epoch attestations may be used.
//
// The server may return an empty list when no attestations match the given
// filter criteria. This RPC should not return NOT_FOUND. Only one filter
// criteria should be used.
func (bs *Server) ListIndexedAttestations(
ctx context.Context, req *ethpb.ListIndexedAttestationsRequest,
) (*ethpb.ListIndexedAttestationsResponse, error) {
atts := make([]*ethpb.Attestation, 0)
var err error
epoch := helpers.SlotToEpoch(bs.GenesisTimeFetcher.CurrentSlot())
switch q := req.QueryFilter.(type) {
case *ethpb.ListIndexedAttestationsRequest_TargetEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(q.TargetEpoch))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = q.TargetEpoch
case *ethpb.ListIndexedAttestationsRequest_GenesisEpoch:
atts, err = bs.BeaconDB.Attestations(ctx, filters.NewFilter().SetTargetEpoch(0))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch attestations: %v", err)
}
epoch = 0
default:
return nil, status.Error(codes.InvalidArgument, "Must specify a filter criteria for fetching attestations")
}
// We sort attestations according to the Sortable interface.
sort.Sort(sortableAttestations(atts))
numAttestations := len(atts)

// If there are no attestations, we simply return a response specifying this.
// Otherwise, attempting to paginate 0 attestations below would result in an error.
if numAttestations == 0 {
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: make([]*ethpb.IndexedAttestation, 0),
TotalSize: int32(0),
NextPageToken: strconv.Itoa(0),
}, nil
}

committeesBySlot, _, err := bs.retrieveCommitteesForEpoch(ctx, epoch)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not retrieve committees for epoch %d: %v",
epoch,
err,
)
}

// We use the retrieved committees for the epoch to convert all attestations
// into indexed form effectively.
indexedAtts := make([]*ethpb.IndexedAttestation, numAttestations, numAttestations)
startSlot := helpers.StartSlot(epoch)
endSlot := startSlot + params.BeaconConfig().SlotsPerEpoch
for i := 0; i < len(indexedAtts); i++ {
att := atts[i]
// Out of range check, the attestation slot cannot be greater
// the last slot of the requested epoch or smaller than its start slot
// given committees are accessed as a map of slot -> commitees list, where there are
// SLOTS_PER_EPOCH keys in the map.
if att.Data.Slot < startSlot || att.Data.Slot > endSlot {
continue
}
committee := committeesBySlot[att.Data.Slot].Committees[att.Data.CommitteeIndex]
idxAtt, err := attestationutil.ConvertToIndexed(ctx, atts[i], committee.ValidatorIndices)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"Could not convert attestation with slot %d to indexed form: %v",
att.Data.Slot,
err,
)
}
indexedAtts[i] = idxAtt
}

start, end, nextPageToken, err := pagination.StartAndEndPage(req.PageToken, int(req.PageSize), len(indexedAtts))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not paginate attestations: %v", err)
}
return &ethpb.ListIndexedAttestationsResponse{
IndexedAttestations: indexedAtts[start:end],
TotalSize: int32(len(indexedAtts)),
NextPageToken: nextPageToken,
}, nil
}

// StreamAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool at the start of a slot and sends
// them over a gRPC stream.
Expand All @@ -133,6 +227,15 @@ func (bs *Server) StreamAttestations(
}
}

// StreamIndexedAttestations to clients at the end of every slot. This method retrieves the
// aggregated attestations currently in the pool, converts them into indexed form, and
// sends them over a gRPC stream.
func (bs *Server) StreamIndexedAttestations(
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamIndexedAttestationsServer,
) error {
return status.Error(codes.Unimplemented, "Unimplemented")
}

// AttestationPool retrieves pending attestations.
//
// The server returns a list of attestations that have been seen but not
Expand Down

0 comments on commit 52324b1

Please sign in to comment.