Skip to content

Commit

Permalink
fix(dot/telemetry): refactor telemetry to reduce CPU usage (ChainSafe…
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack authored and timwu20 committed Dec 6, 2021
1 parent a00f881 commit 3445d84
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 160 deletions.
29 changes: 19 additions & 10 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,15 @@ main:

case <-ticker.C:
o := s.host.bwc.GetBandwidthTotals()
telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut))
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("bandwidth_download", o.RateIn),
telemetry.NewKeyValue("bandwidth_upload", o.RateOut),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("peers", s.host.peerCount())))
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
}

}
}

Expand All @@ -330,14 +336,17 @@ func (s *Service) sentBlockIntervalTelemetry() {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", best.Hash().String()),
telemetry.NewKeyValue("finalized_hash", finalized.Hash().String()), //nolint
telemetry.NewKeyValue("finalized_height", finalized.Number), //nolint
telemetry.NewKeyValue("height", best.Number),
telemetry.NewKeyValue("msg", "system.interval"),
telemetry.NewKeyValue("txcount", 0), // todo (ed) determine where to get tx count
telemetry.NewKeyValue("used_state_cache_size", 0))) // todo (ed) determine where to get used_state_cache_size
if err != nil {
logger.Debug("problem sending system.interval telemetry message", "error", err)
}
time.Sleep(s.telemetryInterval)
}
}
Expand Down
24 changes: 13 additions & 11 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,18 +350,20 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)
data := &telemetry.ConnectionData{
Authority: cfg.Core.GrandpaAuthority,
Chain: sysSrvc.ChainName(),
GenesisHash: stateSrvc.Block.GenesisHash().String(),
SystemName: sysSrvc.SystemName(),
NodeName: cfg.Global.Name,
SystemVersion: sysSrvc.SystemVersion(),
NetworkID: networkSrvc.NetworkState().PeerID,
StartTime: strconv.FormatInt(time.Now().UnixNano(), 10),
}
telemetry.GetInstance().SendConnection(data)

err = telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("authority", cfg.Core.GrandpaAuthority),
telemetry.NewKeyValue("chain", sysSrvc.ChainName()),
telemetry.NewKeyValue("genesis_hash", stateSrvc.Block.GenesisHash().String()),
telemetry.NewKeyValue("implementation", sysSrvc.SystemName()),
telemetry.NewKeyValue("msg", "system.connected"),
telemetry.NewKeyValue("name", cfg.Global.Name),
telemetry.NewKeyValue("network_id", networkSrvc.NetworkState().PeerID),
telemetry.NewKeyValue("startup_time", strconv.FormatInt(time.Now().UnixNano(), 10)),
telemetry.NewKeyValue("version", sysSrvc.SystemVersion())))
if err != nil {
logger.Debug("problem sending system.connected telemetry message", "err", err)
}
return node, nil
}

Expand Down
9 changes: 8 additions & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,14 @@ func (s *Service) handleBlock(block *types.Block) error {
}
} else {
logger.Debug("🔗 imported block", "number", block.Header.Number, "hash", block.Header.Hash())
telemetry.GetInstance().SendBlockImport(block.Header.Hash().String(), block.Header.Number)
err := telemetry.GetInstance().SendMessage(telemetry.NewTelemetryMessage(
telemetry.NewKeyValue("best", block.Header.Hash().String()),
telemetry.NewKeyValue("height", block.Header.Number.Uint64()),
telemetry.NewKeyValue("msg", "block.import"),
telemetry.NewKeyValue("origin", "NetworkInitialSync")))
if err != nil {
logger.Debug("problem sending block.import telemetry message", "error", err)
}
}

// handle consensus digest for authority changes
Expand Down
200 changes: 86 additions & 114 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@
package telemetry

import (
"bytes"
"encoding/json"
"fmt"
"math/big"
"errors"
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
log "github.com/ChainSafe/log15"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)

// Handler struct for holding telemetry related things
type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
sync.RWMutex
type telemetryConnection struct {
wsconn *websocket.Conn
verbosity int
sync.Mutex
}

// MyJSONFormatter struct for defining JSON Formatter
type MyJSONFormatter struct {
// Message struct to hold telemetry message data
type Message struct {
values map[string]interface{}
}

// Format function for handling JSON formatting, this overrides default logging formatter to remove
// log level, line number and timestamp
func (f *MyJSONFormatter) Format(entry *log.Entry) ([]byte, error) {
serialised, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal fields to JSON, %w", err)
}
return append(serialised, '\n'), nil
// Handler struct for holding telemetry related things
type Handler struct {
msg chan Message
connections []*telemetryConnection
log log.Logger
}

// KeyValue object to hold key value pairs used in telemetry messages
type KeyValue struct {
key string
value interface{}
}

var (
Expand All @@ -57,126 +57,98 @@ var (
)

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler {
func GetInstance() *Handler { //nolint
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
buf: bytes.Buffer{},
msg: make(chan Message, 256),
log: log.New("pkg", "telemetry"),
}
log.SetOutput(&handlerInstance.buf)
log.SetFormatter(new(MyJSONFormatter))
go handlerInstance.sender()
go handlerInstance.startListening()
})
}
return handlerInstance
}

// AddConnections adds connections to telemetry sever
// NewTelemetryMessage builds a telemetry message
func NewTelemetryMessage(values ...*KeyValue) *Message { //nolint
mvals := make(map[string]interface{})
for _, v := range values {
mvals[v.key] = v.value
}
return &Message{
values: mvals,
}
}

// NewKeyValue builds a key value pair for telemetry messages
func NewKeyValue(key string, value interface{}) *KeyValue { //nolint
return &KeyValue{
key: key,
value: value,
}
}

// AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data
func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {
for _, v := range conns {
c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil)
if err != nil {
fmt.Printf("Error %v\n", err)
// todo (ed) try reconnecting if there is an error connecting
h.log.Debug("issue adding telemetry connection", "error", err)
continue
}
h.wsConn = append(h.wsConn, c)
tConn := &telemetryConnection{
wsconn: c,
verbosity: v.Verbosity,
}
h.connections = append(h.connections, tConn)
}
}

// ConnectionData struct to hold connection data
type ConnectionData struct {
Authority bool
Chain string
GenesisHash string
SystemName string
NodeName string
SystemVersion string
NetworkID string
StartTime string
}

// SendConnection sends connection request message to telemetry connection
func (h *Handler) SendConnection(data *ConnectionData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash,
"implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime,
"version": data.SystemVersion}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}
// SendMessage sends Message to connected telemetry listeners
func (h *Handler) SendMessage(msg *Message) error {
select {
case h.msg <- *msg:

// NetworkData struct to hold network data telemetry information
type NetworkData struct {
peers int
rateIn float64
rateOut float64
}

// NewNetworkData creates networkData struct
func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData {
return &NetworkData{
peers: peers,
rateIn: rateIn,
rateOut: rateOut,
case <-time.After(time.Second * 1):
return errors.New("timeout sending message")
}
return nil
}

// SendNetworkData send network data system.interval message to telemetry connection
func (h *Handler) SendNetworkData(data *NetworkData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
}

// BlockIntervalData struct to hold data for block system.interval message
type BlockIntervalData struct {
BestHash common.Hash
BestHeight *big.Int
FinalizedHash common.Hash
FinalizedHeight *big.Int
TXCount int
UsedStateCacheSize int
func (h *Handler) startListening() {
for {
msg := <-h.msg
go func() {
for _, conn := range h.connections {
conn.Lock()
err := conn.wsconn.WriteMessage(websocket.TextMessage, msgToBytes(msg))
if err != nil {
h.log.Warn("issue while sending telemetry message", "error", err)
}
conn.Unlock()
}
}()
}
}

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
h.Lock()
defer h.Unlock()
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint
"used_state_cache_size": data.UsedStateCacheSize}
telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
telemetryLogger.Print()
type response struct {
ID int `json:"id"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"ts"`
}

func (h *Handler) sender() {
for {
h.RLock()
line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter
h.RUnlock()
if err != nil {
continue
}

for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, line)
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
}
func msgToBytes(message Message) []byte {
res := response{
ID: 1, // todo (ed) determine how this is used
Payload: message.values,
Timestamp: time.Now(),
}
resB, err := json.Marshal(res)
if err != nil {
return nil
}
return resB
}
Loading

0 comments on commit 3445d84

Please sign in to comment.