Skip to content

Commit

Permalink
Add flags to cluster pk manager (#4645)
Browse files Browse the repository at this point in the history
* Add flags to cluster pk manager
* Merge branch 'master' into cluster-pk-mgr
  • Loading branch information
prestonvanloon authored and prylabs-bulldozer[bot] committed Jan 24, 2020
1 parent 0f730b5 commit 10341cb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 1 deletion.
2 changes: 2 additions & 0 deletions tools/cluster-pk-manager/server/BUILD.bazel
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
Expand Down Expand Up @@ -103,6 +104,7 @@ go_image(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
Expand Down
15 changes: 15 additions & 0 deletions tools/cluster-pk-manager/server/db.go
Expand Up @@ -117,6 +117,20 @@ func (d *db) UnallocatedPKs(_ context.Context, numKeys uint64) (*pb.PrivateKeys,
return pks, nil
}

func (d *db) DeleteUnallocatedKey(_ context.Context, privateKey []byte) error {
return d.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(unassignedPkBucket).Delete(privateKey); err != nil {
return err
}
if err := tx.Bucket(deletedKeysBucket).Put(privateKey, dummyVal); err != nil {
return err
}
blacklistedPKCount.Inc()
allocatedPkCount.Dec()
return nil
})
}

// PodPK returns an assigned private key to the given pod name, if one exists.
func (d *db) PodPKs(_ context.Context, podName string) (*pb.PrivateKeys, error) {
pks := &pb.PrivateKeys{}
Expand Down Expand Up @@ -334,3 +348,4 @@ func (d *db) RemovePKFromPod(podName string, key []byte) error {
return tx.Bucket(assignedPkBucket).Put([]byte(podName), marshaled)
})
}

10 changes: 9 additions & 1 deletion tools/cluster-pk-manager/server/main.go
Expand Up @@ -24,6 +24,8 @@ var (
dbPath = flag.String("db-path", "", "The file path for database storage")
disableWatchtower = flag.Bool("disable-watchtower", false, "Disable kubernetes pod watcher. Useful for local testing")
verbose = flag.Bool("verbose", false, "Enable debug logging")
ensureDeposited = flag.Bool("ensure-deposited", false, "Ensure keys are deposited")
allowNewDeposits = flag.Bool("allow-new-deposits", true, "Allow cluster PK manager to send new deposits or generate new keys")
)

func main() {
Expand All @@ -33,9 +35,15 @@ func main() {
}
// use demo-config for cluster deployments
params.UseDemoBeaconConfig()
if *ensureDeposited {
log.Warn("--ensure-deposited: Ensuring all keys are deposited or deleting them from database!")
}
if !*allowNewDeposits {
log.Warn("Disallowing new deposits")
}

db := newDB(*dbPath)
srv := newServer(db, *rpcPath, *depositContractAddr, *privateKey, *depositAmount)
srv := newServer(db, *rpcPath, *depositContractAddr, *privateKey, *depositAmount, *beaconRPCPath)
if !*disableWatchtower {
wt := newWatchtower(db)
go wt.WatchPods()
Expand Down
42 changes: 42 additions & 0 deletions tools/cluster-pk-manager/server/server.go
Expand Up @@ -15,9 +15,13 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
pb "github.com/prysmaticlabs/prysm/proto/cluster"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/keystore"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
)

var gasLimit = uint64(4000000)
Expand All @@ -29,6 +33,7 @@ type server struct {
depositAmount *big.Int
txPk *ecdsa.PrivateKey
client *ethclient.Client
beacon ethpb.BeaconNodeValidatorClient

clientLock sync.Mutex
}
Expand All @@ -39,6 +44,7 @@ func newServer(
depositContractAddr string,
funderPK string,
validatorDepositAmount int64,
beaconRPCAddr string,
) *server {
rpcClient, err := rpc.Dial(rpcAddr)
if err != nil {
Expand All @@ -58,12 +64,18 @@ func newServer(

depositAmount := big.NewInt(validatorDepositAmount)

conn, err := grpc.DialContext(context.Background(), beaconRPCAddr, grpc.WithInsecure(), grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", beaconRPCAddr, err)
}

return &server{
contract: contract,
client: client,
db: db,
depositAmount: depositAmount,
txPk: txPk,
beacon: ethpb.NewBeaconNodeValidatorClient(conn),
}
}

Expand Down Expand Up @@ -117,6 +129,33 @@ func (s *server) Request(ctx context.Context, req *pb.PrivateKeyRequest) (*pb.Pr

pks.PrivateKeys = append(pks.PrivateKeys, unallocated.PrivateKeys...)

if *ensureDeposited {
log.Debugf("Ensuring %d keys are deposited", len(pks.PrivateKeys))
ok := make([][]byte, 0, len(pks.PrivateKeys))
for _, pk := range pks.PrivateKeys {
sk, err := bls.SecretKeyFromBytes(pk)
if err != nil || sk == nil {
continue
}
pub := sk.PublicKey().Marshal()
req := &ethpb.ValidatorStatusRequest{PublicKey:pub}
res, err := s.beacon.ValidatorStatus(ctx, req)
if err != nil {
log.WithError(err).Error("Failed to get validator status")
continue
}
if res.Status == ethpb.ValidatorStatus_UNKNOWN_STATUS {
log.Warn("Deleting unknown deposit pubkey")
if err := s.db.DeleteUnallocatedKey(ctx, pk); err != nil {
log.WithError(err).Error("Failed to delete unallocated key")
}
} else {
ok = append(ok, pk)
}
}
pks.PrivateKeys = ok
}

if len(pks.PrivateKeys) < int(req.NumberOfKeys) {
c := int(req.NumberOfKeys) - len(pks.PrivateKeys)
newKeys, err := s.allocateNewKeys(ctx, req.PodName, c)
Expand All @@ -134,6 +173,9 @@ func (s *server) Request(ctx context.Context, req *pb.PrivateKeyRequest) (*pb.Pr
}

func (s *server) allocateNewKeys(ctx context.Context, podName string, numKeys int) (*pb.PrivateKeys, error) {
if !*allowNewDeposits {
return nil, errors.New("new deposits not allowed")
}
pks := make([][]byte, 0, numKeys)
txMap := make(map[*keystore.Key]*types.Transaction)

Expand Down

0 comments on commit 10341cb

Please sign in to comment.