Skip to content

Commit

Permalink
Merge pull request #2369 from lebauce/peering
Browse files Browse the repository at this point in the history
Add cluster peering support
  • Loading branch information
lebauce committed Feb 16, 2023
2 parents 645135a + 842d08a commit 46a5fcb
Show file tree
Hide file tree
Showing 25 changed files with 1,187 additions and 301 deletions.
35 changes: 32 additions & 3 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,43 @@ func NewServerFromConfig() (*Server, error) {
return nil, err
}

peers, err := config.GetAnalyzerServiceAddresses()
replicationPeers, err := config.GetAnalyzerServiceAddresses()
if err != nil {
return nil, fmt.Errorf("Unable to get the analyzers list: %s", err)
return nil, fmt.Errorf("unable to get the analyzers list: %s", err)
}

wsClientOpts, err := config.NewWSClientOpts(ClusterAuthenticationOpts())
if err != nil {
return nil, err
}

clusterPeers := make(map[string]*hub.PeeringOpts)
peeredClusters := config.GetStringMapString("analyzer.peers")
for clusterName := range peeredClusters {
peerConfig := "analyzer.peers." + clusterName
endpoints, err := config.GetServiceAddresses(peerConfig + ".endpoints")
if err != nil {
return nil, fmt.Errorf("unable to get peers endpoints for cluster '%s': %w", clusterName, err)
}
peeringOpts := &hub.PeeringOpts{
Endpoints: endpoints,
PublisherFilter: config.GetString(peerConfig + ".publish_filter"),
SubscriptionFilter: config.GetString(peerConfig + ".subscribe_filter"),
}

if wsClientOpts, err := config.NewWSClientOpts(
&shttp.AuthenticationOpts{
Username: config.GetString(peerConfig + ".username"),
Password: config.GetString(peerConfig + ".password"),
},
); err == nil {
peeringOpts.WebsocketClientOpts = *wsClientOpts
}

clusterPeers[clusterName] = peeringOpts

}

probeBundle, err := NewTopologyProbeBundleFromConfig(g)
if err != nil {
return nil, err
Expand All @@ -281,11 +308,13 @@ func NewServerFromConfig() (*Server, error) {
GraphValidator: topology.SchemaValidator,
StatusReporter: s,
TLSConfig: tlsConfig,
Peers: peers,
ReplicationPeers: replicationPeers,
ClusterPeers: clusterPeers,
EtcdClient: etcdClient,
TopologyMarshallers: api.TopologyMarshallers,
Assets: &statics.Assets,
Version: version.Version,
ClusterName: config.GetString("analyzer.cluster"),
}

if config.GetBool("etcd.embedded") {
Expand Down
3 changes: 1 addition & 2 deletions cmd/client/edgerule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"

api "github.com/skydive-project/skydive/api/types"
gclient "github.com/skydive-project/skydive/graffiti/cmd/client"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/logging"
"github.com/skydive-project/skydive/validator"
Expand Down Expand Up @@ -57,7 +56,7 @@ var EdgeRuleCreate = &cobra.Command{
}
},
Run: func(cmd *cobra.Command, args []string) {
m, err := gclient.DefToMetadata(metadata, graph.Metadata{})
m, err := graph.DefToMetadata(metadata, graph.Metadata{})
if err != nil {
exitOnError(err)
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/client/noderule.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"

api "github.com/skydive-project/skydive/api/types"
gclient "github.com/skydive-project/skydive/graffiti/cmd/client"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/logging"
"github.com/skydive-project/skydive/validator"
Expand Down Expand Up @@ -57,7 +56,7 @@ var NodeRuleCreate = &cobra.Command{
SilenceUsage: false,

Run: func(cmd *cobra.Command, args []string) {
m, err := gclient.DefToMetadata(metadata, graph.Metadata{})
m, err := graph.DefToMetadata(metadata, graph.Metadata{})
if err != nil {
exitOnError(err)
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (

"github.com/skydive-project/skydive/agent"
"github.com/skydive-project/skydive/config"
"github.com/skydive-project/skydive/graffiti/clients"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/http"
"github.com/skydive-project/skydive/graffiti/logging"
"github.com/skydive-project/skydive/graffiti/seed"
"github.com/skydive-project/skydive/graffiti/websocket"
"github.com/skydive-project/skydive/probe"
tp "github.com/skydive-project/skydive/topology/probes"
Expand Down Expand Up @@ -104,7 +104,7 @@ var SeedCmd = &cobra.Command{
os.Exit(1)
}

origin := graph.Origin(hostID, seed.Service)
origin := graph.Origin(hostID, clients.SeedService)
g := graph.NewGraph(hostID, memory, origin)

probeBundle = probe.NewBundle()
Expand All @@ -121,13 +121,12 @@ var SeedCmd = &cobra.Command{
TLSConfig: tlsConfig,
}

seed, err := seed.NewSeed(g, seed.Service, agentAddr, subscriberFilter, *wsOpts, logging.GetLogger())
seed, err := clients.NewSeed(g, clients.SeedService, agentAddr, subscriberFilter, "", *wsOpts, logging.GetLogger())
if err != nil {
logging.GetLogger().Errorf("Failed to start seed: %s", err)
os.Exit(1)
}

seed.AddEventHandler(&seedHandler{g: g, probes: args})
seed.Start()

logging.GetLogger().Notice("Skydive seed started")
Expand Down
32 changes: 26 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ func (c *SkydiveConfig) SetDefault(key string, value interface{}) {
c.Viper.SetDefault(key, value)
}

// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
func GetAnalyzerServiceAddresses() ([]service.Address, error) {
return cfg.GetAnalyzerServiceAddresses()
// GetServiceAddresses returns a list of services
func GetServiceAddresses(section string) ([]service.Address, error) {
return cfg.GetServiceAddresses(section)
}

// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) {
// GetServiceAddresses returns a list of services
func (c *SkydiveConfig) GetServiceAddresses(section string) ([]service.Address, error) {
var addresses []service.Address
for _, a := range c.GetStringSlice("analyzers") {
for _, a := range c.GetStringSlice(section) {
sa, err := service.AddressFromString(a)
if err != nil {
return nil, err
Expand All @@ -390,6 +390,16 @@ func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error)
return addresses, nil
}

// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
func GetAnalyzerServiceAddresses() ([]service.Address, error) {
return cfg.GetAnalyzerServiceAddresses()
}

// GetAnalyzerServiceAddresses returns a list of analyzers to connect to
func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) {
return c.GetServiceAddresses("analyzers")
}

// GetOneAnalyzerServiceAddress returns a random connectable Analyzer
func GetOneAnalyzerServiceAddress() (service.Address, error) {
return cfg.GetOneAnalyzerServiceAddress()
Expand Down Expand Up @@ -557,6 +567,16 @@ func (c *SkydiveConfig) GetStringMapString(key string) map[string]string {
return c.Viper.GetStringMapString(realKey(key))
}

// GetStringMap returns a map of strings from the configuration
func GetStringMap(key string) map[string]interface{} {
return cfg.GetStringMap(key)
}

// GetStringMapString returns a map of strings from the configuration
func (c *SkydiveConfig) GetStringMap(key string) map[string]interface{} {
return c.Viper.GetStringMap(realKey(key))
}

// BindPFlag binds a command line flag to a configuration value
func BindPFlag(key string, flag *pflag.Flag) error {
return cfg.BindPFlag(key, flag)
Expand Down
2 changes: 2 additions & 0 deletions config/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package config

import (
"net/http"
"net/url"
"time"

Expand All @@ -39,6 +40,7 @@ func NewWSClientOpts(authOpts *shttp.AuthenticationOpts) (*websocket.ClientOpts,
WriteCompression: GetBool("http.ws.enable_write_compression"),
TLSConfig: tlsConfig,
AuthOpts: authOpts,
Headers: http.Header{},
}, nil
}

Expand Down
45 changes: 43 additions & 2 deletions graffiti/forwarder/forwarder.go → graffiti/clients/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*
*/

package forwarder
package clients

import (
"sync/atomic"

"github.com/skydive-project/skydive/graffiti/filters"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/logging"
"github.com/skydive-project/skydive/graffiti/messages"
Expand All @@ -31,6 +34,8 @@ type Forwarder struct {
masterElection *ws.MasterElection
graph *graph.Graph
logger logging.Logger
nodeFilter *filters.Filter
inhibit atomic.Value
}

func (t *Forwarder) triggerResync() {
Expand Down Expand Up @@ -68,6 +73,10 @@ func (t *Forwarder) OnNewMaster(c ws.Speaker) {

// OnNodeUpdated graph node updated event. Implements the EventListener interface.
func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) {
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(
messages.NodePartiallyUpdatedMsgType,
Expand All @@ -83,20 +92,34 @@ func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp)

// OnNodeAdded graph node added event. Implements the EventListener interface.
func (t *Forwarder) OnNodeAdded(n *graph.Node) {
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(messages.NodeAddedMsgType, n),
)
}

// OnNodeDeleted graph node deleted event. Implements the EventListener interface.
func (t *Forwarder) OnNodeDeleted(n *graph.Node) {
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(messages.NodeDeletedMsgType, n),
)
}

// OnEdgeUpdated graph edge updated event. Implements the EventListener interface.
func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) {
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(
messages.EdgePartiallyUpdatedMsgType,
Expand All @@ -112,26 +135,43 @@ func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp)

// OnEdgeAdded graph edge added event. Implements the EventListener interface.
func (t *Forwarder) OnEdgeAdded(e *graph.Edge) {
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(messages.EdgeAddedMsgType, e),
)
}

// OnEdgeDeleted graph edge deleted event. Implements the EventListener interface.
func (t *Forwarder) OnEdgeDeleted(e *graph.Edge) {
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
return
}

t.masterElection.SendMessageToMaster(
messages.NewStructMessage(messages.EdgeDeletedMsgType, e),
)
}

// Inhib node and edge forwarding
func (t *Forwarder) Inhib(c ws.Speaker) {
t.inhibit.Store(c != nil)
}

// GetMaster returns the current analyzer the agent is sending its events to
func (t *Forwarder) GetMaster() ws.Speaker {
return t.masterElection.GetMaster()
}

// NewForwarder returns a new Graph forwarder which forwards event of the given graph
// to the given WebSocket JSON speakers.
func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logger) *Forwarder {
func NewForwarder(g *graph.Graph, pool ws.SpeakerPool, nodeFilter *filters.Filter, logger logging.Logger) *Forwarder {
if logger == nil {
logger = logging.GetLogger()
}
Expand All @@ -142,6 +182,7 @@ func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logg
masterElection: masterElection,
graph: g,
logger: logger,
nodeFilter: nodeFilter,
}

masterElection.AddEventHandler(t)
Expand Down
Loading

0 comments on commit 46a5fcb

Please sign in to comment.