Skip to content

Commit

Permalink
Leo/v0.22 merge master with consensus fix (#1433)
Browse files Browse the repository at this point in the history
* fvm bench test

* bench test upgrade

* add create account and ft transfer bench

* Add a comparative bench

* adb more benchmarks

* review fixes

* increase benchmarking string length

* Addition of AWS S3 uploader

* Add S3 bucket option to execution CLI

* Use single programs instance

* fix lint issues and revert unnecesary changes

* refactor - part1

* refactor - part 2

* let batchNFT benchmark accept block executor

* Enable transaction fees on mainnet

* adjust CLI flags

* go mod tidy

* simplify admin server implementation

* Extracted rootBlock command from finalize command for bootstrap util. Added encoding and writing to disk encoded root block and votes

* Implemented QC creation as part of finalizing cmd. Updated docs. Implementing reading/writing of DKG and root block

* Fixed typo, updated tests

* Updated QC building logic. Fixed signing of QC

* Updated godoc

* Updated comments. Reverted last commit

* Updated how root block is created. Separated root block and votes

* update QCVoter and DKG Broker to handle access node fallbacks

- update CLI for LN/SN nodes to remove access-address, and secure-access-node-api flags

- add --access-node-ids flag

- update unit tests

- add fallback logic to retry logic of Broadcast and Vote

* Added encoding of full DKG to construct QC in finilize step

* Updated reading of votes, dkg data in finalizer. Simplified logic for generating QC

* implement pull root block and push root block vote

* fux kubg

* Updated tests

* lint fixes

* Fixed tests

* Linted. Fixed tests

* Update cmd/bootstrap/run/qc.go

* renamed methods for reading node infos to reflect that fact that we are _reading_ the infos and not generating them

* consolidated consistency checks

* cleanup

* added error check

* format

* update localnet/integration tests to generate votes separately

* put votes in same directory

* fix path join

* address comments

* fix cmd namings

* rename root block vote

* pass in identities separate from participant data

* add log

* Extracted rootBlock command from finalize command for bootstrap util. Added encoding and writing to disk encoded root block and votes

* unwrap encodable type of key for partners

* Implemented QC creation as part of finalizing cmd. Updated docs. Implementing reading/writing of DKG and root block

* Fixed typo, updated tests

* Updated QC building logic. Fixed signing of QC

* Updated godoc

* Updated comments. Reverted last commit

* Updated how root block is created. Separated root block and votes

* Added encoding of full DKG to construct QC in finilize step

* Updated reading of votes, dkg data in finalizer. Simplified logic for generating QC

* Updated tests

* Fixed tests

* Linted. Fixed tests

* Update cmd/bootstrap/run/qc.go

* renamed methods for reading node infos to reflect that fact that we are _reading_ the infos and not generating them

* consolidated consistency checks

* cleanup

* format

* update localnet/integration tests to generate votes separately

* put votes in same directory

* fix path join

* change file names

* Update finalize_test.go

* fix filename

* fixing makefile and other fixes (#1378)

* fixing makefile and other fixes

* removing special nocgo docker target for the transit script

* fix test (lint)

* Update README.md

* LInted and fixed unit tests

* Fixed network test

* Updated godoc

* fixing the rootblock.json download path

* fixing log message

* restrict fees to mainnet testnet and canary

* split sign and upload into two steps

* update localnet

- update local net flags for LN/SN nodes

- update default access node count to 2

-  put flow client config prep in common function

* Update peerManager.go

* fix bug

* Update peerManager.go

* update integration tests

- update integration tests flags and container configs (from #1323)

* add missing error return when node fails to prepare flow client conn opts

* Update flow_client.go

* update dkg tests controller factory client arg

* update comments and log statement formatting

* Update flow_client.go

* add fallback to poll/submitResult

* add log

* fix bug creating flow client opts

we always used the public key from the first AN, making any fallback
attempts useless (wrong key)

* [Consensus] fix startup time (#1402)

* fix startup time

* remove log from debugging

* maintain ordering of ANs

* update var names

* update flag help string, improve logging,

- use trim prefix

* [crypto] Move ADX detection to crypto Makefile

Fixes #1229

* [crypto] De-quarantine BLST tests

Their flakiness was fixed by #1227

* Update broker.go

* [Consensus and Collection] Refactors guarantee dissemination  (#1406)

* refactors pusher engine multicast

* refactors ingestion engine publish mechanism on consensus node side

* fixes lint

* fixes a comment

* [Consensus] Revert unsealed reason (#1332)

* revert unsealed reason

* fix linter

Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>

* Update command_runner_test.go

* fix typo

Co-authored-by: Janez Podhostnik <janez.podhostnik@gmail.com>
Co-authored-by: Martin Gallagher <martin@martingallagher.com>
Co-authored-by: Janez Podhostnik <67895329+janezpodhostnik@users.noreply.github.com>
Co-authored-by: ramtinms <ramtin@axiomzen.co>
Co-authored-by: Ramtin M. Seraj <ramtinms@users.noreply.github.com>
Co-authored-by: François Garillot <francois.garillot@dapperlabs.com>
Co-authored-by: Simon Zhu <simon.zsiyan@gmail.com>
Co-authored-by: Yurii Oleksyshyn <yuraolex@gmail.com>
Co-authored-by: Khalil Claybon <khalil.claybon@dapperlabs.com>
Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
Co-authored-by: Vishal <1117327+vishalchangrani@users.noreply.github.com>
Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com>
Co-authored-by: Yahya Hassanzadeh <yhassanzadeh13@ku.edu.tr>
  • Loading branch information
15 people committed Oct 5, 2021
1 parent 1f625f6 commit e1659ae
Show file tree
Hide file tree
Showing 32 changed files with 900 additions and 362 deletions.
83 changes: 19 additions & 64 deletions admin/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ import (
pb "github.com/onflow/flow-go/admin/admin"
)

const (
CommandRunnerMaxQueueLength = 128
CommandRunnerNumWorkers = 1
CommandRunnerShutdownTimeout = 5 * time.Second
)
const CommandRunnerShutdownTimeout = 5 * time.Second

type CommandHandler func(ctx context.Context, data map[string]interface{}) error
type CommandValidator func(data map[string]interface{}) error
Expand Down Expand Up @@ -58,7 +54,6 @@ func (r *CommandRunnerBootstrapper) Bootstrap(logger zerolog.Logger, bindAddress
commandRunner := &CommandRunner{
handlers: r.handlers,
validators: r.validators,
commandQ: make(chan *CommandRequest, CommandRunnerMaxQueueLength),
grpcAddress: fmt.Sprintf("%s/flow-node-admin.sock", os.TempDir()),
httpAddress: bindAddress,
logger: logger.With().Str("admin", "command_runner").Logger(),
Expand Down Expand Up @@ -92,7 +87,6 @@ func (r *CommandRunnerBootstrapper) RegisterValidator(command string, validator
type CommandRunner struct {
handlers map[string]CommandHandler
validators map[string]CommandValidator
commandQ chan *CommandRequest
grpcAddress string
httpAddress string
tlsConfig *tls.Config
Expand Down Expand Up @@ -123,12 +117,6 @@ func (r *CommandRunner) Start(ctx context.Context) error {
return fmt.Errorf("failed to start admin server: %w", err)
}

for i := 0; i < CommandRunnerNumWorkers; i++ {
r.workersStarted.Add(1)
r.workersFinished.Add(1)
go r.processLoop(ctx)
}

close(r.startupCompleted)

return nil
Expand Down Expand Up @@ -177,7 +165,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
}

grpcServer := grpc.NewServer()
pb.RegisterAdminServer(grpcServer, NewAdminServer(r.commandQ))
pb.RegisterAdminServer(grpcServer, NewAdminServer(r))

r.workersStarted.Add(1)
r.workersFinished.Add(1)
Expand Down Expand Up @@ -236,7 +224,6 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
r.logger.Info().Msg("admin server shutting down")

grpcServer.Stop()
close(r.commandQ)

if httpServer != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), CommandRunnerShutdownTimeout)
Expand All @@ -252,61 +239,29 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
return nil
}

func (r *CommandRunner) processLoop(ctx context.Context) {
defer func() {
r.logger.Info().Msg("process loop shutting down")
func (r *CommandRunner) runCommand(ctx context.Context, command string, data map[string]interface{}) error {
r.logger.Info().Str("command", command).Msg("received new command")

// cleanup uncompleted requests from the command queue
for command := range r.commandQ {
close(command.responseChan)
if validator := r.getValidator(command); validator != nil {
if validationErr := validator(data); validationErr != nil {
return status.Error(codes.InvalidArgument, validationErr.Error())
}
}

r.workersFinished.Done()
}()

r.workersStarted.Done()

for {
select {
case command, ok := <-r.commandQ:
if !ok {
return
}

r.logger.Info().Str("command", command.command).Msg("received new command")

var err error

if validator := r.getValidator(command.command); validator != nil {
if validationErr := validator(command.data); validationErr != nil {
err = status.Error(codes.InvalidArgument, validationErr.Error())
goto sendResponse
}
}

if handler := r.getHandler(command.command); handler != nil {
// TODO: we can probably merge the command context with the worker context
// using something like: https://github.com/teivah/onecontext
if handleErr := handler(command.ctx, command.data); handleErr != nil {
if errors.Is(handleErr, context.Canceled) {
err = status.Error(codes.Canceled, "client canceled")
} else if errors.Is(handleErr, context.DeadlineExceeded) {
err = status.Error(codes.DeadlineExceeded, "request timed out")
} else {
s, _ := status.FromError(handleErr)
err = s.Err()
}
}
if handler := r.getHandler(command); handler != nil {
if handleErr := handler(ctx, data); handleErr != nil {
if errors.Is(handleErr, context.Canceled) {
return status.Error(codes.Canceled, "client canceled")
} else if errors.Is(handleErr, context.DeadlineExceeded) {
return status.Error(codes.DeadlineExceeded, "request timed out")
} else {
err = status.Error(codes.Unimplemented, "invalid command")
s, _ := status.FromError(handleErr)
return s.Err()
}

sendResponse:
command.responseChan <- &CommandResponse{err}
close(command.responseChan)
case <-ctx.Done():
return
}
} else {
return status.Error(codes.Unimplemented, "invalid command")
}

return nil
}
42 changes: 5 additions & 37 deletions admin/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"math/big"
"net"
"net/http"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -58,9 +57,6 @@ func (suite *CommandRunnerSuite) TearDownTest() {
suite.NoError(err)
suite.cancel()
<-suite.runner.Done()
suite.Len(suite.runner.commandQ, 0)
_, ok := <-suite.runner.commandQ
suite.False(ok)
}

func (suite *CommandRunnerSuite) SetupCommandRunner(opts ...CommandRunnerOption) {
Expand Down Expand Up @@ -271,7 +267,11 @@ func (suite *CommandRunnerSuite) TestHTTPServer() {
reqBody := bytes.NewBuffer([]byte(`{"commandName": "foo", "data": {"key": "value"}}`))
resp, err := http.Post(url, "application/json", reqBody)
suite.NoError(err)
defer resp.Body.Close()
defer func() {
if resp.Body != nil {
resp.Body.Close()
}
}()

suite.True(called)
suite.Equal("200 OK", resp.Status)
Expand Down Expand Up @@ -429,35 +429,3 @@ func (suite *CommandRunnerSuite) TestTLS() {
suite.True(called)
suite.Equal("200 OK", resp.Status)
}

func (suite *CommandRunnerSuite) TestCleanup() {
suite.bootstrapper.RegisterHandler("foo", func(ctx context.Context, data map[string]interface{}) error {
<-ctx.Done()
return ctx.Err()
})

suite.SetupCommandRunner()

data := make(map[string]interface{})
data["key"] = "value"
val, err := structpb.NewStruct(data)
suite.NoError(err)
request := &pb.RunCommandRequest{
CommandName: "foo",
Data: val,
}

var requestsDone sync.WaitGroup
for i := 0; i < CommandRunnerMaxQueueLength; i++ {
requestsDone.Add(1)
go func() {
defer requestsDone.Done()
_, err = suite.client.RunCommand(context.Background(), request)
suite.Error(err)
}()
}

suite.cancel()

requestsDone.Wait()
}
50 changes: 5 additions & 45 deletions admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,23 @@ package admin

import (
"context"
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/onflow/flow-go/admin/admin"
)

type adminServer struct {
pb.UnimplementedAdminServer
commandQ chan<- *CommandRequest
}

type CommandRequest struct {
ctx context.Context
command string
data map[string]interface{}
responseChan chan<- *CommandResponse
}

type CommandResponse struct {
err error
cr *CommandRunner
}

func (s *adminServer) RunCommand(ctx context.Context, in *pb.RunCommandRequest) (*pb.RunCommandResponse, error) {
resp := make(chan *CommandResponse, 1)

select {
case s.commandQ <- &CommandRequest{
ctx,
in.GetCommandName(),
in.GetData().AsMap(),
resp,
}:
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return nil, status.Error(codes.Canceled, "client canceled")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return nil, status.Error(codes.DeadlineExceeded, "request timed out")
}

panic("context returned unexpected error after done channel was closed")
}

response, ok := <-resp
if !ok {
// response channel was closed without a response
return nil, status.Error(codes.Internal, "command terminated unexpectedly")
}

if response.err != nil {
return nil, response.err
if err := s.cr.runCommand(ctx, in.GetCommandName(), in.GetData().AsMap()); err != nil {
return nil, err
}

return &pb.RunCommandResponse{}, nil
}

func NewAdminServer(commandQ chan<- *CommandRequest) *adminServer {
return &adminServer{commandQ: commandQ}
func NewAdminServer(cr *CommandRunner) *adminServer {
return &adminServer{cr: cr}
}
11 changes: 0 additions & 11 deletions cmd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,6 @@ COPY --from=build-debug /app/app /bin/app

ENTRYPOINT ["dlv", "--listen=:2345", "--headless=true", "--api-version=2", "--accept-multiclient", "exec", "/bin/app", "--"]

FROM build-env as build-transit-production-nocgo
WORKDIR /app

RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
GO111MODULE=on CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-extldflags -static \
-X 'github.com/onflow/flow-go/cmd/build.commit=${COMMIT}' -X 'github.com/onflow/flow-go/cmd/build.semver=${VERSION}'" \
-o ./app ./cmd/${TARGET}

RUN chmod a+x /app/app

## (3) Add the statically linked binary to a distroless image
FROM gcr.io/distroless/base-debian10 as production-transit-nocgo

Expand Down
34 changes: 15 additions & 19 deletions cmd/bootstrap/cmd/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,12 @@ import (
"github.com/onflow/flow-go/model/flow/filter"
)

// Checks constraints about the number of partner and internal nodes.
// Internal nodes must comprise >2/3 of consensus committee.
// Internal nodes must comprise >2/3 of each collector cluster.
func checkConstraints(partnerNodes, internalNodes []model.NodeInfo) {

partners := model.ToIdentityList(partnerNodes)
internals := model.ToIdentityList(internalNodes)
all := append(partners, internals...)

// ensureUniformNodeWeightsPerRole verifies that the following condition is satisfied for each role R:
// * all node with role R must have the same weight
func ensureUniformNodeWeightsPerRole(allNodes flow.IdentityList) {
// ensure all nodes of the same role have equal stake/weight
for _, role := range flow.Roles() {
withRole := all.Filter(filter.HasRole(role))
withRole := allNodes.Filter(filter.HasRole(role))
expectedStake := withRole[0].Stake
for _, node := range withRole {
if node.Stake != expectedStake {
Expand All @@ -28,16 +22,18 @@ func checkConstraints(partnerNodes, internalNodes []model.NodeInfo) {
}
}
}
}

// check consensus committee Byzantine threshold
partnerCONCount := partners.Filter(filter.HasRole(flow.RoleConsensus)).Count()
internalCONCount := internals.Filter(filter.HasRole(flow.RoleConsensus)).Count()
if internalCONCount <= partnerCONCount*2 {
log.Fatal().Msgf(
"will not bootstrap configuration without Byzantine majority of consensus nodes: "+
"(partners=%d, internals=%d, min_internals=%d)",
partnerCONCount, internalCONCount, partnerCONCount*2+1)
}
// Checks constraints about the number of partner and internal nodes.
// * Internal nodes must comprise >2/3 of each collector cluster.
// * for all roles R:
// all node with role R must have the same weight
func checkConstraints(partnerNodes, internalNodes []model.NodeInfo) {
partners := model.ToIdentityList(partnerNodes)
internals := model.ToIdentityList(internalNodes)
all := append(partners, internals...)

ensureUniformNodeWeightsPerRole(all)

// check collection committee Byzantine threshold for each cluster
// for checking Byzantine constraints, the seed doesn't matter
Expand Down
18 changes: 18 additions & 0 deletions cmd/bootstrap/cmd/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
model "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/dkg"
"github.com/onflow/flow-go/model/encodable"
"github.com/onflow/flow-go/state/protocol/inmem"
)

func runDKG(nodes []model.NodeInfo) dkg.DKGData {
Expand All @@ -27,6 +28,12 @@ func runDKG(nodes []model.NodeInfo) dkg.DKGData {
}
log.Info().Msgf("finished running DKG")

pubKeyShares := make([]encodable.RandomBeaconPubKey, 0, len(dkgData.PubKeyShares))
for _, pubKey := range dkgData.PubKeyShares {
pubKeyShares = append(pubKeyShares, encodable.RandomBeaconPubKey{PublicKey: pubKey})
}

privKeyShares := make([]encodable.RandomBeaconPrivKey, 0, len(dkgData.PrivKeyShares))
for i, privKey := range dkgData.PrivKeyShares {
nodeID := nodes[i].NodeID

Expand All @@ -37,8 +44,19 @@ func runDKG(nodes []model.NodeInfo) dkg.DKGData {
GroupIndex: i,
}

privKeyShares = append(privKeyShares, encKey)

writeJSON(fmt.Sprintf(model.PathRandomBeaconPriv, nodeID), privParticpant)
}

// write full DKG info that will be used to construct QC
writeJSON(model.PathRootDKGData, inmem.EncodableFullDKG{
GroupKey: encodable.RandomBeaconPubKey{
PublicKey: dkgData.PubGroupKey,
},
PubKeyShares: pubKeyShares,
PrivKeyShares: privKeyShares,
})

return dkgData
}
2 changes: 1 addition & 1 deletion cmd/bootstrap/cmd/final_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func addFinalListFlags() {

func finalList(cmd *cobra.Command, args []string) {
// read public partner node infos
log.Info().Msgf("reading parnter public node information: %s", flagPartnerNodeInfoDir)
log.Info().Msgf("reading partner public node information: %s", flagPartnerNodeInfoDir)
partnerNodes := assemblePartnerNodesWithoutStake()

// read internal private node infos
Expand Down

0 comments on commit e1659ae

Please sign in to comment.