From 598d2ca03a7b22a63c0e22c56ddc3041ffeffd04 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Wed, 7 Apr 2021 10:41:16 +0200 Subject: [PATCH 1/8] websocket: use context.Context in client --- graffiti/cmd/client/topology.go | 3 ++- graffiti/websocket/client.go | 21 +++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/graffiti/cmd/client/topology.go b/graffiti/cmd/client/topology.go index b238c86c4a..3681d3cc46 100644 --- a/graffiti/cmd/client/topology.go +++ b/graffiti/cmd/client/topology.go @@ -18,6 +18,7 @@ package client import ( + "context" "encoding/json" "errors" "fmt" @@ -74,7 +75,7 @@ var TopologyImport = &cobra.Command{ opts.Headers.Add("X-Persistence-Policy", string(endpoints.Persistent)) client := websocket.NewClient(Host, service.Type("CLI"), url, opts) - if err := client.Connect(); err != nil { + if err := client.Connect(context.Background()); err != nil { exitOnError(err) } diff --git a/graffiti/websocket/client.go b/graffiti/websocket/client.go index cf57a17106..a080ad476f 100644 --- a/graffiti/websocket/client.go +++ b/graffiti/websocket/client.go @@ -18,6 +18,7 @@ package websocket import ( + "context" "crypto/tls" "encoding/json" "errors" @@ -126,12 +127,13 @@ type Speaker interface { GetHeaders() http.Header GetURL() *url.URL IsConnected() bool - SendMessage(m Message) error + SendMessage(Message) error SendRaw(r []byte) error - Connect() error + Connect(context.Context) error Start() Stop() StopAndWait() + Run() AddEventHandler(SpeakerEventHandler) GetRemoteHost() string GetRemoteServiceType() service.Type @@ -182,6 +184,13 @@ type ClientOpts struct { Logger logging.Logger } +// NewClientOpts returns a new client option set +func NewClientOpts() ClientOpts { + return ClientOpts{ + Headers: http.Header{}, + } +} + // SpeakerEventHandler is the interface to be implement by the client events listeners. type SpeakerEventHandler interface { OnMessage(c Speaker, m Message) @@ -442,7 +451,7 @@ func (c *Conn) AddEventHandler(h SpeakerEventHandler) { } // Connect default implementation doing nothing as for incoming connection it is not used. -func (c *Conn) Connect() error { +func (c *Conn) Connect(context.Context) error { return nil } @@ -511,7 +520,7 @@ func (c *Client) scheme() string { } // Connect to the server -func (c *Client) Connect() error { +func (c *Client) Connect(ctx context.Context) error { var err error endpoint := c.URL.String() headers := http.Header{ @@ -539,7 +548,7 @@ func (c *Client) Connect() error { d.TLSClientConfig = c.TLSConfig var resp *http.Response - c.conn, resp, err = d.Dial(endpoint, headers) + c.conn, resp, err = d.DialContext(ctx, endpoint, headers) if err != nil { return fmt.Errorf("Unable to create a WebSocket connection %s : %s", endpoint, err) } @@ -576,7 +585,7 @@ func (c *Client) Connect() error { func (c *Client) Start() { go func() { for c.running.Load() == true { - if err := c.Connect(); err == nil { + if err := c.Connect(context.Background()); err == nil { c.Run() if c.running.Load() == true { c.wg.Wait() From 849a545477e187c3d98cd463da8625548487d593 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Wed, 7 Apr 2021 10:42:01 +0200 Subject: [PATCH 2/8] Create Subscriber type to subscribe to an analyzer using websocket --- graffiti/hub/subscriber.go | 107 +++++++++++++++++++++++++++++++++++++ graffiti/seed/seed.go | 74 ++----------------------- 2 files changed, 110 insertions(+), 71 deletions(-) create mode 100644 graffiti/hub/subscriber.go diff --git a/graffiti/hub/subscriber.go b/graffiti/hub/subscriber.go new file mode 100644 index 0000000000..5d34a41864 --- /dev/null +++ b/graffiti/hub/subscriber.go @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2018 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy ofthe License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specificlanguage governing permissions and + * limitations under the License. + * + */ + +package hub + +import ( + "net/http" + + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/logging" + "github.com/skydive-project/skydive/graffiti/messages" + "github.com/skydive-project/skydive/graffiti/websocket" +) + +type Subscriber struct { + *websocket.StructSpeaker + g *graph.Graph + logger logging.Logger +} + +// OnStructMessage callback +func (s *Subscriber) OnStructMessage(c websocket.Speaker, msg *websocket.StructMessage) { + if msg.Status != http.StatusOK { + s.logger.Errorf("request error: %v", msg) + return + } + + origin := string(c.GetServiceType()) + if len(c.GetRemoteHost()) > 0 { + origin += "." + c.GetRemoteHost() + } + + msgType, obj, err := messages.UnmarshalMessage(msg) + if err != nil { + s.logger.Error("unable to parse websocket message: %s", err) + return + } + + s.g.Lock() + defer s.g.Unlock() + + switch msgType { + case messages.SyncMsgType, messages.SyncReplyMsgType: + r := obj.(*messages.SyncMsg) + + s.g.DelNodes(graph.Metadata{"Origin": origin}) + + for _, n := range r.Nodes { + if s.g.GetNode(n.ID) == nil { + if err := s.g.NodeAdded(n); err != nil { + s.logger.Errorf("%s, %+v", err, n) + } + } + } + for _, e := range r.Edges { + if s.g.GetEdge(e.ID) == nil { + if err := s.g.EdgeAdded(e); err != nil { + s.logger.Errorf("%s, %+v", err, e) + } + } + } + case messages.NodeUpdatedMsgType: + err = s.g.NodeUpdated(obj.(*graph.Node)) + case messages.NodeDeletedMsgType: + err = s.g.NodeDeleted(obj.(*graph.Node)) + case messages.NodeAddedMsgType: + err = s.g.NodeAdded(obj.(*graph.Node)) + case messages.EdgeUpdatedMsgType: + err = s.g.EdgeUpdated(obj.(*graph.Edge)) + case messages.EdgeDeletedMsgType: + if err = s.g.EdgeDeleted(obj.(*graph.Edge)); err == graph.ErrEdgeNotFound { + return + } + case messages.EdgeAddedMsgType: + err = s.g.EdgeAdded(obj.(*graph.Edge)) + } + + if err != nil { + s.logger.Errorf("%s, %+v", err, msg) + } +} + +func NewSubscriber(client *websocket.Client, g *graph.Graph, logger logging.Logger) *Subscriber { + structSpeaker := client.UpgradeToStructSpeaker() + subscriber := &Subscriber{ + StructSpeaker: structSpeaker, + g: g, + logger: logger, + } + structSpeaker.AddEventHandler(subscriber) + subscriber.AddStructMessageHandler(subscriber, []string{messages.Namespace}) + return subscriber +} diff --git a/graffiti/seed/seed.go b/graffiti/seed/seed.go index 9614a58594..3087350693 100644 --- a/graffiti/seed/seed.go +++ b/graffiti/seed/seed.go @@ -19,12 +19,12 @@ package seed import ( "fmt" - "net/http" "net/url" "github.com/skydive-project/skydive/graffiti/endpoints" "github.com/skydive-project/skydive/graffiti/forwarder" "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/hub" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/messages" "github.com/skydive-project/skydive/graffiti/service" @@ -48,7 +48,7 @@ type Seed struct { forwarder *forwarder.Forwarder clientPool *ws.StructClientPool publisher *ws.Client - subscriber *ws.StructSpeaker + subscriber *hub.Subscriber g *graph.Graph logger logging.Logger listeners []EventHandler @@ -60,71 +60,6 @@ func (s *Seed) OnConnected(c ws.Speaker) error { return s.subscriber.SendMessage(messages.NewStructMessage(messages.SyncRequestMsgType, messages.SyncRequestMsg{})) } -// OnStructMessage callback -func (s *Seed) OnStructMessage(c ws.Speaker, msg *ws.StructMessage) { - if msg.Status != http.StatusOK { - s.logger.Errorf("request error: %v", msg) - return - } - - origin := string(c.GetServiceType()) - if len(c.GetRemoteHost()) > 0 { - origin += "." + c.GetRemoteHost() - } - - msgType, obj, err := messages.UnmarshalMessage(msg) - if err != nil { - s.logger.Error("unable to parse websocket message: %s", err) - return - } - - s.g.Lock() - defer s.g.Unlock() - - switch msgType { - case messages.SyncMsgType, messages.SyncReplyMsgType: - r := obj.(*messages.SyncMsg) - - s.g.DelNodes(graph.Metadata{"Origin": origin}) - - for _, n := range r.Nodes { - if s.g.GetNode(n.ID) == nil { - if err := s.g.NodeAdded(n); err != nil { - s.logger.Errorf("%s, %+v", err, n) - } - } - } - for _, e := range r.Edges { - if s.g.GetEdge(e.ID) == nil { - if err := s.g.EdgeAdded(e); err != nil { - s.logger.Errorf("%s, %+v", err, e) - } - } - } - for _, listener := range s.listeners { - listener.OnSynchronized() - } - case messages.NodeUpdatedMsgType: - err = s.g.NodeUpdated(obj.(*graph.Node)) - case messages.NodeDeletedMsgType: - err = s.g.NodeDeleted(obj.(*graph.Node)) - case messages.NodeAddedMsgType: - err = s.g.NodeAdded(obj.(*graph.Node)) - case messages.EdgeUpdatedMsgType: - err = s.g.EdgeUpdated(obj.(*graph.Edge)) - case messages.EdgeDeletedMsgType: - if err = s.g.EdgeDeleted(obj.(*graph.Edge)); err == graph.ErrEdgeNotFound { - return - } - case messages.EdgeAddedMsgType: - err = s.g.EdgeAdded(obj.(*graph.Edge)) - } - - if err != nil { - s.logger.Errorf("%s, %+v", err, msg) - } -} - // AddEventHandler register an event handler func (s *Seed) AddEventHandler(handler EventHandler) { s.listeners = append(s.listeners, handler) @@ -182,7 +117,7 @@ func NewSeed(g *graph.Graph, clientType service.Type, address, filter string, ws wsOpts.Headers.Add("X-Update-Policy", endpoints.PartialUpdates) subClient := ws.NewClient(g.GetHost(), clientType, url, wsOpts) - subscriber := subClient.UpgradeToStructSpeaker() + subscriber := hub.NewSubscriber(subClient, g, wsOpts.Logger) s := &Seed{ g: g, @@ -191,8 +126,5 @@ func NewSeed(g *graph.Graph, clientType service.Type, address, filter string, ws logger: wsOpts.Logger, } - subscriber.AddEventHandler(s) - subscriber.AddStructMessageHandler(s, []string{messages.Namespace}) - return s, nil } From 1b3824bae91371abc4dbd8681b0030b16932e378 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Wed, 7 Apr 2021 10:44:44 +0200 Subject: [PATCH 3/8] Add peering support --- analyzer/server.go | 37 ++++++++++- config/config.go | 32 +++++++-- config/websocket.go | 2 + graffiti/hub/hub.go | 158 +++++++++++++++++++++++++++++++++++--------- 4 files changed, 190 insertions(+), 39 deletions(-) diff --git a/analyzer/server.go b/analyzer/server.go index afa21b5e14..daee38ba30 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -251,9 +251,9 @@ func NewServerFromConfig() (*Server, error) { return nil, err } - peers, err := config.GetAnalyzerServiceAddresses() + replicationPeers, err := config.GetAnalyzerServiceAddresses() if err != nil { - return nil, fmt.Errorf("Unable to get the analyzers list: %s", err) + return nil, fmt.Errorf("unable to get the analyzers list: %s", err) } wsClientOpts, err := config.NewWSClientOpts(ClusterAuthenticationOpts()) @@ -261,6 +261,35 @@ func NewServerFromConfig() (*Server, error) { return nil, err } + clusterPeers := make(map[string]hub.PeeringOpts) + peeredClusters := config.GetStringMapString("analyzer.peers") + for clusterName := range peeredClusters { + peerConfig := "analyzer.peers." + clusterName + endpoints, err := config.GetServiceAddresses(peerConfig + ".endpoints") + if err != nil { + return nil, fmt.Errorf("unable to get peers endpoints for cluster '%s': %w", clusterName, err) + } + peeringOpts := hub.PeeringOpts{ + Endpoints: endpoints, + } + + if wsClientOpts, err := config.NewWSClientOpts( + &shttp.AuthenticationOpts{ + Username: config.GetString(peerConfig + ".username"), + Password: config.GetString(peerConfig + ".password"), + }, + ); err == nil { + peeringOpts.WebsocketClientOpts = *wsClientOpts + } + + if subscriptionFilter := config.GetString(peerConfig + ".subscription_filter"); subscriptionFilter != "" { + peeringOpts.WebsocketClientOpts.Headers.Add("X-Gremlin-Filter", subscriptionFilter) + } + + clusterPeers[clusterName] = peeringOpts + + } + probeBundle, err := NewTopologyProbeBundleFromConfig(g) if err != nil { return nil, err @@ -281,11 +310,13 @@ func NewServerFromConfig() (*Server, error) { GraphValidator: topology.SchemaValidator, StatusReporter: s, TLSConfig: tlsConfig, - Peers: peers, + ReplicationPeers: replicationPeers, + ClusterPeers: clusterPeers, EtcdClient: etcdClient, TopologyMarshallers: api.TopologyMarshallers, Assets: &statics.Assets, Version: version.Version, + ClusterName: config.GetString("analyzer.cluster"), } if config.GetBool("etcd.embedded") { diff --git a/config/config.go b/config/config.go index 172d3c7b9c..175c3f858d 100644 --- a/config/config.go +++ b/config/config.go @@ -365,15 +365,15 @@ func (c *SkydiveConfig) SetDefault(key string, value interface{}) { c.Viper.SetDefault(key, value) } -// GetAnalyzerServiceAddresses returns a list of connectable Analyzers -func GetAnalyzerServiceAddresses() ([]service.Address, error) { - return cfg.GetAnalyzerServiceAddresses() +// GetServiceAddresses returns a list of services +func GetServiceAddresses(section string) ([]service.Address, error) { + return cfg.GetServiceAddresses(section) } -// GetAnalyzerServiceAddresses returns a list of connectable Analyzers -func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) { +// GetServiceAddresses returns a list of services +func (c *SkydiveConfig) GetServiceAddresses(section string) ([]service.Address, error) { var addresses []service.Address - for _, a := range c.GetStringSlice("analyzers") { + for _, a := range c.GetStringSlice(section) { sa, err := service.AddressFromString(a) if err != nil { return nil, err @@ -390,6 +390,16 @@ func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) return addresses, nil } +// GetAnalyzerServiceAddresses returns a list of connectable Analyzers +func GetAnalyzerServiceAddresses() ([]service.Address, error) { + return cfg.GetAnalyzerServiceAddresses() +} + +// GetAnalyzerServiceAddresses returns a list of analyzers to connect to +func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) { + return c.GetServiceAddresses("analyzers") +} + // GetOneAnalyzerServiceAddress returns a random connectable Analyzer func GetOneAnalyzerServiceAddress() (service.Address, error) { return cfg.GetOneAnalyzerServiceAddress() @@ -557,6 +567,16 @@ func (c *SkydiveConfig) GetStringMapString(key string) map[string]string { return c.Viper.GetStringMapString(realKey(key)) } +// GetStringMap returns a map of strings from the configuration +func GetStringMap(key string) map[string]interface{} { + return cfg.GetStringMap(key) +} + +// GetStringMapString returns a map of strings from the configuration +func (c *SkydiveConfig) GetStringMap(key string) map[string]interface{} { + return c.Viper.GetStringMap(realKey(key)) +} + // BindPFlag binds a command line flag to a configuration value func BindPFlag(key string, flag *pflag.Flag) error { return cfg.BindPFlag(key, flag) diff --git a/config/websocket.go b/config/websocket.go index 3839593662..7609354664 100644 --- a/config/websocket.go +++ b/config/websocket.go @@ -18,6 +18,7 @@ package config import ( + "net/http" "net/url" "time" @@ -39,6 +40,7 @@ func NewWSClientOpts(authOpts *shttp.AuthenticationOpts) (*websocket.ClientOpts, WriteCompression: GetBool("http.ws.enable_write_compression"), TLSConfig: tlsConfig, AuthOpts: authOpts, + Headers: http.Header{}, }, nil } diff --git a/graffiti/hub/hub.go b/graffiti/hub/hub.go index 0de84588f9..5dccb4dd3d 100644 --- a/graffiti/hub/hub.go +++ b/graffiti/hub/hub.go @@ -20,9 +20,11 @@ package hub import ( "context" "crypto/tls" + "errors" "fmt" "strconv" "strings" + "sync" "time" etcd "github.com/coreos/etcd/client" @@ -35,6 +37,7 @@ import ( etcdserver "github.com/skydive-project/skydive/graffiti/etcd/server" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/skydive-project/skydive/graffiti/http" shttp "github.com/skydive-project/skydive/graffiti/http" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/schema" @@ -50,6 +53,7 @@ const ( type Opts struct { Hostname string Version string + ClusterName string WebsocketOpts websocket.ServerOpts WebsocketClientOpts websocket.ClientOpts APIValidator api.Validator @@ -58,7 +62,8 @@ type Opts struct { StatusReporter api.StatusReporter APIAuthBackend shttp.AuthenticationBackend ClusterAuthBackend shttp.AuthenticationBackend - Peers []service.Address + ReplicationPeers []service.Address + ClusterPeers map[string]PeeringOpts TLSConfig *tls.Config EtcdClient *etcdclient.Client EtcdServerOpts *etcdserver.EmbeddedServerOpts @@ -66,30 +71,88 @@ type Opts struct { Assets assets.Assets } -type podOrigin struct { - HostID string - ServiceType service.Type +type PeeringOpts struct { + Endpoints []service.Address + WebsocketClientOpts websocket.ClientOpts +} + +type clusterPeering struct { + currentPeer int + clusterName string + logger logging.Logger + masterElection etcdclient.MasterElection + peers *websocket.ClientPool + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (p *clusterPeering) OnStartAsMaster() { + p.connect() +} + +func (p *clusterPeering) OnSwitchToMaster() { + p.connect() +} + +func (p *clusterPeering) OnStartAsSlave() { +} + +func (p *clusterPeering) OnSwitchToSlave() { + p.cancel() + p.wg.Wait() +} + +func (p *clusterPeering) connect() { + p.ctx, p.cancel = context.WithCancel(context.Background()) + speakers := p.peers.GetSpeakers() + p.wg.Add(1) + + go func() { + defer p.wg.Done() + + for { + select { + case <-p.ctx.Done(): + return + default: + speaker := speakers[p.currentPeer] + if err := speaker.Connect(p.ctx); err == nil && p.masterElection.IsMaster() { + p.logger.Infof("Peered to cluster %s", p.clusterName) + speaker.Run() + } + + p.currentPeer = (p.currentPeer + 1) % len(speakers) + if p.currentPeer == 0 { + p.logger.Warningf("Failed to peer with cluster %s, retrying in 3 seconds", p.clusterName) + time.Sleep(3 * time.Second) + } + } + } + }() } // Hub describes a graph hub that accepts incoming connections // from pods, other hubs, subscribers or external publishers type Hub struct { - Graph *graph.Graph - cached *graph.CachedBackend - httpServer *shttp.Server - apiServer *api.Server - alertServer *alert.Server - embeddedEtcd *etcdserver.EmbeddedServer - etcdClient *etcdclient.Client - podWSServer *websocket.StructServer - publisherWSServer *websocket.StructServer - replicationWSServer *websocket.StructServer - replicationEndpoint *endpoints.ReplicationEndpoint - subscriberWSServer *websocket.StructServer - traversalParser *traversal.GremlinTraversalParser - expirationDelay time.Duration - quit chan bool - masterElection etcdclient.MasterElection + Graph *graph.Graph + cached *graph.CachedBackend + logger logging.Logger + httpServer *shttp.Server + apiServer *api.Server + alertServer *alert.Server + embeddedEtcd *etcdserver.EmbeddedServer + etcdClient *etcdclient.Client + podWSServer *websocket.StructServer + publisherWSServer *websocket.StructServer + replicationWSServer *websocket.StructServer + replicationEndpoint *endpoints.ReplicationEndpoint + subscriberWSServer *websocket.StructServer + traversalParser *traversal.GremlinTraversalParser + expirationDelay time.Duration + quit chan bool + originMasterElection etcdclient.MasterElection + clusterPeerings map[string]*clusterPeering } // ElectionStatus describes the status of an election @@ -141,7 +204,7 @@ func (h *Hub) OnStarted() { go h.watchOrigins() if err := h.httpServer.Start(); err != nil { - logging.GetLogger().Errorf("Error while starting http server: %s", err) + h.logger.Errorf("Error while starting http server: %s", err) return } @@ -161,7 +224,11 @@ func (h *Hub) Start() error { } } - h.masterElection.StartAndWait() + h.originMasterElection.StartAndWait() + + for _, peering := range h.clusterPeerings { + peering.masterElection.StartAndWait() + } if err := h.cached.Start(); err != nil { return err @@ -179,7 +246,10 @@ func (h *Hub) Stop() { h.subscriberWSServer.Stop() h.alertServer.Stop() h.cached.Stop() - h.masterElection.Stop() + h.originMasterElection.Stop() + for _, peering := range h.clusterPeerings { + peering.masterElection.Stop() + } if h.embeddedEtcd != nil { h.embeddedEtcd.Stop() } @@ -214,7 +284,7 @@ func (h *Hub) GremlinTraversalParser() *traversal.GremlinTraversalParser { func (h *Hub) OnPong(speaker websocket.Speaker) { key := fmt.Sprintf("%s/%s", etcPodPongPath, graph.ClientOrigin(speaker)) if err := h.etcdClient.SetInt64(key, time.Now().Unix()); err != nil { - logging.GetLogger().Errorf("Error while recording Pod pong time: %s", err) + h.logger.Errorf("Error while recording Pod pong time: %s", err) } } @@ -225,7 +295,7 @@ func (h *Hub) watchOrigins() { for { select { case <-tick.C: - if !h.masterElection.IsMaster() { + if !h.originMasterElection.IsMaster() { break } @@ -237,19 +307,19 @@ func (h *Hub) watchOrigins() { for _, node := range resp.Node.Nodes { t, _ := strconv.ParseInt(node.Value, 10, 64) - logging.GetLogger().Infof("TTL of pod of origin %s is %d", node.Key, t) + h.logger.Infof("TTL of pod of origin %s is %d", node.Key, t) if t+int64(h.expirationDelay.Seconds()) < time.Now().Unix() { origin := strings.TrimPrefix(node.Key, etcPodPongPath+"/") - logging.GetLogger().Infof("pod of origin %s expired, removing resources", origin) + h.logger.Infof("pod of origin %s expired, removing resources", origin) h.Graph.Lock() graph.DelSubGraphOfOrigin(h.Graph, origin) h.Graph.Unlock() if _, err := h.etcdClient.KeysAPI.Delete(context.Background(), node.Key, &etcd.DeleteOptions{}); err != nil { - logging.GetLogger().Infof("unable to delete pod entry %s: %s", node.Key, err) + h.logger.Infof("unable to delete pod entry %s: %s", node.Key, err) } } } @@ -266,6 +336,10 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, return nil, err } + if len(opts.ClusterPeers) > 0 && opts.ClusterName == "" { + return nil, errors.New("peering was requested but analyzer has no cluster name") + } + tr := traversal.NewGremlinTraversalParser() if opts.Logger == nil { @@ -275,6 +349,7 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, hub := &Hub{ Graph: g, cached: cached, + logger: opts.Logger, expirationDelay: opts.WebsocketOpts.PongTimeout * 5, quit: make(chan bool), } @@ -305,7 +380,7 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, repOpts.AuthBackend = opts.ClusterAuthBackend repOpts.PongListeners = []websocket.PongListener{hub} replicationWSServer := websocket.NewStructServer(websocket.NewServer(httpServer, "/ws/replication", repOpts)) - replicationEndpoint := endpoints.NewReplicationEndpoint(replicationWSServer, &opts.WebsocketClientOpts, cached, g, opts.Peers, opts.Logger) + replicationEndpoint := endpoints.NewReplicationEndpoint(replicationWSServer, &opts.WebsocketClientOpts, cached, g, opts.ReplicationPeers, opts.Logger) subOpts := opts.WebsocketOpts subOpts.AuthBackend = opts.APIAuthBackend @@ -328,7 +403,30 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, hub.etcdClient = opts.EtcdClient election := hub.etcdClient.NewElection("/elections/hub-origin-watcher") - hub.masterElection = election + hub.originMasterElection = election + + hub.clusterPeerings = make(map[string]*clusterPeering) + for remoteCluster, peeringOpts := range opts.ClusterPeers { + opts.Logger.Debugf("Peering with cluster %s and endpoints %+v", remoteCluster, peeringOpts.Endpoints) + + clientPool := websocket.NewClientPool("HubPeering-"+remoteCluster, websocket.PoolOpts{Logger: opts.WebsocketClientOpts.Logger}) + for _, peer := range peeringOpts.Endpoints { + url, _ := http.MakeURL("ws", peer.Addr, peer.Port, "/ws/subscriber", peeringOpts.WebsocketClientOpts.TLSConfig != nil) + client := websocket.NewClient(id, serviceType, url, peeringOpts.WebsocketClientOpts) + subscriber := NewSubscriber(client, g, opts.Logger) + clientPool.AddClient(subscriber) + } + + peering := &clusterPeering{ + clusterName: remoteCluster, + peers: clientPool, + logger: opts.Logger, + } + hub.clusterPeerings[remoteCluster] = peering + + peering.masterElection = hub.etcdClient.NewElection("/elections/hub-peering/" + opts.ClusterName + "/" + remoteCluster) + peering.masterElection.AddEventListener(peering) + } if opts.StatusReporter == nil { opts.StatusReporter = hub From 04c4142fb5c9861698c0350e5e2566d18ff95232 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Sun, 18 Jul 2021 17:00:15 +0200 Subject: [PATCH 4/8] Add pubsub endpoint for cluster peering --- analyzer/server.go | 8 +- cmd/client/edgerule.go | 3 +- cmd/client/noderule.go | 3 +- cmd/seed/seed.go | 7 +- graffiti/{forwarder => clients}/forwarder.go | 37 +++++- graffiti/{seed => clients}/seed.go | 71 +++++------ graffiti/{hub => clients}/subscriber.go | 2 +- graffiti/cmd/client/client.go | 25 ---- graffiti/cmd/client/edge.go | 2 +- graffiti/cmd/client/node.go | 2 +- graffiti/endpoints/pubsub_endpoint.go | 41 ++++++ graffiti/graph/metadata.go | 22 ++++ graffiti/hub/hub.go | 124 +++++++++---------- graffiti/hub/peering.go | 73 +++++++++++ graffiti/pod/pod.go | 8 +- rbac/policy.csv | 1 + 16 files changed, 276 insertions(+), 153 deletions(-) rename graffiti/{forwarder => clients}/forwarder.go (83%) rename graffiti/{seed => clients}/seed.go (58%) rename graffiti/{hub => clients}/subscriber.go (99%) create mode 100644 graffiti/endpoints/pubsub_endpoint.go create mode 100644 graffiti/hub/peering.go diff --git a/analyzer/server.go b/analyzer/server.go index daee38ba30..51cec9ee4c 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -270,7 +270,9 @@ func NewServerFromConfig() (*Server, error) { return nil, fmt.Errorf("unable to get peers endpoints for cluster '%s': %w", clusterName, err) } peeringOpts := hub.PeeringOpts{ - Endpoints: endpoints, + Endpoints: endpoints, + PublisherFilter: config.GetString(peerConfig + ".publish_filter"), + SubscriptionFilter: config.GetString(peerConfig + ".subscribe_filter"), } if wsClientOpts, err := config.NewWSClientOpts( @@ -282,10 +284,6 @@ func NewServerFromConfig() (*Server, error) { peeringOpts.WebsocketClientOpts = *wsClientOpts } - if subscriptionFilter := config.GetString(peerConfig + ".subscription_filter"); subscriptionFilter != "" { - peeringOpts.WebsocketClientOpts.Headers.Add("X-Gremlin-Filter", subscriptionFilter) - } - clusterPeers[clusterName] = peeringOpts } diff --git a/cmd/client/edgerule.go b/cmd/client/edgerule.go index 3ba511cf1c..3bc4e3a112 100644 --- a/cmd/client/edgerule.go +++ b/cmd/client/edgerule.go @@ -22,7 +22,6 @@ import ( "os" api "github.com/skydive-project/skydive/api/types" - gclient "github.com/skydive-project/skydive/graffiti/cmd/client" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/validator" @@ -57,7 +56,7 @@ var EdgeRuleCreate = &cobra.Command{ } }, Run: func(cmd *cobra.Command, args []string) { - m, err := gclient.DefToMetadata(metadata, graph.Metadata{}) + m, err := graph.DefToMetadata(metadata, graph.Metadata{}) if err != nil { exitOnError(err) } diff --git a/cmd/client/noderule.go b/cmd/client/noderule.go index b324ca6bf3..c93cb1a4a6 100644 --- a/cmd/client/noderule.go +++ b/cmd/client/noderule.go @@ -23,7 +23,6 @@ import ( "os" api "github.com/skydive-project/skydive/api/types" - gclient "github.com/skydive-project/skydive/graffiti/cmd/client" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/validator" @@ -57,7 +56,7 @@ var NodeRuleCreate = &cobra.Command{ SilenceUsage: false, Run: func(cmd *cobra.Command, args []string) { - m, err := gclient.DefToMetadata(metadata, graph.Metadata{}) + m, err := graph.DefToMetadata(metadata, graph.Metadata{}) if err != nil { exitOnError(err) } diff --git a/cmd/seed/seed.go b/cmd/seed/seed.go index 793cd7ff72..6d3a66683c 100644 --- a/cmd/seed/seed.go +++ b/cmd/seed/seed.go @@ -26,10 +26,10 @@ import ( "github.com/skydive-project/skydive/agent" "github.com/skydive-project/skydive/config" + "github.com/skydive-project/skydive/graffiti/clients" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/http" "github.com/skydive-project/skydive/graffiti/logging" - "github.com/skydive-project/skydive/graffiti/seed" "github.com/skydive-project/skydive/graffiti/websocket" "github.com/skydive-project/skydive/probe" tp "github.com/skydive-project/skydive/topology/probes" @@ -104,7 +104,7 @@ var SeedCmd = &cobra.Command{ os.Exit(1) } - origin := graph.Origin(hostID, seed.Service) + origin := graph.Origin(hostID, clients.SeedService) g := graph.NewGraph(hostID, memory, origin) probeBundle = probe.NewBundle() @@ -121,13 +121,12 @@ var SeedCmd = &cobra.Command{ TLSConfig: tlsConfig, } - seed, err := seed.NewSeed(g, seed.Service, agentAddr, subscriberFilter, *wsOpts, logging.GetLogger()) + seed, err := clients.NewSeed(g, clients.SeedService, agentAddr, subscriberFilter, "", *wsOpts, logging.GetLogger()) if err != nil { logging.GetLogger().Errorf("Failed to start seed: %s", err) os.Exit(1) } - seed.AddEventHandler(&seedHandler{g: g, probes: args}) seed.Start() logging.GetLogger().Notice("Skydive seed started") diff --git a/graffiti/forwarder/forwarder.go b/graffiti/clients/forwarder.go similarity index 83% rename from graffiti/forwarder/forwarder.go rename to graffiti/clients/forwarder.go index c59f810fa3..ae04cbf929 100644 --- a/graffiti/forwarder/forwarder.go +++ b/graffiti/clients/forwarder.go @@ -15,9 +15,10 @@ * */ -package forwarder +package clients import ( + "github.com/skydive-project/skydive/graffiti/filters" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/messages" @@ -31,6 +32,7 @@ type Forwarder struct { masterElection *ws.MasterElection graph *graph.Graph logger logging.Logger + nodeFilter *filters.Filter } func (t *Forwarder) triggerResync() { @@ -68,6 +70,10 @@ func (t *Forwarder) OnNewMaster(c ws.Speaker) { // OnNodeUpdated graph node updated event. Implements the EventListener interface. func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) { + if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage( messages.NodePartiallyUpdatedMsgType, @@ -83,6 +89,10 @@ func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) // OnNodeAdded graph node added event. Implements the EventListener interface. func (t *Forwarder) OnNodeAdded(n *graph.Node) { + if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage(messages.NodeAddedMsgType, n), ) @@ -90,6 +100,10 @@ func (t *Forwarder) OnNodeAdded(n *graph.Node) { // OnNodeDeleted graph node deleted event. Implements the EventListener interface. func (t *Forwarder) OnNodeDeleted(n *graph.Node) { + if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage(messages.NodeDeletedMsgType, n), ) @@ -97,6 +111,12 @@ func (t *Forwarder) OnNodeDeleted(n *graph.Node) { // OnEdgeUpdated graph edge updated event. Implements the EventListener interface. func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) { + if t.nodeFilter != nil && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage( messages.EdgePartiallyUpdatedMsgType, @@ -112,6 +132,12 @@ func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) // OnEdgeAdded graph edge added event. Implements the EventListener interface. func (t *Forwarder) OnEdgeAdded(e *graph.Edge) { + if t.nodeFilter != nil && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage(messages.EdgeAddedMsgType, e), ) @@ -119,6 +145,12 @@ func (t *Forwarder) OnEdgeAdded(e *graph.Edge) { // OnEdgeDeleted graph edge deleted event. Implements the EventListener interface. func (t *Forwarder) OnEdgeDeleted(e *graph.Edge) { + if t.nodeFilter != nil && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + return + } + t.masterElection.SendMessageToMaster( messages.NewStructMessage(messages.EdgeDeletedMsgType, e), ) @@ -131,7 +163,7 @@ func (t *Forwarder) GetMaster() ws.Speaker { // NewForwarder returns a new Graph forwarder which forwards event of the given graph // to the given WebSocket JSON speakers. -func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logger) *Forwarder { +func NewForwarder(g *graph.Graph, pool ws.SpeakerPool, nodeFilter *filters.Filter, logger logging.Logger) *Forwarder { if logger == nil { logger = logging.GetLogger() } @@ -142,6 +174,7 @@ func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logg masterElection: masterElection, graph: g, logger: logger, + nodeFilter: nodeFilter, } masterElection.AddEventHandler(t) diff --git a/graffiti/seed/seed.go b/graffiti/clients/seed.go similarity index 58% rename from graffiti/seed/seed.go rename to graffiti/clients/seed.go index 3087350693..ab6af1c84b 100644 --- a/graffiti/seed/seed.go +++ b/graffiti/clients/seed.go @@ -15,24 +15,23 @@ * */ -package seed +package clients import ( "fmt" "net/url" "github.com/skydive-project/skydive/graffiti/endpoints" - "github.com/skydive-project/skydive/graffiti/forwarder" + "github.com/skydive-project/skydive/graffiti/filters" "github.com/skydive-project/skydive/graffiti/graph" - "github.com/skydive-project/skydive/graffiti/hub" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/messages" "github.com/skydive-project/skydive/graffiti/service" ws "github.com/skydive-project/skydive/graffiti/websocket" ) -// Service defines the seed service type -const Service service.Type = "seed" +// SeedService defines the seed service type +const SeedService service.Type = "seed" // EventHandler is the interface to be implemented by event handler type EventHandler interface { @@ -44,14 +43,11 @@ type EventHandler interface { // all its graph events to the agent. A filter can be used to // subscribe only to a part of the agent graph. type Seed struct { + *ws.Client ws.DefaultSpeakerEventHandler - forwarder *forwarder.Forwarder - clientPool *ws.StructClientPool - publisher *ws.Client - subscriber *hub.Subscriber + subscriber *Subscriber g *graph.Graph logger logging.Logger - listeners []EventHandler } // OnConnected websocket listener @@ -60,68 +56,59 @@ func (s *Seed) OnConnected(c ws.Speaker) error { return s.subscriber.SendMessage(messages.NewStructMessage(messages.SyncRequestMsgType, messages.SyncRequestMsg{})) } -// AddEventHandler register an event handler -func (s *Seed) AddEventHandler(handler EventHandler) { - s.listeners = append(s.listeners, handler) -} - -// RemoveEventHandler unregister an event handler -func (s *Seed) RemoveEventHandler(handler EventHandler) { - for i, el := range s.listeners { - if handler == el { - s.listeners = append(s.listeners[:i], s.listeners[i+1:]...) - break - } - } -} - // Start the seed func (s *Seed) Start() { s.subscriber.Start() - s.publisher.Start() + s.Client.Start() } // Stop the seed func (s *Seed) Stop() { - s.publisher.Stop() + s.Client.Stop() s.subscriber.Stop() } // NewSeed returns a new seed -func NewSeed(g *graph.Graph, clientType service.Type, address, filter string, wsOpts ws.ClientOpts, logger logging.Logger) (*Seed, error) { +func NewSeed(g *graph.Graph, clientType service.Type, address, subscribeFilter, publishFilter string, wsOpts ws.ClientOpts, logger logging.Logger) (*Seed, error) { wsOpts.Headers.Add("X-Websocket-Namespace", messages.Namespace) if len(address) == 0 { address = "127.0.0.1:8081" } - url, err := url.Parse("ws://" + address + "/ws/publisher") + url, err := url.Parse("ws://" + address + "/ws/pubsub") if err != nil { return nil, fmt.Errorf("unable to parse the Address: %s, please check the configuration file", address) } - pubClient := ws.NewClient(g.GetHost(), clientType, url, wsOpts) + wsOpts.Headers.Add("X-Gremlin-Filter", subscribeFilter) + wsOpts.Headers.Add("X-Update-Policy", endpoints.PartialUpdates) - pool := ws.NewStructClientPool("publisher", ws.PoolOpts{Logger: wsOpts.Logger}) - if err := pool.AddClient(pubClient); err != nil { - return nil, fmt.Errorf("failed to add client: %s", err) + pubsubClient := ws.NewClient(g.GetHost(), clientType, url, wsOpts) + + pool := ws.NewStructClientPool("pubsub", ws.PoolOpts{Logger: wsOpts.Logger}) + if err := pool.AddClient(pubsubClient); err != nil { + return nil, fmt.Errorf("failed to add client: %w", err) } - forwarder.NewForwarder(g, pool, logger) + var metadataFilter *filters.Filter + if publishFilter != "" { + publishMetadata := graph.Metadata{} + if _, err := graph.DefToMetadata(publishFilter, publishMetadata); err != nil { + return nil, fmt.Errorf("failed to create publish filter: %w", err) + } - if url, err = url.Parse("ws://" + address + "/ws/subscriber"); err != nil { - return nil, fmt.Errorf("unable to parse the Address: %s, please check the configuration file", address) + if metadataFilter, err = publishMetadata.Filter(); err != nil { + return nil, fmt.Errorf("failed to create publish filter: %w", err) + } } - wsOpts.Headers.Add("X-Gremlin-Filter", filter) - wsOpts.Headers.Add("X-Update-Policy", endpoints.PartialUpdates) - - subClient := ws.NewClient(g.GetHost(), clientType, url, wsOpts) - subscriber := hub.NewSubscriber(subClient, g, wsOpts.Logger) + NewForwarder(g, pool, metadataFilter, logger) + subscriber := NewSubscriber(pubsubClient, g, wsOpts.Logger) s := &Seed{ + Client: pubsubClient, g: g, - publisher: pubClient, subscriber: subscriber, logger: wsOpts.Logger, } diff --git a/graffiti/hub/subscriber.go b/graffiti/clients/subscriber.go similarity index 99% rename from graffiti/hub/subscriber.go rename to graffiti/clients/subscriber.go index 5d34a41864..f9ee793061 100644 --- a/graffiti/hub/subscriber.go +++ b/graffiti/clients/subscriber.go @@ -15,7 +15,7 @@ * */ -package hub +package clients import ( "net/http" diff --git a/graffiti/cmd/client/client.go b/graffiti/cmd/client/client.go index 85953672a2..395eb294e9 100644 --- a/graffiti/cmd/client/client.go +++ b/graffiti/cmd/client/client.go @@ -19,13 +19,10 @@ package client import ( "crypto/tls" - "fmt" "os" - "strings" "github.com/spf13/cobra" - "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/http" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/service" @@ -94,28 +91,6 @@ func exitOnError(err error) { os.Exit(1) } -// DefToMetadata converts a string in k1=v1,k2=v2,... format to a metadata object -func DefToMetadata(def string, metadata graph.Metadata) (graph.Metadata, error) { - if def == "" { - return metadata, nil - } - - for _, pair := range strings.Split(def, ",") { - pair = strings.TrimSpace(pair) - - kv := strings.Split(pair, "=") - if len(kv)%2 != 0 { - return nil, fmt.Errorf("attributes must be defined by pair k=v: %v", def) - } - key := strings.Trim(kv[0], `"`) - value := strings.Trim(kv[1], `"`) - - metadata.SetField(key, value) - } - - return metadata, nil -} - func init() { ClientCmd.PersistentFlags().StringVarP(&AuthenticationOpts.Username, "username", "", AuthenticationOpts.Username, "username auth parameter") ClientCmd.PersistentFlags().StringVarP(&AuthenticationOpts.Password, "password", "", AuthenticationOpts.Password, "password auth parameter") diff --git a/graffiti/cmd/client/edge.go b/graffiti/cmd/client/edge.go index f8b7432881..b4b053a9fe 100644 --- a/graffiti/cmd/client/edge.go +++ b/graffiti/cmd/client/edge.go @@ -55,7 +55,7 @@ var EdgeCreate = &cobra.Command{ SilenceUsage: false, Run: func(cmd *cobra.Command, args []string) { - m, err := DefToMetadata(edgeMetadata, graph.Metadata{}) + m, err := graph.DefToMetadata(edgeMetadata, graph.Metadata{}) if err != nil { exitOnError(err) } diff --git a/graffiti/cmd/client/node.go b/graffiti/cmd/client/node.go index 44aae1eddf..386bd3ca36 100644 --- a/graffiti/cmd/client/node.go +++ b/graffiti/cmd/client/node.go @@ -55,7 +55,7 @@ var NodeCreate = &cobra.Command{ SilenceUsage: false, Run: func(cmd *cobra.Command, args []string) { - m, err := DefToMetadata(nodeMetadata, graph.Metadata{}) + m, err := graph.DefToMetadata(nodeMetadata, graph.Metadata{}) if err != nil { exitOnError(err) } diff --git a/graffiti/endpoints/pubsub_endpoint.go b/graffiti/endpoints/pubsub_endpoint.go new file mode 100644 index 0000000000..080b5ad730 --- /dev/null +++ b/graffiti/endpoints/pubsub_endpoint.go @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2020 Sylvain Baubeau + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy ofthe License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specificlanguage governing permissions and + * limitations under the License. + * + */ + +package endpoints + +import ( + "github.com/skydive-project/skydive/graffiti/api/server" + "github.com/skydive-project/skydive/graffiti/graph" + "github.com/skydive-project/skydive/graffiti/graph/traversal" + "github.com/skydive-project/skydive/graffiti/logging" + ws "github.com/skydive-project/skydive/graffiti/websocket" +) + +// PubSubEndpoint describes a WebSocket endpoint that can be used for both +// publishing and subscribing +type PubSubEndpoint struct { + publisherEndpoint *PublisherEndpoint + subscriberEndpoint *SubscriberEndpoint +} + +// NewPubSubEndpoint returns a new PubSub endpoint +func NewPubSubEndpoint(pool ws.StructSpeakerPool, validator server.Validator, g *graph.Graph, tr *traversal.GremlinTraversalParser, logger logging.Logger) *PubSubEndpoint { + return &PubSubEndpoint{ + publisherEndpoint: NewPublisherEndpoint(pool, g, validator, logger), + subscriberEndpoint: NewSubscriberEndpoint(pool, g, tr, logger), + } +} diff --git a/graffiti/graph/metadata.go b/graffiti/graph/metadata.go index 4050f4f8f9..f350f27d36 100644 --- a/graffiti/graph/metadata.go +++ b/graffiti/graph/metadata.go @@ -434,3 +434,25 @@ func GetMapField(obj map[string]interface{}, k string) (interface{}, error) { return obj, nil } + +// DefToMetadata converts a string in k1=v1,k2=v2,... format to a metadata object +func DefToMetadata(def string, metadata Metadata) (Metadata, error) { + if def == "" { + return metadata, nil + } + + for _, pair := range strings.Split(def, ",") { + pair = strings.TrimSpace(pair) + + kv := strings.Split(pair, "=") + if len(kv)%2 != 0 { + return nil, fmt.Errorf("attributes must be defined by pair k=v: %v", def) + } + key := strings.Trim(kv[0], `"`) + value := strings.Trim(kv[1], `"`) + + metadata.SetField(key, value) + } + + return metadata, nil +} diff --git a/graffiti/hub/hub.go b/graffiti/hub/hub.go index 5dccb4dd3d..424b516349 100644 --- a/graffiti/hub/hub.go +++ b/graffiti/hub/hub.go @@ -24,7 +24,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" etcd "github.com/coreos/etcd/client" @@ -32,9 +31,11 @@ import ( "github.com/skydive-project/skydive/graffiti/alert" api "github.com/skydive-project/skydive/graffiti/api/server" "github.com/skydive-project/skydive/graffiti/assets" + "github.com/skydive-project/skydive/graffiti/clients" "github.com/skydive-project/skydive/graffiti/endpoints" etcdclient "github.com/skydive-project/skydive/graffiti/etcd/client" etcdserver "github.com/skydive-project/skydive/graffiti/etcd/server" + "github.com/skydive-project/skydive/graffiti/filters" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/graph/traversal" "github.com/skydive-project/skydive/graffiti/http" @@ -74,62 +75,8 @@ type Opts struct { type PeeringOpts struct { Endpoints []service.Address WebsocketClientOpts websocket.ClientOpts -} - -type clusterPeering struct { - currentPeer int - clusterName string - logger logging.Logger - masterElection etcdclient.MasterElection - peers *websocket.ClientPool - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup -} - -func (p *clusterPeering) OnStartAsMaster() { - p.connect() -} - -func (p *clusterPeering) OnSwitchToMaster() { - p.connect() -} - -func (p *clusterPeering) OnStartAsSlave() { -} - -func (p *clusterPeering) OnSwitchToSlave() { - p.cancel() - p.wg.Wait() -} - -func (p *clusterPeering) connect() { - p.ctx, p.cancel = context.WithCancel(context.Background()) - speakers := p.peers.GetSpeakers() - p.wg.Add(1) - - go func() { - defer p.wg.Done() - - for { - select { - case <-p.ctx.Done(): - return - default: - speaker := speakers[p.currentPeer] - if err := speaker.Connect(p.ctx); err == nil && p.masterElection.IsMaster() { - p.logger.Infof("Peered to cluster %s", p.clusterName) - speaker.Run() - } - - p.currentPeer = (p.currentPeer + 1) % len(speakers) - if p.currentPeer == 0 { - p.logger.Warningf("Failed to peer with cluster %s, retrying in 3 seconds", p.clusterName) - time.Sleep(3 * time.Second) - } - } - } - }() + PublisherFilter string + SubscriptionFilter string } // Hub describes a graph hub that accepts incoming connections @@ -387,6 +334,11 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, subscriberWSServer := websocket.NewStructServer(websocket.NewServer(httpServer, "/ws/subscriber", subOpts)) endpoints.NewSubscriberEndpoint(subscriberWSServer, g, tr, opts.Logger) + pubsubOpts := opts.WebsocketOpts + pubsubOpts.AuthBackend = opts.APIAuthBackend + pubsubWSServer := websocket.NewStructServer(websocket.NewServer(httpServer, "/ws/pubsub", pubsubOpts)) + endpoints.NewPubSubEndpoint(pubsubWSServer, opts.GraphValidator, g, tr, opts.Logger) + apiServer, err := api.NewAPI(httpServer, opts.EtcdClient, opts.Version, id, serviceType, opts.APIAuthBackend, opts.APIValidator) if err != nil { return nil, err @@ -410,21 +362,65 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, opts.Logger.Debugf("Peering with cluster %s and endpoints %+v", remoteCluster, peeringOpts.Endpoints) clientPool := websocket.NewClientPool("HubPeering-"+remoteCluster, websocket.PoolOpts{Logger: opts.WebsocketClientOpts.Logger}) - for _, peer := range peeringOpts.Endpoints { - url, _ := http.MakeURL("ws", peer.Addr, peer.Port, "/ws/subscriber", peeringOpts.WebsocketClientOpts.TLSConfig != nil) - client := websocket.NewClient(id, serviceType, url, peeringOpts.WebsocketClientOpts) - subscriber := NewSubscriber(client, g, opts.Logger) - clientPool.AddClient(subscriber) - } peering := &clusterPeering{ clusterName: remoteCluster, peers: clientPool, logger: opts.Logger, } - hub.clusterPeerings[remoteCluster] = peering peering.masterElection = hub.etcdClient.NewElection("/elections/hub-peering/" + opts.ClusterName + "/" + remoteCluster) + + for _, peer := range peeringOpts.Endpoints { + var client websocket.Speaker + switch { + case peeringOpts.SubscriptionFilter != "" && peeringOpts.PublisherFilter != "": + var subscriptionFilter, publisherFilter string + if peeringOpts.SubscriptionFilter != "*" { + subscriptionFilter = peeringOpts.SubscriptionFilter + } + if peeringOpts.PublisherFilter != "*" { + publisherFilter = peeringOpts.PublisherFilter + } + + client, err = clients.NewSeed(g, serviceType, fmt.Sprintf("%s:%d", peer.Addr, peer.Port), subscriptionFilter, publisherFilter, peeringOpts.WebsocketClientOpts, opts.Logger) + if err != nil { + return nil, err + } + case peeringOpts.SubscriptionFilter != "": + url, _ := http.MakeURL("ws", peer.Addr, peer.Port, "/ws/subscriber", peeringOpts.WebsocketClientOpts.TLSConfig != nil) + wsClient := websocket.NewClient(id, serviceType, url, peeringOpts.WebsocketClientOpts) + if peeringOpts.SubscriptionFilter != "*" { + peeringOpts.WebsocketClientOpts.Headers.Add("X-Gremlin-Filter", peeringOpts.SubscriptionFilter) + } + client = clients.NewSubscriber(wsClient, g, opts.Logger) + default: + url, _ := http.MakeURL("ws", peer.Addr, peer.Port, "/ws/publisher", peeringOpts.WebsocketClientOpts.TLSConfig != nil) + client = websocket.NewClient(id, serviceType, url, peeringOpts.WebsocketClientOpts) + } + clientPool.AddClient(client) + } + + if peeringOpts.SubscriptionFilter == "" { + opts.Logger.Debugf("Creating new forwarder for peering") + + var metadataFilter *filters.Filter + if peeringOpts.PublisherFilter != "*" { + publishMetadata := graph.Metadata{} + _, err := graph.DefToMetadata(peeringOpts.PublisherFilter, publishMetadata) + if err != nil { + return nil, err + } + + if metadataFilter, err = publishMetadata.Filter(); err != nil { + return nil, fmt.Errorf("failed to create publish filter: %w", err) + } + } + clients.NewForwarder(g, clientPool, metadataFilter, opts.Logger) + } + + hub.clusterPeerings[remoteCluster] = peering + peering.masterElection.AddEventListener(peering) } diff --git a/graffiti/hub/peering.go b/graffiti/hub/peering.go new file mode 100644 index 0000000000..5f66aef140 --- /dev/null +++ b/graffiti/hub/peering.go @@ -0,0 +1,73 @@ +package hub + +import ( + "context" + "sync" + "time" + + etcdclient "github.com/skydive-project/skydive/graffiti/etcd/client" + "github.com/skydive-project/skydive/graffiti/logging" + "github.com/skydive-project/skydive/graffiti/websocket" +) + +type clusterPeering struct { + currentPeer int + clusterName string + logger logging.Logger + masterElection etcdclient.MasterElection + peers *websocket.ClientPool + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (p *clusterPeering) OnStartAsMaster() { + p.connect() +} + +func (p *clusterPeering) OnSwitchToMaster() { + p.connect() +} + +func (p *clusterPeering) OnStartAsSlave() { +} + +func (p *clusterPeering) OnSwitchToSlave() { + p.cancel() + p.wg.Wait() +} + +func (p *clusterPeering) OnNewMaster(c websocket.Speaker) { +} + +func (p *clusterPeering) connect() { + websocketElection := websocket.NewMasterElection(p.peers) + websocketElection.AddEventHandler(p) + + p.ctx, p.cancel = context.WithCancel(context.Background()) + speakers := p.peers.GetSpeakers() + p.wg.Add(1) + + go func() { + defer p.wg.Done() + + for { + select { + case <-p.ctx.Done(): + return + default: + speaker := speakers[p.currentPeer] + if err := speaker.Connect(p.ctx); err == nil && p.masterElection.IsMaster() { + p.logger.Infof("Peered to cluster %s", p.clusterName) + speaker.Run() + } + + p.currentPeer = (p.currentPeer + 1) % len(speakers) + if p.currentPeer == 0 { + p.logger.Warningf("Failed to peer with cluster %s, retrying in 3 seconds", p.clusterName) + time.Sleep(3 * time.Second) + } + } + } + }() +} diff --git a/graffiti/pod/pod.go b/graffiti/pod/pod.go index 3717305a7f..6ef385d551 100644 --- a/graffiti/pod/pod.go +++ b/graffiti/pod/pod.go @@ -22,8 +22,8 @@ import ( "net/http" api "github.com/skydive-project/skydive/graffiti/api/server" + "github.com/skydive-project/skydive/graffiti/clients" "github.com/skydive-project/skydive/graffiti/endpoints" - "github.com/skydive-project/skydive/graffiti/forwarder" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/graph/traversal" shttp "github.com/skydive-project/skydive/graffiti/http" @@ -55,7 +55,7 @@ type Pod struct { apiServer *api.Server subscriberWSServer *websocket.StructServer publisherWSServer *websocket.StructServer - forwarder *forwarder.Forwarder + forwarder *clients.Forwarder clientPool *websocket.StructClientPool traversalParser *traversal.GremlinTraversalParser } @@ -120,7 +120,7 @@ func (p *Pod) SubscriberServer() *websocket.StructServer { } // Forwarder returns the pod topology forwarder -func (p *Pod) Forwarder() *forwarder.Forwarder { +func (p *Pod) Forwarder() *clients.Forwarder { return p.forwarder } @@ -184,7 +184,7 @@ func NewPod(id string, serviceType service.Type, listen string, podEndpoint stri tr := traversal.NewGremlinTraversalParser() endpoints.NewSubscriberEndpoint(subscriberWSServer, g, tr, opts.Logger) - forwarder := forwarder.NewForwarder(g, clientPool, logging.GetLogger()) + forwarder := clients.NewForwarder(g, clientPool, nil, logging.GetLogger()) publisherWSServer := websocket.NewStructServer(newWSServer("/ws/publisher", opts.APIAuthBackend)) endpoints.NewPublisherEndpoint(publisherWSServer, g, opts.GraphValidator, opts.Logger) diff --git a/rbac/policy.csv b/rbac/policy.csv index 36850e207d..830126764a 100644 --- a/rbac/policy.csv +++ b/rbac/policy.csv @@ -17,6 +17,7 @@ p, admin, websocket, /ws/subscriber/flow, allow p, admin, websocket, /ws/publisher, allow p, admin, websocket, /ws/replication, allow p, admin, websocket, /ws/subscriber, allow +p, admin, websocket, /ws/pubsub, allow p, admin, node, read, allow p, admin, node, write, allow p, admin, noderule, read, allow From f6d27272f409161ece576df08a3b93690f60d29c Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 10 Aug 2021 09:32:53 +0200 Subject: [PATCH 5/8] websocket: use default queue size if none was specified --- graffiti/hub/hub.go | 4 ++++ graffiti/websocket/client.go | 10 ++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/graffiti/hub/hub.go b/graffiti/hub/hub.go index 424b516349..d7ec4ccc6d 100644 --- a/graffiti/hub/hub.go +++ b/graffiti/hub/hub.go @@ -293,6 +293,10 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, opts.Logger = logging.GetLogger() } + if opts.WebsocketClientOpts.Logger == nil { + opts.WebsocketClientOpts.Logger = opts.Logger + } + hub := &Hub{ Graph: g, cached: cached, diff --git a/graffiti/websocket/client.go b/graffiti/websocket/client.go index a080ad476f..5573fa4552 100644 --- a/graffiti/websocket/client.go +++ b/graffiti/websocket/client.go @@ -39,8 +39,9 @@ import ( ) const ( - maxMessageSize = 0 - writeWait = 10 * time.Second + maxMessageSize = 0 + writeWait = 10 * time.Second + defaultQueueSize = 10000 ) // ConnState describes the connection state @@ -188,6 +189,7 @@ type ClientOpts struct { func NewClientOpts() ClientOpts { return ClientOpts{ Headers: http.Header{}, + Logger: logging.GetLogger(), } } @@ -479,6 +481,10 @@ func newConn(host string, clientType service.Type, clientProtocol Protocol, url headers = http.Header{} } + if opts.QueueSize == 0 { + opts.QueueSize = defaultQueueSize + } + port, _ := strconv.Atoi(url.Port()) c := &Conn{ ConnStatus: ConnStatus{ From 07b5c7fea864f896655b38af726cd076ab7f0b3d Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 10 Aug 2021 09:33:26 +0200 Subject: [PATCH 6/8] Report peering status --- graffiti/hub/hub.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/graffiti/hub/hub.go b/graffiti/hub/hub.go index d7ec4ccc6d..4aa9c648f4 100644 --- a/graffiti/hub/hub.go +++ b/graffiti/hub/hub.go @@ -113,13 +113,20 @@ type PeersStatus struct { Outgoers map[string]websocket.ConnStatus } +// PeeredClustersStatus describes the state of peering with an other cluster +type PeeredClustersStatus struct { + Election ElectionStatus + Outgoers []websocket.ConnStatus +} + // Status describes the status of a hub type Status struct { - Alerts ElectionStatus - Pods map[string]websocket.ConnStatus - Peers PeersStatus - Publishers map[string]websocket.ConnStatus - Subscribers map[string]websocket.ConnStatus + Alerts ElectionStatus + Pods map[string]websocket.ConnStatus + Peers PeersStatus + Publishers map[string]websocket.ConnStatus + Subscribers map[string]websocket.ConnStatus + PeeredClusters map[string]PeeredClustersStatus } // GetStatus returns the status of a hub @@ -137,12 +144,27 @@ func (h *Hub) GetStatus() interface{} { peersStatus.Outgoers[speaker.GetRemoteHost()] = speaker.GetStatus() } + peeredClusters := make(map[string]PeeredClustersStatus) + for cluster, peering := range h.clusterPeerings { + outgoers := make([]websocket.ConnStatus, len(peering.peers.GetSpeakers())) + for i, speaker := range peering.peers.GetSpeakers() { + outgoers[i] = speaker.GetStatus() + } + peeredClusters[cluster] = PeeredClustersStatus{ + Election: ElectionStatus{ + IsMaster: peering.masterElection.IsMaster(), + }, + Outgoers: outgoers, + } + } + return &Status{ - Pods: h.podWSServer.GetStatus(), - Peers: peersStatus, - Publishers: h.publisherWSServer.GetStatus(), - Subscribers: h.subscriberWSServer.GetStatus(), - Alerts: ElectionStatus{IsMaster: h.alertServer.IsMaster()}, + Pods: h.podWSServer.GetStatus(), + Peers: peersStatus, + Publishers: h.publisherWSServer.GetStatus(), + Subscribers: h.subscriberWSServer.GetStatus(), + Alerts: ElectionStatus{IsMaster: h.alertServer.IsMaster()}, + PeeredClusters: peeredClusters, } } From 67ea9da3100345d233f16a0a4d070da96f1d1cf5 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 10 Aug 2021 20:12:17 +0200 Subject: [PATCH 7/8] Do not forward message to the peer that sent it --- analyzer/server.go | 4 ++-- graffiti/clients/forwarder.go | 26 ++++++++++++++-------- graffiti/clients/seed.go | 4 ++-- graffiti/clients/subscriber.go | 15 +++++++++---- graffiti/endpoints/publisher_endpoint.go | 14 +++++++++++- graffiti/endpoints/pubsub_endpoint.go | 5 +++-- graffiti/endpoints/replication_endpoint.go | 4 ++++ graffiti/endpoints/subscriber_endpoint.go | 22 ++++++++++++++++++ graffiti/hub/hub.go | 8 +++---- graffiti/hub/peering.go | 7 ++++-- graffiti/pod/pod.go | 2 +- 11 files changed, 84 insertions(+), 27 deletions(-) diff --git a/analyzer/server.go b/analyzer/server.go index 51cec9ee4c..2a99162a38 100644 --- a/analyzer/server.go +++ b/analyzer/server.go @@ -261,7 +261,7 @@ func NewServerFromConfig() (*Server, error) { return nil, err } - clusterPeers := make(map[string]hub.PeeringOpts) + clusterPeers := make(map[string]*hub.PeeringOpts) peeredClusters := config.GetStringMapString("analyzer.peers") for clusterName := range peeredClusters { peerConfig := "analyzer.peers." + clusterName @@ -269,7 +269,7 @@ func NewServerFromConfig() (*Server, error) { if err != nil { return nil, fmt.Errorf("unable to get peers endpoints for cluster '%s': %w", clusterName, err) } - peeringOpts := hub.PeeringOpts{ + peeringOpts := &hub.PeeringOpts{ Endpoints: endpoints, PublisherFilter: config.GetString(peerConfig + ".publish_filter"), SubscriptionFilter: config.GetString(peerConfig + ".subscribe_filter"), diff --git a/graffiti/clients/forwarder.go b/graffiti/clients/forwarder.go index ae04cbf929..ddd25f7b3f 100644 --- a/graffiti/clients/forwarder.go +++ b/graffiti/clients/forwarder.go @@ -18,6 +18,8 @@ package clients import ( + "sync/atomic" + "github.com/skydive-project/skydive/graffiti/filters" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/logging" @@ -33,6 +35,7 @@ type Forwarder struct { graph *graph.Graph logger logging.Logger nodeFilter *filters.Filter + inhibit atomic.Value } func (t *Forwarder) triggerResync() { @@ -70,7 +73,7 @@ func (t *Forwarder) OnNewMaster(c ws.Speaker) { // OnNodeUpdated graph node updated event. Implements the EventListener interface. func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) { - if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) { return } @@ -89,7 +92,7 @@ func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) // OnNodeAdded graph node added event. Implements the EventListener interface. func (t *Forwarder) OnNodeAdded(n *graph.Node) { - if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) { return } @@ -100,7 +103,7 @@ func (t *Forwarder) OnNodeAdded(n *graph.Node) { // OnNodeDeleted graph node deleted event. Implements the EventListener interface. func (t *Forwarder) OnNodeDeleted(n *graph.Node) { - if t.nodeFilter != nil && !t.nodeFilter.Eval(n) { + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) { return } @@ -111,9 +114,9 @@ func (t *Forwarder) OnNodeDeleted(n *graph.Node) { // OnEdgeUpdated graph edge updated event. Implements the EventListener interface. func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) { - if t.nodeFilter != nil && + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && - !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) { return } @@ -132,9 +135,9 @@ func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) // OnEdgeAdded graph edge added event. Implements the EventListener interface. func (t *Forwarder) OnEdgeAdded(e *graph.Edge) { - if t.nodeFilter != nil && + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && - !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) { return } @@ -145,9 +148,9 @@ func (t *Forwarder) OnEdgeAdded(e *graph.Edge) { // OnEdgeDeleted graph edge deleted event. Implements the EventListener interface. func (t *Forwarder) OnEdgeDeleted(e *graph.Edge) { - if t.nodeFilter != nil && + if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) && - !t.nodeFilter.Eval(t.graph.GetEdge(e.Child)) { + !t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) { return } @@ -156,6 +159,11 @@ func (t *Forwarder) OnEdgeDeleted(e *graph.Edge) { ) } +// Inhib node and edge forwarding +func (t *Forwarder) Inhib(c ws.Speaker) { + t.inhibit.Store(c != nil) +} + // GetMaster returns the current analyzer the agent is sending its events to func (t *Forwarder) GetMaster() ws.Speaker { return t.masterElection.GetMaster() diff --git a/graffiti/clients/seed.go b/graffiti/clients/seed.go index ab6af1c84b..0775e60459 100644 --- a/graffiti/clients/seed.go +++ b/graffiti/clients/seed.go @@ -103,8 +103,8 @@ func NewSeed(g *graph.Graph, clientType service.Type, address, subscribeFilter, } } - NewForwarder(g, pool, metadataFilter, logger) - subscriber := NewSubscriber(pubsubClient, g, wsOpts.Logger) + forwarder := NewForwarder(g, pool, metadataFilter, logger) + subscriber := NewSubscriber(pubsubClient, g, wsOpts.Logger, forwarder) s := &Seed{ Client: pubsubClient, diff --git a/graffiti/clients/subscriber.go b/graffiti/clients/subscriber.go index f9ee793061..0503763100 100644 --- a/graffiti/clients/subscriber.go +++ b/graffiti/clients/subscriber.go @@ -20,6 +20,7 @@ package clients import ( "net/http" + "github.com/skydive-project/skydive/graffiti/endpoints" "github.com/skydive-project/skydive/graffiti/graph" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/messages" @@ -28,8 +29,9 @@ import ( type Subscriber struct { *websocket.StructSpeaker - g *graph.Graph - logger logging.Logger + g *graph.Graph + logger logging.Logger + inhibitor endpoints.Inhibitor } // OnStructMessage callback @@ -53,6 +55,11 @@ func (s *Subscriber) OnStructMessage(c websocket.Speaker, msg *websocket.StructM s.g.Lock() defer s.g.Unlock() + if s.inhibitor != nil { + s.inhibitor.Inhib(c) + defer s.inhibitor.Inhib(nil) + } + switch msgType { case messages.SyncMsgType, messages.SyncReplyMsgType: r := obj.(*messages.SyncMsg) @@ -94,14 +101,14 @@ func (s *Subscriber) OnStructMessage(c websocket.Speaker, msg *websocket.StructM } } -func NewSubscriber(client *websocket.Client, g *graph.Graph, logger logging.Logger) *Subscriber { +func NewSubscriber(client *websocket.Client, g *graph.Graph, logger logging.Logger, inibitor endpoints.Inhibitor) *Subscriber { structSpeaker := client.UpgradeToStructSpeaker() subscriber := &Subscriber{ StructSpeaker: structSpeaker, g: g, logger: logger, + inhibitor: inibitor, } - structSpeaker.AddEventHandler(subscriber) subscriber.AddStructMessageHandler(subscriber, []string{messages.Namespace}) return subscriber } diff --git a/graffiti/endpoints/publisher_endpoint.go b/graffiti/endpoints/publisher_endpoint.go index 9cf9a9200f..62679b91d2 100644 --- a/graffiti/endpoints/publisher_endpoint.go +++ b/graffiti/endpoints/publisher_endpoint.go @@ -39,6 +39,11 @@ const ( DeleteOnDisconnect PersistencePolicy = "DeleteOnDisconnect" ) +// Inhibitor is used to disable graph events on a forwarder +type Inhibitor interface { + Inhib(ws.Speaker) +} + // PublisherEndpoint serves the graph for external publishers, for instance // an external program that interacts with the Skydive graph. type PublisherEndpoint struct { @@ -49,6 +54,7 @@ type PublisherEndpoint struct { validator server.Validator authors map[string]bool logger logging.Logger + inhibitor Inhibitor } // OnDisconnected called when a publisher got disconnected. @@ -117,6 +123,11 @@ func (t *PublisherEndpoint) OnStructMessage(c ws.Speaker, msg *ws.StructMessage) t.Graph.Lock() defer t.Graph.Unlock() + if t.inhibitor != nil { + t.inhibitor.Inhib(c) + defer t.inhibitor.Inhib(nil) + } + switch msgType { case messages.SyncRequestMsgType: reply := msg.Reply(t.Graph, messages.SyncReplyMsgType, http.StatusOK) @@ -169,7 +180,7 @@ func (t *PublisherEndpoint) OnStructMessage(c ws.Speaker, msg *ws.StructMessage) } // NewPublisherEndpoint returns a new server for external publishers. -func NewPublisherEndpoint(pool ws.StructSpeakerPool, g *graph.Graph, validator server.Validator, logger logging.Logger) *PublisherEndpoint { +func NewPublisherEndpoint(pool ws.StructSpeakerPool, g *graph.Graph, validator server.Validator, logger logging.Logger, inhibitor Inhibitor) *PublisherEndpoint { if logger == nil { logger = logging.GetLogger() } @@ -180,6 +191,7 @@ func NewPublisherEndpoint(pool ws.StructSpeakerPool, g *graph.Graph, validator s validator: validator, authors: make(map[string]bool), logger: logger, + inhibitor: inhibitor, } pool.AddEventHandler(t) diff --git a/graffiti/endpoints/pubsub_endpoint.go b/graffiti/endpoints/pubsub_endpoint.go index 080b5ad730..74ebb5fca1 100644 --- a/graffiti/endpoints/pubsub_endpoint.go +++ b/graffiti/endpoints/pubsub_endpoint.go @@ -34,8 +34,9 @@ type PubSubEndpoint struct { // NewPubSubEndpoint returns a new PubSub endpoint func NewPubSubEndpoint(pool ws.StructSpeakerPool, validator server.Validator, g *graph.Graph, tr *traversal.GremlinTraversalParser, logger logging.Logger) *PubSubEndpoint { + subscriberEndpoint := NewSubscriberEndpoint(pool, g, tr, logger) return &PubSubEndpoint{ - publisherEndpoint: NewPublisherEndpoint(pool, g, validator, logger), - subscriberEndpoint: NewSubscriberEndpoint(pool, g, tr, logger), + publisherEndpoint: NewPublisherEndpoint(pool, g, validator, logger, subscriberEndpoint), + subscriberEndpoint: subscriberEndpoint, } } diff --git a/graffiti/endpoints/replication_endpoint.go b/graffiti/endpoints/replication_endpoint.go index aaa53cab89..d27b17e387 100644 --- a/graffiti/endpoints/replication_endpoint.go +++ b/graffiti/endpoints/replication_endpoint.go @@ -96,6 +96,8 @@ func (p *ReplicatorPeer) OnConnected(c ws.Speaker) error { return errAlreadyConnected } + p.endpoint.logger.Infof("Connected to peer %s for replication", host) + p.endpoint.peerStates[host] = state state.cnt++ @@ -372,6 +374,8 @@ func (t *ReplicationEndpoint) OnConnected(c ws.Speaker) error { return errAlreadyConnected } + t.logger.Infof("New replication incomer %s", host) + t.peerStates[host] = state state.cnt++ diff --git a/graffiti/endpoints/subscriber_endpoint.go b/graffiti/endpoints/subscriber_endpoint.go index 8fb5b1e466..e2368afa08 100644 --- a/graffiti/endpoints/subscriber_endpoint.go +++ b/graffiti/endpoints/subscriber_endpoint.go @@ -22,6 +22,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "github.com/safchain/insanelock" @@ -29,6 +30,7 @@ import ( "github.com/skydive-project/skydive/graffiti/graph/traversal" "github.com/skydive-project/skydive/graffiti/logging" "github.com/skydive-project/skydive/graffiti/messages" + "github.com/skydive-project/skydive/graffiti/websocket" ws "github.com/skydive-project/skydive/graffiti/websocket" ) @@ -47,6 +49,7 @@ type subscriber struct { gremlinFilter string ts *traversal.GremlinTraversalSequence updatePolicy UpdatePolicy + inhibit atomic.Value } func (s *subscriber) getSubGraph(g *graph.Graph, lockGraph bool) (*graph.Graph, error) { @@ -79,6 +82,7 @@ type SubscriberEndpoint struct { gremlinParser *traversal.GremlinTraversalParser subscribers map[ws.Speaker]*subscriber logger logging.Logger + inhib atomic.Value } func (t *SubscriberEndpoint) newSubscriber(speaker ws.Speaker, gremlinFilter string, lockGraph bool) (s *subscriber, err error) { @@ -222,10 +226,17 @@ func (t *SubscriberEndpoint) notifyClients(typ string, i interface{}, ops []grap } t.RUnlock() +SUBSCRIBER: for _, subscriber := range subscribers { msg := i msgType := typ + if inhibitedSpeaker := t.inhib.Load(); inhibitedSpeaker.(string) != "" { + if inhibitedSpeaker.(string) == subscriber.Speaker.GetRemoteHost() { + continue SUBSCRIBER + } + } + if subscriber.updatePolicy == PartialUpdates { switch typ { case messages.NodeUpdatedMsgType: @@ -323,6 +334,15 @@ func (t *SubscriberEndpoint) OnEdgeDeleted(e *graph.Edge) { t.notifyClients(messages.EdgeDeletedMsgType, e, nil) } +// Inhib node and edge forwarding +func (t *SubscriberEndpoint) Inhib(c websocket.Speaker) { + remoteHost := "" + if c != nil { + remoteHost = c.GetRemoteHost() + } + t.inhib.Store(remoteHost) +} + // NewSubscriberEndpoint returns a new server to be used by external subscribers, // for instance the WebUI. func NewSubscriberEndpoint(pool ws.StructSpeakerPool, g *graph.Graph, tr *traversal.GremlinTraversalParser, logger logging.Logger) *SubscriberEndpoint { @@ -338,6 +358,8 @@ func NewSubscriberEndpoint(pool ws.StructSpeakerPool, g *graph.Graph, tr *traver logger: logger, } + t.inhib.Store("") + pool.AddEventHandler(t) // subscribe to the graph messages diff --git a/graffiti/hub/hub.go b/graffiti/hub/hub.go index 4aa9c648f4..3e009e6727 100644 --- a/graffiti/hub/hub.go +++ b/graffiti/hub/hub.go @@ -64,7 +64,7 @@ type Opts struct { APIAuthBackend shttp.AuthenticationBackend ClusterAuthBackend shttp.AuthenticationBackend ReplicationPeers []service.Address - ClusterPeers map[string]PeeringOpts + ClusterPeers map[string]*PeeringOpts TLSConfig *tls.Config EtcdClient *etcdclient.Client EtcdServerOpts *etcdserver.EmbeddedServerOpts @@ -342,12 +342,12 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, podOpts.AuthBackend = opts.ClusterAuthBackend podOpts.PongListeners = []websocket.PongListener{hub} podWSServer := websocket.NewStructServer(websocket.NewServer(httpServer, podEndpoint, podOpts)) - endpoints.NewPublisherEndpoint(podWSServer, g, nil, opts.Logger) + endpoints.NewPublisherEndpoint(podWSServer, g, nil, opts.Logger, nil) pubOpts := opts.WebsocketOpts pubOpts.AuthBackend = opts.APIAuthBackend publisherWSServer := websocket.NewStructServer(websocket.NewServer(httpServer, "/ws/publisher", pubOpts)) - endpoints.NewPublisherEndpoint(publisherWSServer, g, opts.GraphValidator, opts.Logger) + endpoints.NewPublisherEndpoint(publisherWSServer, g, opts.GraphValidator, opts.Logger, nil) repOpts := opts.WebsocketOpts repOpts.AuthBackend = opts.ClusterAuthBackend @@ -419,7 +419,7 @@ func NewHub(id string, serviceType service.Type, listen string, g *graph.Graph, if peeringOpts.SubscriptionFilter != "*" { peeringOpts.WebsocketClientOpts.Headers.Add("X-Gremlin-Filter", peeringOpts.SubscriptionFilter) } - client = clients.NewSubscriber(wsClient, g, opts.Logger) + client = clients.NewSubscriber(wsClient, g, opts.Logger, nil) default: url, _ := http.MakeURL("ws", peer.Addr, peer.Port, "/ws/publisher", peeringOpts.WebsocketClientOpts.TLSConfig != nil) client = websocket.NewClient(id, serviceType, url, peeringOpts.WebsocketClientOpts) diff --git a/graffiti/hub/peering.go b/graffiti/hub/peering.go index 5f66aef140..fd4bf1024d 100644 --- a/graffiti/hub/peering.go +++ b/graffiti/hub/peering.go @@ -33,8 +33,11 @@ func (p *clusterPeering) OnStartAsSlave() { } func (p *clusterPeering) OnSwitchToSlave() { - p.cancel() - p.wg.Wait() + if p.cancel != nil { + p.cancel() + p.wg.Wait() + p.cancel = nil + } } func (p *clusterPeering) OnNewMaster(c websocket.Speaker) { diff --git a/graffiti/pod/pod.go b/graffiti/pod/pod.go index 6ef385d551..ccbc70b50b 100644 --- a/graffiti/pod/pod.go +++ b/graffiti/pod/pod.go @@ -187,7 +187,7 @@ func NewPod(id string, serviceType service.Type, listen string, podEndpoint stri forwarder := clients.NewForwarder(g, clientPool, nil, logging.GetLogger()) publisherWSServer := websocket.NewStructServer(newWSServer("/ws/publisher", opts.APIAuthBackend)) - endpoints.NewPublisherEndpoint(publisherWSServer, g, opts.GraphValidator, opts.Logger) + endpoints.NewPublisherEndpoint(publisherWSServer, g, opts.GraphValidator, opts.Logger, nil) api.RegisterTopologyAPI(httpServer, g, tr, opts.APIAuthBackend, opts.TopologyMarshallers) From 842d08a46b3af837057c7567cf8b73a1910cf22e Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Tue, 10 Aug 2021 20:26:44 +0200 Subject: [PATCH 8/8] Add tests on replication and peering --- graffiti/hub/hub_test.go | 474 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 474 insertions(+) create mode 100644 graffiti/hub/hub_test.go diff --git a/graffiti/hub/hub_test.go b/graffiti/hub/hub_test.go new file mode 100644 index 0000000000..55ab2fcf0e --- /dev/null +++ b/graffiti/hub/hub_test.go @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2021 Sylvain Baubeau + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy ofthe License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specificlanguage governing permissions and + * limitations under the License. + * + */ + +package hub + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/skydive-project/skydive/graffiti/api/types" + etcdclient "github.com/skydive-project/skydive/graffiti/etcd/client" + etcdserver "github.com/skydive-project/skydive/graffiti/etcd/server" + "github.com/skydive-project/skydive/graffiti/graph" + shttp "github.com/skydive-project/skydive/graffiti/http" + "github.com/skydive-project/skydive/graffiti/logging" + "github.com/skydive-project/skydive/graffiti/service" + "github.com/skydive-project/skydive/graffiti/websocket" +) + +var ( + hubCount = 0 + clusterCount = 0 +) + +type testCluster struct { + hubs []*testHub +} + +func (c *testCluster) start(t *testing.T) { + startHubs(t, c.hubs) +} + +func (c *testCluster) GetNode(id graph.Identifier) error { + for _, hub := range c.hubs { + if err := hub.GetNode(id); err != nil { + return err + } + } + return nil +} + +type testHub struct { + *Hub + crudClient *shttp.CrudClient +} + +func (h *testHub) GetNode(id graph.Identifier) error { + var node graph.Node + if err := h.crudClient.Get("node", string(id), &node); err != nil { + return err + } + return nil +} + +func newTestHub(index int, opts Opts) (*Hub, error) { + authBackend := shttp.NewNoAuthenticationBackend() + + etcdAddr := fmt.Sprintf("http://%s:%d", etcdclient.DefaultServer, etcdclient.DefaultPort+(clusterCount*2)) + + etcdClientOpts := etcdclient.Opts{ + Servers: []string{etcdAddr}, + Timeout: 5 * time.Second, + } + + hostname := opts.Hostname + if hostname == "" { + hostname, _ = os.Hostname() + } + opts.Hostname = hostname + + etcdClient, err := etcdclient.NewClient(hostname, etcdClientOpts) + if err != nil { + return nil, err + } + + origin := "graffiti-hub" + cached, err := graph.NewCachedBackend(nil, etcdClient, hostname, service.Type(origin)) + if err != nil { + return nil, err + } + + if opts.APIAuthBackend == nil { + opts.APIAuthBackend = authBackend + } + + if opts.ClusterAuthBackend == nil { + opts.ClusterAuthBackend = authBackend + } + + g := graph.NewGraph(hostname, cached, origin) + + opts.WebsocketOpts = websocket.ServerOpts{ + WriteCompression: false, + QueueSize: 10000, + PingDelay: time.Second * time.Duration(2), + PongTimeout: time.Second * time.Duration(10), + Logger: opts.Logger, + } + + opts.EtcdClient = etcdClient + + if index == 0 { + opts.EtcdServerOpts = &etcdserver.EmbeddedServerOpts{ + Name: "localhost", + Listen: fmt.Sprintf("127.0.0.1:%d", 12379+(clusterCount*2)), + DataDir: "/tmp/etcd-" + hostname, + Logger: opts.Logger, + } + } + + hub, err := NewHub(hostname, service.Type("Hub"), fmt.Sprintf("127.0.0.1:%d", 8082+hubCount), g, cached, "/ws/pod", opts) + if err != nil { + return nil, err + } + + return hub, nil +} + +func newTestCluster(t *testing.T, name string, count int, defaultOpts Opts) (*testCluster, error) { + hubs := make([]*testHub, count) + + oldHubCount := hubCount + for i := 0; i < count; i++ { + var replicationPeers []service.Address + if count > 1 { + for j := 0; j < count; j++ { + if i != j { + sa, err := service.AddressFromString(fmt.Sprintf("127.0.0.1:%d", 8082+oldHubCount+j)) + if err != nil { + return nil, err + } + replicationPeers = append(replicationPeers, sa) + } + } + } + + hostname := fmt.Sprintf("%s-%d", name, i) + logging.InitLogging(hostname, true, []*logging.LoggerConfig{logging.NewLoggerConfig(logging.NewStdioBackend(os.Stderr), "DEBUG", "")}) + + opts := defaultOpts + + for cluster := range opts.ClusterPeers { + peeringOpts := opts.ClusterPeers[cluster] + peeringOpts.WebsocketClientOpts = websocket.NewClientOpts() + } + + opts.ClusterName = name + opts.Hostname = hostname + opts.ReplicationPeers = replicationPeers + opts.Logger = logging.GetLogger() + + t.Logf("Hub %s options: %+v", name, opts) + + hub, err := newTestHub(i, opts) + if err != nil { + return nil, err + } + + hubCount++ + + crudClient, err := getHubClient(hub) + if err != nil { + return nil, err + } + + hubs[i] = &testHub{ + Hub: hub, + crudClient: crudClient, + } + } + + clusterCount++ + return &testCluster{ + hubs: hubs, + }, nil +} + +func getHubClient(hub *Hub) (*shttp.CrudClient, error) { + var authenticationOpts shttp.AuthenticationOpts + url, err := shttp.MakeURL("http", "127.0.0.1", hub.httpServer.Port, "/api/", false) + if err != nil { + return nil, err + } + + restClient := shttp.NewRestClient(url, &authenticationOpts, nil) + crudClient := shttp.NewCrudClient(restClient) + return crudClient, nil +} + +func TestReplication(t *testing.T) { + clusterSize := 2 + cluster, err := newTestCluster(t, "replication", clusterSize, Opts{}) + if err != nil { + t.Fatal(err) + } + + cluster.start(t) + + for _, hub := range cluster.hubs { + status := hub.GetStatus().(*Status) + + if len(status.Peers.Incomers)+len(status.Peers.Outgoers) != clusterSize-1 { + t.Errorf("Wrong number of incomers %d and outgoers %d", len(status.Peers.Incomers), len(status.Peers.Outgoers)) + } + + t.Logf("%+v", status) + } + + firstHub := cluster.hubs[0] + crudClient := firstHub.crudClient + + hostname := firstHub.httpServer.Host + origin := graph.Origin(hostname, "test") + metadata := graph.Metadata{ + "Name": "test-name", + "Type": "test-type", + } + id := graph.GenID() + node := types.Node(*graph.CreateNode(id, metadata, graph.Time(time.Now()), hostname, origin)) + + if err := crudClient.Create("node", &node, nil); err != nil { + t.Error(err) + } + + if err := crudClient.Get("node", string(id), &node); err != nil { + t.Error(err) + } + + crudClient2, err := getHubClient(cluster.hubs[1].Hub) + if err != nil { + t.Fatal(err) + } + + if err := crudClient2.Get("node", string(id), &node); err != nil { + t.Error(err) + } + + if err := crudClient2.Delete("node", string(id)); err != nil { + t.Error(err) + } + + if err := crudClient.Get("node", string(id), &node); err == nil { + t.Errorf("should not find node %s", id) + } + + t.Log(node) +} + +func createHubNode(hub *Hub, id graph.Identifier, metadata graph.Metadata) types.Node { + hostname := hub.httpServer.Host + origin := graph.Origin(hostname, "test") + return types.Node(*graph.CreateNode(id, metadata, graph.Time(time.Now()), hostname, origin)) +} + +func startHubs(t *testing.T, hubs []*testHub) { + for i, hub := range hubs { + t.Logf("Starting hub %d", i) + + if err := hub.Start(); err != nil { + t.Error(err) + } + } + time.Sleep(3 * time.Second) +} + +func TestPeering(t *testing.T) { + clusterSize := 2 + + peering1Cluster, err := newTestCluster(t, "peering1", clusterSize, Opts{}) + if err != nil { + t.Fatal(err) + } + + sa := service.Address{ + Addr: peering1Cluster.hubs[0].httpServer.Addr, + Port: peering1Cluster.hubs[0].httpServer.Port, + } + + opts := Opts{ + ClusterPeers: map[string]*PeeringOpts{ + "peering1": { + Endpoints: []service.Address{sa}, + SubscriptionFilter: "*", + }, + }, + } + + peering2Cluster, err := newTestCluster(t, "peering2", clusterSize, opts) + if err != nil { + t.Fatal(err) + } + + hubs := append(peering1Cluster.hubs, peering2Cluster.hubs...) + + opts = Opts{ + ClusterPeers: map[string]*PeeringOpts{ + "peering1": { + Endpoints: []service.Address{sa}, + PublisherFilter: "*", + }, + }, + } + + peering3Cluster, err := newTestCluster(t, "peering3", clusterSize, opts) + if err != nil { + t.Fatal(err) + } + + hubs = append(hubs, peering3Cluster.hubs...) + + opts = Opts{ + ClusterPeers: map[string]*PeeringOpts{ + "peering1": { + Endpoints: []service.Address{sa}, + PublisherFilter: "*", + SubscriptionFilter: "*", + }, + }, + } + + peering4Cluster, err := newTestCluster(t, "peering4", clusterSize, opts) + if err != nil { + t.Fatal(err) + } + + hubs = append(hubs, peering4Cluster.hubs...) + + startHubs(t, hubs) + + peering1Hub := peering1Cluster.hubs[0] + + peering3Hub := peering3Cluster.hubs[0] + peering4Hub := peering4Cluster.hubs[0] + + metadata := graph.Metadata{ + "Name": "test-name", + "Type": "test-type", + } + id := graph.GenID() + + node := createHubNode(peering1Hub.Hub, id, metadata) + + // Test subscription + t.Logf("Test subscription") + + if err := peering1Hub.crudClient.Create("node", &node, nil); err != nil { + t.Error(err) + } + + time.Sleep(200 * time.Millisecond) + + if err := peering1Cluster.GetNode(id); err != nil { + t.Error(err) + } + + status := peering2Cluster.hubs[0].GetStatus().(*Status) + + if len(status.PeeredClusters) != 1 { + t.Errorf("Should have one peered cluster, got %d", len(status.PeeredClusters)) + } + + t.Logf("%+v", status) + + status = peering2Cluster.hubs[1].GetStatus().(*Status) + t.Logf("%+v", status) + + if err := peering2Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering1Hub.crudClient.Delete("node", string(id)); err != nil { + t.Error(err) + } + + time.Sleep(200 * time.Millisecond) + + if err := peering1Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + if err := peering2Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + // Test publishing + t.Logf("Test publishing") + + node = createHubNode(peering3Cluster.hubs[0].Hub, id, metadata) + + if err := peering3Hub.crudClient.Create("node", &node, nil); err != nil { + t.Error(err) + } + + time.Sleep(200 * time.Millisecond) + + if err := peering1Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering2Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering3Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering4Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering3Hub.crudClient.Delete("node", string(id)); err != nil { + t.Error(err) + } + + time.Sleep(200 * time.Millisecond) + + if err := peering1Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + if err := peering2Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + if err := peering3Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + if err := peering4Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + // Test pubsub + t.Logf("Test pubsub") + + node = createHubNode(peering4Cluster.hubs[0].Hub, id, metadata) + + if err := peering4Hub.crudClient.Create("node", &node, nil); err != nil { + t.Error(err) + } + + time.Sleep(200 * time.Millisecond) + + if err := peering1Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering2Cluster.GetNode(id); err != nil { + t.Error(err) + } + + if err := peering3Cluster.GetNode(id); err == nil { + t.Errorf("should not find node %s", id) + } + + t.Log(node) +}