Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Admin] simplify admin server implementation #1364

Merged
merged 3 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 19 additions & 64 deletions admin/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ import (
pb "github.com/onflow/flow-go/admin/admin"
)

const (
CommandRunnerMaxQueueLength = 128
CommandRunnerNumWorkers = 1
CommandRunnerShutdownTimeout = 5 * time.Second
)
const CommandRunnerShutdownTimeout = 5 * time.Second

type CommandHandler func(ctx context.Context, data map[string]interface{}) error
type CommandValidator func(data map[string]interface{}) error
Expand Down Expand Up @@ -58,7 +54,6 @@ func (r *CommandRunnerBootstrapper) Bootstrap(logger zerolog.Logger, bindAddress
commandRunner := &CommandRunner{
handlers: r.handlers,
validators: r.validators,
commandQ: make(chan *CommandRequest, CommandRunnerMaxQueueLength),
grpcAddress: fmt.Sprintf("%s/flow-node-admin.sock", os.TempDir()),
httpAddress: bindAddress,
logger: logger.With().Str("admin", "command_runner").Logger(),
Expand Down Expand Up @@ -92,7 +87,6 @@ func (r *CommandRunnerBootstrapper) RegisterValidator(command string, validator
type CommandRunner struct {
handlers map[string]CommandHandler
validators map[string]CommandValidator
commandQ chan *CommandRequest
grpcAddress string
httpAddress string
tlsConfig *tls.Config
Expand Down Expand Up @@ -123,12 +117,6 @@ func (r *CommandRunner) Start(ctx context.Context) error {
return fmt.Errorf("failed to start admin server: %w", err)
}

for i := 0; i < CommandRunnerNumWorkers; i++ {
r.workersStarted.Add(1)
r.workersFinished.Add(1)
go r.processLoop(ctx)
}

close(r.startupCompleted)

return nil
Expand Down Expand Up @@ -177,7 +165,7 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
}

grpcServer := grpc.NewServer()
pb.RegisterAdminServer(grpcServer, NewAdminServer(r.commandQ))
pb.RegisterAdminServer(grpcServer, NewAdminServer(r))

r.workersStarted.Add(1)
r.workersFinished.Add(1)
Expand Down Expand Up @@ -236,7 +224,6 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
r.logger.Info().Msg("admin server shutting down")

grpcServer.Stop()
close(r.commandQ)

if httpServer != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), CommandRunnerShutdownTimeout)
Expand All @@ -252,61 +239,29 @@ func (r *CommandRunner) runAdminServer(ctx context.Context) error {
return nil
}

func (r *CommandRunner) processLoop(ctx context.Context) {
defer func() {
r.logger.Info().Msg("process loop shutting down")
func (r *CommandRunner) runCommand(ctx context.Context, command string, data map[string]interface{}) error {
r.logger.Info().Str("command", command).Msg("received new command")

// cleanup uncompleted requests from the command queue
for command := range r.commandQ {
close(command.responseChan)
if validator := r.getValidator(command); validator != nil {
if validationErr := validator(data); validationErr != nil {
return status.Error(codes.InvalidArgument, validationErr.Error())
}
}

r.workersFinished.Done()
}()

r.workersStarted.Done()

for {
select {
case command, ok := <-r.commandQ:
if !ok {
return
}

r.logger.Info().Str("command", command.command).Msg("received new command")

var err error

if validator := r.getValidator(command.command); validator != nil {
if validationErr := validator(command.data); validationErr != nil {
err = status.Error(codes.InvalidArgument, validationErr.Error())
goto sendResponse
}
}

if handler := r.getHandler(command.command); handler != nil {
// TODO: we can probably merge the command context with the worker context
// using something like: https://github.com/teivah/onecontext
if handleErr := handler(command.ctx, command.data); handleErr != nil {
if errors.Is(handleErr, context.Canceled) {
err = status.Error(codes.Canceled, "client canceled")
} else if errors.Is(handleErr, context.DeadlineExceeded) {
err = status.Error(codes.DeadlineExceeded, "request timed out")
} else {
s, _ := status.FromError(handleErr)
err = s.Err()
}
}
if handler := r.getHandler(command); handler != nil {
if handleErr := handler(ctx, data); handleErr != nil {
if errors.Is(handleErr, context.Canceled) {
return status.Error(codes.Canceled, "client canceled")
} else if errors.Is(handleErr, context.DeadlineExceeded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can actually do without the if-elseif-else here since we have a return in each of those clauses

return status.Error(codes.DeadlineExceeded, "request timed out")
} else {
err = status.Error(codes.Unimplemented, "invalid command")
s, _ := status.FromError(handleErr)
return s.Err()
}

sendResponse:
command.responseChan <- &CommandResponse{err}
close(command.responseChan)
case <-ctx.Done():
return
}
} else {
return status.Error(codes.Unimplemented, "invalid command")
}

return nil
}
42 changes: 5 additions & 37 deletions admin/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"math/big"
"net"
"net/http"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -58,9 +57,6 @@ func (suite *CommandRunnerSuite) TearDownTest() {
suite.NoError(err)
suite.cancel()
<-suite.runner.Done()
suite.Len(suite.runner.commandQ, 0)
_, ok := <-suite.runner.commandQ
suite.False(ok)
}

func (suite *CommandRunnerSuite) SetupCommandRunner(opts ...CommandRunnerOption) {
Expand Down Expand Up @@ -271,7 +267,11 @@ func (suite *CommandRunnerSuite) TestHTTPServer() {
reqBody := bytes.NewBuffer([]byte(`{"commandName": "foo", "data": {"key": "value"}}`))
resp, err := http.Post(url, "application/json", reqBody)
suite.NoError(err)
defer resp.Body.Close()
defer func() {
if resp.Body != nil {
resp.Body.Close()
}
}()

suite.True(called)
suite.Equal("200 OK", resp.Status)
Expand Down Expand Up @@ -429,35 +429,3 @@ func (suite *CommandRunnerSuite) TestTLS() {
suite.True(called)
suite.Equal("200 OK", resp.Status)
}

func (suite *CommandRunnerSuite) TestCleanup() {
suite.bootstrapper.RegisterHandler("foo", func(ctx context.Context, data map[string]interface{}) error {
<-ctx.Done()
return ctx.Err()
})

suite.SetupCommandRunner()

data := make(map[string]interface{})
data["key"] = "value"
val, err := structpb.NewStruct(data)
suite.NoError(err)
request := &pb.RunCommandRequest{
CommandName: "foo",
Data: val,
}

var requestsDone sync.WaitGroup
for i := 0; i < CommandRunnerMaxQueueLength; i++ {
requestsDone.Add(1)
go func() {
defer requestsDone.Done()
_, err = suite.client.RunCommand(context.Background(), request)
suite.Error(err)
}()
}

suite.cancel()

requestsDone.Wait()
}
50 changes: 5 additions & 45 deletions admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,23 @@ package admin

import (
"context"
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/onflow/flow-go/admin/admin"
)

type adminServer struct {
pb.UnimplementedAdminServer
commandQ chan<- *CommandRequest
}

type CommandRequest struct {
ctx context.Context
command string
data map[string]interface{}
responseChan chan<- *CommandResponse
}

type CommandResponse struct {
err error
cr *CommandRunner
}

func (s *adminServer) RunCommand(ctx context.Context, in *pb.RunCommandRequest) (*pb.RunCommandResponse, error) {
resp := make(chan *CommandResponse, 1)

select {
case s.commandQ <- &CommandRequest{
ctx,
in.GetCommandName(),
in.GetData().AsMap(),
resp,
}:
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return nil, status.Error(codes.Canceled, "client canceled")
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return nil, status.Error(codes.DeadlineExceeded, "request timed out")
}

panic("context returned unexpected error after done channel was closed")
}

response, ok := <-resp
if !ok {
// response channel was closed without a response
return nil, status.Error(codes.Internal, "command terminated unexpectedly")
}

if response.err != nil {
return nil, response.err
if err := s.cr.runCommand(ctx, in.GetCommandName(), in.GetData().AsMap()); err != nil {
return nil, err
}

return &pb.RunCommandResponse{}, nil
}

func NewAdminServer(commandQ chan<- *CommandRequest) *adminServer {
return &adminServer{commandQ: commandQ}
func NewAdminServer(cr *CommandRunner) *adminServer {
return &adminServer{cr: cr}
}