Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into piersy/add-peer-in…
Browse files Browse the repository at this point in the history
…fo-api
  • Loading branch information
piersy committed Aug 21, 2023
2 parents 7f1f409 + b23bc52 commit d99bf92
Show file tree
Hide file tree
Showing 20 changed files with 339 additions and 207 deletions.
32 changes: 13 additions & 19 deletions api/grpcserver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
Expand All @@ -26,6 +27,7 @@ type Server struct {
// been waited on)
BoundAddress string
GrpcServer *grpc.Server
grp errgroup.Group
}

// New creates and returns a new Server with port and interface.
Expand All @@ -39,7 +41,7 @@ func New(listener string, lg log.Logger, opts ...grpc.ServerOption) *Server {
}

// Start starts the server.
func (s *Server) Start() <-chan struct{} {
func (s *Server) Start() error {
s.logger.With().Info("starting grpc server",
log.String("address", s.Listener),
log.Array("services", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error {
Expand All @@ -49,36 +51,28 @@ func (s *Server) Start() <-chan struct{} {
return nil
})),
)

started := make(chan struct{})
go s.startInternal(started)

return started
}

// Blocking, should be called in a goroutine.
func (s *Server) startInternal(started chan<- struct{}) {
lis, err := net.Listen("tcp", s.Listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return
return err
}
s.BoundAddress = lis.Addr().String()
reflection.Register(s.GrpcServer)
s.logger.Info("starting new grpc server on %s", s.Listener)
close(started)
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
}
s.grp.Go(func() error {
if err := s.GrpcServer.Serve(lis); err != nil {
s.logger.Error("error stopping grpc server: %v", err)
return err
}
return nil
})
return nil
}

// Close stops the server.
func (s *Server) Close() error {
s.logger.Info("stopping the grpc server")
s.GrpcServer.Stop()

// We don't return any errors but we want to conform to io.Closer so return a nil error
return nil
return s.grp.Wait()
}

// ServerOptions are shared by all grpc servers.
Expand Down
17 changes: 5 additions & 12 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,11 @@ func launchServer(tb testing.TB, cfg Config, services ...ServiceAPI) func() {
}

// start gRPC and json servers
grpcStarted := grpcService.Start()
jsonStarted := jsonService.StartService(context.Background(), services...)

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()

// wait for server to be ready (critical on CI)
for _, ch := range []<-chan struct{}{grpcStarted, jsonStarted} {
select {
case <-ch:
case <-timer.C:
}
err := grpcService.Start()
require.NoError(tb, err)
if len(services) > 0 {
err = jsonService.StartService(context.Background(), services...)
require.NoError(tb, err)
}

return func() {
Expand Down
81 changes: 31 additions & 50 deletions api/grpcserver/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log"
)
Expand All @@ -18,9 +19,9 @@ import (
type JSONHTTPServer struct {
logger log.Logger

mu sync.RWMutex
listener string
server *http.Server
grp errgroup.Group
}

// NewJSONHTTPServer creates a new json http server.
Expand All @@ -34,41 +35,32 @@ func NewJSONHTTPServer(listener string, lg log.Logger) *JSONHTTPServer {
// Shutdown stops the server.
func (s *JSONHTTPServer) Shutdown(ctx context.Context) error {
s.logger.Debug("stopping json-http service...")
server := s.getServer()
if server != nil {
err := server.Shutdown(ctx)
if s.server != nil {
err := s.server.Shutdown(ctx)
if errors.Is(err, http.ErrServerClosed) {
return nil
}
if err != nil {
return fmt.Errorf("shutdown: %w", err)
}
}

return nil
err := s.grp.Wait()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
}

// StartService starts the json api server and listens for status (started, stopped).
func (s *JSONHTTPServer) StartService(
ctx context.Context,
services ...ServiceAPI,
) <-chan struct{} {
started := make(chan struct{})

// This will block, so run it in a goroutine
go s.startInternal(
ctx,
started,
services...)

return started
}

func (s *JSONHTTPServer) startInternal(
ctx context.Context,
started chan<- struct{},
services ...ServiceAPI,
) {
) error {
// At least one service must be enabled
if len(services) == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return errors.New("no services provided")
}
ctx, cancel := context.WithCancel(ctx)

// This will close all downstream connections when the server closes
Expand Down Expand Up @@ -96,38 +88,27 @@ func (s *JSONHTTPServer) startInternal(
}
if err != nil {
s.logger.Error("registering %T with grpc gateway failed with %v", svc, err)
return err
}
serviceCount++
}

close(started)
s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))

// At least one service must be enabled
if serviceCount == 0 {
s.logger.Error("not starting grpc gateway service; at least one service must be enabled")
return
lis, err := net.Listen("tcp", s.listener)
if err != nil {
s.logger.Error("error listening: %v", err)
return err
}

s.logger.With().Info("starting grpc gateway server", log.String("address", s.listener))
s.setServer(&http.Server{
Addr: s.listener,
s.server = &http.Server{
Handler: mux,
}
s.grp.Go(func() error {
if err := s.server.Serve(lis); err != nil {
s.logger.Error("error from grpc http server: %v", err)
return err
}
return nil
})

// This will block
s.logger.Error("error from grpc http listener: %v", s.getServer().ListenAndServe())
}

func (s *JSONHTTPServer) getServer() *http.Server {
s.mu.RLock()
defer s.mu.RUnlock()

return s.server
}

func (s *JSONHTTPServer) setServer(server *http.Server) {
s.mu.Lock()
defer s.mu.Unlock()

s.server = server
return nil
}
35 changes: 33 additions & 2 deletions bootstrap/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -212,7 +213,7 @@ func (u *Updater) addUpdate(epoch types.EpochID, suffix string) {
switch suffix {
case SuffixActiveSet, SuffixBeacon, SuffixBoostrap:
default:
u.logger.With().Fatal("unexpected suffix for fallback files", log.String("suffix", suffix))
return
}
if _, ok := u.updates[epoch]; !ok {
u.updates[epoch] = map[string]struct{}{}
Expand Down Expand Up @@ -419,6 +420,33 @@ func validateData(cfg Config, update *Update) (*VerifiedUpdate, error) {
return verified, nil
}

func renameLegacyFile(fs afero.Fs, path string) string {
var idx int
for _, suffix := range []string{SuffixBoostrap, SuffixBeacon, SuffixActiveSet} {
if strings.HasSuffix(path, suffix) {
return path
}
}
for _, suffix := range []string{SuffixBoostrap, SuffixBeacon, SuffixActiveSet} {
idx = strings.Index(path, fmt.Sprintf("-%s-", suffix))
if idx > -1 {
break
}
}
if idx < 0 {
return ""
}
newPath := path[:idx+suffixLen+1]
if exists, _ := afero.Exists(fs, newPath); exists {
_ = fs.Remove(path)
return ""
}
if err := fs.Rename(path, newPath); err != nil {
return ""
}
return newPath
}

func load(fs afero.Fs, cfg Config, current types.EpochID) ([]*VerifiedUpdate, error) {
dir := bootstrapDir(cfg.DataDir)
_, err := fs.Stat(dir)
Expand All @@ -437,7 +465,10 @@ func load(fs afero.Fs, cfg Config, current types.EpochID) ([]*VerifiedUpdate, er
return nil, fmt.Errorf("read epoch dir %v: %w", dir, err)
}
for _, f := range files {
persisted := filepath.Join(edir, f.Name())
persisted := renameLegacyFile(fs, filepath.Join(edir, f.Name()))
if persisted == "" {
continue
}
data, err := afero.ReadFile(fs, persisted)
if err != nil {
return nil, fmt.Errorf("read bootstrap file %v: %w", persisted, err)
Expand Down

0 comments on commit d99bf92

Please sign in to comment.