From 258256376abea67679b329dae45ade64db81e5f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Mon, 21 Aug 2023 21:52:43 +0000 Subject: [PATCH] Use ctxzap in GRPC services (#4816) ## Motivation The start of GRPC calls was manually logged. It can be automated and enriched with contextual information about the GRPC connection using middleware. We already use Zap's [UnaryServerInterceptor](https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware@v1.4.0/logging/zap#UnaryServerInterceptor) that appends a logger into the `Context` of a GRPC call. It can be later extracted with [ctxzap](https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap). ## Changes - log start of GRPC call with middleware - use context-aware logging in GRPC services with ctxzap - give time to send a response on AdminService/Recover before shutting down - shutdown with os.Exit instead of panic on /Recover ## Test Plan - existing tests pass - added test for AdminService/Recover --- api/grpcserver/activation_service.go | 23 ++++---- api/grpcserver/activation_service_test.go | 15 +++-- api/grpcserver/admin_service.go | 22 ++++--- api/grpcserver/admin_service_test.go | 24 +++++++- api/grpcserver/debug_service.go | 13 ++-- api/grpcserver/globalstate_service.go | 61 ++++++++----------- api/grpcserver/grpcserver_test.go | 44 +++++++------- api/grpcserver/mesh_service.go | 69 +++++++++------------- api/grpcserver/mesh_service_test.go | 4 +- api/grpcserver/node_service.go | 26 +++----- api/grpcserver/smesher_service.go | 34 +++-------- api/grpcserver/smesher_service_test.go | 13 ++-- api/grpcserver/transaction_service.go | 14 ++--- api/grpcserver/transaction_service_test.go | 7 +-- cmd/bootstrapper/generator_test.go | 2 +- common/types/activation.go | 10 ++++ log/zap.go | 2 + node/node.go | 34 +++++++---- systest/tests/checkpoint_test.go | 5 +- 19 files changed, 200 insertions(+), 222 deletions(-) diff --git a/api/grpcserver/activation_service.go b/api/grpcserver/activation_service.go index 28b4ce404b..b8e711f95b 100644 --- a/api/grpcserver/activation_service.go +++ b/api/grpcserver/activation_service.go @@ -6,25 +6,24 @@ import ( "fmt" "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/sql" ) type activationService struct { - logger log.Logger goldenAtx types.ATXID atxProvider atxProvider } -func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log.Logger) *activationService { +func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID) *activationService { return &activationService{ - logger: lg, goldenAtx: goldenAtx, atxProvider: atxProvider, } @@ -32,7 +31,6 @@ func NewActivationService(atxProvider atxProvider, goldenAtx types.ATXID, lg log // RegisterService implements ServiceAPI. func (s *activationService) RegisterService(server *Server) { - s.logger.Info("registering GRPC Activation Service") pb.RegisterActivationServiceServer(server.GrpcServer, s) } @@ -45,18 +43,19 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p atxId := types.ATXID(types.BytesToHash(request.Id)) atx, err := s.atxProvider.GetFullAtx(atxId) if err != nil || atx == nil { - s.logger.With().Debug("failed to get ATX", - log.Stringer("atx id", atxId), - log.Err(err), + ctxzap.Debug(ctx, "failed to get ATX", + zap.Stringer("id", atxId), + zap.Error(err), ) return nil, status.Error(codes.NotFound, "id was not found") } proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID) if err != nil && !errors.Is(err, sql.ErrNotFound) { - s.logger.With().Error("failed to get malfeasance proof", - log.Stringer("smesher", atx.SmesherID), - log.Stringer("id", atxId), - log.Err(err), + ctxzap.Error(ctx, "failed to get malfeasance proof", + zap.Stringer("smesher", atx.SmesherID), + zap.Stringer("smesher", atx.SmesherID), + zap.Stringer("id", atxId), + zap.Error(err), ) return nil, status.Error(codes.NotFound, "id was not found") } diff --git a/api/grpcserver/activation_service_test.go b/api/grpcserver/activation_service_test.go index 2f49241956..ff99e72e44 100644 --- a/api/grpcserver/activation_service_test.go +++ b/api/grpcserver/activation_service_test.go @@ -16,7 +16,6 @@ import ( "github.com/spacemeshos/go-spacemesh/api/grpcserver" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/sql" ) @@ -24,7 +23,7 @@ func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) goldenAtx := types.ATXID{2, 3, 4} - activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, goldenAtx) atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah")) response, err := activationService.Highest(context.Background(), &empty.Empty{}) @@ -42,7 +41,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) goldenAtx := types.ATXID{2, 3, 4} - activationService := grpcserver.NewActivationService(atxProvider, goldenAtx, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, goldenAtx) atx := types.VerifiedActivationTx{ ActivationTx: &types.ActivationTx{ @@ -77,7 +76,7 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) { func TestGet_RejectInvalidAtxID(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) _, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}}) require.Error(t, err) @@ -87,7 +86,7 @@ func TestGet_RejectInvalidAtxID(t *testing.T) { func TestGet_AtxNotPresent(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atxProvider.EXPECT().GetFullAtx(id).Return(nil, nil) @@ -100,7 +99,7 @@ func TestGet_AtxNotPresent(t *testing.T) { func TestGet_AtxProviderReturnsFailure(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atxProvider.EXPECT().GetFullAtx(id).Return(&types.VerifiedActivationTx{}, errors.New("")) @@ -113,7 +112,7 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) { func TestGet_HappyPath(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atx := types.VerifiedActivationTx{ @@ -150,7 +149,7 @@ func TestGet_HappyPath(t *testing.T) { func TestGet_IdentityCanceled(t *testing.T) { ctrl := gomock.NewController(t) atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation")) + activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory()) id := types.RandomATXID() diff --git a/api/grpcserver/admin_service.go b/api/grpcserver/admin_service.go index 6de184ca82..7934fcd99b 100644 --- a/api/grpcserver/admin_service.go +++ b/api/grpcserver/admin_service.go @@ -6,8 +6,10 @@ import ( "fmt" "io" "os" + "time" "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/spf13/afero" "google.golang.org/grpc/codes" @@ -18,7 +20,6 @@ import ( "github.com/spacemeshos/go-spacemesh/checkpoint" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/sql" ) @@ -29,19 +30,25 @@ const ( // AdminService exposes endpoints for node administration. type AdminService struct { - logger log.Logger db *sql.Database dataDir string + recover func() p peers } // NewAdminService creates a new admin grpc service. -func NewAdminService(db *sql.Database, dataDir string, lg log.Logger, p peers) *AdminService { +func NewAdminService(db *sql.Database, dataDir string, p peers) *AdminService { return &AdminService{ - logger: lg, db: db, dataDir: dataDir, - p: p, + recover: func() { + go func() { + // Allow time for the response to be sent. + time.Sleep(time.Second) + os.Exit(0) + }() + }, + p: p, } } @@ -95,8 +102,9 @@ func (a AdminService) CheckpointStream(req *pb.CheckpointStreamRequest, stream p } } -func (a AdminService) Recover(_ context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) { - a.logger.Panic("going to recover from checkpoint") +func (a AdminService) Recover(ctx context.Context, _ *pb.RecoverRequest) (*empty.Empty, error) { + ctxzap.Info(ctx, "going to recover from checkpoint") + a.recover() return &empty.Empty{}, nil } diff --git a/api/grpcserver/admin_service_test.go b/api/grpcserver/admin_service_test.go index c7b3477235..eb8aab8623 100644 --- a/api/grpcserver/admin_service_test.go +++ b/api/grpcserver/admin_service_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "sync/atomic" "testing" "time" @@ -11,7 +12,6 @@ import ( "github.com/stretchr/testify/require" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/accounts" "github.com/spacemeshos/go-spacemesh/sql/atxs" @@ -56,7 +56,7 @@ func createMesh(tb testing.TB, db *sql.Database) { func TestAdminService_Checkpoint(t *testing.T) { db := sql.InMemory() createMesh(t, db) - svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil) + svc := NewAdminService(db, t.TempDir(), nil) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -91,7 +91,7 @@ func TestAdminService_Checkpoint(t *testing.T) { func TestAdminService_CheckpointError(t *testing.T) { db := sql.InMemory() - svc := NewAdminService(db, t.TempDir(), logtest.New(t), nil) + svc := NewAdminService(db, t.TempDir(), nil) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -104,3 +104,21 @@ func TestAdminService_CheckpointError(t *testing.T) { _, err = stream.Recv() require.ErrorContains(t, err, sql.ErrNotFound.Error()) } + +func TestAdminService_Recovery(t *testing.T) { + db := sql.InMemory() + recoveryCalled := atomic.Bool{} + svc := NewAdminService(db, t.TempDir(), nil) + svc.recover = func() { recoveryCalled.Store(true) } + + t.Cleanup(launchServer(t, cfg, svc)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn := dialGrpc(ctx, t, cfg.PublicListener) + c := pb.NewAdminServiceClient(conn) + + _, err := c.Recover(ctx, &pb.RecoverRequest{}) + require.NoError(t, err) + require.True(t, recoveryCalled.Load()) +} diff --git a/api/grpcserver/debug_service.go b/api/grpcserver/debug_service.go index d7a3276945..964ee36e51 100644 --- a/api/grpcserver/debug_service.go +++ b/api/grpcserver/debug_service.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -13,7 +15,6 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/accounts" ) @@ -21,7 +22,6 @@ import ( // DebugService exposes global state data, output from the STF. type DebugService struct { db *sql.Database - logger log.Logger conState conservativeState identity networkIdentity oracle oracle @@ -33,10 +33,9 @@ func (d DebugService) RegisterService(server *Server) { } // NewDebugService creates a new grpc service using config data. -func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle, lg log.Logger) *DebugService { +func NewDebugService(db *sql.Database, conState conservativeState, host networkIdentity, oracle oracle) *DebugService { return &DebugService{ db: db, - logger: lg, conState: conState, identity: host, oracle: oracle, @@ -44,9 +43,7 @@ func NewDebugService(db *sql.Database, conState conservativeState, host networkI } // Accounts returns current counter and balance for all accounts. -func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) { - d.logger.Info("GRPC DebugServices.Accounts") - +func (d DebugService) Accounts(ctx context.Context, in *pb.AccountsRequest) (*pb.AccountsResponse, error) { var ( accts []*types.Account err error @@ -57,7 +54,7 @@ func (d DebugService) Accounts(_ context.Context, in *pb.AccountsRequest) (*pb.A accts, err = accounts.Snapshot(d.db, types.LayerID(in.Layer)) } if err != nil { - d.logger.Error("Failed to get all accounts from state: %s", err) + ctxzap.Error(ctx, " Failed to get all accounts from state", zap.Error(err)) return nil, status.Errorf(codes.Internal, "error fetching accounts state") } diff --git a/api/grpcserver/globalstate_service.go b/api/grpcserver/globalstate_service.go index c311aa42fd..12602b58e2 100644 --- a/api/grpcserver/globalstate_service.go +++ b/api/grpcserver/globalstate_service.go @@ -4,19 +4,19 @@ import ( "context" "fmt" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" ) // GlobalStateService exposes global state data, output from the STF. type GlobalStateService struct { - logger log.Logger mesh meshAPI conState conservativeState } @@ -27,9 +27,8 @@ func (s GlobalStateService) RegisterService(server *Server) { } // NewGlobalStateService creates a new grpc service using config data. -func NewGlobalStateService(msh meshAPI, conState conservativeState, lg log.Logger) *GlobalStateService { +func NewGlobalStateService(msh meshAPI, conState conservativeState) *GlobalStateService { return &GlobalStateService{ - logger: lg, mesh: msh, conState: conState, } @@ -37,8 +36,6 @@ func NewGlobalStateService(msh meshAPI, conState conservativeState, lg log.Logge // GlobalStateHash returns the latest layer and its computed global state hash. func (s GlobalStateService) GlobalStateHash(context.Context, *pb.GlobalStateHashRequest) (*pb.GlobalStateHashResponse, error) { - s.logger.Info("GRPC GlobalStateService.GlobalStateHash") - root, err := s.conState.GetStateRoot() if err != nil { return nil, err @@ -73,9 +70,7 @@ func (s GlobalStateService) getAccount(addr types.Address) (acct *pb.Account, er } // Account returns current and projected counter and balance for one account. -func (s GlobalStateService) Account(_ context.Context, in *pb.AccountRequest) (*pb.AccountResponse, error) { - s.logger.Info("GRPC GlobalStateService.Account") - +func (s GlobalStateService) Account(ctx context.Context, in *pb.AccountRequest) (*pb.AccountResponse, error) { if in.AccountId == nil { return nil, status.Errorf(codes.InvalidArgument, "`AccountId` must be provided") } @@ -87,25 +82,23 @@ func (s GlobalStateService) Account(_ context.Context, in *pb.AccountRequest) (* } acct, err := s.getAccount(addr) if err != nil { - s.logger.With().Error("unable to fetch projected account state", log.Err(err)) + ctxzap.Error(ctx, "unable to fetch projected account state", zap.Error(err)) return nil, status.Errorf(codes.Internal, "error fetching projected account data") } - s.logger.With().Debug("GRPC GlobalStateService.Account", - addr, - log.Uint64("balance", acct.StateCurrent.Balance.Value), - log.Uint64("counter", acct.StateCurrent.Counter), - log.Uint64("balance projected", acct.StateProjected.Balance.Value), - log.Uint64("counter projected", acct.StateProjected.Counter), + ctxzap.Debug(ctx, "GRPC GlobalStateService.Account", + addr.Field().Zap(), + zap.Uint64("balance", acct.StateCurrent.Balance.Value), + zap.Uint64("counter", acct.StateCurrent.Counter), + zap.Uint64("balance projected", acct.StateProjected.Balance.Value), + zap.Uint64("counter projected", acct.StateProjected.Counter), ) return &pb.AccountResponse{AccountWrapper: acct}, nil } // AccountDataQuery returns historical account data such as rewards and receipts. -func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDataQueryRequest) (*pb.AccountDataQueryResponse, error) { - s.logger.Info("GRPC GlobalStateService.AccountDataQuery") - +func (s GlobalStateService) AccountDataQuery(ctx context.Context, in *pb.AccountDataQueryRequest) (*pb.AccountDataQueryResponse, error) { if in.Filter == nil { return nil, status.Errorf(codes.InvalidArgument, "`Filter` must be provided") } @@ -155,7 +148,7 @@ func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDa if filterAccount { acct, err := s.getAccount(addr) if err != nil { - s.logger.With().Error("unable to fetch projected account state", log.Err(err)) + ctxzap.Error(ctx, "unable to fetch projected account state", zap.Error(err)) return nil, status.Errorf(codes.Internal, "error fetching projected account data") } res.AccountItem = append(res.AccountItem, &pb.AccountData{Datum: &pb.AccountData_AccountWrapper{ @@ -194,7 +187,6 @@ func (s GlobalStateService) AccountDataQuery(_ context.Context, in *pb.AccountDa // SmesherDataQuery returns historical info on smesher rewards. func (s GlobalStateService) SmesherDataQuery(_ context.Context, in *pb.SmesherDataQueryRequest) (*pb.SmesherDataQueryResponse, error) { - s.logger.Info("DEPRECATED GRPC GlobalStateService.SmesherDataQuery") return nil, status.Errorf(codes.Unimplemented, "DEPRECATED") } @@ -202,8 +194,6 @@ func (s GlobalStateService) SmesherDataQuery(_ context.Context, in *pb.SmesherDa // AccountDataStream exposes a stream of account-related data. func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, stream pb.GlobalStateService_AccountDataStreamServer) error { - s.logger.Info("GRPC GlobalStateService.AccountDataStream") - if in.Filter == nil { return status.Errorf(codes.InvalidArgument, "`Filter` must be provided") } @@ -246,10 +236,10 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s for { select { case <-accountBufFull: - s.logger.Info("account buffer is full, shutting down") + ctxzap.Info(stream.Context(), "account buffer is full, shutting down") return status.Error(codes.Canceled, errAccountBufferFull) case <-rewardsBufFull: - s.logger.Info("rewards buffer is full, shutting down") + ctxzap.Info(stream.Context(), "rewards buffer is full, shutting down") return status.Error(codes.Canceled, errRewardsBufferFull) case updatedAccountEvent := <-accountCh: // Apply address filter @@ -259,7 +249,7 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s // nonce. acct, err := s.getAccount(addr) if err != nil { - s.logger.With().Error("unable to fetch projected account state", log.Err(err)) + ctxzap.Error(stream.Context(), "unable to fetch projected account state", zap.Error(err)) return status.Errorf(codes.Internal, "error fetching projected account data") } resp := &pb.AccountDataStreamResponse{Datum: &pb.AccountData{Datum: &pb.AccountData_AccountWrapper{ @@ -310,7 +300,7 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s } case <-stream.Context().Done(): - s.logger.Info("AccountDataStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "AccountDataStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate @@ -321,14 +311,11 @@ func (s GlobalStateService) AccountDataStream(in *pb.AccountDataStreamRequest, s // SmesherRewardStream exposes a stream of smesher rewards. func (s GlobalStateService) SmesherRewardStream(in *pb.SmesherRewardStreamRequest, stream pb.GlobalStateService_SmesherRewardStreamServer) error { - s.logger.Info("DEPRECATED GRPC GlobalStateService.SmesherRewardStream") return status.Errorf(codes.Unimplemented, "DEPRECATED") } // AppEventStream exposes a stream of emitted app events. func (s GlobalStateService) AppEventStream(*pb.AppEventStreamRequest, pb.GlobalStateService_AppEventStreamServer) error { - s.logger.Info("GRPC GlobalStateService.AppEventStream") - // TODO: implement me! We don't currently have any app events // See https://github.com/spacemeshos/go-spacemesh/issues/2074 @@ -337,8 +324,6 @@ func (s GlobalStateService) AppEventStream(*pb.AppEventStreamRequest, pb.GlobalS // GlobalStateStream exposes a stream of global data data items: rewards, receipts, account info, global state hash. func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, stream pb.GlobalStateService_GlobalStateStreamServer) error { - s.logger.Info("GRPC GlobalStateService.GlobalStateStream") - if in.GlobalStateDataFlags == uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_UNSPECIFIED) { return status.Errorf(codes.InvalidArgument, "`GlobalStateDataFlags` must set at least one bitfield") } @@ -378,13 +363,13 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s for { select { case <-accountBufFull: - s.logger.Info("account buffer is full, shutting down") + ctxzap.Info(stream.Context(), "account buffer is full, shutting down") return status.Error(codes.Canceled, errAccountBufferFull) case <-rewardsBufFull: - s.logger.Info("rewards buffer is full, shutting down") + ctxzap.Info(stream.Context(), "rewards buffer is full, shutting down") return status.Error(codes.Canceled, errRewardsBufferFull) case <-layersBufFull: - s.logger.Info("layers buffer is full, shutting down") + ctxzap.Info(stream.Context(), "layers buffer is full, shutting down") return status.Error(codes.Canceled, errLayerBufferFull) case updatedAccount := <-accountCh: // The Reporter service just sends us the account address. We are responsible @@ -392,7 +377,7 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s // nonce. acct, err := s.getAccount(updatedAccount.Address) if err != nil { - s.logger.With().Error("unable to fetch projected account state", log.Err(err)) + ctxzap.Error(stream.Context(), "unable to fetch projected account state", zap.Error(err)) return status.Errorf(codes.Internal, "error fetching projected account data") } resp := &pb.GlobalStateStreamResponse{Datum: &pb.GlobalStateData{Datum: &pb.GlobalStateData_AccountWrapper{ @@ -422,7 +407,7 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s } root, err := s.conState.GetLayerStateRoot(layer.LayerID) if err != nil { - s.logger.With().Warning("error retrieving layer data", log.Err(err)) + ctxzap.Warn(stream.Context(), "error retrieving layer data", zap.Error(err)) root = types.Hash32{} } resp := &pb.GlobalStateStreamResponse{Datum: &pb.GlobalStateData{Datum: &pb.GlobalStateData_GlobalState{ @@ -435,7 +420,7 @@ func (s GlobalStateService) GlobalStateStream(in *pb.GlobalStateStreamRequest, s return fmt.Errorf("send to stream: %w", err) } case <-stream.Context().Done(): - s.logger.Info("AccountDataStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "AccountDataStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/grpcserver_test.go index 7eff544c6c..eecf8054fc 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/grpcserver_test.go @@ -555,7 +555,7 @@ func TestNodeService(t *testing.T) { version := "v0.0.0" build := "cafebabe" - grpcService := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build, logtest.New(t).WithName("grpc.Node")) + grpcService := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, version, build) t.Cleanup(launchServer(t, cfg, grpcService)) conn := dialGrpc(ctx, t, cfg.PublicListener) @@ -642,7 +642,7 @@ func TestNodeService(t *testing.T) { } func TestGlobalStateService(t *testing.T) { - svc := NewGlobalStateService(meshAPIMock, conStateAPI, logtest.New(t).WithName("grpc.GlobalState")) + svc := NewGlobalStateService(meshAPIMock, conStateAPI) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -916,7 +916,7 @@ func TestSmesherService(t *testing.T) { postProvider.EXPECT().Status().Return(&activation.PostSetupStatus{}).AnyTimes() postProvider.EXPECT().Providers().Return(nil, nil).AnyTimes() smeshingAPI := &SmeshingAPIMock{} - svc := NewSmesherService(postProvider, smeshingAPI, 10*time.Millisecond, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := NewSmesherService(postProvider, smeshingAPI, 10*time.Millisecond, activation.DefaultPostSetupOpts()) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1043,7 +1043,7 @@ func TestMeshService(t *testing.T) { genesis := time.Unix(genTimeUnix, 0) genTime.EXPECT().GenesisTime().Return(genesis) genTime.EXPECT().CurrentLayer().Return(layerCurrent).AnyTimes() - grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1553,7 +1553,7 @@ func TestTransactionServiceSubmitUnsync(t *testing.T) { txHandler := NewMocktxValidator(ctrl) txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil) - grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler, logtest.New(t).WithName("grpc.Transactions")) + grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -1591,7 +1591,7 @@ func TestTransactionServiceSubmitInvalidTx(t *testing.T) { txHandler := NewMocktxValidator(ctrl) txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(errors.New("failed validation")) - grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler, logtest.New(t).WithName("grpc.Transactions")) + grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -1623,7 +1623,7 @@ func TestTransactionService_SubmitNoConcurrency(t *testing.T) { txHandler := NewMocktxValidator(ctrl) txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil).Times(numTxs) - grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler, logtest.New(t).WithName("grpc.Transactions")) + grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -1650,7 +1650,7 @@ func TestTransactionService(t *testing.T) { txHandler := NewMocktxValidator(ctrl) txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler, logtest.New(t).WithName("grpc.Transactions")) + grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPIMock, conStateAPI, syncer, txHandler) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -1989,7 +1989,7 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) { ctrl := gomock.NewController(t) genTime := NewMockgenesisTimeAPI(ctrl) - grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -2044,7 +2044,7 @@ func TestAccountDataStream_comprehensive(t *testing.T) { events.InitializeReporter() t.Cleanup(events.CloseEventReporter) - svc := NewGlobalStateService(meshAPIMock, conStateAPI, logtest.New(t).WithName("grpc.GlobalState")) + svc := NewGlobalStateService(meshAPIMock, conStateAPI) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -2102,7 +2102,7 @@ func TestGlobalStateStream_comprehensive(t *testing.T) { events.InitializeReporter() t.Cleanup(events.CloseEventReporter) - svc := NewGlobalStateService(meshAPIMock, conStateAPI, logtest.New(t).WithName("grpc.GlobalState")) + svc := NewGlobalStateService(meshAPIMock, conStateAPI) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -2163,7 +2163,7 @@ func TestLayerStream_comprehensive(t *testing.T) { ctrl := gomock.NewController(t) genTime := NewMockgenesisTimeAPI(ctrl) - grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + grpcService := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, grpcService)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -2304,8 +2304,8 @@ func TestMultiService(t *testing.T) { genTime := NewMockgenesisTimeAPI(ctrl) genesis := time.Unix(genTimeUnix, 0) genTime.EXPECT().GenesisTime().Return(genesis) - svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node")) - svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe") + svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) shutDown := launchServer(t, cfg, svc1, svc2) t.Cleanup(shutDown) @@ -2359,8 +2359,8 @@ func TestJsonApi(t *testing.T) { genTime := NewMockgenesisTimeAPI(ctrl) genesis := time.Unix(genTimeUnix, 0) genTime.EXPECT().GenesisTime().Return(genesis) - svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe", logtest.New(t).WithName("grpc.Node")) - svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + svc1 := NewNodeService(peerCounter, meshAPIMock, genTime, syncer, "v0.0.0", "cafebabe") + svc2 := NewMeshService(datastore.NewCachedDB(sql.InMemory(), logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, svc1, svc2)) time.Sleep(time.Second) @@ -2385,7 +2385,7 @@ func TestDebugService(t *testing.T) { identity := NewMocknetworkIdentity(ctrl) mOracle := NewMockoracle(ctrl) db := sql.InMemory() - svc := NewDebugService(db, conStateAPI, identity, mOracle, logtest.New(t).WithName("grpc.Debug")) + svc := NewDebugService(db, conStateAPI, identity, mOracle) t.Cleanup(launchServer(t, cfg, svc)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -2487,8 +2487,8 @@ func TestEventsReceived(t *testing.T) { events.InitializeReporter() t.Cleanup(events.CloseEventReporter) - txService := NewTransactionService(sql.InMemory(), nil, meshAPIMock, conStateAPI, nil, nil, logtest.New(t).WithName("grpc.Transactions")) - gsService := NewGlobalStateService(meshAPIMock, conStateAPI, logtest.New(t).WithName("grpc.GlobalState")) + txService := NewTransactionService(sql.InMemory(), nil, meshAPIMock, conStateAPI, nil, nil) + gsService := NewGlobalStateService(meshAPIMock, conStateAPI) t.Cleanup(launchServer(t, cfg, txService, gsService)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -2566,7 +2566,7 @@ func TestTransactionsRewards(t *testing.T) { events.InitializeReporter() t.Cleanup(events.CloseEventReporter) - t.Cleanup(launchServer(t, cfg, NewGlobalStateService(meshAPIMock, conStateAPI, logtest.New(t).WithName("grpc.GlobalState")))) + t.Cleanup(launchServer(t, cfg, NewGlobalStateService(meshAPIMock, conStateAPI))) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) @@ -2632,7 +2632,7 @@ func TestVMAccountUpdates(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { db.Close() }) svm := vm.New(db, vm.WithLogger(logtest.New(t))) - t.Cleanup(launchServer(t, cfg, NewGlobalStateService(nil, txs.NewConservativeState(svm, db), logtest.New(t).WithName("grpc.GlobalState")))) + t.Cleanup(launchServer(t, cfg, NewGlobalStateService(nil, txs.NewConservativeState(svm, db)))) keys := make([]*signing.EdSigner, 10) accounts := make([]types.Account, len(keys)) @@ -2728,7 +2728,7 @@ func TestMeshService_EpochStream(t *testing.T) { ctrl := gomock.NewController(t) genTime := NewMockgenesisTimeAPI(ctrl) db := sql.InMemory() - srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, srv)) epoch := types.EpochID(3) diff --git a/api/grpcserver/mesh_service.go b/api/grpcserver/mesh_service.go index 64eb129f97..1c0bec36d2 100644 --- a/api/grpcserver/mesh_service.go +++ b/api/grpcserver/mesh_service.go @@ -7,7 +7,9 @@ import ( "fmt" "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -15,13 +17,11 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/sql" ) // MeshService exposes mesh data such as accounts, blocks, and transactions. type MeshService struct { - logger log.Logger cdb *datastore.CachedDB mesh meshAPI // Mesh conState conservativeState @@ -49,10 +49,8 @@ func NewMeshService( layerDuration time.Duration, layerAvgSize, txsPerProposal uint32, - lg log.Logger, ) *MeshService { return &MeshService{ - logger: lg, cdb: cdb, mesh: msh, conState: cstate, @@ -67,7 +65,6 @@ func NewMeshService( // GenesisTime returns the network genesis time as UNIX time. func (s MeshService) GenesisTime(context.Context, *pb.GenesisTimeRequest) (*pb.GenesisTimeResponse, error) { - s.logger.Info("GRPC MeshService.GenesisTime") return &pb.GenesisTimeResponse{Unixtime: &pb.SimpleInt{ Value: uint64(s.genTime.GenesisTime().Unix()), }}, nil @@ -75,7 +72,6 @@ func (s MeshService) GenesisTime(context.Context, *pb.GenesisTimeRequest) (*pb.G // CurrentLayer returns the current layer number. func (s MeshService) CurrentLayer(context.Context, *pb.CurrentLayerRequest) (*pb.CurrentLayerResponse, error) { - s.logger.Info("GRPC MeshService.CurrentLayer") return &pb.CurrentLayerResponse{Layernum: &pb.LayerNumber{ Number: s.genTime.CurrentLayer().Uint32(), }}, nil @@ -83,7 +79,6 @@ func (s MeshService) CurrentLayer(context.Context, *pb.CurrentLayerRequest) (*pb // CurrentEpoch returns the current epoch number. func (s MeshService) CurrentEpoch(context.Context, *pb.CurrentEpochRequest) (*pb.CurrentEpochResponse, error) { - s.logger.Info("GRPC MeshService.CurrentEpoch") curLayer := s.genTime.CurrentLayer() return &pb.CurrentEpochResponse{Epochnum: &pb.EpochNumber{ Number: curLayer.GetEpoch().Uint32(), @@ -92,13 +87,11 @@ func (s MeshService) CurrentEpoch(context.Context, *pb.CurrentEpochRequest) (*pb // GenesisID returns the network ID. func (s MeshService) GenesisID(context.Context, *pb.GenesisIDRequest) (*pb.GenesisIDResponse, error) { - s.logger.Info("GRPC MeshService.NetId") return &pb.GenesisIDResponse{GenesisId: s.genesisID.Bytes()}, nil } // EpochNumLayers returns the number of layers per epoch (a network parameter). func (s MeshService) EpochNumLayers(context.Context, *pb.EpochNumLayersRequest) (*pb.EpochNumLayersResponse, error) { - s.logger.Info("GRPC MeshService.EpochNumLayers") return &pb.EpochNumLayersResponse{Numlayers: &pb.LayerNumber{ Number: s.layersPerEpoch, }}, nil @@ -106,7 +99,6 @@ func (s MeshService) EpochNumLayers(context.Context, *pb.EpochNumLayersRequest) // LayerDuration returns the layer duration in seconds (a network parameter). func (s MeshService) LayerDuration(context.Context, *pb.LayerDurationRequest) (*pb.LayerDurationResponse, error) { - s.logger.Info("GRPC MeshService.LayerDuration") return &pb.LayerDurationResponse{Duration: &pb.SimpleInt{ Value: uint64(s.layerDuration.Seconds()), }}, nil @@ -114,7 +106,6 @@ func (s MeshService) LayerDuration(context.Context, *pb.LayerDurationRequest) (* // MaxTransactionsPerSecond returns the max number of tx per sec (a network parameter). func (s MeshService) MaxTransactionsPerSecond(context.Context, *pb.MaxTransactionsPerSecondRequest) (*pb.MaxTransactionsPerSecondResponse, error) { - s.logger.Info("GRPC MeshService.MaxTransactionsPerSecond") return &pb.MaxTransactionsPerSecondResponse{MaxTxsPerSecond: &pb.SimpleInt{ Value: uint64(s.txsPerProposal * s.layerAvgSize / uint32(s.layerDuration.Seconds())), }}, nil @@ -151,7 +142,7 @@ func (s MeshService) getFilteredActivations(ctx context.Context, startLayer type // Look up full data atxs, matxs := s.mesh.GetATXs(ctx, atxids) if len(matxs) != 0 { - s.logger.Error("could not find activations %v", matxs) + ctxzap.Error(ctx, "could not find activations", zap.Array("matxs", types.ATXIDs(matxs))) return nil, status.Errorf(codes.Internal, "error retrieving activations data") } for _, atx := range atxs { @@ -160,13 +151,11 @@ func (s MeshService) getFilteredActivations(ctx context.Context, startLayer type activations = append(activations, atx) } } - return + return activations, nil } // AccountMeshDataQuery returns account data. func (s MeshService) AccountMeshDataQuery(ctx context.Context, in *pb.AccountMeshDataQueryRequest) (*pb.AccountMeshDataQueryResponse, error) { - s.logger.Info("GRPC MeshService.AccountMeshDataQuery") - var startLayer types.LayerID if in.MinLayer != nil { startLayer = types.LayerID(in.MinLayer.Number) @@ -318,7 +307,7 @@ func (s MeshService) readLayer(ctx context.Context, layerID types.LayerID, layer // internal or an input error? For now, all missing layers produce // internal errors. if err != nil { - s.logger.With().Error("could not read layer from database", layerID, log.Err(err)) + ctxzap.Error(ctx, "could not read layer from database", layerID.Field().Zap(), zap.Error(err)) return nil, status.Errorf(codes.Internal, "error reading layer data") } @@ -329,8 +318,8 @@ func (s MeshService) readLayer(ctx context.Context, layerID types.LayerID, layer // TODO: Do we ever expect txs to be missing here? // E.g., if this node has not synced/received them yet. if len(missing) != 0 { - s.logger.With().Error("could not find transactions from layer", - log.String("missing", fmt.Sprint(missing)), layer.Index()) + ctxzap.Error(ctx, "could not find transactions from layer", + zap.String("missing", fmt.Sprint(missing)), layer.Index().Field().Zap()) return nil, status.Errorf(codes.Internal, "error retrieving tx data") } @@ -356,8 +345,12 @@ func (s MeshService) readLayer(ctx context.Context, layerID types.LayerID, layer // Add unique ATXIDs atxs, matxs := s.mesh.GetATXs(ctx, activations) if len(matxs) != 0 { - s.logger.With().Error("could not find activations from layer", - log.String("missing", fmt.Sprint(matxs)), layer.Index()) + ctxzap.Error( + ctx, + "could not find activations from layer", + zap.Array("missing", types.ATXIDs(matxs)), + layer.Index().Field().Zap(), + ) return nil, status.Errorf(codes.Internal, "error retrieving activations data") } for _, atx := range atxs { @@ -368,15 +361,15 @@ func (s MeshService) readLayer(ctx context.Context, layerID types.LayerID, layer if err != nil { // This is expected. We can only retrieve state root for a layer that was applied to state, // which only happens after it's approved/confirmed. - s.logger.With().Debug("no state root for layer", - layer, log.String("status", layerStatus.String()), log.Err(err)) + ctxzap.Debug(ctx, "no state root for layer", + layer.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err)) } hash, err := s.mesh.MeshHash(layerID) if err != nil { // This is expected. We can only retrieve state root for a layer that was applied to state, // which only happens after it's approved/confirmed. - s.logger.With().Debug("no mesh hash at layer", - layer, log.String("status", layerStatus.String()), log.Err(err)) + ctxzap.Debug(ctx, "no mesh hash at layer", + layer.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err)) } return &pb.Layer{ Number: &pb.LayerNumber{Number: layer.Index().Uint32()}, @@ -390,8 +383,6 @@ func (s MeshService) readLayer(ctx context.Context, layerID types.LayerID, layer // LayersQuery returns all mesh data, layer by layer. func (s MeshService) LayersQuery(ctx context.Context, in *pb.LayersQueryRequest) (*pb.LayersQueryResponse, error) { - s.logger.Info("GRPC MeshService.LayersQuery") - var startLayer, endLayer types.LayerID if in.StartLayer != nil { startLayer = types.LayerID(in.StartLayer.Number) @@ -426,7 +417,7 @@ func (s MeshService) LayersQuery(ctx context.Context, in *pb.LayersQueryRequest) // internal or an input error? For now, all missing layers produce // internal errors. if layer == nil || err != nil { - s.logger.With().Error("error retrieving layer data", log.Err(err)) + ctxzap.Error(ctx, "error retrieving layer data", zap.Error(err)) return nil, status.Errorf(codes.Internal, "error retrieving layer data") } @@ -443,8 +434,6 @@ func (s MeshService) LayersQuery(ctx context.Context, in *pb.LayersQueryRequest) // AccountMeshDataStream exposes a stream of transactions and activations for an account. func (s MeshService) AccountMeshDataStream(in *pb.AccountMeshDataStreamRequest, stream pb.MeshService_AccountMeshDataStreamServer) error { - s.logger.Info("GRPC MeshService.AccountMeshDataStream") - if in.Filter == nil { return status.Errorf(codes.InvalidArgument, "`Filter` must be provided") } @@ -484,10 +473,10 @@ func (s MeshService) AccountMeshDataStream(in *pb.AccountMeshDataStreamRequest, for { select { case <-txBufFull: - s.logger.Info("tx buffer is full, shutting down") + ctxzap.Info(stream.Context(), "tx buffer is full, shutting down") return status.Error(codes.Canceled, errTxBufferFull) case <-activationsBufFull: - s.logger.Info("activations buffer is full, shutting down") + ctxzap.Info(stream.Context(), "activations buffer is full, shutting down") return status.Error(codes.Canceled, errActivationsBufferFull) case activationEvent := <-activationsCh: activation := activationEvent.VerifiedActivationTx @@ -522,7 +511,7 @@ func (s MeshService) AccountMeshDataStream(in *pb.AccountMeshDataStreamRequest, } } case <-stream.Context().Done(): - s.logger.Info("AccountMeshDataStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "AccountMeshDataStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate @@ -532,8 +521,6 @@ func (s MeshService) AccountMeshDataStream(in *pb.AccountMeshDataStreamRequest, // LayerStream exposes a stream of all mesh data per layer. func (s MeshService) LayerStream(_ *pb.LayerStreamRequest, stream pb.MeshService_LayerStreamServer) error { - s.logger.Info("GRPC MeshService.LayerStream") - var ( layerCh <-chan events.LayerUpdate layersBufFull <-chan struct{} @@ -546,11 +533,11 @@ func (s MeshService) LayerStream(_ *pb.LayerStreamRequest, stream pb.MeshService for { select { case <-layersBufFull: - s.logger.Info("layer buffer is full, shutting down") + ctxzap.Info(stream.Context(), "layer buffer is full, shutting down") return status.Error(codes.Canceled, errAccountBufferFull) case layer, ok := <-layerCh: if !ok { - s.logger.Info("LayerStream closed, shutting down") + ctxzap.Info(stream.Context(), "LayerStream closed, shutting down") return nil } pbLayer, err := s.readLayer(stream.Context(), layer.LayerID, convertLayerStatus(layer.Status)) @@ -562,7 +549,7 @@ func (s MeshService) LayerStream(_ *pb.LayerStreamRequest, stream pb.MeshService return fmt.Errorf("send to stream: %w", err) } case <-stream.Context().Done(): - s.logger.Info("LayerStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "LayerStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate @@ -607,10 +594,10 @@ func (s MeshService) EpochStream(req *pb.EpochStreamRequest, stream pb.MeshServi }); err != nil { return status.Error(codes.Internal, err.Error()) } - s.logger.With().Info("epoch atxs streamed", - log.Uint32("target epoch", (epoch+1).Uint32()), - log.Int("total", total), - log.Int("malicious", mal), + ctxzap.Info(stream.Context(), "epoch atxs streamed", + zap.Uint32("target epoch", (epoch+1).Uint32()), + zap.Int("total", total), + zap.Int("malicious", mal), ) return nil } diff --git a/api/grpcserver/mesh_service_test.go b/api/grpcserver/mesh_service_test.go index b3003510cc..6834c1e8d5 100644 --- a/api/grpcserver/mesh_service_test.go +++ b/api/grpcserver/mesh_service_test.go @@ -141,7 +141,7 @@ func TestMeshService_MalfeasanceQuery(t *testing.T) { ctrl := gomock.NewController(t) genTime := NewMockgenesisTimeAPI(ctrl) db := sql.InMemory() - srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, srv)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -183,7 +183,7 @@ func TestMeshService_MalfeasanceStream(t *testing.T) { ctrl := gomock.NewController(t) genTime := NewMockgenesisTimeAPI(ctrl) db := sql.InMemory() - srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal, logtest.New(t).WithName("grpc.Mesh")) + srv := NewMeshService(datastore.NewCachedDB(db, logtest.New(t)), meshAPIMock, conStateAPI, genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal) t.Cleanup(launchServer(t, cfg, srv)) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/api/grpcserver/node_service.go b/api/grpcserver/node_service.go index cb10f881fd..34cf45bf35 100644 --- a/api/grpcserver/node_service.go +++ b/api/grpcserver/node_service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" @@ -13,14 +14,12 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" - "github.com/spacemeshos/go-spacemesh/log" ) // NodeService is a grpc server that provides the NodeService, which exposes node-related // data such as node status, software version, errors, etc. It can also be used to start // the sync process, or to shut down the node. type NodeService struct { - logger log.Logger mesh meshAPI genTime genesisTimeAPI peerCounter peerCounter @@ -42,10 +41,8 @@ func NewNodeService( syncer syncer, appVersion string, appCommit string, - lg log.Logger, ) *NodeService { return &NodeService{ - logger: lg, mesh: msh, genTime: genTime, peerCounter: peers, @@ -57,7 +54,6 @@ func NewNodeService( // Echo returns the response for an echo api request. It's used for E2E tests. func (s NodeService) Echo(_ context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { - s.logger.Info("GRPC NodeService.Echo") if in.Msg != nil { return &pb.EchoResponse{Msg: &pb.SimpleString{Value: in.Msg.Value}}, nil } @@ -66,7 +62,6 @@ func (s NodeService) Echo(_ context.Context, in *pb.EchoRequest) (*pb.EchoRespon // Version returns the version of the node software as a semver string. func (s NodeService) Version(context.Context, *empty.Empty) (*pb.VersionResponse, error) { - s.logger.Info("GRPC NodeService.Version") return &pb.VersionResponse{ VersionString: &pb.SimpleString{Value: s.appVersion}, }, nil @@ -74,7 +69,6 @@ func (s NodeService) Version(context.Context, *empty.Empty) (*pb.VersionResponse // Build returns the build of the node software. func (s NodeService) Build(context.Context, *empty.Empty) (*pb.BuildResponse, error) { - s.logger.Info("GRPC NodeService.Build") return &pb.BuildResponse{ BuildString: &pb.SimpleString{Value: s.appCommit}, }, nil @@ -83,8 +77,6 @@ func (s NodeService) Build(context.Context, *empty.Empty) (*pb.BuildResponse, er // Status returns a status object providing information about the connected peers, sync status, // current and verified layer. func (s NodeService) Status(ctx context.Context, _ *pb.StatusRequest) (*pb.StatusResponse, error) { - s.logger.Info("GRPC NodeService.Status") - curLayer, latestLayer, verifiedLayer := s.getLayers() return &pb.StatusResponse{ Status: &pb.NodeStatus{ @@ -125,8 +117,6 @@ func (s NodeService) getLayers() (curLayer, latestLayer, verifiedLayer uint32) { // StatusStream exposes a stream of node status updates. func (s NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeService_StatusStreamServer) error { - s.logger.Info("GRPC NodeService.StatusStream") - var ( statusCh <-chan events.Status statusBufFull <-chan struct{} @@ -139,13 +129,13 @@ func (s NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeServi for { select { case <-statusBufFull: - s.logger.Info("status buffer is full, shutting down") + ctxzap.Info(stream.Context(), "status buffer is full, shutting down") return status.Error(codes.Canceled, errStatusBufferFull) case _, ok := <-statusCh: // statusCh works a bit differently than the other streams. It doesn't actually // send us data. Instead, it just notifies us that there's new data to be read. if !ok { - s.logger.Info("StatusStream closed, shutting down") + ctxzap.Info(stream.Context(), "StatusStream closed, shutting down") return nil } curLayer, latestLayer, verifiedLayer := s.getLayers() @@ -164,7 +154,7 @@ func (s NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeServi return fmt.Errorf("send to stream: %w", err) } case <-stream.Context().Done(): - s.logger.Info("StatusStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "StatusStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate @@ -174,8 +164,6 @@ func (s NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeServi // ErrorStream exposes a stream of node errors. func (s NodeService) ErrorStream(_ *pb.ErrorStreamRequest, stream pb.NodeService_ErrorStreamServer) error { - s.logger.Info("GRPC NodeService.ErrorStream") - var ( errorsCh <-chan events.NodeError errorsBufFull <-chan struct{} @@ -191,11 +179,11 @@ func (s NodeService) ErrorStream(_ *pb.ErrorStreamRequest, stream pb.NodeService for { select { case <-errorsBufFull: - s.logger.Info("errors buffer is full, shutting down") + ctxzap.Info(stream.Context(), "errors buffer is full, shutting down") return status.Error(codes.Canceled, errErrorsBufferFull) case nodeError, ok := <-errorsCh: if !ok { - s.logger.Info("ErrorStream closed, shutting down") + ctxzap.Info(stream.Context(), "ErrorStream closed, shutting down") return nil } @@ -209,7 +197,7 @@ func (s NodeService) ErrorStream(_ *pb.ErrorStreamRequest, stream pb.NodeService return fmt.Errorf("send to stream: %w", err) } case <-stream.Context().Done(): - s.logger.Info("ErrorStream closing stream, client disconnected") + ctxzap.Info(stream.Context(), "ErrorStream closing stream, client disconnected") return nil } // TODO: do we need an additional case here for a context to indicate diff --git a/api/grpcserver/smesher_service.go b/api/grpcserver/smesher_service.go index d1a5ad1a71..9425a7e90d 100644 --- a/api/grpcserver/smesher_service.go +++ b/api/grpcserver/smesher_service.go @@ -7,8 +7,10 @@ import ( "time" "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" "github.com/spacemeshos/post/config" + "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/code" rpcstatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -16,13 +18,10 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/log" ) // SmesherService exposes endpoints to manage smeshing. type SmesherService struct { - logger log.Logger - postSetupProvider postSetupProvider smeshingProvider activation.SmeshingProvider @@ -36,9 +35,8 @@ func (s SmesherService) RegisterService(server *Server) { } // NewSmesherService creates a new grpc service using config data. -func NewSmesherService(post postSetupProvider, smeshing activation.SmeshingProvider, streamInterval time.Duration, postOpts activation.PostSetupOpts, lg log.Logger) *SmesherService { +func NewSmesherService(post postSetupProvider, smeshing activation.SmeshingProvider, streamInterval time.Duration, postOpts activation.PostSetupOpts) *SmesherService { return &SmesherService{ - logger: lg, postSetupProvider: post, smeshingProvider: smeshing, streamInterval: streamInterval, @@ -48,14 +46,11 @@ func NewSmesherService(post postSetupProvider, smeshing activation.SmeshingProvi // IsSmeshing reports whether the node is smeshing. func (s SmesherService) IsSmeshing(context.Context, *empty.Empty) (*pb.IsSmeshingResponse, error) { - s.logger.Info("GRPC SmesherService.IsSmeshing") - return &pb.IsSmeshingResponse{IsSmeshing: s.smeshingProvider.Smeshing()}, nil } // StartSmeshing requests that the node begin smeshing. func (s SmesherService) StartSmeshing(ctx context.Context, in *pb.StartSmeshingRequest) (*pb.StartSmeshingResponse, error) { - s.logger.Info("GRPC SmesherService.StartSmeshing") if in.Coinbase == nil { return nil, status.Errorf(codes.InvalidArgument, "`Coinbase` must be provided") } @@ -93,7 +88,7 @@ func (s SmesherService) StartSmeshing(ctx context.Context, in *pb.StartSmeshingR } if err := s.smeshingProvider.StartSmeshing(coinbaseAddr, opts); err != nil { err := fmt.Sprintf("failed to start smeshing: %v", err) - s.logger.Error(err) + ctxzap.Error(ctx, err) return nil, status.Error(codes.Internal, err) } @@ -104,8 +99,6 @@ func (s SmesherService) StartSmeshing(ctx context.Context, in *pb.StartSmeshingR // StopSmeshing requests that the node stop smeshing. func (s SmesherService) StopSmeshing(ctx context.Context, in *pb.StopSmeshingRequest) (*pb.StopSmeshingResponse, error) { - s.logger.Info("GRPC SmesherService.StopSmeshing") - errchan := make(chan error, 1) go func() { errchan <- s.smeshingProvider.StopSmeshing(in.DeleteFiles) @@ -116,7 +109,7 @@ func (s SmesherService) StopSmeshing(ctx context.Context, in *pb.StopSmeshingReq case err := <-errchan: if err != nil { err := fmt.Sprintf("failed to stop smeshing: %v", err) - s.logger.Error(err) + ctxzap.Error(ctx, err) return nil, status.Error(codes.Internal, err) } } @@ -137,8 +130,6 @@ func (s SmesherService) Coinbase(context.Context, *empty.Empty) (*pb.CoinbaseRes // SetCoinbase sets the current coinbase setting of this node. func (s SmesherService) SetCoinbase(_ context.Context, in *pb.SetCoinbaseRequest) (*pb.SetCoinbaseResponse, error) { - s.logger.Info("GRPC SmesherService.SetCoinbase") - if in.Id == nil { return nil, status.Errorf(codes.InvalidArgument, "`Id` must be provided") } @@ -156,34 +147,27 @@ func (s SmesherService) SetCoinbase(_ context.Context, in *pb.SetCoinbaseRequest // MinGas returns the current mingas setting of this node. func (s SmesherService) MinGas(context.Context, *empty.Empty) (*pb.MinGasResponse, error) { - s.logger.Info("GRPC SmesherService.MinGas") return nil, status.Errorf(codes.Unimplemented, "this endpoint is not implemented") } // SetMinGas sets the mingas setting of this node. func (s SmesherService) SetMinGas(context.Context, *pb.SetMinGasRequest) (*pb.SetMinGasResponse, error) { - s.logger.Info("GRPC SmesherService.SetMinGas") return nil, status.Errorf(codes.Unimplemented, "this endpoint is not implemented") } // EstimatedRewards returns estimated smeshing rewards over the next epoch. func (s SmesherService) EstimatedRewards(context.Context, *pb.EstimatedRewardsRequest) (*pb.EstimatedRewardsResponse, error) { - s.logger.Info("GRPC SmesherService.EstimatedRewards") return nil, status.Errorf(codes.Unimplemented, "this endpoint is not implemented") } // PostSetupStatus returns post data status. -func (s SmesherService) PostSetupStatus(context.Context, *empty.Empty) (*pb.PostSetupStatusResponse, error) { - s.logger.Info("GRPC SmesherService.PostSetupStatus") - +func (s SmesherService) PostSetupStatus(ctx context.Context, _ *empty.Empty) (*pb.PostSetupStatusResponse, error) { status := s.postSetupProvider.Status() return &pb.PostSetupStatusResponse{Status: statusToPbStatus(status)}, nil } // PostSetupStatusStream exposes a stream of status updates during post setup. func (s SmesherService) PostSetupStatusStream(_ *empty.Empty, stream pb.SmesherService_PostSetupStatusStreamServer) error { - s.logger.Info("GRPC SmesherService.PostSetupStatusStream") - timer := time.NewTicker(s.streamInterval) defer timer.Stop() @@ -202,8 +186,6 @@ func (s SmesherService) PostSetupStatusStream(_ *empty.Empty, stream pb.SmesherS // PostSetupComputeProviders returns a list of available Post setup compute providers. func (s SmesherService) PostSetupProviders(ctx context.Context, in *pb.PostSetupProvidersRequest) (*pb.PostSetupProvidersResponse, error) { - s.logger.Info("GRPC SmesherService.PostSetupProviders") - providers, err := s.postSetupProvider.Providers() if err != nil { return nil, status.Errorf(codes.Internal, "failed to get OpenCL providers: %v", err) @@ -217,7 +199,7 @@ func (s SmesherService) PostSetupProviders(ctx context.Context, in *pb.PostSetup var err error hashesPerSec, err = s.postSetupProvider.Benchmark(p) if err != nil { - s.logger.Error("failed to benchmark provider: %v", err) + ctxzap.Error(ctx, "failed to benchmark provider", zap.Error(err)) return nil, status.Error(codes.Internal, "failed to benchmark provider") } } @@ -235,8 +217,6 @@ func (s SmesherService) PostSetupProviders(ctx context.Context, in *pb.PostSetup // PostConfig returns the Post protocol config. func (s SmesherService) PostConfig(context.Context, *empty.Empty) (*pb.PostConfigResponse, error) { - s.logger.Info("GRPC SmesherService.PostConfig") - cfg := s.postSetupProvider.Config() return &pb.PostConfigResponse{ diff --git a/api/grpcserver/smesher_service_test.go b/api/grpcserver/smesher_service_test.go index 2c727e5db1..ce78b0ff8d 100644 --- a/api/grpcserver/smesher_service_test.go +++ b/api/grpcserver/smesher_service_test.go @@ -15,7 +15,6 @@ import ( "github.com/spacemeshos/go-spacemesh/activation" "github.com/spacemeshos/go-spacemesh/api/grpcserver" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/log/logtest" ) func TestPostConfig(t *testing.T) { @@ -23,7 +22,7 @@ func TestPostConfig(t *testing.T) { postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) postConfig := activation.PostConfig{ MinNumUnits: rand.Uint32(), @@ -48,7 +47,7 @@ func TestStartSmeshingPassesCorrectSmeshingOpts(t *testing.T) { ctrl := gomock.NewController(t) postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) types.SetNetworkHRP("stest") addr, err := types.StringToAddress("stest1qqqqqqrs60l66w5uksxzmaznwq6xnhqfv56c28qlkm4a5") @@ -82,7 +81,7 @@ func TestSmesherService_PostSetupProviders(t *testing.T) { ctrl := gomock.NewController(t) postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) providers := []activation.PostSetupProvider{ { @@ -123,7 +122,7 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { ctrl := gomock.NewController(t) postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) postSetupProvider.EXPECT().Status().Return(&activation.PostSetupStatus{ State: activation.PostSetupStateComplete, @@ -141,7 +140,7 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { ctrl := gomock.NewController(t) postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) id := activation.PostProviderID{} id.SetInt64(1) @@ -172,7 +171,7 @@ func TestSmesherService_PostSetupStatus(t *testing.T) { ctrl := gomock.NewController(t) postSetupProvider := activation.NewMockpostSetupProvider(ctrl) smeshingProvider := activation.NewMockSmeshingProvider(ctrl) - svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts(), logtest.New(t).WithName("grpc.Smesher")) + svc := grpcserver.NewSmesherService(postSetupProvider, smeshingProvider, time.Second, activation.DefaultPostSetupOpts()) id := activation.PostProviderID{} id.SetInt64(100) diff --git a/api/grpcserver/transaction_service.go b/api/grpcserver/transaction_service.go index 7d4f2c8c22..e5fe2b34d3 100644 --- a/api/grpcserver/transaction_service.go +++ b/api/grpcserver/transaction_service.go @@ -7,7 +7,9 @@ import ( "fmt" "io" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/code" rpcstatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -17,7 +19,6 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/events" "github.com/spacemeshos/go-spacemesh/genvm/core" - "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/transactions" @@ -26,7 +27,6 @@ import ( // TransactionService exposes transaction data, and a submit tx endpoint. type TransactionService struct { db *sql.Database - logger log.Logger publisher pubsub.Publisher // P2P Swarm mesh meshAPI // Mesh conState conservativeState @@ -47,11 +47,9 @@ func NewTransactionService( conState conservativeState, syncer syncer, txHandler txValidator, - lg log.Logger, ) *TransactionService { return &TransactionService{ db: db, - logger: lg, publisher: publisher, mesh: msh, conState: conState, @@ -191,10 +189,10 @@ func (s TransactionService) TransactionsStateStream(in *pb.TransactionsStateStre for { select { case <-txBufFull: - s.logger.Info("tx buffer is full, shutting down") + ctxzap.Info(stream.Context(), "tx buffer is full, shutting down") return status.Error(codes.Canceled, errTxBufferFull) case <-layerBufFull: - s.logger.Info("layer buffer is full, shutting down") + ctxzap.Info(stream.Context(), "layer buffer is full, shutting down") return status.Error(codes.Canceled, errLayerBufferFull) case tx := <-txCh: // Filter @@ -236,7 +234,7 @@ func (s TransactionService) TransactionsStateStream(in *pb.TransactionsStateStre // In order to read transactions, we first need to read layer blocks layerObj, err := s.mesh.GetLayer(layer.LayerID) if err != nil { - s.logger.With().Error("error reading layer data for updated layer", layer.LayerID, log.Err(err)) + ctxzap.Error(stream.Context(), "error reading layer data for updated layer", layer.LayerID.Field().Zap(), zap.Error(err)) return status.Error(codes.Internal, "error reading layer data") } @@ -279,7 +277,7 @@ func (s TransactionService) TransactionsStateStream(in *pb.TransactionsStateStre if in.IncludeTransactions { tx, err := s.conState.GetMeshTransaction(txid) if err != nil { - s.logger.Error("could not find transaction %v from layer %v: %v", txid, layer, err) + ctxzap.Error(stream.Context(), "could not find transaction from layer", txid.Field().Zap(), layer.Field().Zap(), zap.Error(err)) return status.Error(codes.Internal, "error retrieving tx data") } diff --git a/api/grpcserver/transaction_service_test.go b/api/grpcserver/transaction_service_test.go index 68b005d024..aac2a526e6 100644 --- a/api/grpcserver/transaction_service_test.go +++ b/api/grpcserver/transaction_service_test.go @@ -21,7 +21,6 @@ import ( "github.com/spacemeshos/go-spacemesh/events" vm "github.com/spacemeshos/go-spacemesh/genvm" "github.com/spacemeshos/go-spacemesh/genvm/sdk/wallet" - "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/signing" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/transactions" @@ -48,7 +47,7 @@ func TestTransactionService_StreamResults(t *testing.T) { return nil })) - svc := NewTransactionService(db, nil, nil, nil, nil, nil, logtest.New(t).WithName("grpc.Transactions")) + svc := NewTransactionService(db, nil, nil, nil, nil, nil) t.Cleanup(launchServer(t, cfg, svc)) conn := dialGrpc(ctx, t, cfg.PublicListener) @@ -163,7 +162,7 @@ func BenchmarkStreamResults(b *testing.B) { } require.NoError(b, tx.Commit()) require.NoError(b, tx.Release()) - svc := NewTransactionService(db, nil, nil, nil, nil, nil, logtest.New(b).WithName("grpc.Transactions")) + svc := NewTransactionService(db, nil, nil, nil, nil, nil) b.Cleanup(launchServer(b, cfg, svc)) conn := dialGrpc(ctx, b, cfg.PublicListener) @@ -220,7 +219,7 @@ func TestParseTransactions(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) vminst := vm.New(db) - t.Cleanup(launchServer(t, cfg, NewTransactionService(db, nil, nil, txs.NewConservativeState(vminst, db), nil, nil, logtest.New(t).WithName("grpc.Transactions")))) + t.Cleanup(launchServer(t, cfg, NewTransactionService(db, nil, nil, txs.NewConservativeState(vminst, db), nil, nil))) var ( conn = dialGrpc(ctx, t, cfg.PublicListener) client = pb.NewTransactionServiceClient(conn) diff --git a/cmd/bootstrapper/generator_test.go b/cmd/bootstrapper/generator_test.go index 8b113d2532..6c2edb6639 100644 --- a/cmd/bootstrapper/generator_test.go +++ b/cmd/bootstrapper/generator_test.go @@ -136,7 +136,7 @@ func createAtxs(tb testing.TB, db sql.Executor, epoch types.EpochID, atxids []ty func launchServer(tb testing.TB, cdb *datastore.CachedDB) func() { grpcService := grpcserver.New(fmt.Sprintf("127.0.0.1:%d", grpcPort), logtest.New(tb).Named("grpc")) jsonService := grpcserver.NewJSONHTTPServer(fmt.Sprintf("127.0.0.1:%d", jsonport), logtest.New(tb).WithName("grpc.JSON")) - s := grpcserver.NewMeshService(cdb, &MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0, logtest.New(tb).WithName("grpc.Mesh")) + s := grpcserver.NewMeshService(cdb, &MeshAPIMock{}, nil, nil, 0, types.Hash20{}, 0, 0, 0) pb.RegisterMeshServiceServer(grpcService.GrpcServer, s) // start gRPC and json servers diff --git a/common/types/activation.go b/common/types/activation.go index 54c83b73ba..46524530fb 100644 --- a/common/types/activation.go +++ b/common/types/activation.go @@ -68,6 +68,16 @@ func (t *ATXID) UnmarshalText(buf []byte) error { // EmptyATXID is a canonical empty ATXID. var EmptyATXID = ATXID{} +type ATXIDs []ATXID + +// impl zap's ArrayMarshaler interface. +func (ids ATXIDs) MarshalLogArray(enc log.ArrayEncoder) error { + for _, id := range ids { + enc.AppendString(id.String()) + } + return nil +} + // NIPostChallenge is the set of fields that's serialized, hashed and submitted to the PoET service to be included in the // PoET membership proof. type NIPostChallenge struct { diff --git a/log/zap.go b/log/zap.go index 044958d44a..ff13aa491e 100644 --- a/log/zap.go +++ b/log/zap.go @@ -60,6 +60,8 @@ type Field zap.Field // Field satisfies loggable field interface. func (f Field) Field() Field { return f } +func (f Field) Zap() zap.Field { return zap.Field(f) } + // Named is an alias to FieldNamed. // FieldNamed returns a field with the provided name instead of the default. var Named = FieldNamed diff --git a/node/node.go b/node/node.go index 019c143fce..98efd164ef 100644 --- a/node/node.go +++ b/node/node.go @@ -18,6 +18,7 @@ import ( "github.com/gofrs/flock" grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" grpctags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/mitchellh/mapstructure" "github.com/pyroscope-io/pyroscope/pkg/agent/profiler" @@ -1040,32 +1041,41 @@ func (app *App) startServices(ctx context.Context) error { } func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcserver.ServiceAPI, error) { - logger := app.addLogger(GRPCLogger, app.log) switch svc { case grpcserver.Debug: - return grpcserver.NewDebugService(app.db, app.conState, app.host, app.hOracle, logger.WithName("Debug")), nil + return grpcserver.NewDebugService(app.db, app.conState, app.host, app.hOracle), nil case grpcserver.GlobalState: - return grpcserver.NewGlobalStateService(app.mesh, app.conState, logger.WithName("GlobalState")), nil + return grpcserver.NewGlobalStateService(app.mesh, app.conState), nil case grpcserver.Mesh: - return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal), logger.WithName("Mesh")), nil + return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal)), nil case grpcserver.Node: - return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit, logger.WithName("Node")), nil + return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit), nil case grpcserver.Admin: - return grpcserver.NewAdminService(app.db, app.Config.DataDir(), logger.WithName("Admin"), app.host), nil + return grpcserver.NewAdminService(app.db, app.Config.DataDir(), app.host), nil case grpcserver.Smesher: - return grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts, logger.WithName("Smesher")), nil + return grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts), nil case grpcserver.Transaction: - return grpcserver.NewTransactionService(app.db, app.host, app.mesh, app.conState, app.syncer, app.txHandler, logger.WithName("Transaction")), nil + return grpcserver.NewTransactionService(app.db, app.host, app.mesh, app.conState, app.syncer, app.txHandler), nil case grpcserver.Activation: - return grpcserver.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX()), logger.WithName("Activation")), nil + return grpcserver.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX())), nil } return nil, fmt.Errorf("unknown service %s", svc) } +func unaryGrpcLogStart(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + ctxzap.Info(ctx, "started unary call") + return handler(ctx, req) +} + +func streamingGrpcLogStart(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctxzap.Info(stream.Context(), "started streaming call") + return handler(srv, stream) +} + func (app *App) newGrpc(logger log.Log, endpoint string) *grpcserver.Server { return grpcserver.New(endpoint, logger, - grpc.ChainStreamInterceptor(grpctags.StreamServerInterceptor(), grpczap.StreamServerInterceptor(logger.Zap())), - grpc.ChainUnaryInterceptor(grpctags.UnaryServerInterceptor(), grpczap.UnaryServerInterceptor(logger.Zap())), + grpc.ChainStreamInterceptor(grpctags.StreamServerInterceptor(), grpczap.StreamServerInterceptor(logger.Zap()), streamingGrpcLogStart), + grpc.ChainUnaryInterceptor(grpctags.UnaryServerInterceptor(), grpczap.UnaryServerInterceptor(logger.Zap()), unaryGrpcLogStart), grpc.MaxSendMsgSize(app.Config.API.GrpcSendMsgSize), grpc.MaxRecvMsgSize(app.Config.API.GrpcRecvMsgSize), ) @@ -1092,6 +1102,7 @@ func (app *App) startAPIServices(ctx context.Context) error { if err != nil { return err } + logger.Info("registering public service %s", svc) gsvc.RegisterService(app.grpcPublicService) public = append(public, gsvc) unique[svc] = struct{}{} @@ -1104,6 +1115,7 @@ func (app *App) startAPIServices(ctx context.Context) error { if err != nil { return err } + logger.Info("registering private service %s", svc) gsvc.RegisterService(app.grpcPrivateService) unique[svc] = struct{}{} } diff --git a/systest/tests/checkpoint_test.go b/systest/tests/checkpoint_test.go index 994a0f8bf8..4a2d886f87 100644 --- a/systest/tests/checkpoint_test.go +++ b/systest/tests/checkpoint_test.go @@ -277,10 +277,7 @@ func snapshotNode(ctx *testcontext.Context, client *cluster.NodeClient, snapshot func recoverNode(ctx *testcontext.Context, client *cluster.NodeClient) error { smshr := pb.NewAdminServiceClient(client) _, err := smshr.Recover(ctx, &pb.RecoverRequest{}) - if err == nil { - return errors.New("recover should return error but did not") - } - return nil + return err } type acctState struct {