Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New web-sockets connector #5170

Merged
merged 17 commits into from
May 18, 2023
13 changes: 11 additions & 2 deletions cmd/node/config/external.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,19 @@
# marshalled structures in block events data
MarshallerType = "json"

[WebSocketConnector]
[HostDriverConfig]
# This flag shall only be used for observer nodes
Enabled = false
URL = "localhost:22111"
# This flag will start the WebSocket connector as server or client (can be "client" or "server")
Mode = "client"
# URL for the WebSocket client/server connection
# This value represents the IP address and port number that the WebSocket client or server will use to establish a connection.
URL = "127.0.0.1:22111"
# After a message will be sent it will wait for an ack message if this flag is enabled
WithAcknowledge = true
# Currently, only "json" is supported. In the future, "gogo protobuf" could also be supported
MarshallerType = "json"
# The number of seconds when the client will try again to send the data
RetryDurationInSec = 5
# Sets if, in case of data payload processing error, we should block or not the advancement to the next processing event. Set this to true if you wish the node to stop processing blocks if the client/server encounters errors while processing requests.
BlockingAckOnError = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a descriptive comment on what this does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

17 changes: 10 additions & 7 deletions config/externalConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package config
type ExternalConfig struct {
ElasticSearchConnector ElasticSearchConfig
EventNotifierConnector EventNotifierConfig
WebSocketConnector WebSocketDriverConfig
HostDriverConfig HostDriverConfig
}

// ElasticSearchConfig will hold the configuration for the elastic search
Expand Down Expand Up @@ -38,10 +38,13 @@ type CovalentConfig struct {
RouteAcknowledgeData string
}

// WebSocketDriverConfig will hold the configuration for web socket driver
type WebSocketDriverConfig struct {
Enabled bool
WithAcknowledge bool
URL string
MarshallerType string
// HostDriverConfig will hold the configuration for WebSocket driver
type HostDriverConfig struct {
Enabled bool
WithAcknowledge bool
BlockingAckOnError bool
URL string
MarshallerType string
Mode string
RetryDurationInSec int
}
29 changes: 9 additions & 20 deletions factory/status/statusComponents.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
nodeData "github.com/multiversx/mx-chain-core-go/data"
outportCore "github.com/multiversx/mx-chain-core-go/data/outport"
factoryMarshalizer "github.com/multiversx/mx-chain-core-go/marshal/factory"
"github.com/multiversx/mx-chain-core-go/websocketOutportDriver/data"
wsDriverFactory "github.com/multiversx/mx-chain-core-go/websocketOutportDriver/factory"
indexerFactory "github.com/multiversx/mx-chain-es-indexer-go/process/factory"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/common/statistics"
Expand Down Expand Up @@ -205,7 +203,7 @@ func (pc *statusComponents) Close() error {
// createOutportDriver creates a new outport.OutportHandler which is used to register outport drivers
// once a driver is subscribed it will receive data through the implemented outport.Driver methods
func (scf *statusComponentsFactory) createOutportDriver() (outport.OutportHandler, error) {
webSocketSenderDriverFactoryArgs, err := scf.makeWebSocketDriverArgs()
hostDriverArgs, err := scf.makeHostDriverArgs()
if err != nil {
return nil, err
}
Expand All @@ -219,10 +217,7 @@ func (scf *statusComponentsFactory) createOutportDriver() (outport.OutportHandle
RetrialInterval: common.RetrialIntervalForOutportDriver,
ElasticIndexerFactoryArgs: scf.makeElasticIndexerArgs(),
EventNotifierFactoryArgs: eventNotifierArgs,
WebSocketSenderDriverFactoryArgs: outportDriverFactory.WrappedOutportDriverWebSocketSenderFactoryArgs{
Enabled: scf.externalConfig.WebSocketConnector.Enabled,
OutportDriverWebSocketSenderFactoryArgs: webSocketSenderDriverFactoryArgs,
},
HostDriverArgs: hostDriverArgs,
}

return outportDriverFactory.CreateOutport(outportFactoryArgs)
Expand Down Expand Up @@ -266,24 +261,18 @@ func (scf *statusComponentsFactory) makeEventNotifierArgs() (*outportDriverFacto
}, nil
}

func (scf *statusComponentsFactory) makeWebSocketDriverArgs() (wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs, error) {
if !scf.externalConfig.WebSocketConnector.Enabled {
return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{}, nil
func (scf *statusComponentsFactory) makeHostDriverArgs() (outportDriverFactory.ArgsHostDriverFactory, error) {
if !scf.externalConfig.HostDriverConfig.Enabled {
return outportDriverFactory.ArgsHostDriverFactory{}, nil
}

marshaller, err := factoryMarshalizer.NewMarshalizer(scf.externalConfig.WebSocketConnector.MarshallerType)
marshaller, err := factoryMarshalizer.NewMarshalizer(scf.externalConfig.HostDriverConfig.MarshallerType)
if err != nil {
return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{}, err
return outportDriverFactory.ArgsHostDriverFactory{}, err
}

return wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs{
return outportDriverFactory.ArgsHostDriverFactory{
Marshaller: marshaller,
WebSocketConfig: data.WebSocketConfig{
URL: scf.externalConfig.WebSocketConnector.URL,
WithAcknowledge: scf.externalConfig.WebSocketConnector.WithAcknowledge,
},
Uint64ByteSliceConverter: scf.coreComponents.Uint64ByteSliceConverter(),
Log: log,
WithAcknowledge: scf.externalConfig.WebSocketConnector.WithAcknowledge,
HostConfig: scf.externalConfig.HostDriverConfig,
}, nil
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ require (
github.com/google/gops v0.3.18
github.com/gorilla/websocket v1.5.0
github.com/mitchellh/mapstructure v1.5.0
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6
github.com/multiversx/mx-chain-communication-go v0.0.0-20230512095548-5bc637293104
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df
github.com/multiversx/mx-chain-crypto-go v1.2.5
github.com/multiversx/mx-chain-es-indexer-go v1.4.1-0.20230331083741-0fd8a2156e96
github.com/multiversx/mx-chain-logger-go v1.0.11
Expand All @@ -26,7 +27,7 @@ require (
github.com/pelletier/go-toml v1.9.3
github.com/pkg/errors v0.9.1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/urfave/cli v1.22.10
golang.org/x/crypto v0.5.0
gopkg.in/go-playground/validator.v8 v8.18.2
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,15 @@ github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI=
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-communication-go v0.0.0-20230512095548-5bc637293104 h1:oFsYNkebv7TQygdEjN4aGgQ8ICLPmS9bDJmzlOHtU2Y=
github.com/multiversx/mx-chain-communication-go v0.0.0-20230512095548-5bc637293104/go.mod h1:GPHOm4HSXbvC0IotMziWXQmhtsUe69ScBPYsb+mF9bk=
github.com/multiversx/mx-chain-core-go v1.1.30/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.1.31/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.1.34/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.0/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230329082847-b78e96c3ad5a/go.mod h1:/lovncjwo+pXQ7IAERwNzwCifeH7SAWk0DGqjorX2bc=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6 h1:4Nv0uxJbfSZ1fqWcQEYyQ1SdAAluDEbHjTi0X8ZFXFs=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230330105824-932a718276f6/go.mod h1:/lovncjwo+pXQ7IAERwNzwCifeH7SAWk0DGqjorX2bc=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df h1:ADV4QOB2Tg42SYyVmYNq4FBXCc4bzD5EA66IFhF+fb0=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df/go.mod h1:jzYFSiYBuO0dGpGFXnZWSwcwcKP7Flyn/X41y4zIQrQ=
github.com/multiversx/mx-chain-crypto-go v1.2.5 h1:tuq3BUNMhKud5DQbZi9DiVAAHUXypizy8zPH0NpTGZk=
github.com/multiversx/mx-chain-crypto-go v1.2.5/go.mod h1:teqhNyWEqfMPgNn8sgWXlgtJ1a36jGCnhs/tRpXW6r4=
github.com/multiversx/mx-chain-es-indexer-go v1.4.1-0.20230331083741-0fd8a2156e96 h1:okIfLr+NqX04eHNp9k97KuLhpYfLJOjmGZaOia9xcGg=
Expand Down Expand Up @@ -788,8 +790,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
Expand Down
42 changes: 42 additions & 0 deletions outport/factory/hostDriverFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package factory

import (
"github.com/multiversx/mx-chain-communication-go/websocket/data"
"github.com/multiversx/mx-chain-communication-go/websocket/factory"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/outport"
"github.com/multiversx/mx-chain-go/outport/host"
logger "github.com/multiversx/mx-chain-logger-go"
)

type ArgsHostDriverFactory struct {
HostConfig config.HostDriverConfig
Marshaller marshal.Marshalizer
}

var log = logger.GetOrCreate("outport/factory/hostdriver")

// CreateHostDriver will create a new instance of outport.Driver
func CreateHostDriver(args ArgsHostDriverFactory) (outport.Driver, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing comments in the whole file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

wsHost, err := factory.CreateWebSocketHost(factory.ArgsWebSocketHost{
WebSocketConfig: data.WebSocketConfig{
URL: args.HostConfig.URL,
WithAcknowledge: args.HostConfig.WithAcknowledge,
Mode: args.HostConfig.Mode,
RetryDurationInSec: args.HostConfig.RetryDurationInSec,
BlockingAckOnError: args.HostConfig.BlockingAckOnError,
},
Marshaller: args.Marshaller,
Log: log,
})
if err != nil {
return nil, err
}

return host.NewHostDriver(host.ArgsHostDriver{
Marshaller: args.Marshaller,
SenderHost: wsHost,
Log: log,
})
}
30 changes: 30 additions & 0 deletions outport/factory/hostDriverFactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


import (
"fmt"
"testing"

"github.com/multiversx/mx-chain-communication-go/websocket/data"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/stretchr/testify/require"
)

func TestCreateHostDriver(t *testing.T) {
t.Parallel()

args := ArgsHostDriverFactory{
HostConfig: config.HostDriverConfig{
URL: "localhost",
RetryDurationInSec: 1,
MarshallerType: "json",
Mode: data.ModeClient,
},
Marshaller: &testscommon.MarshalizerMock{},
}

driver, err := CreateHostDriver(args)
require.Nil(t, err)
require.NotNil(t, driver)
require.Equal(t, "*host.hostDriver", fmt.Sprintf("%T", driver))
}
32 changes: 10 additions & 22 deletions outport/factory/outportFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ package factory
import (
"time"

wsDriverFactory "github.com/multiversx/mx-chain-core-go/websocketOutportDriver/factory"
indexerFactory "github.com/multiversx/mx-chain-es-indexer-go/process/factory"
"github.com/multiversx/mx-chain-go/outport"
)

// WrappedOutportDriverWebSocketSenderFactoryArgs extends the wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs structure with the Enabled field
type WrappedOutportDriverWebSocketSenderFactoryArgs struct {
Enabled bool
wsDriverFactory.OutportDriverWebSocketSenderFactoryArgs
}

// OutportFactoryArgs holds the factory arguments of different outport drivers
type OutportFactoryArgs struct {
RetrialInterval time.Duration
ElasticIndexerFactoryArgs indexerFactory.ArgsIndexerFactory
EventNotifierFactoryArgs *EventNotifierFactoryArgs
WebSocketSenderDriverFactoryArgs WrappedOutportDriverWebSocketSenderFactoryArgs
RetrialInterval time.Duration
ElasticIndexerFactoryArgs indexerFactory.ArgsIndexerFactory
EventNotifierFactoryArgs *EventNotifierFactoryArgs
HostDriverArgs ArgsHostDriverFactory
}

// CreateOutport will create a new instance of OutportHandler
Expand Down Expand Up @@ -53,7 +46,7 @@ func createAndSubscribeDrivers(outport outport.OutportHandler, args *OutportFact
return err
}

return createAndSubscribeWebSocketDriver(outport, args.WebSocketSenderDriverFactoryArgs)
return createAndSubscribeHostDriverIfNeeded(outport, args.HostDriverArgs)
}

func createAndSubscribeElasticDriverIfNeeded(
Expand Down Expand Up @@ -96,23 +89,18 @@ func checkArguments(args *OutportFactoryArgs) error {
return nil
}

func createAndSubscribeWebSocketDriver(
func createAndSubscribeHostDriverIfNeeded(
outport outport.OutportHandler,
args WrappedOutportDriverWebSocketSenderFactoryArgs,
args ArgsHostDriverFactory,
) error {
if !args.Enabled {
if !args.HostConfig.Enabled {
return nil
}

wsFactory, err := wsDriverFactory.NewOutportDriverWebSocketSenderFactory(args.OutportDriverWebSocketSenderFactoryArgs)
if err != nil {
return err
}

wsDriver, err := wsFactory.Create()
hostDriver, err := CreateHostDriver(args)
if err != nil {
return err
}

return outport.SubscribeDriver(wsDriver)
return outport.SubscribeDriver(hostDriver)
}