Skip to content

Commit

Permalink
http: added to debug info to WebSocket pools
Browse files Browse the repository at this point in the history
Co-Authored-By: eranra@il.ibm.com
  • Loading branch information
hunchback committed Jan 9, 2018
1 parent b83408e commit f807b80
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 13 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Agent struct {
// NewAnalyzerWSJSONClientPool creates a new http WebSocket client Pool
// with authentification
func NewAnalyzerWSJSONClientPool(authOptions *shttp.AuthenticationOpts) (*shttp.WSJSONClientPool, error) {
pool := shttp.NewWSJSONClientPool()
pool := shttp.NewWSJSONClientPool("AnalyzerClientPool")

addresses, err := config.GetAnalyzerServiceAddresses()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion analyzer/topology_replication_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func NewTopologyReplicationEndpoint(pool shttp.WSJSONSpeakerPool, auth *shttp.Au
Graph: g,
cached: cached,
in: pool,
out: shttp.NewWSJSONClientPool(),
out: shttp.NewWSJSONClientPool("TopologyReplicationEndpoint"),
conns: make(map[string]shttp.WSSpeaker),
}
t.replicateMsg.Store(true)
Expand Down
24 changes: 19 additions & 5 deletions http/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ package http

import (
"math/rand"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/logging"
)

// WSSpeakerPool is the interface that WSSpeaker pools have to implement.
Expand All @@ -47,6 +49,7 @@ type WSSpeakerPool interface {
// WSPool is a connection container. It embed a list of WSSpeaker.
type WSPool struct {
sync.RWMutex
name string
quit chan bool
broadcast chan WSMessage
bulkMaxMsgs int
Expand Down Expand Up @@ -86,6 +89,7 @@ func (s *WSPool) OnConnected(c WSSpeaker) {

// OnDisconnected forwards the OnConnected event to event listeners of the pool.
func (s *WSPool) OnDisconnected(c WSSpeaker) {
logging.GetLogger().Debugf("OnDisconnected %s for pool %s ", c.GetHost(), s.GetName())
s.eventHandlersLock.RLock()
for _, h := range s.eventHandlers {
h.OnDisconnected(c)
Expand All @@ -102,6 +106,7 @@ func (s *wsIncomerPool) OnDisconnected(c WSSpeaker) {

// AddClient adds the given WSSpeaker to the pool.
func (s *WSPool) AddClient(c WSSpeaker) error {
logging.GetLogger().Debugf("AddClient %s for pool %s", c.GetHost(), s.GetName())
s.Lock()
s.speakers = append(s.speakers, c)
s.Unlock()
Expand All @@ -114,6 +119,7 @@ func (s *WSPool) AddClient(c WSSpeaker) error {

// AddClient adds the given WSSpeaker to the wsIncomerPool.
func (s *wsIncomerPool) AddClient(c WSSpeaker) error {
logging.GetLogger().Debugf("AddClient %s for pool %s", c.GetHost(), s.GetName())
s.Lock()
s.speakers = append(s.speakers, c)
s.Unlock()
Expand All @@ -138,10 +144,12 @@ func (s *WSPool) RemoveClient(c WSSpeaker) {
defer s.Unlock()
for i, ic := range s.speakers {
if ic.GetHost() == c.GetHost() {
logging.GetLogger().Debugf("Successfully removed client %s for pool %s", c.GetHost(), s.GetName())
s.speakers = append(s.speakers[:i], s.speakers[i+1:]...)
return
}
}
logging.GetLogger().Debugf("Failed to remove client %s for pool %s", c.GetHost(), s.GetName())
}

// GetStatus returns the states of the WebSocket clients
Expand All @@ -153,6 +161,11 @@ func (s *WSPool) GetStatus() map[string]WSConnStatus {
return clients
}

// GetName returns the name of the pool
func (s *WSPool) GetName() string {
return s.name + " type : [" + (reflect.TypeOf(s).String()) + "]"
}

// GetSpeakers returns the WSSpeakers of the pool.
func (s *WSPool) GetSpeakers() (speakers []WSSpeaker) {
s.RLock()
Expand Down Expand Up @@ -333,28 +346,29 @@ func (s *WSClientPool) ConnectAll() {
s.RUnlock()
}

func newWSPool() *WSPool {
func newWSPool(name string) *WSPool {
bulkMaxMsgs := config.GetConfig().GetInt("ws_bulk_maxmsgs")
bulkMaxDelay := config.GetConfig().GetInt("ws_bulk_maxdelay")

return &WSPool{
name: name,
broadcast: make(chan WSMessage, 100000),
quit: make(chan bool, 2),
bulkMaxMsgs: bulkMaxMsgs,
bulkMaxDelay: time.Duration(bulkMaxDelay) * time.Second,
}
}

func newWSIncomerPool() *wsIncomerPool {
func newWSIncomerPool(name string) *wsIncomerPool {
return &wsIncomerPool{
WSPool: newWSPool(),
WSPool: newWSPool(name),
}
}

// NewWSClientPool returns a new WSClientPool meaning a pool of outgoing WSClient.
func NewWSClientPool() *WSClientPool {
func NewWSClientPool(name string) *WSClientPool {
s := &WSClientPool{
WSPool: newWSPool(),
WSPool: newWSPool(name),
}

s.Start()
Expand Down
4 changes: 2 additions & 2 deletions http/wsjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func (s *WSJSONClientPool) Request(host string, request *WSJSONMessage, timeout
}

// NewWSJSONClientPool returns a new WSJSONClientPool.
func NewWSJSONClientPool() *WSJSONClientPool {
pool := NewWSClientPool()
func NewWSJSONClientPool(name string) *WSJSONClientPool {
pool := NewWSClientPool(name)
return &WSJSONClientPool{
WSClientPool: pool,
wsJSONSpeakerPoolEventDispatcher: newWSJSONSpeakerPoolEventDispatcher(pool),
Expand Down
4 changes: 3 additions & 1 deletion http/wsmessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/logging"
)

type fakeWSMessageServerSubscriptionHandler struct {
Expand Down Expand Up @@ -86,6 +87,7 @@ func (f *fakeWSMessageClientSubscriptionHandler) OnWSJSONMessage(c WSSpeaker, m
}

func TestWSMessageSubscription(t *testing.T) {
logging.InitLogging()
httpserver := NewServer("myhost", common.AnalyzerService, "localhost", 59999, NewNoAuthenticationBackend(), "")

go httpserver.ListenAndServe()
Expand All @@ -102,7 +104,7 @@ func TestWSMessageSubscription(t *testing.T) {

wsclient := NewWSClient("myhost", common.AgentService, config.GetURL("ws", "localhost", 59999, "/wstest"), nil, http.Header{}, 1000)

wspool := NewWSJSONClientPool()
wspool := NewWSJSONClientPool("TestWSMessageSubscription")
wspool.AddClient(wsclient)

clientHandler := &fakeWSMessageClientSubscriptionHandler{t: t, received: make(map[string]bool)}
Expand Down
3 changes: 2 additions & 1 deletion http/wsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *WSServer) serveMessages(w http.ResponseWriter, r *auth.AuthenticatedReq
if host == "" {
host = r.RemoteAddr
}
logging.GetLogger().Debugf("Serving messages for client %s for pool %s", host, s.GetName())

s.wsIncomerPool.RLock()
c := s.GetSpeakerByHost(host)
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *WSServer) serveMessages(w http.ResponseWriter, r *auth.AuthenticatedReq
// NewWSServer returns a new WSServer.
func NewWSServer(server *Server, endpoint string) *WSServer {
s := &WSServer{
wsIncomerPool: newWSIncomerPool(), // server inherites from a WSSpeaker pool
wsIncomerPool: newWSIncomerPool(endpoint), // server inherites from a WSSpeaker pool
incomerHandler: func(c *websocket.Conn, a *auth.AuthenticatedRequest) WSSpeaker {
return defaultIncomerHandler(c, a)
},
Expand Down
2 changes: 1 addition & 1 deletion http/wsserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestSubscription(t *testing.T) {
defer wsserver.Stop()

wsclient := NewWSClient("myhost", common.AgentService, config.GetURL("ws", "localhost", 59999, "/wstest"), nil, http.Header{}, 1000)
wspool := NewWSClientPool()
wspool := NewWSClientPool("TestSubscription")

wspool.AddClient(wsclient)

Expand Down
2 changes: 1 addition & 1 deletion tests/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func TestQueryMetadata(t *testing.T) {
}

hostname, _ := os.Hostname()
wspool := shttp.NewWSJSONClientPool()
wspool := shttp.NewWSJSONClientPool("TestQueryMetadata")
for _, sa := range addresses {
authClient := shttp.NewAuthenticationClient(config.GetURL("http", sa.Addr, sa.Port, ""), authOptions)
client := shttp.NewWSClient(hostname+"-cli", common.UnknownService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/publisher"), authClient, http.Header{}, 1000)
Expand Down

0 comments on commit f807b80

Please sign in to comment.