Skip to content

Commit

Permalink
Add logger specific to API (#2086)
Browse files Browse the repository at this point in the history
This adds code flexibility to add multiple miscellaneous options.
This separates all files into their own package to not have weird option type
naming like optionNode, optionMesh, etc.

Signed-off-by: hoenirvili <hoenirvili@gmail.com>
  • Loading branch information
hoenirvili committed Aug 2, 2020
1 parent 1c81878 commit 0f6edf9
Show file tree
Hide file tree
Showing 12 changed files with 447 additions and 343 deletions.
24 changes: 20 additions & 4 deletions api/grpcserver/http_server.go → api/grpc/gateway/gateway.go
@@ -1,14 +1,15 @@
package grpcserver
package gateway

import (
"fmt"
"net/http"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
gw "github.com/spacemeshos/api/release/go/spacemesh/v1"
cmdp "github.com/spacemeshos/go-spacemesh/cmd"
"github.com/spacemeshos/go-spacemesh/log"
"golang.org/x/net/context"
"google.golang.org/grpc"
"net/http"
)

// JSONHTTPServer is a JSON http server providing the Spacemesh API.
Expand All @@ -17,11 +18,26 @@ type JSONHTTPServer struct {
Port int
GrpcPort int
server *http.Server
log log.Logger
}

// WithLogger set's the underlying logger to a custom logger. By default the logger is NoOp
func WithLogger(log log.Logger) Option {
return func(j *JSONHTTPServer) {
j.log = log
}
}

// Option type definds an callback that can set internal JSONHTTPServer fields
type Option func(j *JSONHTTPServer)

// NewJSONHTTPServer creates a new json http server.
func NewJSONHTTPServer(port int, grpcPort int) *JSONHTTPServer {
return &JSONHTTPServer{Port: port, GrpcPort: grpcPort}
func NewJSONHTTPServer(port int, grpcPort int, options ...Option) *JSONHTTPServer {
server := &JSONHTTPServer{Port: port, GrpcPort: grpcPort, log: log.NewNop()}
for _, option := range options {
option(server)
}
return server
}

// Close stops the server.
Expand Down
47 changes: 26 additions & 21 deletions api/grpcserver/grpcserver_test.go → api/grpc/grpcserver_test.go
@@ -1,17 +1,9 @@
package grpcserver
package grpc

import (
"bytes"
"errors"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/spacemeshos/go-spacemesh/cmd"
"github.com/spacemeshos/go-spacemesh/miner"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io/ioutil"
"math/big"
"net/http"
Expand All @@ -21,12 +13,25 @@ import (
"testing"
"time"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/spacemeshos/go-spacemesh/cmd"
"github.com/spacemeshos/go-spacemesh/miner"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/stretchr/testify/require"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spacemeshos/go-spacemesh/api/config"
"github.com/spacemeshos/go-spacemesh/api/grpc/gateway"
"github.com/spacemeshos/go-spacemesh/api/grpc/server"
servicemesh "github.com/spacemeshos/go-spacemesh/api/grpc/service/mesh"
servicenode "github.com/spacemeshos/go-spacemesh/api/grpc/service/node"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -249,14 +254,14 @@ type SyncerMock struct {
func (SyncerMock) IsSynced() bool { return false }
func (s *SyncerMock) Start() { s.startCalled = true }

func launchServer(t *testing.T, services ...ServiceAPI) func() {
func launchServer(t *testing.T, services ...server.API) func() {
networkMock.Broadcast("", []byte{0x00})
grpcService := NewServer(cfg.NewGrpcServerPort)
jsonService := NewJSONHTTPServer(cfg.NewJSONServerPort, cfg.NewGrpcServerPort)
grpcService := server.New(cfg.NewGrpcServerPort)
jsonService := gateway.NewJSONHTTPServer(cfg.NewJSONServerPort, cfg.NewGrpcServerPort)

// attach services
for _, svc := range services {
svc.RegisterService(grpcService)
svc.Register(grpcService)
}

// start gRPC and json servers
Expand Down Expand Up @@ -289,8 +294,8 @@ func TestNewServersConfig(t *testing.T) {
port2, err := node.GetUnboundedPort()
require.NoError(t, err, "Should be able to establish a connection on a port")

grpcService := NewServer(port1)
jsonService := NewJSONHTTPServer(port2, port1)
grpcService := server.New(port1)
jsonService := gateway.NewJSONHTTPServer(port2, port1)

require.Equal(t, port2, jsonService.Port, "Expected same port")
require.Equal(t, port1, jsonService.GrpcPort, "Expected same port")
Expand All @@ -299,7 +304,7 @@ func TestNewServersConfig(t *testing.T) {

func TestNodeService(t *testing.T) {
syncer := SyncerMock{}
grpcService := NewNodeService(&networkMock, txAPI, &genTime, &syncer)
grpcService := servicenode.New(&networkMock, txAPI, &genTime, &syncer)
shutDown := launchServer(t, grpcService)
defer shutDown()

Expand Down Expand Up @@ -389,7 +394,7 @@ func TestNodeService(t *testing.T) {
}

func TestMeshService(t *testing.T) {
grpcService := NewMeshService(&networkMock, txAPI, &genTime, &SyncerMock{})
grpcService := servicemesh.New(&networkMock, txAPI, &genTime, &SyncerMock{})
shutDown := launchServer(t, grpcService)
defer shutDown()

Expand All @@ -411,8 +416,8 @@ func TestMeshService(t *testing.T) {
}

func TestMultiService(t *testing.T) {
svc1 := NewNodeService(&networkMock, txAPI, &genTime, &SyncerMock{})
svc2 := NewMeshService(&networkMock, txAPI, &genTime, &SyncerMock{})
svc1 := servicenode.New(&networkMock, txAPI, &genTime, &SyncerMock{})
svc2 := servicemesh.New(&networkMock, txAPI, &genTime, &SyncerMock{})
shutDown := launchServer(t, svc1, svc2)
defer shutDown()

Expand Down Expand Up @@ -471,8 +476,8 @@ func TestJsonApi(t *testing.T) {
shutDown()

// enable services and try again
svc1 := NewNodeService(&networkMock, txAPI, &genTime, &SyncerMock{})
svc2 := NewMeshService(&networkMock, txAPI, &genTime, &SyncerMock{})
svc1 := servicenode.New(&networkMock, txAPI, &genTime, &SyncerMock{})
svc2 := servicemesh.New(&networkMock, txAPI, &genTime, &SyncerMock{})
cfg.StartNodeService = true
cfg.StartMeshService = true
shutDown = launchServer(t, svc1, svc2)
Expand Down
98 changes: 98 additions & 0 deletions api/grpc/server/grpc.go
@@ -0,0 +1,98 @@
package server

import (
"fmt"
"net"
"time"

"github.com/spacemeshos/go-spacemesh/log"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)

// ServerAPI allows individual grpc services to register the grpc server
type API interface {
// Register binds the grpc server with the desired proto spec
Register(*Server)
}

// Server is a very basic grpc server
type Server struct {
Port int
GrpcServer *grpc.Server
log log.Logger
options []grpc.ServerOption
}

// Option type definds an callback that can set internal Server fields
type Option func(s *Server)

// WithLogger set's the underlying logger to a custom logger. By default the logger is NoOp
func WithLogger(log log.Logger) Option {
return func(s *Server) {
s.log = log
}
}

// WithServerOptions set's the underlying grpc server options to use these cutom options
func WithServerOptions(options []grpc.ServerOption) Option {
return func(s *Server) {
s.options = options
}
}

// New creates and returns a new grpc Server
func New(port int, options ...Option) *Server {
s := &Server{
Port: port,
log: log.NewNop(),
options: []grpc.ServerOption{
// XXX: this is done to prevent routers from cleaning up our connections (e.g aws load balances..)
// TODO: these parameters work for now but we might need to revisit or add them as configuration
// TODO: Configure maxconns, maxconcurrentcons ..
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Minute * 120,
MaxConnectionAge: time.Minute * 180,
MaxConnectionAgeGrace: time.Minute * 10,
Time: time.Minute,
Timeout: time.Minute * 3,
}),
},
}
for _, option := range options {
option(s)
}
s.GrpcServer = grpc.NewServer(s.options...)
return s
}

// Start starts the server
func (s *Server) Start() {
s.log.Info("starting new grpc server")
go s.startInternal()
}

// Blocking, should be called in a goroutine
func (s *Server) startInternal() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.Port))
if err != nil {
s.log.Error("error listening on port", err)
return
}

// SubscribeOnNewConnections reflection service on gRPC server
reflection.Register(s.GrpcServer)

// start serving - this blocks until err or server is stopped
s.log.Info("starting new grpc server on port %d", s.Port)
if err := s.GrpcServer.Serve(lis); err != nil {
s.log.Error("error stopping grpc server: %v", err)
}
}

// Close stops the server
func (s *Server) Close() {
s.log.Info("Stopping new grpc server...")
s.GrpcServer.Stop()
}
126 changes: 126 additions & 0 deletions api/grpc/service/mesh/mesh_service.go
@@ -0,0 +1,126 @@
package mesh

import (
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"github.com/spacemeshos/go-spacemesh/api"
"github.com/spacemeshos/go-spacemesh/api/grpc/server"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/peers"
"golang.org/x/net/context"
)

// Mesh is a grpc server providing the MeshService
type Mesh struct {
Network api.NetworkAPI // P2P Swarm
Tx api.TxAPI // Mesh
GenTime api.GenesisTimeAPI
PeerCounter api.PeerCounter
Syncer api.Syncer

log log.Logger
}

var _ server.API = (*Mesh)(nil)

// Register registers this service with a grpc server instance
func (m Mesh) Register(server *server.Server) {
pb.RegisterMeshServiceServer(server.GrpcServer, m)
}

// Option type definds an callback that can set internal Mesh fields
type Option func(m *Mesh)

// WithLogger set's the underlying logger to a custom logger. By default the logger is NoOp
func WithLogger(log log.Logger) Option {
return func(m *Mesh) {
m.log = log.WithName("grpc_server.Mesh")
}
}

// New creates a new grpc service using config data.
func New(net api.NetworkAPI, tx api.TxAPI, genTime api.GenesisTimeAPI, syncer api.Syncer, options ...Option) *Mesh {
m := &Mesh{
Network: net,
Tx: tx,
GenTime: genTime,
PeerCounter: peers.NewPeers(net, log.NewDefault("grpc_server.Mesh")),
Syncer: syncer,
log: log.NewNop(),
}
for _, option := range options {
option(m)
}
return m
}

// GenesisTime returns the network genesis time as UNIX time
func (m Mesh) GenesisTime(ctx context.Context, in *pb.GenesisTimeRequest) (*pb.GenesisTimeResponse, error) {
m.log.Info("GRPC Mesh.GenesisTime")
return &pb.GenesisTimeResponse{Unixtime: &pb.SimpleInt{
Value: uint64(m.GenTime.GetGenesisTime().Unix()),
}}, nil
}

// CurrentLayer returns the current layer number
func (m Mesh) CurrentLayer(ctx context.Context, in *pb.CurrentLayerRequest) (*pb.CurrentLayerResponse, error) {
m.log.Info("GRPC Mesh.CurrentLayer")
return nil, nil
}

// CurrentEpoch returns the current epoch number
func (m Mesh) CurrentEpoch(ctx context.Context, in *pb.CurrentEpochRequest) (*pb.CurrentEpochResponse, error) {
m.log.Info("GRPC Mesh.CurrentEpoch")
return nil, nil
}

// NetID returns the network ID
func (m Mesh) NetID(ctx context.Context, in *pb.NetIDRequest) (*pb.NetIDResponse, error) {
m.log.Info("GRPC Mesh.NetId")
return nil, nil
}

// EpochNumLayers returns the number of layers per epoch (a network parameter)
func (m Mesh) EpochNumLayers(ctx context.Context, in *pb.EpochNumLayersRequest) (*pb.EpochNumLayersResponse, error) {
m.log.Info("GRPC Mesh.EpochNumLayers")
return nil, nil
}

// LayerDuration returns the layer duration in seconds (a network parameter)
func (m Mesh) LayerDuration(ctx context.Context, in *pb.LayerDurationRequest) (*pb.LayerDurationResponse, error) {
m.log.Info("GRPC Mesh.LayerDuration")
return nil, nil
}

// MaxTransactionsPerSecond returns the max number of tx per sec (a network parameter)
func (m Mesh) MaxTransactionsPerSecond(ctx context.Context, in *pb.MaxTransactionsPerSecondRequest) (*pb.MaxTransactionsPerSecondResponse, error) {
m.log.Info("GRPC Mesh.MaxTransactionsPerSecond")
return nil, nil
}

// QUERIES

// AccountMeshDataQuery returns account data
func (m Mesh) AccountMeshDataQuery(ctx context.Context, in *pb.AccountMeshDataQueryRequest) (*pb.AccountMeshDataQueryResponse, error) {
m.log.Info("GRPC Mesh.AccountMeshDataQuery")
return nil, nil
}

// LayersQuery returns all mesh data, layer by layer
func (m Mesh) LayersQuery(ctx context.Context, in *pb.LayersQueryRequest) (*pb.LayersQueryResponse, error) {
m.log.Info("GRPC Mesh.LayersQuery")
return nil, nil
}

// STREAMS

// AccountMeshDataStream returns a stream of transactions and activations for an account
func (m Mesh) AccountMeshDataStream(request *pb.AccountMeshDataStreamRequest, stream pb.MeshService_AccountMeshDataStreamServer) error {
m.log.Info("GRPC Mesh.AccountMeshDataStream")
return nil
}

// LayerStream returns a stream of all mesh data per layer
func (m Mesh) LayerStream(request *pb.LayerStreamRequest, stream pb.MeshService_LayerStreamServer) error {
m.log.Info("GRPC Mesh.LayerStream")
return nil
}

0 comments on commit 0f6edf9

Please sign in to comment.