Skip to content

Commit

Permalink
Merge branch 'develop' into piersy/grpcserver-synchronous-start
Browse files Browse the repository at this point in the history
  • Loading branch information
piersy committed Aug 18, 2023
2 parents 96ccd2f + c479d5b commit 5ec4c6a
Show file tree
Hide file tree
Showing 48 changed files with 2,067 additions and 270 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# go-spacemesh needs at least ubuntu 20.04 (because gpu-post and post-rs are linked to glibc 2.31)
# newer versions of ubuntu should work as well, so far only 22.04 has been tested
# go-spacemesh needs at least ubuntu 22.04. newer versions of ubuntu might work as well, but are untested
FROM ubuntu:22.04 AS linux
ENV DEBIAN_FRONTEND noninteractive
ENV SHELL /bin/bash
Expand Down
10 changes: 8 additions & 2 deletions Makefile-libs.Inc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ $(BINDIR_POSTRS_SETUP_LIBS): $(PROJ_DIR)$(POSTRS_SETUP_ZIP)
unzip -o -j $(PROJ_DIR)$(POSTRS_SETUP_ZIP) -d $(dir $@) $(notdir $@)
touch $@

CURL_OPTIONS = --retry 10 --retry-max-time 120
CURL_VERSION = $(shell curl --version 2>/dev/null | head -n 1 | cut -d' ' -f2)
ifeq ($(shell expr "$(CURL_VERSION)" \>= 7.71.0),1)
CURL_OPTIONS := $(CURL_OPTIONS) --retry-all-errors
endif

$(PROJ_DIR)$(POSTRS_SETUP_ZIP):
curl -sSL --retry 10 --retry-max-time 120 --retry-all-errors $(POSTRS_SETUP_URL_ZIP) -o $(PROJ_DIR)$(POSTRS_SETUP_ZIP)
curl -sSL $(CURL_OPTIONS) $(POSTRS_SETUP_URL_ZIP) -o $(PROJ_DIR)$(POSTRS_SETUP_ZIP)

$(BIN_DIR)$(POSTRS_PROFILER_BIN):
curl -sSL --retry 10 --retry-max-time 120 --retry-all-errors $(POSTRS_PROFILER_URL) -o $(PROJ_DIR)$(POSTRS_PROFILER_ZIP)
curl -sSL $(CURL_OPTIONS) $(POSTRS_PROFILER_URL) -o $(PROJ_DIR)$(POSTRS_PROFILER_ZIP)
unzip -o -j $(PROJ_DIR)$(POSTRS_PROFILER_ZIP) -d $(BIN_DIR)

get-postrs-lib: $(PROJ_DIR)$(POSTRS_SETUP_ZIP) $(BINDIR_POSTRS_SETUP_LIBS)
Expand Down
25 changes: 22 additions & 3 deletions api/grpcserver/activation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpcserver

import (
"context"
"errors"
"fmt"

"github.com/golang/protobuf/ptypes/empty"
Expand All @@ -10,7 +11,9 @@ import (
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

type activationService struct {
Expand Down Expand Up @@ -42,12 +45,28 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p
atxId := types.ATXID(types.BytesToHash(request.Id))
atx, err := s.atxProvider.GetFullAtx(atxId)
if err != nil || atx == nil {
s.logger.With().Debug("failed to get the ATX", log.Err(err), log.Stringer("id", atxId))
s.logger.With().Debug("failed to get ATX",
log.Stringer("atx id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
return &pb.GetResponse{
proof, err := s.atxProvider.GetMalfeasanceProof(atx.SmesherID)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
s.logger.With().Error("failed to get malfeasance proof",
log.Stringer("smesher", atx.SmesherID),
log.Stringer("id", atxId),
log.Err(err),
)
return nil, status.Error(codes.NotFound, "id was not found")
}
resp := &pb.GetResponse{
Atx: convertActivation(atx),
}, nil
}
if proof != nil {
resp.MalfeasanceProof = events.ToMalfeasancePB(atx.SmesherID, proof, false)
}
return resp, nil
}

func (s *activationService) Highest(ctx context.Context, req *empty.Empty) (*pb.HighestResponse, error) {
Expand Down
43 changes: 43 additions & 0 deletions api/grpcserver/activation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/sql"
)

func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) {
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestGet_HappyPath(t *testing.T) {
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(gomock.Any()).Return(nil, sql.ErrNotFound)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)
Expand All @@ -141,4 +144,44 @@ func TestGet_HappyPath(t *testing.T) {
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Nil(t, response.MalfeasanceProof)
}

func TestGet_IdentityCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
atxProvider := grpcserver.NewMockatxProvider(ctrl)
activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}, logtest.New(t).WithName("grpc.Activation"))

smesher, proof := grpcserver.BallotMalfeasance(t, sql.InMemory())
id := types.RandomATXID()
atx := types.VerifiedActivationTx{
ActivationTx: &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
Sequence: rand.Uint64(),
PrevATXID: types.RandomATXID(),
PublishEpoch: 0,
PositioningATX: types.RandomATXID(),
},
Coinbase: types.GenerateAddress(types.RandomBytes(32)),
NumUnits: rand.Uint32(),
},
SmesherID: smesher,
},
}
atx.SetID(id)
atxProvider.EXPECT().GetFullAtx(id).Return(&atx, nil)
atxProvider.EXPECT().GetMalfeasanceProof(smesher).Return(proof, nil)

response, err := activationService.Get(context.Background(), &pb.GetRequest{Id: id.Bytes()})
require.NoError(t, err)

require.Equal(t, atx.ID().Bytes(), response.Atx.Id.Id)
require.Equal(t, atx.PublishEpoch.Uint32(), response.Atx.Layer.Number)
require.Equal(t, atx.SmesherID.Bytes(), response.Atx.SmesherId.Id)
require.Equal(t, atx.Coinbase.String(), response.Atx.Coinbase.Address)
require.Equal(t, atx.PrevATXID.Bytes(), response.Atx.PrevAtx.Id)
require.Equal(t, atx.NumUnits, response.Atx.NumUnits)
require.Equal(t, atx.Sequence, response.Atx.Sequence)
require.Equal(t, events.ToMalfeasancePB(smesher, proof, false), response.MalfeasanceProof)
}
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type txValidator interface {
type atxProvider interface {
GetFullAtx(id types.ATXID) (*types.VerifiedActivationTx, error)
MaxHeightAtx() (types.ATXID, error)
GetMalfeasanceProof(id types.NodeID) (*types.MalfeasanceProof, error)
}

type postSetupProvider interface {
Expand Down
64 changes: 64 additions & 0 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package grpcserver

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"

pb "github.com/spacemeshos/api/release/go/spacemesh/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
)

// MeshService exposes mesh data such as accounts, blocks, and transactions.
Expand Down Expand Up @@ -610,3 +614,63 @@ func (s MeshService) EpochStream(req *pb.EpochStreamRequest, stream pb.MeshServi
)
return nil
}

func (s MeshService) MalfeasanceQuery(ctx context.Context, req *pb.MalfeasanceRequest) (*pb.MalfeasanceResponse, error) {
parsed, err := hex.DecodeString(req.SmesherHex)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if l := len(parsed); l != types.NodeIDSize {
return nil, status.Error(codes.InvalidArgument,
fmt.Sprintf("invalid smesher id length (%d), expected (%d)", l, types.NodeIDSize))
}
id := types.BytesToNodeID(parsed)
proof, err := s.cdb.GetMalfeasanceProof(id)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.MalfeasanceResponse{
Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof),
}, nil
}

func (s MeshService) MalfeasanceStream(req *pb.MalfeasanceStreamRequest, stream pb.MeshService_MalfeasanceStreamServer) error {
sub := events.SubscribeMalfeasance()
if sub == nil {
return status.Errorf(codes.FailedPrecondition, "event reporting is not enabled")
}
eventch, fullch := consumeEvents[events.EventMalfeasance](stream.Context(), sub)
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")
}

// first serve those already existed locally.
if err := s.cdb.IterateMalfeasanceProofs(func(id types.NodeID, mp *types.MalfeasanceProof) error {
select {
case <-stream.Context().Done():
return nil
default:
res := &pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(id, mp, req.IncludeProof),
}
return stream.Send(res)
}
}); err != nil {
return status.Error(codes.Internal, err.Error())
}

for {
select {
case <-stream.Context().Done():
return nil
case <-fullch:
return status.Errorf(codes.Canceled, "buffer is full")
case ev := <-eventch:
if err := stream.Send(&pb.MalfeasanceStreamResponse{
Proof: events.ToMalfeasancePB(ev.Smesher, ev.Proof, req.IncludeProof),
}); err != nil {
return status.Error(codes.Internal, fmt.Errorf("send to stream: %w", err).Error())
}
}
}
}

0 comments on commit 5ec4c6a

Please sign in to comment.