From 73e99c99c60d096faeddb3f7e4059f3f1eb658d8 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 30 Jun 2021 11:48:10 -0400 Subject: [PATCH] feat(dot/telemetry): implement telemetry message network_state (#1618) * refactor telemetry messages to map format * add basic network state telemetry message * refactor message sender to handle interface{} types * refactor telemetry messages to be structs * lint * go fmt * lint * move msg building logic outside msg sending loop * make telemetry messages an interface * Lookup transactions count from TransactionsState * address comments * fix mocks for tests * lint * refactor TelemetryMessage to Message * update mock handler to return result * add TransactionsCount to mockhandler * move logic to build new network state message * lint * fix interface * update mockhandler * lint --- chain/dev/genesis.json | 9 +- chain/gssmr/genesis.json | 8 +- dot/core/messages.go | 5 + dot/core/messages_test.go | 6 +- dot/network/mock_transaction_handler.go | 14 ++ dot/network/service.go | 33 ++-- dot/network/service_test.go | 1 + dot/network/state.go | 1 + dot/network/test_helpers.go | 1 + dot/network/transaction_test.go | 1 + dot/node.go | 21 ++- dot/sync/syncer.go | 12 +- dot/telemetry/telemetry.go | 226 +++++++++++++++++++----- dot/telemetry/telemetry_test.go | 71 ++++---- 14 files changed, 289 insertions(+), 120 deletions(-) diff --git a/chain/dev/genesis.json b/chain/dev/genesis.json index a5a8ca24e5..6b76f85b45 100644 --- a/chain/dev/genesis.json +++ b/chain/dev/genesis.json @@ -3,7 +3,12 @@ "id": "dev", "chainType": "Local", "bootNodes": [], - "telemetryEndpoints": null, + "telemetryEndpoints": [ + [ + "wss://telemetry.polkadot.io/submit/", + 0 + ] + ], "protocolId": "/gossamer/dev/0", "genesis": { "raw": { @@ -32,4 +37,4 @@ "forkBlocks": null, "badBlocks": null, "consensusEngine": "" -} \ No newline at end of file +} diff --git a/chain/gssmr/genesis.json b/chain/gssmr/genesis.json index 2e6f157d90..080619e282 100644 --- a/chain/gssmr/genesis.json +++ b/chain/gssmr/genesis.json @@ -3,6 +3,12 @@ "id": "gssmr", "chainType": "Local", "bootNodes": [], + "telemetryEndpoints": [ + [ + "wss://telemetry.polkadot.io/submit/", + 0 + ] + ], "protocolId": "/gossamer/gssmr/0", "genesis": { "raw": { @@ -40,4 +46,4 @@ "forkBlocks": null, "badBlocks": null, "consensusEngine": "" -} \ No newline at end of file +} diff --git a/dot/core/messages.go b/dot/core/messages.go index 9fa5e97b51..318414eab8 100644 --- a/dot/core/messages.go +++ b/dot/core/messages.go @@ -57,3 +57,8 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo return len(msg.Extrinsics) > 0, nil } + +// TransactionsCount returns number for pending transactions in pool +func (s *Service) TransactionsCount() int { + return len(s.transactionState.PendingInPool()) +} diff --git a/dot/core/messages_test.go b/dot/core/messages_test.go index 2717d3704e..6b718b0d5c 100644 --- a/dot/core/messages_test.go +++ b/dot/core/messages_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - . "github.com/ChainSafe/gossamer/dot/core/mocks" + . "github.com/ChainSafe/gossamer/dot/core/mocks" // nolint "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/dot/types" @@ -38,7 +38,7 @@ import ( func TestService_ProcessBlockAnnounceMessage(t *testing.T) { // TODO: move to sync package - net := new(MockNetwork) + net := new(MockNetwork) // nolint cfg := &Config{ Network: net, @@ -136,7 +136,7 @@ func TestService_HandleTransactionMessage(t *testing.T) { ks := keystore.NewGlobalKeystore() ks.Acco.Insert(kp) - bp := new(MockBlockProducer) + bp := new(MockBlockProducer) // nolint blockC := make(chan types.Block) bp.On("GetBlockChannel", nil).Return(blockC) diff --git a/dot/network/mock_transaction_handler.go b/dot/network/mock_transaction_handler.go index c3eaa07972..5fe1d7e597 100644 --- a/dot/network/mock_transaction_handler.go +++ b/dot/network/mock_transaction_handler.go @@ -29,3 +29,17 @@ func (_m *MockTransactionHandler) HandleTransactionMessage(_a0 *TransactionMessa return r0, r1 } + +// TransactionsCount provides a mock function with given fields: +func (_m *MockTransactionHandler) TransactionsCount() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} diff --git a/dot/network/service.go b/dot/network/service.go index 738c1e0fb3..429ec79c52 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -20,6 +20,7 @@ import ( "context" "errors" "io" + "math/big" "os" "sync" "time" @@ -315,11 +316,12 @@ main: case <-ticker.C: o := s.host.bwc.GetBandwidthTotals() - err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("bandwidth_download", o.RateIn), - telemetry.NewKeyValue("bandwidth_upload", o.RateOut), - telemetry.NewKeyValue("msg", "system.interval"), - telemetry.NewKeyValue("peers", s.host.peerCount()))) + err := telemetry.GetInstance().SendMessage(telemetry.NewBandwidthTM(o.RateIn, o.RateOut, s.host.peerCount())) + if err != nil { + logger.Debug("problem sending system.interval telemetry message", "error", err) + } + + err = telemetry.GetInstance().SendMessage(telemetry.NewNetworkStateTM(s.host.h, s.Peers())) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } @@ -333,19 +335,22 @@ func (s *Service) sentBlockIntervalTelemetry() { if err != nil { continue } + bestHash := best.Hash() + finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint if err != nil { continue } - - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("best", best.Hash().String()), - telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint - telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint - telemetry.NewKeyValue("height", best.Number), - telemetry.NewKeyValue("msg", "system.interval"), - telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count - telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size + finalizedHash := finalized.Hash() + + err = telemetry.GetInstance().SendMessage(telemetry.NewBlockIntervalTM( + &bestHash, + best.Number, + &finalizedHash, + finalized.Number, + big.NewInt(int64(s.transactionHandler.TransactionsCount())), + big.NewInt(0), // todo (ed) determine where to get used_state_cache_size + )) if err != nil { logger.Debug("problem sending system.interval telemetry message", "error", err) } diff --git a/dot/network/service_test.go b/dot/network/service_test.go index db20565b1d..cbc44f64fd 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -84,6 +84,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { if cfg.TransactionHandler == nil { mocktxhandler := &MockTransactionHandler{} mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil) + mocktxhandler.On("TransactionsCount").Return(0) cfg.TransactionHandler = mocktxhandler } diff --git a/dot/network/state.go b/dot/network/state.go index 61c777526b..323fdf2f02 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -56,4 +56,5 @@ type Syncer interface { // TransactionHandler is the interface used by the transactions sub-protocol type TransactionHandler interface { HandleTransactionMessage(*TransactionMessage) (bool, error) + TransactionsCount() int } diff --git a/dot/network/test_helpers.go b/dot/network/test_helpers.go index d5ff38f2ff..cf513983ec 100644 --- a/dot/network/test_helpers.go +++ b/dot/network/test_helpers.go @@ -59,6 +59,7 @@ func NewMockSyncer() *MockSyncer { func NewMockTransactionHandler() *MockTransactionHandler { mocktxhandler := new(MockTransactionHandler) mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(nil) + mocktxhandler.On("TransactionsCount").Return(0) return mocktxhandler } diff --git a/dot/network/transaction_test.go b/dot/network/transaction_test.go index cb78794c31..41b2f38d63 100644 --- a/dot/network/transaction_test.go +++ b/dot/network/transaction_test.go @@ -56,6 +56,7 @@ func TestHandleTransactionMessage(t *testing.T) { basePath := utils.NewTestBasePath(t, "nodeA") mockhandler := &MockTransactionHandler{} mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil) + mockhandler.On("TransactionsCount").Return(0) config := &Config{ BasePath: basePath, diff --git a/dot/node.go b/dot/node.go index 768999bb40..9af3efdb8a 100644 --- a/dot/node.go +++ b/dot/node.go @@ -345,17 +345,16 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, } telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints) - - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( - telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority), - telemetry.NewKeyValue("chain", sysSrvc.ChainName()), - telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()), - telemetry.NewKeyValue("implementation", sysSrvc.SystemName()), - telemetry.NewKeyValue("msg", "system.connected"), - telemetry.NewKeyValue("name", cfg.Global.Name), - telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID), - telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)), - telemetry.NewKeyValue("version", sysSrvc.SystemVersion()))) + genesisHash := stateSrvc.Block.GenesisHash() + err = telemetry.GetInstance().SendMessage(telemetry.NewSystemConnectedTM( + cfg.Core.GrandpaAuthority, + sysSrvc.ChainName(), + &genesisHash, + sysSrvc.SystemName(), + cfg.Global.Name, + networkSrvc.NetworkState().PeerID, + strconv.FormatInt(time.Now().UnixNano(), 10), + sysSrvc.SystemVersion())) if err != nil { logger.Debug("problem sending system.connected telemetry message", "err", err) } diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index c6e3b5cf5b..9ac360ed3d 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -346,13 +346,13 @@ func (s *Service) handleBlock(block *types.Block) error { logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash()) - err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage( // nolint - telemetry.NewKeyValue("best", block.Header.Hash().String()), - telemetry.NewKeyValue("height", block.Header.Number.Uint64()), - telemetry.NewKeyValue("msg", "block.import"), - telemetry.NewKeyValue("origin", "NetworkInitialSync"))) + blockHash := block.Header.Hash() + err = telemetry.GetInstance().SendMessage(telemetry.NewBlockImportTM( + &blockHash, + block.Header.Number, + "NetworkInitialSync")) if err != nil { - logger.Trace("problem sending block.import telemetry message", "error", err) + logger.Debug("problem sending block.import telemetry message", "error", err) } return nil diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 22ba7286ef..2df26725b7 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -19,12 +19,16 @@ package telemetry import ( "encoding/json" "errors" + "fmt" + "math/big" "sync" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" log "github.com/ChainSafe/log15" "github.com/gorilla/websocket" + libp2phost "github.com/libp2p/go-libp2p-core/host" ) type telemetryConnection struct { @@ -33,11 +37,6 @@ type telemetryConnection struct { sync.Mutex } -// Message struct to hold telemetry message data -type Message struct { - values map[string]interface{} -} - // Handler struct for holding telemetry related things type Handler struct { msg chan Message @@ -46,12 +45,6 @@ type Handler struct { sendMessageTimeout time.Duration } -// KeyValue object to hold key value pairs used in telemetry messages -type KeyValue struct { - key string - value interface{} -} - var ( once sync.Once handlerInstance *Handler @@ -75,25 +68,6 @@ func GetInstance() *Handler { //nolint return handlerInstance } -// NewTelemetryMessage builds a telemetry message -func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint - mvals := make(map[string]interface{}) - for _, v := range values { - mvals[v.key] = v.value - } - return &Message{ - values: mvals, - } -} - -// NewKeyValue builds a key value pair for telemetry messages -func NewKeyValue(key string, value interface{}) *KeyValue { //nolint - return &KeyValue{ - key: key, - value: value, - } -} - // AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { for _, v := range conns { @@ -112,11 +86,11 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { } // SendMessage sends Message to connected telemetry listeners -func (h *Handler) SendMessage(msg *Message) error { +func (h *Handler) SendMessage(msg Message) error { t := time.NewTicker(h.sendMessageTimeout) defer t.Stop() select { - case h.msg <- *msg: + case h.msg <- msg: case <-t.C: return errors.New("timeout sending message") @@ -128,33 +102,191 @@ func (h *Handler) startListening() { for { msg := <-h.msg go func() { + msgBytes, err := h.msgToJSON(msg) + if err != nil { + h.log.Debug("issue decoding telemetry message", "error", err) + return + } for _, conn := range h.connections { conn.Lock() - err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg)) + defer conn.Unlock() + + err = conn.wsconn.WriteMessage(websocket.TextMessage, msgBytes) if err != nil { h.log.Warn("issue while sending telemetry message", "error", err) } - conn.Unlock() } }() } } -type response struct { - ID int `json:"id"` - Payload map[string]interface{} `json:"payload"` - Timestamp time.Time `json:"ts"` -} +func (h *Handler) msgToJSON(message Message) ([]byte, error) { + messageBytes, err := json.Marshal(message) + if err != nil { + return nil, err + } -func msgToBytes(message Message) []byte { - res := response{ - ID: 1, // todo (ed) determine how this is used - Payload: message.values, - Timestamp: time.Now(), + messageMap := make(map[string]interface{}) + err = json.Unmarshal(messageBytes, &messageMap) + if err != nil { + return nil, err } - resB, err := json.Marshal(res) + + messageMap["ts"] = time.Now() + + messageMap["msg"] = message.messageType() + + fullRes, err := json.Marshal(messageMap) if err != nil { - return nil + return nil, err } - return resB + return fullRes, nil +} + +// Message interface for Message functions +type Message interface { + messageType() string +} + +// SystemConnectedTM struct to hold system connected telemetry messages +type SystemConnectedTM struct { + Authority bool `json:"authority"` + Chain string `json:"chain"` + GenesisHash *common.Hash `json:"genesis_hash"` + Implementation string `json:"implementation"` + Msg string `json:"msg"` + Name string `json:"name"` + NetworkID string `json:"network_id"` + StartupTime string `json:"startup_time"` + Version string `json:"version"` +} + +// NewSystemConnectedTM function to create new System Connected Telemetry Message +func NewSystemConnectedTM(authority bool, chain string, genesisHash *common.Hash, + implementation, name, networkID, startupTime, version string) *SystemConnectedTM { + return &SystemConnectedTM{ + Authority: authority, + Chain: chain, + GenesisHash: genesisHash, + Implementation: implementation, + Msg: "system.connected", + Name: name, + NetworkID: networkID, + StartupTime: startupTime, + Version: version, + } +} +func (tm *SystemConnectedTM) messageType() string { + return tm.Msg +} + +// BlockImportTM struct to hold block import telemetry messages +type BlockImportTM struct { + BestHash *common.Hash `json:"best"` + Height *big.Int `json:"height"` + Msg string `json:"msg"` + Origin string `json:"origin"` +} + +// NewBlockImportTM function to create new Block Import Telemetry Message +func NewBlockImportTM(bestHash *common.Hash, height *big.Int, origin string) *BlockImportTM { + return &BlockImportTM{ + BestHash: bestHash, + Height: height, + Msg: "block.import", + Origin: origin, + } +} + +func (tm *BlockImportTM) messageType() string { + return tm.Msg +} + +// SystemIntervalTM struct to hold system interval telemetry messages +type SystemIntervalTM struct { + BandwidthDownload float64 `json:"bandwidth_download,omitempty"` + BandwidthUpload float64 `json:"bandwidth_upload,omitempty"` + Msg string `json:"msg"` + Peers int `json:"peers,omitempty"` + BestHash *common.Hash `json:"best,omitempty"` + BestHeight *big.Int `json:"height,omitempty"` + FinalisedHash *common.Hash `json:"finalized_hash,omitempty"` // nolint + FinalisedHeight *big.Int `json:"finalized_height,omitempty"` // nolint + TxCount *big.Int `json:"txcount,omitempty"` + UsedStateCacheSize *big.Int `json:"used_state_cache_size,omitempty"` +} + +// NewBandwidthTM function to create new Bandwidth Telemetry Message +func NewBandwidthTM(bandwidthDownload, bandwidthUpload float64, peers int) *SystemIntervalTM { + return &SystemIntervalTM{ + BandwidthDownload: bandwidthDownload, + BandwidthUpload: bandwidthUpload, + Msg: "system.interval", + Peers: peers, + } +} + +// NewBlockIntervalTM function to create new Block Interval Telemetry Message +func NewBlockIntervalTM(beshHash *common.Hash, bestHeight *big.Int, finalisedHash *common.Hash, + finalisedHeight, txCount, usedStateCacheSize *big.Int) *SystemIntervalTM { + return &SystemIntervalTM{ + Msg: "system.interval", + BestHash: beshHash, + BestHeight: bestHeight, + FinalisedHash: finalisedHash, + FinalisedHeight: finalisedHeight, + TxCount: txCount, + UsedStateCacheSize: usedStateCacheSize, + } +} + +func (tm *SystemIntervalTM) messageType() string { + return tm.Msg +} + +type peerInfo struct { + Roles byte `json:"roles"` + BestHash string `json:"bestHash"` + BestNumber uint64 `json:"bestNumber"` +} + +// NetworkStateTM struct to hold network state telemetry messages +type NetworkStateTM struct { + Msg string `json:"msg"` + State map[string]interface{} `json:"state"` +} + +// NewNetworkStateTM function to create new Network State Telemetry Message +func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *NetworkStateTM { + netState := make(map[string]interface{}) + netState["peerId"] = host.ID() + hostAddrs := []string{} + for _, v := range host.Addrs() { + hostAddrs = append(hostAddrs, v.String()) + } + netState["externalAddressess"] = hostAddrs + listAddrs := []string{} + for _, v := range host.Network().ListenAddresses() { + listAddrs = append(listAddrs, fmt.Sprintf("%s/p2p/%s", v, host.ID())) + } + netState["listenedAddressess"] = listAddrs + + peers := make(map[string]interface{}) + for _, v := range peerInfos { + p := &peerInfo{ + Roles: v.Roles, + BestHash: v.BestHash.String(), + BestNumber: v.BestNumber, + } + peers[v.PeerID] = *p + } + netState["connectedPeers"] = peers + + return &NetworkStateTM{ + Msg: "system.network_state", + State: netState, + } +} +func (tm *NetworkStateTM) messageType() string { + return tm.Msg } diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index e2b7b9c05b..f9e606b9f8 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -2,6 +2,7 @@ package telemetry import ( "bytes" + "fmt" "log" "math/big" "net/http" @@ -11,6 +12,7 @@ import ( "testing" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" @@ -46,55 +48,44 @@ func TestHandler_SendMulti(t *testing.T) { resultCh = make(chan []byte) go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("authority", false), - NewKeyValue("chain", "chain"), - NewKeyValue("genesis_hash", "hash"), - NewKeyValue("implementation", "systemName"), - NewKeyValue("msg", "system.connected"), - NewKeyValue("name", "nodeName"), - NewKeyValue("network_id", "netID"), - NewKeyValue("startup_time", "startTime"), - NewKeyValue("version", "version"))) + genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3") + + GetInstance().SendMessage(NewSystemConnectedTM(false, "chain", &genesisHash, + "systemName", "nodeName", "netID", "startTime", "0.1")) + wg.Done() }() go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "hash"), - NewKeyValue("height", big.NewInt(2)), - NewKeyValue("msg", "block.import"), - NewKeyValue("origin", "NetworkInitialSync"))) + bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")) + wg.Done() }() go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("bandwidth_download", 2), - NewKeyValue("bandwidth_upload", 3), - NewKeyValue("msg", "system.interval"), - NewKeyValue("peers", 1))) + GetInstance().SendMessage(NewBandwidthTM(2, 3, 1)) + wg.Done() }() go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), - NewKeyValue("finalized_hash", "0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), // nolint - NewKeyValue("finalized_height", 32256), NewKeyValue("height", 32375), // nolint - NewKeyValue("msg", "system.interval"), NewKeyValue("txcount", 2), - NewKeyValue("used_state_cache_size", 1886357))) + bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2") + GetInstance().SendMessage(NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash, + big.NewInt(32256), big.NewInt(0), big.NewInt(1234))) + wg.Done() }() wg.Wait() - expected1 := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`) - expected2 := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`) - expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) - expected4 := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`) // nolint + expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) + expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) + expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) + expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint - expected := [][]byte{expected3, expected1, expected4, expected2} + expected := [][]byte{expected1, expected3, expected4, expected2} var actual [][]byte for data := range resultCh { @@ -121,11 +112,9 @@ func TestListenerConcurrency(t *testing.T) { resultCh = make(chan []byte) for i := 0; i < qty; i++ { go func() { - GetInstance().SendMessage(NewTelemetryMessage( - NewKeyValue("best", "hash"), - NewKeyValue("height", big.NewInt(2)), - NewKeyValue("msg", "block.import"), - NewKeyValue("origin", "NetworkInitialSync"))) + bestHash := common.Hash{} + GetInstance().SendMessage(NewBlockImportTM(&bestHash, big.NewInt(2), "NetworkInitialSync")) + wg.Done() }() } @@ -139,6 +128,16 @@ func TestListenerConcurrency(t *testing.T) { } } +// TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/ +// this can be useful to see what data is sent to telemetry server +func TestInfiniteListener(t *testing.T) { + t.Skip() + resultCh = make(chan []byte) + for data := range resultCh { + fmt.Printf("Data %s\n", data) + } +} + func listen(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil {