Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New web-sockets connector #5170

Merged
merged 17 commits into from
May 18, 2023
10 changes: 8 additions & 2 deletions cmd/node/config/external.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@
# marshalled structures in block events data
MarshallerType = "json"

[WebSocketConnector]
[HostDriverConfig]
# This flag shall only be used for observer nodes
Enabled = false
URL = "localhost:22111"
# This flag will start the WebSocket connector as server or client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "ServerMode = false/true" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or Mode = "server"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in Mode

IsServer = false
# The url of the WebSocket client/server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clarify that this URL is used when using the driver in client mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in both cases because the URL is needed also for the server when it starts.

URL = "127.0.0.1:22111"
WithAcknowledge = true
# Currently, only "json" is supported. In the future, "gogo protobuf" could also be supported
MarshallerType = "json"
# The number of seconds when the client will try again to send the data
RetryDurationInSec = 5
BlockingAckOnError = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a descriptive comment on what this does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

17 changes: 10 additions & 7 deletions config/externalConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package config
type ExternalConfig struct {
ElasticSearchConnector ElasticSearchConfig
EventNotifierConnector EventNotifierConfig
WebSocketConnector WebSocketDriverConfig
HostDriverConfig HostDriverConfig
}

// ElasticSearchConfig will hold the configuration for the elastic search
Expand Down Expand Up @@ -38,10 +38,13 @@ type CovalentConfig struct {
RouteAcknowledgeData string
}

// WebSocketDriverConfig will hold the configuration for web socket driver
type WebSocketDriverConfig struct {
Enabled bool
WithAcknowledge bool
URL string
MarshallerType string
// HostDriverConfig will hold the configuration for WebSocket driver
type HostDriverConfig struct {
Enabled bool
IsServer bool
WithAcknowledge bool
BlockingAckOnError bool
URL string
MarshallerType string
RetryDurationInSec int
}
29 changes: 9 additions & 20 deletions factory/status/statusComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
nodeData "github.com/multiversx/mx-chain-core-go/data"
outportCore "github.com/multiversx/mx-chain-core-go/data/outport"
factoryMarshalizer "github.com/multiversx/mx-chain-core-go/marshal/factory"
"github.com/multiversx/mx-chain-core-go/websocketOutportDriver/data"
wsDriverFactory "github.com/multiversx/mx-chain-core-go/websocketOutportDriver/factory"
indexerFactory "github.com/multiversx/mx-chain-es-indexer-go/process/factory"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/statistics"
Expand Down Expand Up @@ -205,7 +203,7 @@ func (pc *statusComponents) Close() error {
// createOutportDriver creates a new outport.OutportHandler which is used to register outport drivers
// once a driver is subscribed it will receive data through the implemented outport.Driver methods
func (scf *statusComponentsFactory) createOutportDriver() (outport.OutportHandler, error) {
webSocketSenderDriverFactoryArgs, err := scf.makeWebSocketDriverArgs()
hostDriverArgs, err := scf.makeHostDriverArgs()
if err != nil {
return nil, err
}
Expand All @@ -219,10 +217,7 @@ func (scf *statusComponentsFactory) createOutportDriver() (outport.OutportHandle
RetrialInterval: common.RetrialIntervalForOutportDriver,
ElasticIndexerFactoryArgs: scf.makeElasticIndexerArgs(),
EventNotifierFactoryArgs: eventNotifierArgs,
WebSocketSenderDriverFactoryArgs: outportDriverFactory.WrappedOutportDriverWebSocketSenderFactoryArgs{
Enabled: scf.externalConfig.WebSocketConnector.Enabled,
OutportDriverWebSocketSenderFactoryArgs: webSocketSenderDriverFactoryArgs,
},
HostDriverArgs: hostDriverArgs,
}

return outportDriverFactory.CreateOutport(outportFactoryArgs)
Expand Down Expand Up @@ -266,24 +261,18 @@ func (scf *statusComponentsFactory) makeEventNotifierArgs() (*outportDriverFacto
}, nil
}

func (scf *statusComponentsFactory) makeWebSocketDriverArgs() (wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs, error) {
if !scf.externalConfig.WebSocketConnector.Enabled {
return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{}, nil
func (scf *statusComponentsFactory) makeHostDriverArgs() (outportDriverFactory.ArgsHostDriverFactory, error) {
if !scf.externalConfig.HostDriverConfig.Enabled {
return outportDriverFactory.ArgsHostDriverFactory{}, nil
}

marshaller, err := factoryMarshalizer.NewMarshalizer(scf.externalConfig.WebSocketConnector.MarshallerType)
marshaller, err := factoryMarshalizer.NewMarshalizer(scf.externalConfig.HostDriverConfig.MarshallerType)
if err != nil {
return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{}, err
return outportDriverFactory.ArgsHostDriverFactory{}, err
}

return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{
return outportDriverFactory.ArgsHostDriverFactory{
Marshaller: marshaller,
WebSocketConfig: data.WebSocketConfig{
URL: scf.externalConfig.WebSocketConnector.URL,
WithAcknowledge: scf.externalConfig.WebSocketConnector.WithAcknowledge,
},
Uint64ByteSliceConverter: scf.coreComponents.Uint64ByteSliceConverter(),
Log: log,
WithAcknowledge: scf.externalConfig.WebSocketConnector.WithAcknowledge,
HostConfig: scf.externalConfig.HostDriverConfig,
}, nil
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ require (
github.com/google/gops v0.3.18
github.com/gorilla/websocket v1.5.0
github.com/mitchellh/mapstructure v1.5.0
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6
github.com/multiversx/mx-chain-communication-go v0.0.0-20230511105730-3400290e42c0
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df
github.com/multiversx/mx-chain-crypto-go v1.2.5
github.com/multiversx/mx-chain-es-indexer-go v1.4.1-0.20230331083741-0fd8a2156e96
github.com/multiversx/mx-chain-logger-go v1.0.11
Expand All @@ -26,7 +27,7 @@ require (
github.com/pelletier/go-toml v1.9.3
github.com/pkg/errors v0.9.1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/urfave/cli v1.22.10
golang.org/x/crypto v0.5.0
gopkg.in/go-playground/validator.v8 v8.18.2
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,15 @@ github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI=
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-communication-go v0.0.0-20230511105730-3400290e42c0 h1:dZa9ZfN9R605VZYJNhC36eSXJumADO6bHNZMhMdMLfg=
github.com/multiversx/mx-chain-communication-go v0.0.0-20230511105730-3400290e42c0/go.mod h1:GPHOm4HSXbvC0IotMziWXQmhtsUe69ScBPYsb+mF9bk=
github.com/multiversx/mx-chain-core-go v1.1.30/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.1.31/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.1.34/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.0/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230329082847-b78e96c3ad5a/go.mod h1:/lovncjwo+pXQ7IAERwNzwCifeH7SAWk0DGqjorX2bc=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6 h1:4Nv0uxJbfSZ1fqWcQEYyQ1SdAAluDEbHjTi0X8ZFXFs=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6/go.mod h1:/lovncjwo+pXQ7IAERwNzwCifeH7SAWk0DGqjorX2bc=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df h1:ADV4QOB2Tg42SYyVmYNq4FBXCc4bzD5EA66IFhF+fb0=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df/go.mod h1:jzYFSiYBuO0dGpGFXnZWSwcwcKP7Flyn/X41y4zIQrQ=
github.com/multiversx/mx-chain-crypto-go v1.2.5 h1:tuq3BUNMhKud5DQbZi9DiVAAHUXypizy8zPH0NpTGZk=
github.com/multiversx/mx-chain-crypto-go v1.2.5/go.mod h1:teqhNyWEqfMPgNn8sgWXlgtJ1a36jGCnhs/tRpXW6r4=
github.com/multiversx/mx-chain-es-indexer-go v1.4.1-0.20230331083741-0fd8a2156e96 h1:okIfLr+NqX04eHNp9k97KuLhpYfLJOjmGZaOia9xcGg=
Expand Down Expand Up @@ -788,8 +790,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
Expand Down
41 changes: 41 additions & 0 deletions outport/factory/hostDriverFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package factory

import (
"github.com/multiversx/mx-chain-communication-go/websocket/data"
"github.com/multiversx/mx-chain-communication-go/websocket/factory"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/outport"
"github.com/multiversx/mx-chain-go/outport/host"
logger "github.com/multiversx/mx-chain-logger-go"
)

type ArgsHostDriverFactory struct {
HostConfig config.HostDriverConfig
Marshaller marshal.Marshalizer
}

var log = logger.GetOrCreate("outport/factory/hostdriver")

func CreateHostDriver(args ArgsHostDriverFactory) (outport.Driver, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comments in the whole file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

wsHost, err := factory.CreateWebSocketHost(factory.ArgsWebSocketHost{
WebSocketConfig: data.WebSocketConfig{
URL: args.HostConfig.URL,
WithAcknowledge: args.HostConfig.WithAcknowledge,
IsServer: args.HostConfig.IsServer,
RetryDurationInSec: args.HostConfig.RetryDurationInSec,
BlockingAckOnError: args.HostConfig.BlockingAckOnError,
},
Marshaller: args.Marshaller,
Log: log,
})
if err != nil {
return nil, err
}

return host.NewHostDriver(host.ArgsHostDriver{
Marshaller: args.Marshaller,
SenderHost: wsHost,
Log: log,
})
}
1 change: 1 addition & 0 deletions outport/factory/hostDriverFactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

32 changes: 10 additions & 22 deletions outport/factory/outportFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ package factory
import (
"time"

wsDriverFactory "github.com/multiversx/mx-chain-core-go/websocketOutportDriver/factory"
indexerFactory "github.com/multiversx/mx-chain-es-indexer-go/process/factory"
"github.com/multiversx/mx-chain-go/outport"
)

// WrappedOutportDriverWebSocketSenderFactoryArgs extends the wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs structure with the Enabled field
type WrappedOutportDriverWebSocketSenderFactoryArgs struct {
Enabled bool
wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs
}

// OutportFactoryArgs holds the factory arguments of different outport drivers
type OutportFactoryArgs struct {
RetrialInterval time.Duration
ElasticIndexerFactoryArgs indexerFactory.ArgsIndexerFactory
EventNotifierFactoryArgs *EventNotifierFactoryArgs
WebSocketSenderDriverFactoryArgs WrappedOutportDriverWebSocketSenderFactoryArgs
RetrialInterval time.Duration
ElasticIndexerFactoryArgs indexerFactory.ArgsIndexerFactory
EventNotifierFactoryArgs *EventNotifierFactoryArgs
HostDriverArgs ArgsHostDriverFactory
}

// CreateOutport will create a new instance of OutportHandler
Expand Down Expand Up @@ -53,7 +46,7 @@ func createAndSubscribeDrivers(outport outport.OutportHandler, args *OutportFact
return err
}

return createAndSubscribeWebSocketDriver(outport, args.WebSocketSenderDriverFactoryArgs)
return createAndSubscribeHostDriverIfNeeded(outport, args.HostDriverArgs)
}

func createAndSubscribeElasticDriverIfNeeded(
Expand Down Expand Up @@ -96,23 +89,18 @@ func checkArguments(args *OutportFactoryArgs) error {
return nil
}

func createAndSubscribeWebSocketDriver(
func createAndSubscribeHostDriverIfNeeded(
outport outport.OutportHandler,
args WrappedOutportDriverWebSocketSenderFactoryArgs,
args ArgsHostDriverFactory,
) error {
if !args.Enabled {
if !args.HostConfig.Enabled {
return nil
}

wsFactory, err := wsDriverFactory.NewOutportDriverWebSocketSenderFactory(args.OutportDriverWebSocketSenderFactoryArgs)
if err != nil {
return err
}

wsDriver, err := wsFactory.Create()
hostDriver, err := CreateHostDriver(args)
if err != nil {
return err
}

return outport.SubscribeDriver(wsDriver)
return outport.SubscribeDriver(hostDriver)
}
114 changes: 114 additions & 0 deletions outport/host/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package host

import (
"fmt"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/atomic"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
)

// ArgsHostDriver holds the arguments needed for creating a new hostDriver
type ArgsHostDriver struct {
Marshaller marshal.Marshalizer
SenderHost SenderHost
Log core.Logger
}

type hostDriver struct {
marshaller marshal.Marshalizer
senderHost SenderHost
isClosed atomic.Flag
log core.Logger
}

func NewHostDriver(args ArgsHostDriver) (*hostDriver, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

if check.IfNil(args.SenderHost) {
return nil, ErrNilHost
}
if check.IfNil(args.Marshaller) {
return nil, core.ErrNilMarshalizer
}
if check.IfNil(args.Log) {
return nil, core.ErrNilLogger
}

return &hostDriver{
marshaller: args.Marshaller,
senderHost: args.SenderHost,
log: args.Log,
isClosed: atomic.Flag{},
}, nil
}

func (o *hostDriver) SaveBlock(outportBlock *outport.OutportBlock) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

return o.handleAction(outportBlock, outport.TopicSaveBlock)
}

// RevertIndexedBlock will handle the action of reverting the indexed block
func (o *hostDriver) RevertIndexedBlock(blockData *outport.BlockData) error {
return o.handleAction(blockData, outport.TopicRevertIndexedBlock)
}

// SaveRoundsInfo will handle the saving of rounds
func (o *hostDriver) SaveRoundsInfo(roundsInfos *outport.RoundsInfo) error {
return o.handleAction(roundsInfos, outport.TopicSaveRoundsInfo)
}

// SaveValidatorsPubKeys will handle the saving of the validators' public keys
func (o *hostDriver) SaveValidatorsPubKeys(validatorsPubKeys *outport.ValidatorsPubKeys) error {
return o.handleAction(validatorsPubKeys, outport.TopicSaveValidatorsPubKeys)
}

// SaveValidatorsRating will handle the saving of the validators' rating
func (o *hostDriver) SaveValidatorsRating(validatorsRating *outport.ValidatorsRating) error {
return o.handleAction(validatorsRating, outport.TopicSaveValidatorsRating)
}

// SaveAccounts will handle the accounts' saving
func (o *hostDriver) SaveAccounts(accounts *outport.Accounts) error {
return o.handleAction(accounts, outport.TopicSaveAccounts)
}

// FinalizedBlock will handle the finalized block
func (o *hostDriver) FinalizedBlock(finalizedBlock *outport.FinalizedBlock) error {
return o.handleAction(finalizedBlock, outport.TopicFinalizedBlock)
}

// GetMarshaller returns the internal marshaller
func (o *hostDriver) GetMarshaller() marshal.Marshalizer {
return o.marshaller
}

func (o *hostDriver) handleAction(args interface{}, topic string) error {
if o.isClosed.IsSet() {
return ErrHostIsClosed
}

marshalledPayload, err := o.marshaller.Marshal(args)
if err != nil {
o.log.Error("cannot marshal block", "topic", topic, "error", err)
return fmt.Errorf("%w while marshaling block for topic %s", err, topic)
}

err = o.senderHost.Send(marshalledPayload, topic)
if err != nil {
o.log.Error("cannot send on route", "topic", topic, "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this still can happen while closing the node, I would filter out log.Error prints only if they are not of type closed error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return fmt.Errorf("%w while sending data on route for topic %s", err, topic)
}

return nil
}

// Close will handle the closing of the outport driver web socket sender
func (o *hostDriver) Close() error {
o.isClosed.SetValue(true)
return o.senderHost.Close()
}

// IsInterfaceNil returns true if there is no value under the interface
func (o *hostDriver) IsInterfaceNil() bool {
return o == nil
}