Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add topos sequencer grpc client to polygon edge (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Sep 15, 2023
1 parent 639edc9 commit daf4ac8
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 17 deletions.
7 changes: 4 additions & 3 deletions command/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var (
)

const (
JSONOutputFlag = "json"
GRPCAddressFlag = "grpc-address"
JSONRPCFlag = "jsonrpc"
JSONOutputFlag = "json"
GRPCAddressFlag = "grpc-address"
JSONRPCFlag = "jsonrpc"
FrostSequencerFlag = "frost-sequencer"
)

// GRPCAddressFlagLEGACY Legacy flag that needs to be present to preserve backwards
Expand Down
2 changes: 2 additions & 0 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
JSONRPCBlockRangeLimit uint64 `json:"json_rpc_block_range_limit" yaml:"json_rpc_block_range_limit"`
JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"`
CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"`
ToposSequencerAddr string `json:"topos_sequencer_addr" yaml:"topos_sequencer_addr"`

Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`
Expand Down Expand Up @@ -136,6 +137,7 @@ func DefaultConfig() *Config {
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
RelayerTrackerPollInterval: DefaultRelayerTrackerPollInterval,
ToposSequencerAddr: "",
}
}

Expand Down
4 changes: 4 additions & 0 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
devFlag = "dev"
corsOriginFlag = "access-control-allow-origins"
logFileLocationFlag = "log-to"
toposSequencerFlag = "topos-sequencer"

relayerFlag = "relayer"
numBlockConfirmationsFlag = "num-block-confirmations"
Expand Down Expand Up @@ -93,6 +94,8 @@ type serverParams struct {

logFileLocation string

toposSequencerAddr string

relayer bool
}

Expand Down Expand Up @@ -186,6 +189,7 @@ func (p *serverParams) generateConfig() *server.Config {
LogLevel: hclog.LevelFromString(p.rawConfig.LogLevel),
JSONLogFormat: p.rawConfig.JSONLogFormat,
LogFilePath: p.logFileLocation,
ToposSequencerAddr: p.toposSequencerAddr,

Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
Expand Down
7 changes: 7 additions & 0 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ func setFlags(cmd *cobra.Command) {
"write all logs to the file at specified location instead of writing them to console",
)

cmd.Flags().StringVar(
&params.toposSequencerAddr,
toposSequencerFlag,
defaultConfig.ToposSequencerAddr,
"grpc address of the topos-sequencer",
)

cmd.Flags().BoolVar(
&params.rawConfig.Relayer,
relayerFlag,
Expand Down
21 changes: 11 additions & 10 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ type Config struct {
}

type Params struct {
Context context.Context
Config *Config
TxPool *txpool.TxPool
Network *network.Server
Blockchain *blockchain.Blockchain
Executor *state.Executor
Grpc *grpc.Server
Logger hclog.Logger
SecretsManager secrets.SecretsManager
BlockTime uint64
Context context.Context
Config *Config
TxPool *txpool.TxPool
Network *network.Server
Blockchain *blockchain.Blockchain
Executor *state.Executor
Grpc *grpc.Server
Logger hclog.Logger
SecretsManager secrets.SecretsManager
BlockTime uint64
ToposSequencerAddr string

NumBlockConfirmations uint64
}
Expand Down
95 changes: 95 additions & 0 deletions consensus/ibft/frost/frost_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package frost

import (
"github.com/0xPolygon/go-ibft/core"
"github.com/topos-network/go-topos-sequencer-client/frostclient"
protofrost "github.com/topos-network/go-topos-sequencer-client/frostclient/proto"
)

type FrostTransport interface {
MulticastFrost(msg *protofrost.FrostMessage)
}

type FrostBackend struct {
// GRPC address of the topos sequencer service
address string

// Public validator account address
validatorID string

// log is the logger instance
log core.Logger

// grpc client for topos-sequencer service
client *frostclient.FrostServiceClient

// Network interface for frost topic
transport FrostTransport
}

func (fb *FrostBackend) GetToposSequencerAddr() string {
return fb.address
}

func NewFrostBackend(toposSequencerAddr string, logger core.Logger) (*FrostBackend, error) {
return &FrostBackend{
address: toposSequencerAddr,
log: logger,
}, nil
}

func (fb *FrostBackend) Initialize(serverAddress string, validatorAccount string, transport FrostTransport) error {
fb.transport = transport

frostServiceClient, err := frostclient.NewFrostServiceClient(serverAddress, validatorAccount)
if err != nil {
fb.log.Error("could not instantiate frost client: %v", err)

return err
}

fb.client = frostServiceClient

// Start loop for listening messages from frost-sequencer
go func() {
for {
select {
case message := <-frostServiceClient.Inbox:
fb.log.Info("Received message from frost-sequencer:", message)

switch op := message.Event.(type) {
case *protofrost.WatchFrostMessagesResponse_FrostMessagePushed_:
// New froost message received from local topos sequencer
err := fb.PublishFrost(op.FrostMessagePushed.FrostMessage)
if err != nil {
fb.log.Error("unable to publish frost message: %v", err)
}
case *protofrost.WatchFrostMessagesResponse_StreamOpened_:
// Connection to local topos sequencer is opened
fb.log.Info("stream opened to service ", serverAddress)
}
}
}
}()

return nil
}

func (fb *FrostBackend) PublishFrost(message *protofrost.FrostMessage) error {
fb.transport.MulticastFrost(message)

return nil
}

func (fb *FrostBackend) ProcessGossipedMessages(message *protofrost.FrostMessage) error {
request := &protofrost.SubmitFrostMessageRequest{
FrostMessage: message,
}

_, err := fb.client.Client.SubmitFrostMessage(fb.client.Ctx, request)
if err != nil {
fb.log.Error("unable to submit frost message: %v", err)
}

return nil
}
23 changes: 22 additions & 1 deletion consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygon/polygon-edge/chain"
"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/consensus/ibft/fork"
"github.com/0xPolygon/polygon-edge/consensus/ibft/frost"
"github.com/0xPolygon/polygon-edge/consensus/ibft/proto"
"github.com/0xPolygon/polygon-edge/consensus/ibft/signer"
"github.com/0xPolygon/polygon-edge/helper/progress"
Expand All @@ -28,7 +29,8 @@ const (
IbftKeyName = "validator.key"
KeyEpochSize = "epochSize"

ibftProto = "/ibft/0.2"
ibftProto = "/ibft/0.2"
frostProto = "/frost/0.1"

// consensusMetrics is a prefix used for consensus-related metrics
consensusMetrics = "consensus"
Expand Down Expand Up @@ -83,6 +85,7 @@ type backendIBFT struct {
currentSigner signer.Signer // Signer at current sequence
currentValidators validators.Validators // signer at current sequence
currentHooks fork.HooksInterface // Hooks at current sequence
frostBackend *frost.FrostBackend // Reference to Frost messages gossip mechanism

// Configurations
config *consensus.Config // Consensus configuration
Expand Down Expand Up @@ -138,6 +141,15 @@ func Factory(params *consensus.Params) (consensus.Consensus, error) {
return nil, err
}

frostBackend, err := frost.NewFrostBackend(
params.ToposSequencerAddr,
logger.Named("frost"),
)

if err != nil {
return nil, err
}

p := &backendIBFT{
// References
logger: logger,
Expand All @@ -154,6 +166,7 @@ func Factory(params *consensus.Params) (consensus.Consensus, error) {
secretsManager: params.SecretsManager,
Grpc: params.Grpc,
forkManager: forkManager,
frostBackend: frostBackend,

// Configurations
config: params.Config,
Expand Down Expand Up @@ -200,6 +213,14 @@ func (i *backendIBFT) Initialize() error {
i,
)

// Initialize frost backend if topos sequencer address is set
if i.frostBackend.GetToposSequencerAddr() != "" {
if err := i.frostBackend.Initialize(i.frostBackend.GetToposSequencerAddr(),
i.currentSigner.Address().String(), i); err != nil {
return err
}
}

// Ensure consensus takes into account user configured block production time
i.consensus.ExtendRoundTimeout(i.blockTime)

Expand Down
56 changes: 54 additions & 2 deletions consensus/ibft/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,39 @@ import (
"github.com/0xPolygon/polygon-edge/network"
"github.com/0xPolygon/polygon-edge/types"
"github.com/libp2p/go-libp2p/core/peer"
protofrost "github.com/topos-network/go-topos-sequencer-client/frostclient/proto"
)

type transport interface {
Multicast(msg *proto.Message) error
MulticastFrost(msg *protofrost.FrostMessage) error
}

type gossipTransport struct {
topic *network.Topic
topic *network.Topic
topicFrost *network.Topic
}

func (g *gossipTransport) Multicast(msg *proto.Message) error {
return g.topic.Publish(msg)
}

func (g *gossipTransport) MulticastFrost(msg *protofrost.FrostMessage) error {
return g.topicFrost.Publish(msg)
}

func (i *backendIBFT) Multicast(msg *proto.Message) {
if err := i.transport.Multicast(msg); err != nil {
i.logger.Error("fail to gossip", "err", err)
}
}

func (i *backendIBFT) MulticastFrost(msg *protofrost.FrostMessage) {
if err := i.transport.MulticastFrost(msg); err != nil {
i.logger.Error("fail to gossip frost message", "err", err)
}
}

// setupTransport sets up the gossip transport protocol
func (i *backendIBFT) setupTransport() error {
// Define a new topic
Expand Down Expand Up @@ -61,7 +74,46 @@ func (i *backendIBFT) setupTransport() error {
return err
}

i.transport = &gossipTransport{topic: topic}
// Define a new topic for frost
topicFrost, err := i.network.NewTopic(frostProto, &protofrost.FrostMessage{})
if err != nil {
return err
}

// Subscribe to the frost Topic
if err := topicFrost.Subscribe(
func(obj interface{}, _ peer.ID) {
if !i.isActiveValidator() {
return
}

msg, ok := obj.(*protofrost.FrostMessage)
if !ok {
i.logger.Error("invalid type assertion for message request")

return
}
i.logger.Info("Frost message gossiped to this node:", msg)

err := i.frostBackend.ProcessGossipedMessages(msg)
if err != nil {
i.logger.Error("Failed to process gossiped message:", err)

return
}

i.logger.Debug(
"frost message received from validator network",
"message id: ", msg.MessageId,
" from: ", msg.From,
" data: ", msg.Data,
)
},
); err != nil {
return err
}

i.transport = &gossipTransport{topic: topic, topicFrost: topicFrost}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ require (
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
github.com/topos-protocol/go-topos-sequencer-client v0.0.0-20230719150134-37636f549da4 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/fx v1.19.2 // indirect
golang.org/x/exp v0.0.0-20230725012225-302865e7556b // indirect
Expand Down
3 changes: 2 additions & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type Config struct {

LogFilePath string

Relayer bool
ToposSequencerAddr string // Address of topos serquencer service
Relayer bool

NumBlockConfirmations uint64
RelayerTrackerPollInterval time.Duration
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func (s *Server) setupConsensus() error {
SecretsManager: s.secretsManager,
BlockTime: uint64(blockTime.Seconds()),
NumBlockConfirmations: s.config.NumBlockConfirmations,
ToposSequencerAddr: s.config.ToposSequencerAddr,
},
)

Expand Down

0 comments on commit daf4ac8

Please sign in to comment.