Skip to content

Commit

Permalink
intro WebsocketStatsPusher to connect to linkstats
Browse files Browse the repository at this point in the history
  • Loading branch information
dimroc committed Mar 19, 2019
1 parent a59d631 commit c98c40b
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 15 deletions.
21 changes: 21 additions & 0 deletions internal/cltest/cltest.go
Expand Up @@ -1021,3 +1021,24 @@ func GetLastTxAttempt(t *testing.T, store *strpkg.Store) models.TxAttempt {
func JustError(_ interface{}, err error) error {
return err
}

func CallbackOrTimeout(t *testing.T, msg string, callback func(), durationParams ...time.Duration) {
t.Helper()

duration := 100 * time.Millisecond
if len(durationParams) > 0 {
duration = durationParams[0]
}

done := make(chan struct{})
go func() {
callback()
close(done)
}()

select {
case <-done:
case <-time.After(duration):
t.Fatal(fmt.Sprintf("CallbackOrTimeout: %s timed out", msg))
}
}
74 changes: 74 additions & 0 deletions internal/cltest/mocks.go
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
Expand All @@ -17,6 +18,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/gorilla/websocket"
"github.com/onsi/gomega"
"github.com/smartcontractkit/chainlink/cmd"
"github.com/smartcontractkit/chainlink/logger"
Expand Down Expand Up @@ -713,3 +715,75 @@ type MockPasswordPrompter struct {
func (m MockPasswordPrompter) Prompt() string {
return m.Password
}

type CountingWebsocketServer struct {
*httptest.Server
entries []string
m *sync.RWMutex
t *testing.T
Connected chan struct{}
Received chan string
URL *url.URL
}

func NewCountingWebsocketServer(t *testing.T) (*CountingWebsocketServer, func()) {
server := &CountingWebsocketServer{
entries: []string{},
m: &sync.RWMutex{},
t: t,
Connected: make(chan struct{}, 1), // have buffer of one for easier assertions after the event
Received: make(chan string, 1),
}

server.Server = httptest.NewServer(http.HandlerFunc(server.handler))
u, err := url.Parse(server.Server.URL)
if err != nil {
t.Fatal(err)
}
u.Scheme = "ws"
server.URL = u
return server, func() {
server.Close()
}
}

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func (c *CountingWebsocketServer) handler(w http.ResponseWriter, r *http.Request) {
select {
case c.Connected <- struct{}{}:
default:
}

var err error
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
c.t.Fatal(err)
}
for {
_, payload, err := conn.ReadMessage() // we only read
if err != nil && !websocket.IsCloseError(err) {
c.t.Fatal(err)
}

strp := string(payload)
c.m.Lock()
c.entries = append(c.entries)
c.m.Unlock()

select {
case c.Received <- strp:
default:
}
}
}

func (c *CountingWebsocketServer) Entries() []string {
c.m.RLock()
defer c.m.RUnlock()
tmp := make([]string, len(c.entries))
copy(tmp, c.entries)
return tmp
}
11 changes: 9 additions & 2 deletions logger/logger.go
Expand Up @@ -155,14 +155,21 @@ func Error(args ...interface{}) {
logger.Error(args...)
}

//WarnIf logs the error if present.
// WarnIf logs the error if present.
func WarnIf(err error) {
if err != nil {
logger.Warn(err)
}
}

//PanicIf logs the error if present.
// ErrorIf logs the error if present.
func ErrorIf(err error) {
if err != nil {
logger.Error(err)
}
}

// PanicIf logs the error if present.
func PanicIf(err error) {
if err != nil {
logger.Panic(err)
Expand Down
18 changes: 18 additions & 0 deletions store/config.go
Expand Up @@ -53,6 +53,7 @@ type ConfigSchema struct {
EthereumURL string `env:"ETH_URL" default:"ws://localhost:8546"`
JSONConsole bool `env:"JSON_CONSOLE" default:"false"`
LinkContractAddress string `env:"LINK_CONTRACT_ADDRESS" default:"0x514910771AF9Ca656af840dff83E8264EcF986CA"`
LinkstatsURL *url.URL `env:"LINKSTATS_URL"`
LogLevel LogLevel `env:"LOG_LEVEL" default:"info"`
LogToDisk bool `env:"LOG_TO_DISK" default:"true"`
MinIncomingConfirmations uint64 `env:"MIN_INCOMING_CONFIRMATIONS" default:"0"`
Expand Down Expand Up @@ -202,6 +203,20 @@ func (c Config) LinkContractAddress() string {
return c.viper.GetString(c.envVarName("LinkContractAddress"))
}

// LinkstatsURL returns the websocket URL for this node to push stats to, or nil.
func (c Config) LinkstatsURL() *url.URL {
rval := c.getWithFallback("LinkstatsURL", parseURL)
switch t := rval.(type) {
case nil:
return nil
case *url.URL:
return t
default:
logger.Panicf("invariant: LinkstatsURL returned as type %T", rval)
return nil
}
}

// OracleContractAddress represents the deployed Oracle contract's address.
func (c Config) OracleContractAddress() *common.Address {
if c.viper.GetString(c.envVarName("OracleContractAddress")) == "" {
Expand Down Expand Up @@ -359,6 +374,9 @@ func (c Config) defaultValue(name string) (string, bool) {
func (c Config) zeroValue(name string) interface{} {
schemaT := reflect.TypeOf(ConfigSchema{})
if item, ok := schemaT.FieldByName(name); ok {
if item.Type.Kind() == reflect.Ptr {
return nil
}
return reflect.New(item.Type).Interface()
}
log.Panicf("Invariant violated, no field of name %s found for zeroValue", name)
Expand Down
150 changes: 150 additions & 0 deletions store/stats_pusher.go
@@ -0,0 +1,150 @@
package store

import (
"fmt"
"net/url"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/smartcontractkit/chainlink/logger"
)

// StatsPusher encapsulates all the functionality needed to
// push run information to linkstats.
type StatsPusher interface {
Start() error
Close() error
}

// NewStatsPusher returns a functioning instance depending on the
// URL passed: nil is a noop instance, url assumes a websocket instance.
// No support for http.
func NewStatsPusher(url *url.URL) StatsPusher {
if url != nil {
return NewWebsocketStatsPusher(url)
}
return noopStatsPusher{}
}

type noopStatsPusher struct{}

func (noopStatsPusher) Start() error { return nil }
func (noopStatsPusher) Close() error { return nil }

type websocketStatsPusher struct {
url url.URL
conn *websocket.Conn
send chan []byte
boot *sync.Mutex
started bool
}

// NewWebsocketStatsPusher returns a stats pusher using a websocket for
// delivery.
func NewWebsocketStatsPusher(url *url.URL) StatsPusher {
return &websocketStatsPusher{
url: *url,
send: make(chan []byte),
boot: &sync.Mutex{},
}
}

// Start starts a write pump over a websocket.
func (w *websocketStatsPusher) Start() error {
w.boot.Lock()
defer w.boot.Unlock()

if w.started {
return nil
}

conn, _, err := websocket.DefaultDialer.Dial(w.url.String(), nil)
if err != nil {
return fmt.Errorf("websocketStatsPusher#Start(): %v", err)
}

w.conn = conn
go w.writePump()
w.started = true
return nil
}

const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)

// Inspired by https://github.com/gorilla/websocket/blob/master/examples/chat/client.go
func (w *websocketStatsPusher) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
wrapLoggerErrorIf(w.conn.Close())
}()
for {
select {
case message, open := <-w.send:
if !open { // channel closed
wrapLoggerErrorIf(w.conn.WriteMessage(websocket.CloseMessage, []byte{}))
return
}

wrapLoggerErrorIf(w.conn.SetWriteDeadline(time.Now().Add(writeWait)))
writer, err := w.conn.NextWriter(websocket.TextMessage)
if err != nil {
logger.Error(err)
return
}
_, err = writer.Write(message)
wrapLoggerErrorIf(err)

// Add queued messages to the current websocket message,
// batching sending for efficiency.
n := len(w.send)
for i := 0; i < n; i++ {
additionalMsg, open := <-w.send
if !open {
break
}
_, err = writer.Write(additionalMsg)
wrapLoggerErrorIf(err)
}

if err := writer.Close(); err != nil {
logger.Error(err)
return
}
case <-ticker.C:
wrapLoggerErrorIf(w.conn.SetWriteDeadline(time.Now().Add(writeWait)))
if err := w.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
wrapLoggerErrorIf(err)
return
}
}
}
}

func wrapLoggerErrorIf(err error) {
if err != nil && !websocket.IsCloseError(err) {
logger.Error(fmt.Sprintf("websocketStatsPusher: %v", err))
}
}

func (w *websocketStatsPusher) Close() error {
w.boot.Lock()
defer w.boot.Unlock()

if w.send != nil {
close(w.send)
w.send = nil
}
w.started = false
return nil
}
21 changes: 21 additions & 0 deletions store/stats_pusher_test.go
@@ -0,0 +1,21 @@
package store_test

import (
"testing"

"github.com/smartcontractkit/chainlink/internal/cltest"
"github.com/smartcontractkit/chainlink/store"
"github.com/stretchr/testify/require"
)

func TestWebsocketStatsPusher_New(t *testing.T) {
wsserver, cleanup := cltest.NewCountingWebsocketServer(t)
defer cleanup()

pusher := store.NewWebsocketStatsPusher(wsserver.URL)
require.NoError(t, pusher.Start())
cltest.CallbackOrTimeout(t, "stats pusher connects", func() {
<-wsserver.Connected
})
require.NoError(t, pusher.Close())
}

0 comments on commit c98c40b

Please sign in to comment.