Skip to content

Commit

Permalink
Use ctxzap in GRPC services (#4816)
Browse files Browse the repository at this point in the history
## Motivation
The start of GRPC calls was manually logged. It can be automated and enriched with contextual information about the GRPC connection using middleware. We already use Zap's [UnaryServerInterceptor](https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware@v1.4.0/logging/zap#UnaryServerInterceptor) that appends a logger into the `Context` of a GRPC call. It can be later extracted with [ctxzap](https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap).

## Changes
- log start of GRPC call with middleware
- use context-aware logging in GRPC services with ctxzap
- give time to send a response on AdminService/Recover before shutting down
- shutdown with os.Exit instead of panic on /Recover

## Test Plan
- existing tests pass
- added test for AdminService/Recover
  • Loading branch information
poszu committed Aug 21, 2023
1 parent 403f8c4 commit 2582563
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 222 deletions.
23 changes: 11 additions & 12 deletions api/grpcserver/activation_service.go
Expand Up @@ -6,33 +6,31 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

type activationService struct {
logger log.Logger
goldenAtx types.ATXID
atxProvider atxProvider
}

func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log.Logger) *activationService {
func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activationService {
return &activationService{
logger: lg,
goldenAtx: goldenAtx,
atxProvider: atxProvider,
}
}

// RegisterService implements ServiceAPI.
func (s *activationService) RegisterService(server *Server) {
s.logger.Info("registering GRPC Activation Service")
pb.RegisterActivationServiceServer(server.GrpcServer, s)
}

Expand All @@ -45,18 +43,19 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get ATX",
log.Stringer("atx id", atxId),
log.Err(err),
ctxzap.Debug(ctx, "failed to get ATX",
zap.Stringer("id", atxId),
zap.Error(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
s.logger.With().Error("failed to get malfeasance proof",
log.Stringer("smesher", atx.SmesherID),
log.Stringer("id", atxId),
log.Err(err),
ctxzap.Error(ctx, "failed to get malfeasance proof",
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("smesher", atx.SmesherID),
zap.Stringer("id", atxId),
zap.Error(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
Expand Down
15 changes: 7 additions & 8 deletions api/grpcserver/activation_service_test.go
Expand Up @@ -16,15 +16,14 @@ import (
"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah"))
response, err := activationService.Highest(context.Background(), &empty.Empty{})
Expand All @@ -42,7 +41,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
goldenAtx := types.ATXID{2, 3, 4}
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, goldenAtx)

atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
Expand Down Expand Up @@ -77,7 +76,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) {
func TestGet_RejectInvalidAtxID(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

_, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}})
require.Error(t, err)
Expand All @@ -87,7 +86,7 @@ func TestGet_RejectInvalidAtxID(t *testing.T) {
func TestGet_AtxNotPresent(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(nil, nil)
Expand All @@ -100,7 +99,7 @@ func TestGet_AtxNotPresent(t *testing.T) {
func TestGet_AtxProviderReturnsFailure(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atxProvider.EXPECT().GetFullAtx(id).Return(&types.VerifiedActivationTx{}, errors.New(""))
Expand All @@ -113,7 +112,7 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) {
func TestGet_HappyPath(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

id := types.RandomATXID()
atx := types.VerifiedActivationTx{
Expand Down Expand Up @@ -150,7 +149,7 @@ func TestGet_HappyPath(t *testing.T) {
func TestGet_IdentityCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1})

smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory())
id := types.RandomATXID()
Expand Down
22 changes: 15 additions & 7 deletions api/grpcserver/admin_service.go
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spf13/afero"
"google.golang.org/grpc/codes"
Expand All @@ -18,7 +20,6 @@ import (
"github.com/spacemeshos/go-spacemesh/checkpoint"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

Expand All @@ -29,19 +30,25 @@ const (

// AdminService exposes endpoints for node administration.
type AdminService struct {
logger log.Logger
db *sql.Database
dataDir string
recover func()
p peers
}

// NewAdminService creates a new admin grpc service.
func NewAdminService(db *sql.Database, dataDir string, lg log.Logger, p peers) *AdminService {
func NewAdminService(db *sql.Database, dataDir string, p peers) *AdminService {
return &AdminService{
logger: lg,
db: db,
dataDir: dataDir,
p: p,
recover: func() {
go func() {
// Allow time for the response to be sent.
time.Sleep(time.Second)
os.Exit(0)
}()
},
p: p,
}
}

Expand Down Expand Up @@ -95,8 +102,9 @@ func (a AdminService) CheckpointStream(req *pb.CheckpointStreamRequest, stream p
}
}

func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
a.logger.Panic("going to recover from checkpoint")
func (a AdminService) Recover(ctx context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) {
ctxzap.Info(ctx, "going to recover from checkpoint")
a.recover()
return &empty.Empty{}, nil
}

Expand Down
24 changes: 21 additions & 3 deletions api/grpcserver/admin_service_test.go
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"io"
"sync/atomic"
"testing"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
Expand Down Expand Up @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) {
func TestAdminService_Checkpoint(t *testing.T) {
db := sql.InMemory()
createMesh(t, db)
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) {

func TestAdminService_CheckpointError(t *testing.T) {
db := sql.InMemory()
svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil)
svc := NewAdminService(db, t.TempDir(), nil)
t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -104,3 +104,21 @@ func TestAdminService_CheckpointError(t *testing.T) {
_, err = stream.Recv()
require.ErrorContains(t, err, sql.ErrNotFound.Error())
}

func TestAdminService_Recovery(t *testing.T) {
db := sql.InMemory()
recoveryCalled := atomic.Bool{}
svc := NewAdminService(db, t.TempDir(), nil)
svc.recover = func() { recoveryCalled.Store(true) }

t.Cleanup(launchServer(t, cfg, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg.PublicListener)
c := pb.NewAdminServiceClient(conn)

_, err := c.Recover(ctx, &pb.RecoverRequest{})
require.NoError(t, err)
require.True(t, recoveryCalled.Load())
}
13 changes: 5 additions & 8 deletions api/grpcserver/debug_service.go
Expand Up @@ -5,23 +5,23 @@ import (
"fmt"

"github.com/golang/protobuf/ptypes/empty"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/accounts"
)

// DebugService exposes global state data, output from the STF.
type DebugService struct {
db *sql.Database
logger log.Logger
conState conservativeState
identity networkIdentity
oracle oracle
Expand All @@ -33,20 +33,17 @@ func (d DebugService) RegisterService(server *Server) {
}

// NewDebugService creates a new grpc service using config data.
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle, lg log.Logger) *DebugService {
func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService {
return &DebugService{
db: db,
logger: lg,
conState: conState,
identity: host,
oracle: oracle,
}
}

// Accounts returns current counter and balance for all accounts.
func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
d.logger.Info("GRPC DebugServices.Accounts")

func (d DebugService) Accounts(ctx context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) {
var (
accts []*types.Account
err error
Expand All @@ -57,7 +54,7 @@ func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.A
accts, err = accounts.Snapshot(d.db, types.LayerID(in.Layer))
}
if err != nil {
d.logger.Error("Failed to get all accounts from state: %s", err)
ctxzap.Error(ctx, " Failed to get all accounts from state", zap.Error(err))
return nil, status.Errorf(codes.Internal, "error fetching accounts state")
}

Expand Down

0 comments on commit 2582563

Please sign in to comment.