Skip to content

Commit

Permalink
Rename WSClient files and Capitalize Socket
Browse files Browse the repository at this point in the history
  • Loading branch information
j16r committed Mar 25, 2019
1 parent cbbf076 commit ceb22ee
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
2 changes: 1 addition & 1 deletion integration/features_test.go
Expand Up @@ -618,7 +618,7 @@ func TestIntegration_CreateServiceAgreement(t *testing.T) {
}

func TestIntegration_SyncJobRuns(t *testing.T) {
wsserver, wsserverCleanup := cltest.NewEventWebsocketServer(t)
wsserver, wsserverCleanup := cltest.NewEventWebSocketServer(t)
defer wsserverCleanup()

t.Parallel()
Expand Down
20 changes: 10 additions & 10 deletions internal/cltest/event_websocket_server.go
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/gorilla/websocket"
)

type EventWebsocketServer struct {
type EventWebSocketServer struct {
*httptest.Server
mutex *sync.RWMutex // shared mutex for safe access to arrays/maps.
t *testing.T
Expand All @@ -20,8 +20,8 @@ type EventWebsocketServer struct {
URL *url.URL
}

func NewEventWebsocketServer(t *testing.T) (*EventWebsocketServer, func()) {
server := &EventWebsocketServer{
func NewEventWebSocketServer(t *testing.T) (*EventWebSocketServer, func()) {
server := &EventWebSocketServer{
mutex: &sync.RWMutex{},
t: t,
Connected: make(chan struct{}, 1), // have buffer of one for easier assertions after the event
Expand All @@ -31,7 +31,7 @@ func NewEventWebsocketServer(t *testing.T) (*EventWebsocketServer, func()) {
server.Server = httptest.NewServer(http.HandlerFunc(server.handler))
u, err := url.Parse(server.Server.URL)
if err != nil {
t.Fatal("EventWebsocketServer: ", err)
t.Fatal("EventWebSocketServer: ", err)
}
u.Scheme = "ws"
server.URL = u
Expand All @@ -44,11 +44,11 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func (wss *EventWebsocketServer) handler(w http.ResponseWriter, r *http.Request) {
func (wss *EventWebSocketServer) handler(w http.ResponseWriter, r *http.Request) {
var err error
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
wss.t.Fatal("EventWebsocketServer Upgrade: ", err)
wss.t.Fatal("EventWebSocketServer Upgrade: ", err)
}

wss.addConnection(conn)
Expand All @@ -60,7 +60,7 @@ func (wss *EventWebsocketServer) handler(w http.ResponseWriter, r *http.Request)
return
}
if err != nil {
wss.t.Fatal("EventWebsocketServer ReadMessage: ", err)
wss.t.Fatal("EventWebSocketServer ReadMessage: ", err)
}

select {
Expand All @@ -70,7 +70,7 @@ func (wss *EventWebsocketServer) handler(w http.ResponseWriter, r *http.Request)
}
}

func (wss *EventWebsocketServer) addConnection(conn *websocket.Conn) {
func (wss *EventWebSocketServer) addConnection(conn *websocket.Conn) {
wss.mutex.Lock()
wss.connections = append(wss.connections, conn)
wss.mutex.Unlock()
Expand All @@ -80,7 +80,7 @@ func (wss *EventWebsocketServer) addConnection(conn *websocket.Conn) {
}
}

func (wss *EventWebsocketServer) removeConnection(conn *websocket.Conn) {
func (wss *EventWebSocketServer) removeConnection(conn *websocket.Conn) {
newc := []*websocket.Conn{}
wss.mutex.Lock()
for _, connection := range wss.connections {
Expand All @@ -96,7 +96,7 @@ func (wss *EventWebsocketServer) removeConnection(conn *websocket.Conn) {
// Useful to emulate that the websocket server is shutting down without
// actually shutting down.
// This overcomes httptest.Server's inability to restart on the same URL:port.
func (wss *EventWebsocketServer) WriteCloseMessage() {
func (wss *EventWebSocketServer) WriteCloseMessage() {
wss.mutex.RLock()
for _, connection := range wss.connections {
err := connection.WriteMessage(
Expand Down
10 changes: 5 additions & 5 deletions store/event_queuer.go
Expand Up @@ -10,20 +10,20 @@ import (
"github.com/smartcontractkit/chainlink/store/orm"
)

// StatsPusher polls for events and pushes them via a WebsocketClient
// StatsPusher polls for events and pushes them via a WebSocketClient
type StatsPusher struct {
ORM *orm.ORM
WSClient WebsocketClient
WSClient WebSocketClient
cancel context.CancelFunc
Period time.Duration
}

// NewStatsPusher returns a new event queuer
func NewStatsPusher(orm *orm.ORM, url *url.URL) *StatsPusher {
var wsClient WebsocketClient
wsClient = noopWebsocketClient{}
var wsClient WebSocketClient
wsClient = noopWebSocketClient{}
if url != nil {
wsClient = NewWebsocketClient(url)
wsClient = NewWebSocketClient(url)
}
return &StatsPusher{
ORM: orm,
Expand Down
16 changes: 8 additions & 8 deletions store/stats_pusher.go → store/web_socket_client.go
Expand Up @@ -12,19 +12,19 @@ import (
"github.com/smartcontractkit/chainlink/utils"
)

// WebsocketClient encapsulates all the functionality needed to
// WebSocketClient encapsulates all the functionality needed to
// push run information to linkstats.
type WebsocketClient interface {
type WebSocketClient interface {
Start() error
Close() error
Send([]byte)
}

type noopWebsocketClient struct{}
type noopWebSocketClient struct{}

func (noopWebsocketClient) Start() error { return nil }
func (noopWebsocketClient) Close() error { return nil }
func (noopWebsocketClient) Send([]byte) {}
func (noopWebSocketClient) Start() error { return nil }
func (noopWebSocketClient) Close() error { return nil }
func (noopWebSocketClient) Send([]byte) {}

type websocketClient struct {
boot *sync.Mutex
Expand All @@ -36,9 +36,9 @@ type websocketClient struct {
url *url.URL
}

// NewWebsocketClient returns a stats pusher using a websocket for
// NewWebSocketClient returns a stats pusher using a websocket for
// delivery.
func NewWebsocketClient(url *url.URL) WebsocketClient {
func NewWebSocketClient(url *url.URL) WebSocketClient {
return &websocketClient{
url: url,
send: make(chan []byte, 100), // TODO: figure out a better buffer (circular FIFO?)
Expand Down
18 changes: 9 additions & 9 deletions store/stats_pusher_test.go → store/web_socket_client_test.go
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/stretchr/testify/require"
)

func TestWebsocketStatsPusher_StartCloseStart(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebsocketServer(t)
func TestWebSocketStatsPusher_StartCloseStart(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebSocketServer(t)
defer cleanup()

pusher := store.NewWebsocketClient(wsserver.URL)
pusher := store.NewWebSocketClient(wsserver.URL)
require.NoError(t, pusher.Start())
cltest.CallbackOrTimeout(t, "stats pusher connects", func() {
<-wsserver.Connected
Expand All @@ -28,11 +28,11 @@ func TestWebsocketStatsPusher_StartCloseStart(t *testing.T) {
require.NoError(t, pusher.Close())
}

func TestWebsocketStatsPusher_ReconnectLoop(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebsocketServer(t)
func TestWebSocketStatsPusher_ReconnectLoop(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebSocketServer(t)
defer cleanup()

pusher := store.NewWebsocketClient(wsserver.URL)
pusher := store.NewWebSocketClient(wsserver.URL)
require.NoError(t, pusher.Start())
cltest.CallbackOrTimeout(t, "stats pusher connects", func() {
<-wsserver.Connected
Expand All @@ -46,11 +46,11 @@ func TestWebsocketStatsPusher_ReconnectLoop(t *testing.T) {
require.NoError(t, pusher.Close())
}

func TestWebsocketStatsPusher_Send(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebsocketServer(t)
func TestWebSocketStatsPusher_Send(t *testing.T) {
wsserver, cleanup := cltest.NewEventWebSocketServer(t)
defer cleanup()

pusher := store.NewWebsocketClient(wsserver.URL)
pusher := store.NewWebSocketClient(wsserver.URL)
require.NoError(t, pusher.Start())
defer pusher.Close()

Expand Down

0 comments on commit ceb22ee

Please sign in to comment.